Skip to content

Commit

Permalink
Merge pull request #16065 from Dr15Jones/betterExceptionHandling
Browse files Browse the repository at this point in the history
Handle cross Path exceptions
  • Loading branch information
smuzaffar committed Oct 2, 2016
2 parents d1be7a4 + 575bdf6 commit 031cee8
Show file tree
Hide file tree
Showing 8 changed files with 63 additions and 16 deletions.
29 changes: 28 additions & 1 deletion FWCore/Framework/src/ProductResolvers.cc
Expand Up @@ -277,20 +277,47 @@ namespace edm {
ModuleCallingContext const* mcc) const {
if(not skipCurrentProcess) {
m_waitingTasks.add(waitTask);

bool expected = false;
if(worker_ and prefetchRequested_.compare_exchange_strong(expected,true)) {
//using a waiting task to do a callback guarantees that
// the m_waitingTasks list will be released from waiting even
// if the module does not put this data product or the
// module has an exception while running

auto waiting = make_waiting_task(tbb::task::allocate_root(),
[this](std::exception_ptr const * iException) {
if(nullptr != iException) {
m_waitingTasks.doneWaiting(*iException);
} else {
m_waitingTasks.doneWaiting(std::exception_ptr());
}
});
worker_->callWhenDoneAsync(waiting);
}
}
}

void
PuttableProductResolver::putProduct_(std::unique_ptr<WrapperBase> edp) const {
ProducedProductResolver::putProduct_(std::move(edp));
m_waitingTasks.doneWaiting(std::exception_ptr());
bool expected = false;
if(prefetchRequested_.compare_exchange_strong(expected,true)) {
m_waitingTasks.doneWaiting(std::exception_ptr());
}
}


void
PuttableProductResolver::resetProductData_(bool deleteEarly) {
m_waitingTasks.reset();
DataManagingProductResolver::resetProductData_(deleteEarly);
prefetchRequested_ = false;
}

void
PuttableProductResolver::setupUnscheduled(UnscheduledConfigurator const& iConfigure) {
worker_ = iConfigure.findWorker(branchDescription().moduleLabel());
}


