From eef8cc2951df174604fa645c56d43a2207305e1e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jaakko=20Ker=C3=A4nen?= Date: Fri, 23 Aug 2019 19:14:54 +0300 Subject: [PATCH] libcore|Concurrency: Async work using TaskPool TaskPool now supports performing async work and then making a completion callback in the main thread. Data can be passed between the worker and the completion using a Variant. Fixed a memory leak in TaskPool where the completed tasks were never deleted. (Hopefully just a goof during refactoring!) --- .../core/include/de/concurrency/taskpool.h | 28 +++++- .../libs/core/src/concurrency/taskpool.cpp | 98 ++++++++++++++++--- 2 files changed, 112 insertions(+), 14 deletions(-) diff --git a/doomsday/libs/core/include/de/concurrency/taskpool.h b/doomsday/libs/core/include/de/concurrency/taskpool.h index 9cfedad9c5..210d851da3 100644 --- a/doomsday/libs/core/include/de/concurrency/taskpool.h +++ b/doomsday/libs/core/include/de/concurrency/taskpool.h @@ -20,6 +20,9 @@ #define LIBCORE_TASKPOOL_H #include "../Observers" +#include "../Time" +#include "../Variant" + #include namespace de { @@ -52,7 +55,7 @@ class DE_PUBLIC TaskPool HighPriority = 2 }; - class IPool + class IPool : public Lockable { public: virtual void taskFinishedRunning(Task &) = 0; @@ -70,6 +73,7 @@ class DE_PUBLIC TaskPool * Destroys the task pool when all running tasks have finished. This method will * always return immediately and the public-facing TaskPool object will be deleted, * but the private instance will exist until all the tasks have finished running. + * Completion callbacks will not be called for any pending async tasks. */ virtual ~TaskPool(); @@ -84,6 +88,17 @@ class DE_PUBLIC TaskPool void start(TaskFunction taskFunction, Priority priority = LowPriority); + /** + * Starts an asynchronous operation in a background thread and calls a completion + * callback in the main thread once the operation is complete. + * + * @param asyncWork Work to do. + * @param completionInMainThread Completion callback. Receives the Value returned from the + * the async work function. Always runs in the main thread. + */ + void async(const std::function &asyncWork, + const std::function &completionInMainThread); + /** * Blocks execution until all running tasks have finished. A Task is considered * finished when it has exited its Task::runTask() method. @@ -95,6 +110,17 @@ class DE_PUBLIC TaskPool */ bool isDone() const; + /** + * Use the calling thread to perform queued tasks in any task pool. + * + * You should call this instead of sleeping in tasks, so that global thread pool doesn't get + * blocked by sleeping workers. + * + * @param timeout How long to wait until queued tasks are available. Zero means to + * wait indefinitely. + */ + static void yield(const TimeSpan timeout); + /** * Called by de::App at shutdown. */ diff --git a/doomsday/libs/core/src/concurrency/taskpool.cpp b/doomsday/libs/core/src/concurrency/taskpool.cpp index 07c4c22862..f90a4ef882 100644 --- a/doomsday/libs/core/src/concurrency/taskpool.cpp +++ b/doomsday/libs/core/src/concurrency/taskpool.cpp @@ -21,6 +21,8 @@ #include "de/Guard" #include "de/Set" +#include +#include #include #include #include @@ -60,22 +62,66 @@ static void deleteThreadPool() class CallbackTask : public Task { + TaskPool::TaskFunction _func; + public: - CallbackTask(TaskPool::TaskFunction func) : _func(func) {} + CallbackTask(TaskPool::TaskFunction func) + : _func(func) + {} + void runTask() override { _func(); } -private: - TaskPool::TaskFunction _func; }; } // namespace internal -DE_PIMPL(TaskPool), public Lockable, public Waitable, public TaskPool::IPool +DE_PIMPL(TaskPool), public Waitable, public TaskPool::IPool { - /// Private instance will be deleted when pool is empty. - bool deleteWhenDone = false; + /// Runs async worker + main-thread-completion tasks. + class CompletionTask : public Task + { + std::function _task; + struct Epilogue + { + std::function completion; + Variant result; + }; + Epilogue *_epilogue; + + public: + CompletionTask(const std::function &task, + const std::function &completion) + : _task(task) + , _epilogue(new Epilogue{completion, {}}) + {} + + ~CompletionTask() override + { + // Check that the pool is still valid. + { + TaskPool::Impl *d = static_cast(_pool); + DE_GUARD(d); + if (d->poolDestroyed) + { + // Completion cannot be called now. + delete _epilogue; + return; + } + } + Epilogue *e = _epilogue; // deleted after the callback is done + Loop::mainCall([e]() { + e->completion(e->result); + delete e; + }); + } + + void runTask() override + { + _epilogue->result = _task(); + } + }; - /// Set of running tasks. - Set tasks; + bool poolDestroyed = false; // Impl will be deleted when pool is empty. + Set tasks; // Set of running tasks. Impl(Public *i) : Base(i) { @@ -128,12 +174,14 @@ DE_PIMPL(TaskPool), public Lockable, public Waitable, public TaskPool::IPool return tasks.isEmpty(); } - void taskFinishedRunning(Task &task) + void taskFinishedRunning(Task &finishedTask) { + std::unique_ptr task(&finishedTask); // We have ownership of this. + lock(); - if (remove(&task)) + if (remove(&finishedTask)) { - if (deleteWhenDone) + if (poolDestroyed) { // All done, clean up! unlock(); @@ -174,7 +222,7 @@ TaskPool::~TaskPool() { // Detach the private instance and make it auto-delete itself when done. // The ongoing tasks will report themselves finished to the private instance. - d.release()->deleteWhenDone = true; + d.release()->poolDestroyed = true; } } @@ -204,7 +252,20 @@ void TaskPool::start(TaskFunction taskFunction, Priority priority) void TaskPool::waitForDone() { - d->waitForEmpty(); + if (App::inMainThread()) + { + // Main thread can sleep while waiting for tasks. + d->waitForEmpty(); + } + else + { + // Non-main threads should not block -- this is likely a worker, so we'll yield to + // other tasks while waiting. + while (!isDone()) + { + yield(250_ms); + } + } } bool TaskPool::isDone() const @@ -217,4 +278,15 @@ void TaskPool::deleteThreadPool() // static internal::deleteThreadPool(); } +void TaskPool::yield(const TimeSpan timeout) // static +{ + yield_ThreadPool(internal::globalThreadPool(), timeout); +} + +void TaskPool::async(const std::function &work, + const std::function &completion) +{ + start(new Impl::CompletionTask(work, completion)); +} + } // namespace de