Skip to content

Commit

Permalink
libcore|Concurrency: Async work using TaskPool
Browse files Browse the repository at this point in the history
TaskPool now supports performing async work and then making a completion callback in the main thread. Data can be passed between the worker and the completion using a Variant.

Fixed a memory leak in TaskPool where the completed tasks were never deleted. (Hopefully just a goof during refactoring!)
  • Loading branch information
skyjake committed Sep 1, 2019
1 parent d282a6c commit eef8cc2
Show file tree
Hide file tree
Showing 2 changed files with 112 additions and 14 deletions.
28 changes: 27 additions & 1 deletion doomsday/libs/core/include/de/concurrency/taskpool.h
Expand Up @@ -20,6 +20,9 @@
#define LIBCORE_TASKPOOL_H

#include "../Observers"
#include "../Time"
#include "../Variant"

#include <functional>

namespace de {
Expand Down Expand Up @@ -52,7 +55,7 @@ class DE_PUBLIC TaskPool
HighPriority = 2
};

class IPool
class IPool : public Lockable
{
public:
virtual void taskFinishedRunning(Task &) = 0;
Expand All @@ -70,6 +73,7 @@ class DE_PUBLIC TaskPool
* Destroys the task pool when all running tasks have finished. This method will
* always return immediately and the public-facing TaskPool object will be deleted,
* but the private instance will exist until all the tasks have finished running.
* Completion callbacks will not be called for any pending async tasks.
*/
virtual ~TaskPool();

Expand All @@ -84,6 +88,17 @@ class DE_PUBLIC TaskPool

void start(TaskFunction taskFunction, Priority priority = LowPriority);

/**
* Starts an asynchronous operation in a background thread and calls a completion
* callback in the main thread once the operation is complete.
*
* @param asyncWork Work to do.
* @param completionInMainThread Completion callback. Receives the Value returned from the
* the async work function. Always runs in the main thread.
*/
void async(const std::function<Variant()> &asyncWork,
const std::function<void(const Variant &)> &completionInMainThread);

/**
* Blocks execution until all running tasks have finished. A Task is considered
* finished when it has exited its Task::runTask() method.
Expand All @@ -95,6 +110,17 @@ class DE_PUBLIC TaskPool
*/
bool isDone() const;

/**
* Use the calling thread to perform queued tasks in any task pool.
*
* You should call this instead of sleeping in tasks, so that global thread pool doesn't get
* blocked by sleeping workers.
*
* @param timeout How long to wait until queued tasks are available. Zero means to
* wait indefinitely.
*/
static void yield(const TimeSpan timeout);

/**
* Called by de::App at shutdown.
*/
Expand Down
98 changes: 85 additions & 13 deletions doomsday/libs/core/src/concurrency/taskpool.cpp
Expand Up @@ -21,6 +21,8 @@
#include "de/Guard"
#include "de/Set"

#include <de/App>
#include <de/Garbage>
#include <de/Lockable>
#include <de/Loop>
#include <de/Waitable>
Expand Down Expand Up @@ -60,22 +62,66 @@ static void deleteThreadPool()

class CallbackTask : public Task
{
TaskPool::TaskFunction _func;

public:
CallbackTask(TaskPool::TaskFunction func) : _func(func) {}
CallbackTask(TaskPool::TaskFunction func)
: _func(func)
{}

void runTask() override { _func(); }
private:
TaskPool::TaskFunction _func;
};

} // namespace internal

DE_PIMPL(TaskPool), public Lockable, public Waitable, public TaskPool::IPool
DE_PIMPL(TaskPool), public Waitable, public TaskPool::IPool
{
/// Private instance will be deleted when pool is empty.
bool deleteWhenDone = false;
/// Runs async worker + main-thread-completion tasks.
class CompletionTask : public Task
{
std::function<Variant()> _task;
struct Epilogue
{
std::function<void(const Variant &)> completion;
Variant result;
};
Epilogue *_epilogue;

public:
CompletionTask(const std::function<Variant()> &task,
const std::function<void(const Variant &)> &completion)
: _task(task)
, _epilogue(new Epilogue{completion, {}})
{}

~CompletionTask() override
{
// Check that the pool is still valid.
{
TaskPool::Impl *d = static_cast<TaskPool::Impl *>(_pool);
DE_GUARD(d);
if (d->poolDestroyed)
{
// Completion cannot be called now.
delete _epilogue;
return;
}
}
Epilogue *e = _epilogue; // deleted after the callback is done
Loop::mainCall([e]() {
e->completion(e->result);
delete e;
});
}

void runTask() override
{
_epilogue->result = _task();
}
};

/// Set of running tasks.
Set<Task *> tasks;
bool poolDestroyed = false; // Impl will be deleted when pool is empty.
Set<Task *> tasks; // Set of running tasks.

Impl(Public *i) : Base(i)
{
Expand Down Expand Up @@ -128,12 +174,14 @@ DE_PIMPL(TaskPool), public Lockable, public Waitable, public TaskPool::IPool
return tasks.isEmpty();
}

void taskFinishedRunning(Task &task)
void taskFinishedRunning(Task &finishedTask)
{
std::unique_ptr<Task> task(&finishedTask); // We have ownership of this.

lock();
if (remove(&task))
if (remove(&finishedTask))
{
if (deleteWhenDone)
if (poolDestroyed)
{
// All done, clean up!
unlock();
Expand Down Expand Up @@ -174,7 +222,7 @@ TaskPool::~TaskPool()
{
// Detach the private instance and make it auto-delete itself when done.
// The ongoing tasks will report themselves finished to the private instance.
d.release()->deleteWhenDone = true;
d.release()->poolDestroyed = true;
}
}

Expand Down Expand Up @@ -204,7 +252,20 @@ void TaskPool::start(TaskFunction taskFunction, Priority priority)

void TaskPool::waitForDone()
{
d->waitForEmpty();
if (App::inMainThread())
{
// Main thread can sleep while waiting for tasks.
d->waitForEmpty();
}
else
{
// Non-main threads should not block -- this is likely a worker, so we'll yield to
// other tasks while waiting.
while (!isDone())
{
yield(250_ms);
}
}
}

bool TaskPool::isDone() const
Expand All @@ -217,4 +278,15 @@ void TaskPool::deleteThreadPool() // static
internal::deleteThreadPool();
}

void TaskPool::yield(const TimeSpan timeout) // static
{
yield_ThreadPool(internal::globalThreadPool(), timeout);
}

void TaskPool::async(const std::function<Variant()> &work,
const std::function<void (const Variant &)> &completion)
{
start(new Impl::CompletionTask(work, completion));
}

} // namespace de

0 comments on commit eef8cc2

Please sign in to comment.