Expand Down
7 changes: 6 additions & 1 deletion FWCore/Framework/src/ProductResolvers.h
Expand Up @@ -145,7 +145,9 @@ namespace edm {

class PuttableProductResolver : public ProducedProductResolver {
public:
explicit PuttableProductResolver(std::shared_ptr<BranchDescription const> bd) : ProducedProductResolver(bd, ProductStatus::NotPut) {}
explicit PuttableProductResolver(std::shared_ptr<BranchDescription const> bd) : ProducedProductResolver(bd, ProductStatus::NotPut), worker_(nullptr), prefetchRequested_(false) {}

virtual void setupUnscheduled(UnscheduledConfigurator const&) override final;

private:
virtual Resolution resolveProduct_(Principal const& principal,
Expand All @@ -163,6 +165,9 @@ namespace edm {
virtual void resetProductData_(bool deleteEarly) override;

mutable WaitingTaskList m_waitingTasks;
Worker* worker_;
mutable std::atomic<bool> prefetchRequested_;

};

class UnscheduledProductResolver : public ProducedProductResolver {
Expand Down
12 changes: 2 additions & 10 deletions FWCore/Framework/src/Worker.cc
Expand Up @@ -308,20 +308,12 @@ namespace edm {
}
}

void Worker::skipOnPath(EventPrincipal const& iPrincipal) {
void Worker::skipOnPath() {
if( 0 == --numberOfPathsLeftToRun_) {
for(auto index : itemsShouldPutInEvent()) {
auto resolver = iPrincipal.getProductResolverByIndex(index);
resolver->putProduct(std::unique_ptr<WrapperBase>());
}
waitingTasks_.doneWaiting(cached_exception_);
}
}

void Worker::pathFinished(EventPrincipal const& iEvent) {
if(earlyDeleteHelper_) {
earlyDeleteHelper_->pathFinished(iEvent);
}
}
void Worker::postDoEvent(EventPrincipal const& iEvent) {
if(earlyDeleteHelper_) {
earlyDeleteHelper_->moduleRan(iEvent);
Expand Down
7 changes: 5 additions & 2 deletions FWCore/Framework/src/Worker.h
Expand Up @@ -95,7 +95,10 @@ namespace edm {
ParentContext const& parentContext,
typename T::Context const* context);

void skipOnPath(EventPrincipal const& );
void callWhenDoneAsync(WaitingTask* task) {
waitingTasks_.add(task);
}
void skipOnPath();
void beginJob() ;
void endJob();
void beginStream(StreamID id, StreamContext& streamContext);
Expand All @@ -115,7 +118,6 @@ namespace edm {
numberOfPathsLeftToRun_ = numberOfPathsOn_;
}

void pathFinished(EventPrincipal const&);
void postDoEvent(EventPrincipal const&);

ModuleDescription const& description() const {return *(moduleCallingContext_.moduleDescription());}
Expand Down Expand Up @@ -651,6 +653,7 @@ namespace edm {
iException.addContext(iost.str());
setException<T::isEvent_>(std::current_exception());
waitingTasks_.doneWaiting(cached_exception_);
return;
} else {
setPassed<T::isEvent_>();
}
Expand Down
2 changes: 1 addition & 1 deletion FWCore/Framework/src/WorkerInPath.h
Expand Up @@ -42,7 +42,7 @@ namespace edm {
bool checkResultsOfRunWorker(bool wasEvent);

void skipWorker(EventPrincipal const& iPrincipal) {
worker_->skipOnPath(iPrincipal);
worker_->skipOnPath();
}
void skipWorker(RunPrincipal const&) {}
void skipWorker(LuminosityBlockPrincipal const&) {}
Expand Down
2 changes: 1 addition & 1 deletion FWCore/Framework/src/WorkerManager.cc
Expand Up @@ -149,7 +149,7 @@ namespace edm {
WorkerManager::setupOnDemandSystem(EventPrincipal& ep, EventSetup const& es) {
unscheduled_.setEventSetup(es);
if(&ep != lastSetupEventPrincipal_) {
UnscheduledConfigurator config( unscheduled_.begin(), unscheduled_.end(), &(unscheduled_.auxiliary()));
UnscheduledConfigurator config( allWorkers_.begin(), allWorkers_.end(), &(unscheduled_.auxiliary()));
ep.setupUnscheduled(config);
lastSetupEventPrincipal_ = &ep;
}
Expand Down
16 changes: 16 additions & 0 deletions FWCore/Framework/test/test_dependentPathsAndExceptions_cfg.py
@@ -0,0 +1,16 @@
import FWCore.ParameterSet.Config as cms

process = cms.Process("Test")

process.source = cms.Source("EmptySource")

process.fail = cms.EDProducer("FailingProducer")

process.readFail = cms.EDProducer("AddIntsProducer", labels = cms.vstring("fail"))

process.a = cms.EDProducer("BusyWaitIntProducer", ivalue = cms.int32(5), iterations = cms.uint32(10000))

process.p2 = cms.Path(process.fail)
process.p1 = cms.Path(process.readFail+process.a)

process.add_(cms.Service("Tracer"))
4 changes: 4 additions & 0 deletions FWCore/Framework/test/test_earlyTerminationSignal.sh
Expand Up @@ -4,4 +4,8 @@

function die { echo $1: status $2 ; exit $2; }

echo "running cmsRun testEarlyTerminationSignal_cfg.py"
(cmsRun ${LOCAL_TEST_DIR}/testEarlyTerminationSignal_cfg.py 2>&1 | grep -q 'early termination of event: stream = 0 run = 1 lumi = 1 event = 10 : time = 50000001') || die "Early termination signal failed" $?

echo "runnig cmsRun test_dependentPathsAndExceptions_cfg.py"
(cmsRun ${LOCAL_TEST_DIR}/test_dependentPathsAndExceptions_cfg.py 2>&1 | grep -q "Intentional 'NotFound' exception for testing purposes") || die "dependent Paths and Exceptions failed" $?

0 comments on commit 031cee8

Please sign in to comment.