Skip to content

Commit

Permalink
Merge pull request #23432 from Dr15Jones/prioritizeExceptionFromPaths
Browse files Browse the repository at this point in the history
Prioritize exception from Paths
  • Loading branch information
cmsbuild committed Jun 6, 2018
2 parents 59293bb + afa8f9d commit 3f93e81
Show file tree
Hide file tree
Showing 3 changed files with 30 additions and 17 deletions.
4 changes: 2 additions & 2 deletions FWCore/Concurrency/interface/WaitingTask.h
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ namespace edm {
template<typename F>
class FunctorWaitingTask : public WaitingTask {
public:
explicit FunctorWaitingTask( F f): func_(f) {}
explicit FunctorWaitingTask( F f): func_(std::move(f)) {}

task* execute() override {
func_(exceptionPtr());
Expand All @@ -90,7 +90,7 @@ namespace edm {

template< typename ALLOC, typename F>
FunctorWaitingTask<F>* make_waiting_task( ALLOC&& iAlloc, F f) {
return new (iAlloc) FunctorWaitingTask<F>(f);
return new (iAlloc) FunctorWaitingTask<F>(std::move(f));
}

}
Expand Down
41 changes: 27 additions & 14 deletions FWCore/Framework/src/StreamSchedule.cc
Original file line number Diff line number Diff line change
Expand Up @@ -576,13 +576,21 @@ namespace edm {
workerManager_.setupOnDemandSystem(ep,es);

++total_events_;

//use to give priorities on an error to ones from Paths
auto pathErrorHolder = std::make_unique<std::atomic<std::exception_ptr*>>(nullptr);
auto pathErrorPtr = pathErrorHolder.get();
auto allPathsDone = make_waiting_task(tbb::task::allocate_root(),
[iTask,this,serviceToken](std::exception_ptr const* iPtr) mutable
[iTask,this,serviceToken,pathError=std::move(pathErrorHolder)](std::exception_ptr const* iPtr) mutable
{
ServiceRegistry::Operate operate(serviceToken);

std::exception_ptr ptr;
if(iPtr) {
if(pathError->load()) {
ptr = *pathError->load();
delete pathError->load();
}
if( (not ptr) and iPtr) {
ptr = *iPtr;
}
iTask.doneWaiting(finishProcessOneEvent(ptr));
Expand All @@ -593,15 +601,16 @@ namespace edm {
WaitingTaskHolder allPathsHolder(allPathsDone);

auto pathsDone = make_waiting_task(tbb::task::allocate_root(),
[allPathsHolder,&ep, &es, this,serviceToken](std::exception_ptr const* iPtr) mutable
[allPathsHolder,pathErrorPtr,&ep, &es, this,serviceToken](std::exception_ptr const* iPtr) mutable
{
ServiceRegistry::Operate operate(serviceToken);

std::exception_ptr ptr;
if(iPtr) {
ptr = *iPtr;
//this is used to prioritize this error over one
// that happens in EndPath or Accumulate
pathErrorPtr->store( new std::exception_ptr(*iPtr) );
}
finishedPaths(ptr, std::move(allPathsHolder), ep, es);
finishedPaths(*pathErrorPtr, std::move(allPathsHolder), ep, es);
});

//The holder guarantees that if the paths finish before the loop ends
Expand Down Expand Up @@ -629,26 +638,26 @@ namespace edm {
}

void
StreamSchedule::finishedPaths(std::exception_ptr iExcept, WaitingTaskHolder iWait, EventPrincipal& ep,
StreamSchedule::finishedPaths(std::atomic<std::exception_ptr*>& iExcept, WaitingTaskHolder iWait, EventPrincipal& ep,
EventSetup const& es) {

if(iExcept) {
try {
std::rethrow_exception(iExcept);
std::rethrow_exception(*(iExcept.load()));
}
catch(cms::Exception& e) {
exception_actions::ActionCodes action = actionTable().find(e.category());
assert (action != exception_actions::IgnoreCompletely);
assert (action != exception_actions::FailPath);
if (action == exception_actions::SkipEvent) {
edm::printCmsExceptionWarning("SkipEvent", e);
iExcept = std::exception_ptr();
*(iExcept.load()) = std::exception_ptr();
} else {
iExcept = std::current_exception();
*(iExcept.load()) = std::current_exception();
}
}
catch(...) {
iExcept = std::current_exception();
*(iExcept.load()) = std::current_exception();
}
}

Expand All @@ -673,16 +682,20 @@ namespace edm {
ost << "Processing Event " << ep.id();
ex.addContext(ost.str());
}
iExcept = std::current_exception();
iExcept.store( new std::exception_ptr(std::current_exception()));
}
}
catch(...) {
if (not iExcept) {
iExcept = std::current_exception();
iExcept.store(new std::exception_ptr(std::current_exception()));
}
}
}
iWait.doneWaiting(iExcept);
std::exception_ptr ptr;
if(iExcept) {
ptr = *iExcept.load();
}
iWait.doneWaiting(ptr);
}


Expand Down
2 changes: 1 addition & 1 deletion FWCore/Framework/src/StreamSchedule.h
Original file line number Diff line number Diff line change
Expand Up @@ -295,7 +295,7 @@ namespace edm {

void resetAll();

void finishedPaths(std::exception_ptr, WaitingTaskHolder,
void finishedPaths(std::atomic<std::exception_ptr*>&, WaitingTaskHolder,
EventPrincipal& ep, EventSetup const& es);
std::exception_ptr finishProcessOneEvent(std::exception_ptr);

Expand Down

0 comments on commit 3f93e81

Please sign in to comment.