-
Notifications
You must be signed in to change notification settings - Fork 5.5k
/
ThreadPoolExecutor.h
415 lines (343 loc) · 12.8 KB
/
ThreadPoolExecutor.h
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
/*
* Copyright (c) Meta Platforms, Inc. and affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#pragma once
#include <algorithm>
#include <mutex>
#include <queue>
#include <glog/logging.h>
#include <folly/DefaultKeepAliveExecutor.h>
#include <folly/Memory.h>
#include <folly/SharedMutex.h>
#include <folly/executors/GlobalThreadPoolList.h>
#include <folly/executors/task_queue/LifoSemMPMCQueue.h>
#include <folly/executors/thread_factory/NamedThreadFactory.h>
#include <folly/io/async/Request.h>
#include <folly/portability/GFlags.h>
#include <folly/synchronization/AtomicStruct.h>
#include <folly/synchronization/Baton.h>
namespace folly {
/* Base class for implementing threadpool based executors.
*
* Dynamic thread behavior:
*
* ThreadPoolExecutors may vary their actual running number of threads
* between minThreads_ and maxThreads_, tracked by activeThreads_.
* The actual implementation of joining an idle thread is left to the
* ThreadPoolExecutors' subclass (typically by LifoSem try_take_for
* timing out). Idle threads should be removed from threadList_, and
* threadsToJoin incremented, and activeThreads_ decremented.
*
* On task add(), if an executor can guarantee there is an active
* thread that will handle the task, then nothing needs to be done.
* If not, then ensureActiveThreads() should be called to possibly
* start another pool thread, up to maxThreads_.
*
* ensureJoined() is called on add(), such that we can join idle
* threads that were destroyed (which can't be joined from
* themselves).
*
* Thread pool stats accounting:
*
* Derived classes must register instances to keep stats on all thread
* pools by calling registerThreadPoolExecutor(this) on constructions
* and deregisterThreadPoolExecutor(this) on destruction.
*
* Registration must be done wherever getPendingTaskCountImpl is implemented
* and getPendingTaskCountImpl should be marked 'final' to avoid data races.
*/
class ThreadPoolExecutor : public DefaultKeepAliveExecutor {
public:
explicit ThreadPoolExecutor(
size_t maxThreads,
size_t minThreads,
std::shared_ptr<ThreadFactory> threadFactory);
~ThreadPoolExecutor() override;
void add(Func func) override = 0;
/**
* If func doesn't get started within expiration time after its enqueued,
* expireCallback will be run
*
* @param func Main function to be executed
* @param expiration Maximum time to wait for func to start execution
* @param expireCallback If expiration limit is reached, execute this callback
*/
virtual void add(
Func func, std::chrono::milliseconds expiration, Func expireCallback);
void setThreadFactory(std::shared_ptr<ThreadFactory> threadFactory) {
CHECK(numThreads() == 0);
threadFactory_ = std::move(threadFactory);
}
std::shared_ptr<ThreadFactory> getThreadFactory() const {
return threadFactory_;
}
size_t numThreads() const;
void setNumThreads(size_t numThreads);
// Return actual number of active threads -- this could be different from
// numThreads() due to ThreadPoolExecutor's dynamic behavior.
size_t numActiveThreads() const;
/*
* stop() is best effort - there is no guarantee that unexecuted tasks won't
* be executed before it returns. Specifically, IOThreadPoolExecutor's stop()
* behaves like join().
*/
virtual void stop();
virtual void join();
/**
* Execute f against all ThreadPoolExecutors, primarily for retrieving and
* exporting stats.
*/
static void withAll(FunctionRef<void(ThreadPoolExecutor&)> f);
struct PoolStats {
PoolStats()
: threadCount(0),
idleThreadCount(0),
activeThreadCount(0),
pendingTaskCount(0),
totalTaskCount(0),
maxIdleTime(0) {}
size_t threadCount, idleThreadCount, activeThreadCount;
uint64_t pendingTaskCount, totalTaskCount;
std::chrono::nanoseconds maxIdleTime;
};
PoolStats getPoolStats() const;
size_t getPendingTaskCount() const;
const std::string& getName() const;
/**
* Return the cumulative CPU time used by all threads in the pool, including
* those that are no longer alive. Requires system support for per-thread CPU
* clocks. If not available, the function returns 0. This operation can be
* expensive.
*/
std::chrono::nanoseconds getUsedCpuTime() const {
std::shared_lock r{threadListLock_};
return threadList_.getUsedCpuTime();
}
/**
* Base class for threads created with ThreadPoolExecutor.
* Some subclasses have methods that operate on these
* handles.
*/
class ThreadHandle {
public:
virtual ~ThreadHandle() = default;
};
/**
* Observer interface for thread start/stop.
* Provides hooks so actions can be taken when
* threads are created
*/
class Observer {
public:
virtual ~Observer() = default;
virtual void threadStarted(ThreadHandle*) {}
virtual void threadStopped(ThreadHandle*) {}
virtual void threadPreviouslyStarted(ThreadHandle* h) { threadStarted(h); }
virtual void threadNotYetStopped(ThreadHandle* h) { threadStopped(h); }
};
virtual void addObserver(std::shared_ptr<Observer>);
virtual void removeObserver(std::shared_ptr<Observer>);
struct TaskInfo {
int8_t priority;
uint64_t requestId = 0;
std::chrono::steady_clock::time_point enqueueTime;
uint64_t taskId;
};
struct DequeuedTaskInfo : TaskInfo {
std::chrono::nanoseconds waitTime{0}; // Dequeue time - enqueueTime.
};
struct ProcessedTaskInfo : DequeuedTaskInfo {
bool expired = false;
std::chrono::nanoseconds runTime{0};
};
class TaskObserver {
public:
virtual ~TaskObserver() = default;
virtual void taskEnqueued(const TaskInfo& /* info */) noexcept {}
virtual void taskDequeued(const DequeuedTaskInfo& /* info */) noexcept {}
virtual void taskProcessed(const ProcessedTaskInfo& /* info */) noexcept {}
private:
friend class ThreadPoolExecutor;
TaskObserver* next_ = nullptr;
};
// For performance reasons, TaskObservers can be added but not removed. All
// added observers will be destroyed on executor destruction.
void addTaskObserver(std::unique_ptr<TaskObserver> taskObserver);
// TODO(ott): Migrate call sites to the TaskObserver interface.
using TaskStats = ProcessedTaskInfo;
using TaskStatsCallback = std::function<void(const TaskStats&)>;
[[deprecated("Use addTaskObserver()")]] void subscribeToTaskStats(
TaskStatsCallback cb);
void setThreadDeathTimeout(std::chrono::milliseconds timeout) {
threadTimeout_ = timeout;
}
protected:
// Prerequisite: threadListLock_ writelocked
void addThreads(size_t n);
// Prerequisite: threadListLock_ writelocked
void removeThreads(size_t n, bool isJoin);
struct //
alignas(folly::cacheline_align_v) //
alignas(folly::AtomicStruct<std::chrono::steady_clock::time_point>) //
Thread : public ThreadHandle {
explicit Thread()
: id(nextId++),
handle(),
idle(true),
lastActiveTime(std::chrono::steady_clock::now()) {}
~Thread() override = default;
std::chrono::nanoseconds usedCpuTime() const;
static std::atomic<uint64_t> nextId;
uint64_t id;
std::thread handle;
std::atomic<bool> idle;
folly::AtomicStruct<std::chrono::steady_clock::time_point> lastActiveTime;
folly::Baton<> startupBaton;
};
using ThreadPtr = std::shared_ptr<Thread>;
struct Task {
struct Expiration {
std::chrono::milliseconds expiration;
Func expireCallback;
};
Task(
Func&& func,
std::chrono::milliseconds expiration,
Func&& expireCallback,
int8_t pri = 0);
int8_t priority() const { return priority_; }
Func func_;
std::chrono::steady_clock::time_point enqueueTime_;
std::shared_ptr<folly::RequestContext> context_;
std::unique_ptr<Expiration> expiration_;
private:
friend class ThreadPoolExecutor;
int8_t priority_;
uint64_t taskId_;
};
static void fillTaskInfo(const Task& task, TaskInfo& info);
void registerTaskEnqueue(const Task& task);
template <class F>
void forEachTaskObserver(F&& f) const {
auto* taskObserver = taskObservers_.load(std::memory_order_acquire);
while (taskObserver != nullptr) {
f(*taskObserver);
taskObserver = taskObserver->next_;
}
}
void runTask(const ThreadPtr& thread, Task&& task);
virtual void validateNumThreads(size_t /* numThreads */) {}
// The function that will be bound to pool threads. It must call
// thread->startupBaton.post() when it's ready to consume work.
virtual void threadRun(ThreadPtr thread) = 0;
// Stop n threads and put their ThreadPtrs in the stoppedThreads_ queue
// and remove them from threadList_, either synchronize or asynchronize
// Prerequisite: threadListLock_ writelocked
virtual void stopThreads(size_t n) = 0;
// Join n stopped threads and remove them from waitingForJoinThreads_ queue.
// Should not hold a lock because joining thread operation may invoke some
// cleanup operations on the thread, and those cleanup operations may
// require a lock on ThreadPoolExecutor.
void joinStoppedThreads(size_t n);
// To implement shutdown.
void stopAndJoinAllThreads(bool isJoin);
// Create a suitable Thread struct
virtual ThreadPtr makeThread() { return std::make_shared<Thread>(); }
static void registerThreadPoolExecutor(ThreadPoolExecutor* tpe);
static void deregisterThreadPoolExecutor(ThreadPoolExecutor* tpe);
// Prerequisite: threadListLock_ readlocked or writelocked
virtual size_t getPendingTaskCountImpl() const = 0;
// Called with threadListLock_ readlocked or writelocked.
virtual void handleObserverRegisterThread(ThreadHandle*, Observer&) {}
virtual void handleObserverUnregisterThread(ThreadHandle*, Observer&) {}
class ThreadList {
public:
void add(const ThreadPtr& state) {
auto it = std::lower_bound(vec_.begin(), vec_.end(), state, Compare{});
vec_.insert(it, state);
}
void remove(const ThreadPtr& state) {
auto itPair =
std::equal_range(vec_.begin(), vec_.end(), state, Compare{});
CHECK(itPair.first != vec_.end());
CHECK(std::next(itPair.first) == itPair.second);
vec_.erase(itPair.first);
pastCpuUsed_ += state->usedCpuTime();
}
bool contains(const ThreadPtr& ts) const {
return std::binary_search(vec_.cbegin(), vec_.cend(), ts, Compare{});
}
const std::vector<ThreadPtr>& get() const { return vec_; }
std::chrono::nanoseconds getUsedCpuTime() const {
auto acc{pastCpuUsed_};
for (const auto& thread : vec_) {
acc += thread->usedCpuTime();
}
return acc;
}
private:
struct Compare {
bool operator()(const ThreadPtr& ts1, const ThreadPtr& ts2) const {
return ts1->id < ts2->id;
}
};
std::vector<ThreadPtr> vec_;
// cpu time used by threads that are no longer alive
std::chrono::nanoseconds pastCpuUsed_{0};
};
class StoppedThreadQueue : public BlockingQueue<ThreadPtr> {
public:
BlockingQueueAddResult add(ThreadPtr item) override;
ThreadPtr take() override;
size_t size() override;
folly::Optional<ThreadPtr> try_take_for(
std::chrono::milliseconds /*timeout */) override;
private:
folly::LifoSem sem_;
std::mutex mutex_;
std::queue<ThreadPtr> queue_;
};
std::shared_ptr<ThreadFactory> threadFactory_;
ThreadList threadList_;
mutable SharedMutex threadListLock_;
StoppedThreadQueue stoppedThreads_;
std::atomic<bool> isJoin_{false}; // whether the current downsizing is a join
std::vector<std::shared_ptr<Observer>> observers_;
folly::ThreadPoolListHook threadPoolHook_;
// Dynamic thread sizing functions and variables
void ensureMaxActiveThreads();
void ensureActiveThreads();
void ensureJoined();
bool minActive();
bool tryTimeoutThread();
// These are only modified while holding threadListLock_, but
// are read without holding the lock.
std::atomic<size_t> maxThreads_{0};
std::atomic<size_t> minThreads_{0};
std::atomic<size_t> activeThreads_{0};
std::atomic<size_t> threadsToJoin_{0};
std::atomic<std::chrono::milliseconds> threadTimeout_;
bool joinKeepAliveOnce() {
if (!std::exchange(keepAliveJoined_, true)) {
joinKeepAlive();
return true;
}
return false;
}
bool keepAliveJoined_{false};
private:
std::atomic<TaskObserver*> taskObservers_{nullptr};
};
} // namespace folly