Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Handle cross Path exceptions #16065

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
29 changes: 28 additions & 1 deletion FWCore/Framework/src/ProductResolvers.cc
Expand Up @@ -277,20 +277,47 @@ namespace edm {
ModuleCallingContext const* mcc) const {
if(not skipCurrentProcess) {
m_waitingTasks.add(waitTask);

bool expected = false;
if(worker_ and prefetchRequested_.compare_exchange_strong(expected,true)) {
//using a waiting task to do a callback guarantees that
// the m_waitingTasks list will be released from waiting even
// if the module does not put this data product or the
// module has an exception while running

auto waiting = make_waiting_task(tbb::task::allocate_root(),
[this](std::exception_ptr const * iException) {
if(nullptr != iException) {
m_waitingTasks.doneWaiting(*iException);
} else {
m_waitingTasks.doneWaiting(std::exception_ptr());
}
});
worker_->callWhenDoneAsync(waiting);
}
}
}

void
PuttableProductResolver::putProduct_(std::unique_ptr<WrapperBase> edp) const {
ProducedProductResolver::putProduct_(std::move(edp));
m_waitingTasks.doneWaiting(std::exception_ptr());
bool expected = false;
if(prefetchRequested_.compare_exchange_strong(expected,true)) {
m_waitingTasks.doneWaiting(std::exception_ptr());
}
}


void
PuttableProductResolver::resetProductData_(bool deleteEarly) {
m_waitingTasks.reset();
DataManagingProductResolver::resetProductData_(deleteEarly);
prefetchRequested_ = false;
}

void
PuttableProductResolver::setupUnscheduled(UnscheduledConfigurator const& iConfigure) {
worker_ = iConfigure.findWorker(branchDescription().moduleLabel());
}


