Skip to content

Commit

Permalink
Merge pull request #15178 from unknownbrackets/threads-atomic
Browse files Browse the repository at this point in the history
Tweaks to thread manager enqueue and shutdown
  • Loading branch information
hrydgard committed Nov 30, 2021
2 parents 3824329 + 12b790b commit cfe2716
Show file tree
Hide file tree
Showing 2 changed files with 90 additions and 18 deletions.
106 changes: 88 additions & 18 deletions Common/Thread/ThreadManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,65 +26,116 @@ const int EXTRA_THREADS = 4; // For I/O limited tasks
struct GlobalThreadContext {
std::mutex mutex; // associated with each respective condition variable
std::deque<Task *> queue;
std::atomic<int> queue_size;
std::vector<ThreadContext *> threads_;

int roundRobin = 0;
std::atomic<int> roundRobin;
};

struct ThreadContext {
std::thread thread; // the worker thread
std::condition_variable cond; // used to signal new work
std::mutex mutex; // protects the local queue.
std::atomic<int> queueSize;
std::atomic<int> queue_size;
int index;
std::atomic<bool> cancelled;
std::atomic<Task *> private_single;
std::deque<Task *> private_queue;
};

ThreadManager::ThreadManager() : global_(new GlobalThreadContext()) {

global_->queue_size = 0;
global_->roundRobin = 0;
}

ThreadManager::~ThreadManager() {
delete global_;
}

void ThreadManager::Teardown() {
for (size_t i = 0; i < global_->threads_.size(); i++) {
global_->threads_[i]->cancelled = true;
global_->threads_[i]->cond.notify_one();
for (ThreadContext *&threadCtx : global_->threads_) {
threadCtx->cancelled = true;
std::unique_lock<std::mutex> lock(threadCtx->mutex);
threadCtx->cond.notify_one();
}

// Purge any cancellable tasks while the threads shut down.
bool done = false;
while (!done) {
done = true;

std::unique_lock<std::mutex> lock(global_->mutex);
for (auto it = global_->queue.begin(); it != global_->queue.end(); ++it) {
if (TeardownTask(*it, false)) {
global_->queue.erase(it);
global_->queue_size--;
done = false;
break;
}
}
}
for (size_t i = 0; i < global_->threads_.size(); i++) {
global_->threads_[i]->thread.join();
delete global_->threads_[i];

for (ThreadContext *&threadCtx : global_->threads_) {
threadCtx->thread.join();
// TODO: Is it better to just delete these?
TeardownTask(threadCtx->private_single, true);
for (Task *task : threadCtx->private_queue) {
TeardownTask(threadCtx->private_single, true);
}
delete threadCtx;
}
global_->threads_.clear();

if (global_->queue_size > 0) {
WARN_LOG(SYSTEM, "ThreadManager::Teardown() with tasks still enqueued");
}
}

bool ThreadManager::TeardownTask(Task *task, bool enqueue) {
if (!task)
return true;

if (task->Cancellable()) {
task->Cancel();
delete task;
return true;
}

if (enqueue) {
global_->queue.push_back(task);
global_->queue_size++;
}
return false;
}

static void WorkerThreadFunc(GlobalThreadContext *global, ThreadContext *thread) {
char threadName[16];
snprintf(threadName, sizeof(threadName), "PoolWorker %d", thread->index);
SetCurrentThreadName(threadName);
while (!thread->cancelled) {
Task *task = nullptr;
Task *task = thread->private_single.exchange(nullptr);

// Check the global queue first, then check the private queue and wait if there's nothing to do.
{
if (!task && global->queue_size.load() > 0) {
// Grab one from the global queue if there is any.
std::unique_lock<std::mutex> lock(global->mutex);
if (!global->queue.empty()) {
task = global->queue.front();
global->queue.pop_front();
global->queue_size--;

// We are processing one now, so mark that.
thread->queue_size++;
}
}

if (!task) {
std::unique_lock<std::mutex> lock(thread->mutex);
// We must check both queue and single again, while locked.
if (!thread->private_queue.empty()) {
task = thread->private_queue.front();
thread->private_queue.pop_front();
thread->queueSize.store((int)thread->private_queue.size());
} else {
} else if (!thread->private_single && !thread->cancelled && global->queue_size.load() == 0) {
thread->cond.wait(lock);
}
}
Expand All @@ -93,6 +144,9 @@ static void WorkerThreadFunc(GlobalThreadContext *global, ThreadContext *thread)
if (task) {
task->Run();
delete task;

// Reduce the queue size once complete.
thread->queue_size--;
}
}
}
Expand All @@ -111,6 +165,7 @@ void ThreadManager::Init(int numRealCores, int numLogicalCoresPerCpu) {
for (int i = 0; i < numThreads; i++) {
ThreadContext *thread = new ThreadContext();
thread->cancelled.store(false);
thread->private_single.store(nullptr);
thread->thread = std::thread(&WorkerThreadFunc, global_, thread);
thread->index = i;
global_->threads_.push_back(thread);
Expand Down Expand Up @@ -139,10 +194,10 @@ void ThreadManager::EnqueueTask(Task *task, TaskType taskType) {
threadNum = 0;
}
ThreadContext *thread = global_->threads_[threadNum];
if (thread->queueSize.load() == 0) {
if (thread->queue_size.load() == 0) {
std::unique_lock<std::mutex> lock(thread->mutex);
thread->private_queue.push_back(task);
thread->queueSize.store((int)thread->private_queue.size());
thread->queue_size++;
thread->cond.notify_one();
// Found it - done.
return;
Expand All @@ -154,15 +209,30 @@ void ThreadManager::EnqueueTask(Task *task, TaskType taskType) {
{
std::unique_lock<std::mutex> lock(global_->mutex);
global_->queue.push_back(task);
global_->threads_[global_->roundRobin % maxThread]->cond.notify_one();
global_->roundRobin++;
global_->queue_size++;
}

// Lock the thread to ensure it gets the message.
int chosenIndex = global_->roundRobin++;
ThreadContext *&chosenThread = global_->threads_[chosenIndex % maxThread];
std::unique_lock<std::mutex> lock(chosenThread->mutex);
chosenThread->cond.notify_one();
}

void ThreadManager::EnqueueTaskOnThread(int threadNum, Task *task, TaskType taskType) {
_assert_msg_(threadNum >= 0 && threadNum < (int)global_->threads_.size(), "Bad threadnum or not initialized");
ThreadContext *thread = global_->threads_[threadNum];
{

// Try first atomically, as highest priority.
Task *expected = nullptr;
thread->private_single.compare_exchange_weak(expected, task);
// Whether we got that or will have to wait, increase the queue counter.
thread->queue_size++;

if (expected == nullptr) {
std::unique_lock<std::mutex> lock(thread->mutex);
thread->cond.notify_one();
} else {
std::unique_lock<std::mutex> lock(thread->mutex);
thread->private_queue.push_back(task);
thread->cond.notify_one();
Expand Down
2 changes: 2 additions & 0 deletions Common/Thread/ThreadManager.h
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,8 @@ class ThreadManager {
int GetNumLooperThreads() const;

private:
bool TeardownTask(Task *task, bool enqueue);

// This is always pointing to a context, initialized in the constructor.
GlobalThreadContext *global_;

Expand Down

0 comments on commit cfe2716

Please sign in to comment.