Skip to content

Commit

Permalink
Merge pull request #14153 from Dr15Jones/updateWaitingTaskList
Browse files Browse the repository at this point in the history
Updated WaitingTaskList to handle exception
  • Loading branch information
cmsbuild committed Apr 21, 2016
2 parents 9d0f95d + 3e5abfa commit 9837ca8
Show file tree
Hide file tree
Showing 4 changed files with 197 additions and 36 deletions.
75 changes: 75 additions & 0 deletions FWCore/Concurrency/interface/WaitingTask.h
@@ -0,0 +1,75 @@
#ifndef FWCore_Concurrency_WaitingTask_h
#define FWCore_Concurrency_WaitingTask_h
// -*- C++ -*-
//
// Package: Concurrency
// Class : WaitingTask
//
/**\class WaitingTask WaitingTask.h FWCore/Concurrency/interface/WaitingTask.h
Description: Task used by WaitingTaskList.
Usage:
Used as a callback to happen after a task has been completed. Includes the ability to hold an exception which has occurred while waiting.
*/
//
// Original Author: Chris Jones
// Created: Thu Feb 21 13:46:31 CST 2013
// $Id$
//

// system include files
#include <atomic>
#include <exception>
#include <memory>
#include "tbb/task.h"

// user include files

// forward declarations

namespace edm {
class WaitingTaskList;

class WaitingTask : public tbb::task {

public:
friend class WaitingTaskList;

///Constructor
WaitingTask() : m_ptr{nullptr} {}
~WaitingTask() override {
delete m_ptr.load();
};

// ---------- const member functions ---------------------------

///Returns exception thrown by dependent task
/** If the value is non-null then the dependent task failed.
*/
std::exception_ptr const * exceptionPtr() const {
return m_ptr.load();
}
private:

///Called if waited for task failed
/**Allows transfer of the exception caused by the dependent task to be
* moved to another thread.
* This method should only be called by WaitingTaskList
*/
void dependentTaskFailed(std::exception_ptr iPtr) {
if (iPtr and not m_ptr) {
auto temp = std::make_unique<std::exception_ptr>(iPtr);
std::exception_ptr* expected = nullptr;
if( m_ptr.compare_exchange_strong(expected, temp.get()) ) {
temp.release();
}
}
}

std::atomic<std::exception_ptr*> m_ptr;
};

}

#endif
34 changes: 23 additions & 11 deletions FWCore/Concurrency/interface/WaitingTaskList.h
Expand Up @@ -22,14 +22,19 @@
then several other tasks have been created in a different thread and before running those
new tasks you need the result of the long calculation.
\code
class CalcTask : public tbb::task {
class CalcTask : public edm::WaitingTask {
public:
CalcTask(edm::WaitingTaskList* iWL, Value* v):
m_waitList(iWL), m_output(v) {}
tbb::task* execute() {
*m_output = doCalculation();
m_waitList.doneWaiting();
std::exception_ptr ptr;
try {
*m_output = doCalculation();
catch(...) {
ptr = std::current_exception();
}
m_waitList.doneWaiting(ptr);
return nullptr;
}
private:
Expand All @@ -50,7 +55,7 @@
tbb::task::spawn(calc);
\endcode
Finally in some unrelated part of the code we can create tasks needed the calculation
Finally in some unrelated part of the code we can create tasks that need the calculation
\code
tbb::task* t1 = makeTask1(v);
waitList.add(t1);
Expand All @@ -69,13 +74,18 @@
#include <atomic>

// user include files
#include "FWCore/Concurrency/interface/WaitingTask.h"

// forward declarations
namespace tbb {
class task;
}

namespace edm {
class EmptyWaitingTask : public WaitingTask {
public:
EmptyWaitingTask() = default;

tbb::task* execute() override { return nullptr;}
};

class WaitingTaskList
{

Expand All @@ -96,15 +106,16 @@ namespace edm {
* then be spawned.
* Calls to add() and doneWaiting() can safely be done concurrently.
*/
void add(tbb::task*);
void add(WaitingTask*);

///Signals that the resource is now available and tasks should be spawned
/**The owner of the resource calls this function to allow the waiting tasks to
* start accessing it.
* If the task fails, a non 'null' std::exception_ptr should be used.
* To have tasks wait again one must call reset().
* Calls to add() and doneWaiting() can safely be done concurrently.
*/
void doneWaiting();
void doneWaiting(std::exception_ptr iPtr);

///Resets access to the resource so that added tasks will wait.
/**The owner of the resouce calls reset() to make tasks wait.
Expand All @@ -124,7 +135,7 @@ namespace edm {
void announce();

struct WaitNode {
tbb::task* m_task;
WaitingTask* m_task;
std::atomic<WaitNode*> m_next;
bool m_fromCache;

Expand All @@ -137,12 +148,13 @@ namespace edm {
}
};

WaitNode* createNode(tbb::task* iTask);
WaitNode* createNode(WaitingTask* iTask);


// ---------- member data --------------------------------
std::atomic<WaitNode*> m_head;
WaitNode* m_nodeCache;
std::exception_ptr m_exceptionPtr; //guarded by m_waiting
unsigned int m_nodeCacheSize;
std::atomic<unsigned int> m_lastAssignedCacheIndex;
std::atomic<bool> m_waiting;
Expand Down
14 changes: 11 additions & 3 deletions FWCore/Concurrency/src/WaitingTaskList.cc
Expand Up @@ -72,6 +72,7 @@ WaitingTaskList::~WaitingTaskList()
void
WaitingTaskList::reset()
{
m_exceptionPtr = std::exception_ptr{};
m_waiting = true;
unsigned int nSeenTasks = m_lastAssignedCacheIndex;
m_lastAssignedCacheIndex = 0;
Expand All @@ -89,7 +90,7 @@ WaitingTaskList::reset()
}

WaitingTaskList::WaitNode*
WaitingTaskList::createNode(tbb::task* iTask)
WaitingTaskList::createNode(WaitingTask* iTask)
{
unsigned int index = m_lastAssignedCacheIndex++;

Expand All @@ -108,9 +109,12 @@ WaitingTaskList::createNode(tbb::task* iTask)


void
WaitingTaskList::add(tbb::task* iTask) {
WaitingTaskList::add(WaitingTask* iTask) {
iTask->increment_ref_count();
if(!m_waiting) {
if(m_exceptionPtr) {
iTask->dependentTaskFailed(m_exceptionPtr);
}
if(0==iTask->decrement_ref_count()) {
tbb::task::spawn(*iTask);
}
Expand Down Expand Up @@ -155,6 +159,9 @@ WaitingTaskList::announce()
hardware_pause();
}
auto t = n->m_task;
if(m_exceptionPtr) {
t->dependentTaskFailed(m_exceptionPtr);
}
if(0==t->decrement_ref_count()){
tbb::task::spawn(*t);
}
Expand All @@ -166,8 +173,9 @@ WaitingTaskList::announce()
}

void
WaitingTaskList::doneWaiting()
WaitingTaskList::doneWaiting(std::exception_ptr iPtr)
{
m_exceptionPtr = iPtr;
m_waiting=false;
announce();
}

0 comments on commit 9837ca8

Please sign in to comment.