Expand Down
7 changes: 6 additions & 1 deletion FWCore/Framework/src/ProductResolvers.h
Expand Up @@ -145,7 +145,9 @@ namespace edm {

class PuttableProductResolver : public ProducedProductResolver {
public:
explicit PuttableProductResolver(std::shared_ptr<BranchDescription const> bd) : ProducedProductResolver(bd, ProductStatus::NotPut) {}
explicit PuttableProductResolver(std::shared_ptr<BranchDescription const> bd) : ProducedProductResolver(bd, ProductStatus::NotPut), worker_(nullptr), prefetchRequested_(false) {}

virtual void setupUnscheduled(UnscheduledConfigurator const&) override final;

private:
virtual Resolution resolveProduct_(Principal const& principal,
Expand All @@ -163,6 +165,9 @@ namespace edm {
virtual void resetProductData_(bool deleteEarly) override;

mutable WaitingTaskList m_waitingTasks;
Worker* worker_;
mutable std::atomic<bool> prefetchRequested_;

};

class UnscheduledProductResolver : public ProducedProductResolver {
Expand Down
12 changes: 2 additions & 10 deletions FWCore/Framework/src/Worker.cc
Expand Up @@ -308,20 +308,12 @@ namespace edm {
}
}

void Worker::skipOnPath(EventPrincipal const& iPrincipal) {
void Worker::skipOnPath() {
if( 0 == --numberOfPathsLeftToRun_) {
for(auto index : itemsShouldPutInEvent()) {
auto resolver = iPrincipal.getProductResolverByIndex(index);
resolver->putProduct(std::unique_ptr<WrapperBase>());
}
waitingTasks_.doneWaiting(cached_exception_);
}
}

void Worker::pathFinished(EventPrincipal const& iEvent) {
if(earlyDeleteHelper_) {
earlyDeleteHelper_->pathFinished(iEvent);
}
}
void Worker::postDoEvent(EventPrincipal const& iEvent) {
if(earlyDeleteHelper_) {
earlyDeleteHelper_->moduleRan(iEvent);
Expand Down
7 changes: 5 additions & 2 deletions FWCore/Framework/src/Worker.h
Expand Up @@ -95,7 +95,10 @@ namespace edm {
ParentContext const& parentContext,
typename T::Context const* context);

void skipOnPath(EventPrincipal const& );
void callWhenDoneAsync(WaitingTask* task) {
waitingTasks_.add(task);
}
void skipOnPath();
void beginJob() ;
void endJob();
void beginStream(StreamID id, StreamContext& streamContext);
Expand All @@ -115,7 +118,6 @@ namespace edm {
numberOfPathsLeftToRun_ = numberOfPathsOn_;
}

void pathFinished(EventPrincipal const&);
void postDoEvent(EventPrincipal const&);

ModuleDescription const& description() const {return *(moduleCallingContext_.moduleDescription());}
Expand Down Expand Up @@ -651,6 +653,7 @@ namespace edm {
iException.addContext(iost.str());
setException<T::isEvent_>(std::current_exception());
waitingTasks_.doneWaiting(cached_exception_);
return;
} else {
setPassed<T::isEvent_>();
}
Expand Down
2 changes: 1 addition & 1 deletion FWCore/Framework/src/WorkerInPath.h
Expand Up @@ -42,7 +42,7 @@ namespace edm {
bool checkResultsOfRunWorker(bool wasEvent);

void skipWorker(EventPrincipal const& iPrincipal) {
worker_->skipOnPath(iPrincipal);
worker_->skipOnPath();
}
void skipWorker(RunPrincipal const&) {}
void skipWorker(LuminosityBlockPrincipal const&) {}
Expand Down
2 changes: 1 addition & 1 deletion FWCore/Framework/src/WorkerManager.cc
Expand Up @@ -149,7 +149,7 @@ namespace edm {
WorkerManager::setupOnDemandSystem(EventPrincipal& ep, EventSetup const& es) {
unscheduled_.setEventSetup(es);
if(&ep != lastSetupEventPrincipal_) {
UnscheduledConfigurator config( unscheduled_.begin(), unscheduled_.end(), &(unscheduled_.auxiliary()));
UnscheduledConfigurator config( allWorkers_.begin(), allWorkers_.end(), &(unscheduled_.auxiliary()));
ep.setupUnscheduled(config);
lastSetupEventPrincipal_ = &ep;
}
Expand Down
16 changes: 16 additions & 0 deletions FWCore/Framework/test/test_dependentPathsAndExceptions_cfg.py
@@ -0,0 +1,16 @@
import FWCore.ParameterSet.Config as cms

process = cms.Process("Test")

process.source = cms.Source("EmptySource")

process.fail = cms.EDProducer("FailingProducer")

process.readFail = cms.EDProducer("AddIntsProducer", labels = cms.vstring("fail"))

process.a = cms.EDProducer("BusyWaitIntProducer", ivalue = cms.int32(5), iterations = cms.uint32(10000))

process.p2 = cms.Path(process.fail)
process.p1 = cms.Path(process.readFail+process.a)

process.add_(cms.Service("Tracer"))
4 changes: 4 additions & 0 deletions FWCore/Framework/test/test_earlyTerminationSignal.sh
Expand Up @@ -4,4 +4,8 @@

function die { echo $1: status $2 ; exit $2; }

echo "running cmsRun testEarlyTerminationSignal_cfg.py"
(cmsRun ${LOCAL_TEST_DIR}/testEarlyTerminationSignal_cfg.py 2>&1 | grep -q 'early termination of event: stream = 0 run = 1 lumi = 1 event = 10 : time = 50000001') || die "Early termination signal failed" $?

echo "runnig cmsRun test_dependentPathsAndExceptions_cfg.py"
(cmsRun ${LOCAL_TEST_DIR}/test_dependentPathsAndExceptions_cfg.py 2>&1 | grep -q "Intentional 'NotFound' exception for testing purposes") || die "dependent Paths and Exceptions failed" $?