From 78708bf747ad110831066dd5a618f639ee10d8f7 Mon Sep 17 00:00:00 2001 From: Christopher Jones Date: Sat, 2 Jun 2018 13:06:15 -0500 Subject: [PATCH 1/3] Handle non-copyable lambdas in FunctorWaitingTask Use std::move to allow use of a lambda that only has a move constructor and no copy constructor. --- FWCore/Concurrency/interface/WaitingTask.h | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/FWCore/Concurrency/interface/WaitingTask.h b/FWCore/Concurrency/interface/WaitingTask.h index 46961cbb61fa5..bf5c189755c1e 100644 --- a/FWCore/Concurrency/interface/WaitingTask.h +++ b/FWCore/Concurrency/interface/WaitingTask.h @@ -77,7 +77,7 @@ namespace edm { template class FunctorWaitingTask : public WaitingTask { public: - explicit FunctorWaitingTask( F f): func_(f) {} + explicit FunctorWaitingTask( F f): func_(std::move(f)) {} task* execute() override { func_(exceptionPtr()); @@ -90,7 +90,7 @@ namespace edm { template< typename ALLOC, typename F> FunctorWaitingTask* make_waiting_task( ALLOC&& iAlloc, F f) { - return new (iAlloc) FunctorWaitingTask(f); + return new (iAlloc) FunctorWaitingTask(std::move(f)); } } From bae6614f7aaa7976a2fd33b01a9d666b19efcd94 Mon Sep 17 00:00:00 2001 From: Christopher Jones Date: Sat, 2 Jun 2018 13:08:26 -0500 Subject: [PATCH 2/3] Prioritize exceptions from Paths When reporting an exception from processing an event, prioritize exceptions originating from a Path over those from EndPaths or from calling accumulators. --- FWCore/Framework/src/StreamSchedule.cc | 16 +++++++++++++--- 1 file changed, 13 insertions(+), 3 deletions(-) diff --git a/FWCore/Framework/src/StreamSchedule.cc b/FWCore/Framework/src/StreamSchedule.cc index a510dc5c7400e..6309caad9e608 100644 --- a/FWCore/Framework/src/StreamSchedule.cc +++ b/FWCore/Framework/src/StreamSchedule.cc @@ -576,13 +576,20 @@ namespace edm { workerManager_.setupOnDemandSystem(ep,es); ++total_events_; + + //use to give priorities on an error to ones from Paths + auto pathErrorHolder = std::make_unique>(nullptr); + auto pathErrorPtr = pathErrorHolder.get(); auto allPathsDone = make_waiting_task(tbb::task::allocate_root(), - [iTask,this,serviceToken](std::exception_ptr const* iPtr) mutable + [iTask,this,serviceToken,pathError=std::move(pathErrorHolder)](std::exception_ptr const* iPtr) mutable { ServiceRegistry::Operate operate(serviceToken); std::exception_ptr ptr; - if(iPtr) { + if(pathError->load()) { + ptr = *pathError->load(); + delete pathError->load(); + } else if(iPtr) { ptr = *iPtr; } iTask.doneWaiting(finishProcessOneEvent(ptr)); @@ -593,13 +600,16 @@ namespace edm { WaitingTaskHolder allPathsHolder(allPathsDone); auto pathsDone = make_waiting_task(tbb::task::allocate_root(), - [allPathsHolder,&ep, &es, this,serviceToken](std::exception_ptr const* iPtr) mutable + [allPathsHolder,pathErrorPtr,&ep, &es, this,serviceToken](std::exception_ptr const* iPtr) mutable { ServiceRegistry::Operate operate(serviceToken); std::exception_ptr ptr; if(iPtr) { ptr = *iPtr; + //this is used to prioritize this error over one + // that happens in EndPath or Accumulate + pathErrorPtr->store( new std::exception_ptr(ptr) ); } finishedPaths(ptr, std::move(allPathsHolder), ep, es); }); From afa8f9dbc685947f6ef1c2dc411b05876c8c323b Mon Sep 17 00:00:00 2001 From: Christopher Jones Date: Mon, 4 Jun 2018 10:55:42 -0500 Subject: [PATCH 3/3] Make sure to support SkipEvent option Need to wait to set Path exception information until we are sure that the exception is not being swallowed by SkipEvent. --- FWCore/Framework/src/StreamSchedule.cc | 31 ++++++++++++++------------ FWCore/Framework/src/StreamSchedule.h | 2 +- 2 files changed, 18 insertions(+), 15 deletions(-) diff --git a/FWCore/Framework/src/StreamSchedule.cc b/FWCore/Framework/src/StreamSchedule.cc index 6309caad9e608..bade3b7b7c244 100644 --- a/FWCore/Framework/src/StreamSchedule.cc +++ b/FWCore/Framework/src/StreamSchedule.cc @@ -578,7 +578,7 @@ namespace edm { ++total_events_; //use to give priorities on an error to ones from Paths - auto pathErrorHolder = std::make_unique>(nullptr); + auto pathErrorHolder = std::make_unique>(nullptr); auto pathErrorPtr = pathErrorHolder.get(); auto allPathsDone = make_waiting_task(tbb::task::allocate_root(), [iTask,this,serviceToken,pathError=std::move(pathErrorHolder)](std::exception_ptr const* iPtr) mutable @@ -589,7 +589,8 @@ namespace edm { if(pathError->load()) { ptr = *pathError->load(); delete pathError->load(); - } else if(iPtr) { + } + if( (not ptr) and iPtr) { ptr = *iPtr; } iTask.doneWaiting(finishProcessOneEvent(ptr)); @@ -604,14 +605,12 @@ namespace edm { { ServiceRegistry::Operate operate(serviceToken); - std::exception_ptr ptr; if(iPtr) { - ptr = *iPtr; //this is used to prioritize this error over one // that happens in EndPath or Accumulate - pathErrorPtr->store( new std::exception_ptr(ptr) ); + pathErrorPtr->store( new std::exception_ptr(*iPtr) ); } - finishedPaths(ptr, std::move(allPathsHolder), ep, es); + finishedPaths(*pathErrorPtr, std::move(allPathsHolder), ep, es); }); //The holder guarantees that if the paths finish before the loop ends @@ -639,12 +638,12 @@ namespace edm { } void - StreamSchedule::finishedPaths(std::exception_ptr iExcept, WaitingTaskHolder iWait, EventPrincipal& ep, + StreamSchedule::finishedPaths(std::atomic& iExcept, WaitingTaskHolder iWait, EventPrincipal& ep, EventSetup const& es) { if(iExcept) { try { - std::rethrow_exception(iExcept); + std::rethrow_exception(*(iExcept.load())); } catch(cms::Exception& e) { exception_actions::ActionCodes action = actionTable().find(e.category()); @@ -652,13 +651,13 @@ namespace edm { assert (action != exception_actions::FailPath); if (action == exception_actions::SkipEvent) { edm::printCmsExceptionWarning("SkipEvent", e); - iExcept = std::exception_ptr(); + *(iExcept.load()) = std::exception_ptr(); } else { - iExcept = std::current_exception(); + *(iExcept.load()) = std::current_exception(); } } catch(...) { - iExcept = std::current_exception(); + *(iExcept.load()) = std::current_exception(); } } @@ -683,16 +682,20 @@ namespace edm { ost << "Processing Event " << ep.id(); ex.addContext(ost.str()); } - iExcept = std::current_exception(); + iExcept.store( new std::exception_ptr(std::current_exception())); } } catch(...) { if (not iExcept) { - iExcept = std::current_exception(); + iExcept.store(new std::exception_ptr(std::current_exception())); } } } - iWait.doneWaiting(iExcept); + std::exception_ptr ptr; + if(iExcept) { + ptr = *iExcept.load(); + } + iWait.doneWaiting(ptr); } diff --git a/FWCore/Framework/src/StreamSchedule.h b/FWCore/Framework/src/StreamSchedule.h index 6765388507377..1c61fa2e2d7f9 100644 --- a/FWCore/Framework/src/StreamSchedule.h +++ b/FWCore/Framework/src/StreamSchedule.h @@ -295,7 +295,7 @@ namespace edm { void resetAll(); - void finishedPaths(std::exception_ptr, WaitingTaskHolder, + void finishedPaths(std::atomic&, WaitingTaskHolder, EventPrincipal& ep, EventSetup const& es); std::exception_ptr finishProcessOneEvent(std::exception_ptr);