diff --git a/FWCore/Concurrency/interface/WaitingTask.h b/FWCore/Concurrency/interface/WaitingTask.h index 46961cbb61fa5..bf5c189755c1e 100644 --- a/FWCore/Concurrency/interface/WaitingTask.h +++ b/FWCore/Concurrency/interface/WaitingTask.h @@ -77,7 +77,7 @@ namespace edm { template class FunctorWaitingTask : public WaitingTask { public: - explicit FunctorWaitingTask( F f): func_(f) {} + explicit FunctorWaitingTask( F f): func_(std::move(f)) {} task* execute() override { func_(exceptionPtr()); @@ -90,7 +90,7 @@ namespace edm { template< typename ALLOC, typename F> FunctorWaitingTask* make_waiting_task( ALLOC&& iAlloc, F f) { - return new (iAlloc) FunctorWaitingTask(f); + return new (iAlloc) FunctorWaitingTask(std::move(f)); } } diff --git a/FWCore/Framework/src/StreamSchedule.cc b/FWCore/Framework/src/StreamSchedule.cc index a510dc5c7400e..bade3b7b7c244 100644 --- a/FWCore/Framework/src/StreamSchedule.cc +++ b/FWCore/Framework/src/StreamSchedule.cc @@ -576,13 +576,21 @@ namespace edm { workerManager_.setupOnDemandSystem(ep,es); ++total_events_; + + //use to give priorities on an error to ones from Paths + auto pathErrorHolder = std::make_unique>(nullptr); + auto pathErrorPtr = pathErrorHolder.get(); auto allPathsDone = make_waiting_task(tbb::task::allocate_root(), - [iTask,this,serviceToken](std::exception_ptr const* iPtr) mutable + [iTask,this,serviceToken,pathError=std::move(pathErrorHolder)](std::exception_ptr const* iPtr) mutable { ServiceRegistry::Operate operate(serviceToken); std::exception_ptr ptr; - if(iPtr) { + if(pathError->load()) { + ptr = *pathError->load(); + delete pathError->load(); + } + if( (not ptr) and iPtr) { ptr = *iPtr; } iTask.doneWaiting(finishProcessOneEvent(ptr)); @@ -593,15 +601,16 @@ namespace edm { WaitingTaskHolder allPathsHolder(allPathsDone); auto pathsDone = make_waiting_task(tbb::task::allocate_root(), - [allPathsHolder,&ep, &es, this,serviceToken](std::exception_ptr const* iPtr) mutable + [allPathsHolder,pathErrorPtr,&ep, &es, this,serviceToken](std::exception_ptr const* iPtr) mutable { ServiceRegistry::Operate operate(serviceToken); - std::exception_ptr ptr; if(iPtr) { - ptr = *iPtr; + //this is used to prioritize this error over one + // that happens in EndPath or Accumulate + pathErrorPtr->store( new std::exception_ptr(*iPtr) ); } - finishedPaths(ptr, std::move(allPathsHolder), ep, es); + finishedPaths(*pathErrorPtr, std::move(allPathsHolder), ep, es); }); //The holder guarantees that if the paths finish before the loop ends @@ -629,12 +638,12 @@ namespace edm { } void - StreamSchedule::finishedPaths(std::exception_ptr iExcept, WaitingTaskHolder iWait, EventPrincipal& ep, + StreamSchedule::finishedPaths(std::atomic& iExcept, WaitingTaskHolder iWait, EventPrincipal& ep, EventSetup const& es) { if(iExcept) { try { - std::rethrow_exception(iExcept); + std::rethrow_exception(*(iExcept.load())); } catch(cms::Exception& e) { exception_actions::ActionCodes action = actionTable().find(e.category()); @@ -642,13 +651,13 @@ namespace edm { assert (action != exception_actions::FailPath); if (action == exception_actions::SkipEvent) { edm::printCmsExceptionWarning("SkipEvent", e); - iExcept = std::exception_ptr(); + *(iExcept.load()) = std::exception_ptr(); } else { - iExcept = std::current_exception(); + *(iExcept.load()) = std::current_exception(); } } catch(...) { - iExcept = std::current_exception(); + *(iExcept.load()) = std::current_exception(); } } @@ -673,16 +682,20 @@ namespace edm { ost << "Processing Event " << ep.id(); ex.addContext(ost.str()); } - iExcept = std::current_exception(); + iExcept.store( new std::exception_ptr(std::current_exception())); } } catch(...) { if (not iExcept) { - iExcept = std::current_exception(); + iExcept.store(new std::exception_ptr(std::current_exception())); } } } - iWait.doneWaiting(iExcept); + std::exception_ptr ptr; + if(iExcept) { + ptr = *iExcept.load(); + } + iWait.doneWaiting(ptr); } diff --git a/FWCore/Framework/src/StreamSchedule.h b/FWCore/Framework/src/StreamSchedule.h index 6765388507377..1c61fa2e2d7f9 100644 --- a/FWCore/Framework/src/StreamSchedule.h +++ b/FWCore/Framework/src/StreamSchedule.h @@ -295,7 +295,7 @@ namespace edm { void resetAll(); - void finishedPaths(std::exception_ptr, WaitingTaskHolder, + void finishedPaths(std::atomic&, WaitingTaskHolder, EventPrincipal& ep, EventSetup const& es); std::exception_ptr finishProcessOneEvent(std::exception_ptr);