-
Notifications
You must be signed in to change notification settings - Fork 1.5k
/
thread_pool.h
143 lines (107 loc) · 4.01 KB
/
thread_pool.h
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
// Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file
// for details. All rights reserved. Use of this source code is governed by a
// BSD-style license that can be found in the LICENSE file.
#ifndef RUNTIME_VM_THREAD_POOL_H_
#define RUNTIME_VM_THREAD_POOL_H_
#include <memory>
#include <utility>
#include "vm/allocation.h"
#include "vm/globals.h"
#include "vm/intrusive_dlist.h"
#include "vm/os_thread.h"
namespace dart {
class MonitorLocker;
class ThreadPool {
public:
// Subclasses of Task are able to run on a ThreadPool.
class Task : public IntrusiveDListEntry<Task> {
protected:
Task() {}
public:
virtual ~Task() {}
// Override this to provide task-specific behavior.
virtual void Run() = 0;
private:
DISALLOW_COPY_AND_ASSIGN(Task);
};
explicit ThreadPool(uintptr_t max_pool_size = 0);
// Prevent scheduling of new tasks, wait until all pending tasks are done
// and join worker threads.
virtual ~ThreadPool();
// Runs a task on the thread pool.
template <typename T, typename... Args>
bool Run(Args&&... args) {
return RunImpl(std::unique_ptr<Task>(new T(std::forward<Args>(args)...)));
}
// Returns `true` if the current thread is runing on the [this] thread pool.
bool CurrentThreadIsWorker();
// Mark the current thread as being blocked (e.g. in native code). This might
// temporarily increase the max thread pool size.
void MarkCurrentWorkerAsBlocked();
// Mark the current thread as being unblocked. Must be called iff
// [MarkCurrentWorkerAsBlocked] was called before and the thread is now ready
// to coninue executing.
void MarkCurrentWorkerAsUnBlocked();
// Triggers shutdown, prevents scheduling of new tasks.
void Shutdown();
// Exposed for unit test in thread_pool_test.cc
uint64_t workers_started() const { return count_idle_ + count_running_; }
// Exposed for unit test in thread_pool_test.cc
uint64_t workers_stopped() const { return count_dead_; }
private:
class Worker : public IntrusiveDListEntry<Worker> {
public:
explicit Worker(ThreadPool* pool);
// Starts the thread for the worker. This should only be called
// after a task has been set by the initial call to SetTask().
void StartThread();
private:
friend class ThreadPool;
// The main entry point for new worker threads.
static void Main(uword args);
// Fields initialized during construction or in start of main function of
// thread.
ThreadPool* pool_;
ThreadJoinId join_id_;
OSThread* os_thread_ = nullptr;
bool is_blocked_ = false;
DISALLOW_COPY_AND_ASSIGN(Worker);
};
protected:
// Called when the thread pool turns idle.
//
// Subclasses can override this to perform some action.
// NOTE: While this function is running the thread pool will be locked.
virtual void OnEnterIdleLocked(MonitorLocker* ml) {}
// Whether a shutdown was requested.
bool ShuttingDownLocked() { return shutting_down_; }
// Whether new tasks are ready to be run.
bool TasksWaitingToRunLocked() { return !tasks_.IsEmpty(); }
private:
using TaskList = IntrusiveDList<Task>;
using WorkerList = IntrusiveDList<Worker>;
bool RunImpl(std::unique_ptr<Task> task);
void WorkerLoop(Worker* worker);
Worker* ScheduleTaskLocked(MonitorLocker* ml, std::unique_ptr<Task> task);
void IdleToRunningLocked(Worker* worker);
void RunningToIdleLocked(Worker* worker);
void IdleToDeadLocked(Worker* worker);
void ObtainDeadWorkersLocked(WorkerList* dead_workers_to_join);
void JoinDeadWorkersLocked(WorkerList* dead_workers_to_join);
Monitor pool_monitor_;
bool shutting_down_ = false;
uint64_t count_running_ = 0;
uint64_t count_idle_ = 0;
uint64_t count_dead_ = 0;
WorkerList running_workers_;
WorkerList idle_workers_;
WorkerList dead_workers_;
uint64_t pending_tasks_ = 0;
TaskList tasks_;
Monitor exit_monitor_;
std::atomic<bool> all_workers_dead_;
uintptr_t max_pool_size_ = 0;
DISALLOW_COPY_AND_ASSIGN(ThreadPool);
};
} // namespace dart
#endif // RUNTIME_VM_THREAD_POOL_H_