From 1013bbcaf0f65e18cdff761b4826e17f78901456 Mon Sep 17 00:00:00 2001 From: skyjake Date: Thu, 2 May 2013 15:08:54 +0300 Subject: [PATCH] libdeng2: Added concurrent tasks (Task and TaskPool) --- doomsday/libdeng2/concurrency.pri | 12 +- doomsday/libdeng2/include/de/Task | 1 + doomsday/libdeng2/include/de/TaskPool | 1 + .../libdeng2/include/de/concurrency/task.h | 56 +++++++++ .../include/de/concurrency/taskpool.h | 80 +++++++++++++ doomsday/libdeng2/src/concurrency/task.cpp | 43 +++++++ .../libdeng2/src/concurrency/taskpool.cpp | 110 ++++++++++++++++++ 7 files changed, 300 insertions(+), 3 deletions(-) create mode 100644 doomsday/libdeng2/include/de/Task create mode 100644 doomsday/libdeng2/include/de/TaskPool create mode 100644 doomsday/libdeng2/include/de/concurrency/task.h create mode 100644 doomsday/libdeng2/include/de/concurrency/taskpool.h create mode 100644 doomsday/libdeng2/src/concurrency/task.cpp create mode 100644 doomsday/libdeng2/src/concurrency/taskpool.cpp diff --git a/doomsday/libdeng2/concurrency.pri b/doomsday/libdeng2/concurrency.pri index 0e7fbac5e2..217f2875bc 100644 --- a/doomsday/libdeng2/concurrency.pri +++ b/doomsday/libdeng2/concurrency.pri @@ -2,16 +2,22 @@ HEADERS += \ include/de/Guard \ include/de/Lockable \ include/de/ReadWriteLockable \ - include/de/Waitable + include/de/Task \ + include/de/TaskPool \ + include/de/Waitable HEADERS += \ include/de/concurrency/guard.h \ include/de/concurrency/lockable.h \ include/de/concurrency/readwritelockable.h \ - include/de/concurrency/waitable.h + include/de/concurrency/task.h \ + include/de/concurrency/taskpool.h \ + include/de/concurrency/waitable.h SOURCES += \ src/concurrency/guard.cpp \ src/concurrency/lockable.cpp \ src/concurrency/readwritelockable.cpp \ - src/concurrency/waitable.cpp + src/concurrency/task.cpp \ + src/concurrency/taskpool.cpp \ + src/concurrency/waitable.cpp diff --git a/doomsday/libdeng2/include/de/Task b/doomsday/libdeng2/include/de/Task new file mode 100644 index 0000000000..2e78474fb6 --- /dev/null +++ b/doomsday/libdeng2/include/de/Task @@ -0,0 +1 @@ +#include "concurrency/task.h" diff --git a/doomsday/libdeng2/include/de/TaskPool b/doomsday/libdeng2/include/de/TaskPool new file mode 100644 index 0000000000..f7928b8f55 --- /dev/null +++ b/doomsday/libdeng2/include/de/TaskPool @@ -0,0 +1 @@ +#include "concurrency/taskpool.h" diff --git a/doomsday/libdeng2/include/de/concurrency/task.h b/doomsday/libdeng2/include/de/concurrency/task.h new file mode 100644 index 0000000000..7e6e7f1654 --- /dev/null +++ b/doomsday/libdeng2/include/de/concurrency/task.h @@ -0,0 +1,56 @@ +/** @file task.h Concurrent task. + * + * @authors Copyright (c) 2013 Jaakko Keränen + * + * @par License + * GPL: http://www.gnu.org/licenses/gpl.html + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by the + * Free Software Foundation; either version 2 of the License, or (at your + * option) any later version. This program is distributed in the hope that it + * will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty + * of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General + * Public License for more details. You should have received a copy of the GNU + * General Public License along with this program; if not, see: + * http://www.gnu.org/licenses + */ + +#ifndef LIBDENG2_TASK_H +#define LIBDENG2_TASK_H + +#include + +#include "../libdeng2.h" + +namespace de { + +class TaskPool; + +/** + * Concurrent task that will be executed by a TaskPool asynchronously. Override + * runTask() in a derived class. + */ +class DENG2_PUBLIC Task : public QRunnable +{ +public: + Task(); + virtual ~Task() {} + + TaskPool &pool() const; + void run(); + + /** + * Task classes must override this. + */ + virtual void runTask() = 0; + +private: + friend class TaskPool; + + TaskPool *_pool; +}; + +} // namespace de + +#endif // LIBDENG2_TASK_H diff --git a/doomsday/libdeng2/include/de/concurrency/taskpool.h b/doomsday/libdeng2/include/de/concurrency/taskpool.h new file mode 100644 index 0000000000..5aa00ed891 --- /dev/null +++ b/doomsday/libdeng2/include/de/concurrency/taskpool.h @@ -0,0 +1,80 @@ +/** @file taskpool.h Pool of tasks. + * + * @authors Copyright (c) 2013 Jaakko Keränen + * + * @par License + * GPL: http://www.gnu.org/licenses/gpl.html + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by the + * Free Software Foundation; either version 2 of the License, or (at your + * option) any later version. This program is distributed in the hope that it + * will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty + * of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General + * Public License for more details. You should have received a copy of the GNU + * General Public License along with this program; if not, see: + * http://www.gnu.org/licenses + */ + +#ifndef LIBDENG2_TASKPOOL_H +#define LIBDENG2_TASKPOOL_H + +#include + +#include "../libdeng2.h" + +namespace de { + +class Task; + +/** + * Pool of concurrent tasks. + */ +class DENG2_PUBLIC TaskPool : public QObject +{ + Q_OBJECT + +public: + enum Priority + { + LowPriority = 0, + MediumPriority = 1, + HighPriority = 2 + }; + +public: + TaskPool(); + + virtual ~TaskPool(); + + /** + * Starts a new concurrent task. Ownership of the task is given to the + * pool. + * + * @param task Task instance. Ownership given. + */ + void start(Task *task, Priority priority = LowPriority); + + /** + * Blocks execution until all running tasks have finished. + */ + void waitForDone(); + + /** + * Determines if all started tasks have finished. + */ + bool isDone() const; + +signals: + void allTasksDone(); + +private: + friend class Task; + void taskFinished(Task &task); + + DENG2_PRIVATE(d) +}; + +} // namespace de + +#endif // LIBDENG2_TASKPOOL_H diff --git a/doomsday/libdeng2/src/concurrency/task.cpp b/doomsday/libdeng2/src/concurrency/task.cpp new file mode 100644 index 0000000000..99e1a5120e --- /dev/null +++ b/doomsday/libdeng2/src/concurrency/task.cpp @@ -0,0 +1,43 @@ +/** @file task.cpp Concurrent task. + * + * @authors Copyright (c) 2013 Jaakko Keränen + * + * @par License + * GPL: http://www.gnu.org/licenses/gpl.html + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by the + * Free Software Foundation; either version 2 of the License, or (at your + * option) any later version. This program is distributed in the hope that it + * will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty + * of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General + * Public License for more details. You should have received a copy of the GNU + * General Public License along with this program; if not, see: + * http://www.gnu.org/licenses + */ + +#include "de/Task" +#include "de/TaskPool" +#include "de/Log" + +namespace de { + +Task::Task() : _pool(0) +{} + +TaskPool &Task::pool() const +{ + DENG2_ASSERT(_pool != 0); + return *_pool; +} + +void Task::run() +{ + runTask(); + + // Cleanup. + pool().taskFinished(*this); + Log::disposeThreadLog(); +} + +} // namespace de diff --git a/doomsday/libdeng2/src/concurrency/taskpool.cpp b/doomsday/libdeng2/src/concurrency/taskpool.cpp new file mode 100644 index 0000000000..38f4f41cad --- /dev/null +++ b/doomsday/libdeng2/src/concurrency/taskpool.cpp @@ -0,0 +1,110 @@ +/** @file taskpool.cpp + * + * @authors Copyright (c) 2013 Jaakko Keränen + * + * @par License + * GPL: http://www.gnu.org/licenses/gpl.html + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by the + * Free Software Foundation; either version 2 of the License, or (at your + * option) any later version. This program is distributed in the hope that it + * will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty + * of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General + * Public License for more details. You should have received a copy of the GNU + * General Public License along with this program; if not, see: + * http://www.gnu.org/licenses + */ + +#include "de/TaskPool" +#include "de/Task" +#include "de/Guard" + +#include +#include +#include + +namespace de { + +DENG2_PIMPL(TaskPool), public Lockable, public Waitable +{ + // Set of running tasks. + typedef QSet Tasks; + Tasks tasks; + + Instance(Public *i) : Base(i) + { + // When empty, the semaphore is available. + post(); + } + + ~Instance() + { + waitForEmpty(); + } + + void add(Task *t) + { + DENG2_GUARD(this); + t->_pool = &self; + if(tasks.isEmpty()) + { + wait(); // Semaphore now unavailable. + } + tasks.insert(t); + } + + void remove(Task *t) + { + DENG2_GUARD(this); + tasks.remove(t); + if(tasks.isEmpty()) + { + post(); + } + } + + void waitForEmpty() const + { + wait(); + DENG2_GUARD(this); + DENG2_ASSERT(tasks.isEmpty()); + post(); + } + + bool isEmpty() const + { + DENG2_GUARD(this); + return tasks.isEmpty(); + } +}; + +TaskPool::TaskPool() : d(new Instance(this)) +{} + +TaskPool::~TaskPool() +{} + +void TaskPool::start(Task *task, Priority priority) +{ + d->add(task); + QThreadPool::globalInstance()->start(task, int(priority)); +} + +void TaskPool::waitForDone() +{ + d->waitForEmpty(); +} + +void TaskPool::taskFinished(Task &task) +{ + d->remove(&task); + + if(d->isEmpty()) + { + allTasksDone(); + } +} + +} // namespace de +