Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Minor thread related improvements to the framework #22173

Merged
merged 4 commits into from Feb 11, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
32 changes: 12 additions & 20 deletions FWCore/Framework/src/EventProcessor.cc
Expand Up @@ -1142,13 +1142,13 @@ namespace edm {
if (iPtr) {
holder.doneWaiting(*iPtr);
} else {
//make the services available
ServiceRegistry::Operate operate(serviceToken_);

status->globalBeginDidSucceed();
EventSetup const& es = esp_->eventSetup();
if(looper_) {
try {
//make the services available
ServiceRegistry::Operate operate(serviceToken_);
looper_->doBeginLuminosityBlock(*(status->lumiPrincipal()), es, &processContext_);
}catch(...) {
holder.doneWaiting(std::current_exception());
Expand Down Expand Up @@ -1234,23 +1234,15 @@ namespace edm {

unsigned int streamIndex = 0;
for(; streamIndex< preallocations_.numberOfStreams()-1; ++streamIndex) {
tbb::task::enqueue( *edm::make_waiting_task(tbb::task::allocate_root(),
[this,streamIndex,h = iHolder](std::exception_ptr const* iPtr) mutable
{
if(iPtr) {
h.doneWaiting(*iPtr);
} else {
handleNextEventForStreamAsync(std::move(h), streamIndex);
}
}) );

}
//need a temporary Task so that the temporary WaitingTaskHolder assigned to h will go out of scope
// before the call to spawn_and_wait_for_all
auto t = edm::make_waiting_task(tbb::task::allocate_root(),[this,streamIndex,h=iHolder](std::exception_ptr const*){
handleNextEventForStreamAsync(h,streamIndex);
});
tbb::task::spawn(*t);
tbb::task::enqueue( *edm::make_functor_task(tbb::task::allocate_root(),
[this,streamIndex,h = iHolder](){
handleNextEventForStreamAsync(std::move(h), streamIndex);
}) );

}
tbb::task::spawn( *edm::make_functor_task(tbb::task::allocate_root(),[this,streamIndex,h=std::move(iHolder)](){
handleNextEventForStreamAsync(std::move(h),streamIndex);
}) );
}

void EventProcessor::globalEndLumiAsync(edm::WaitingTaskHolder iTask, std::shared_ptr<LuminosityBlockProcessingStatus> iLumiStatus) {
Expand Down Expand Up @@ -1606,10 +1598,10 @@ namespace edm {
tbb::task::allocate_root(),
[this,pep,iHolder](std::exception_ptr const* iPtr) mutable
{
ServiceRegistry::Operate operate(serviceToken_);

//NOTE: If we have a looper we only have one Stream
if(looper_) {
ServiceRegistry::Operate operate(serviceToken_);
processEventWithLooper(*pep);
}

Expand Down
114 changes: 59 additions & 55 deletions FWCore/Framework/src/GlobalSchedule.h
Expand Up @@ -168,68 +168,72 @@ namespace edm {
EventSetup const& es,
ServiceToken const& token,
bool cleaningUpAfterException) {
//need the doneTask to own the memory
auto globalContext = std::make_shared<GlobalContext>(T::makeGlobalContext(ep, processContext_));

if(actReg_) {
//Services may depend upon each other
ServiceRegistry::Operate op(token);
T::preScheduleSignal(actReg_.get(), globalContext.get());
}

auto doneTask = make_waiting_task(tbb::task::allocate_root(),
[this,iHolder, cleaningUpAfterException, globalContext, token](std::exception_ptr const* iPtr) mutable
{
ServiceRegistry::Operate op(token);
std::exception_ptr excpt;
if(iPtr) {
excpt = *iPtr;
//add context information to the exception and print message
try {
convertException::wrap([&]() {
std::rethrow_exception(excpt);
});
} catch(cms::Exception& ex) {
//TODO: should add the transition type info
std::ostringstream ost;
if(ex.context().empty()) {
ost<<"Processing "<<T::transitionName()<<" ";
try {
//need the doneTask to own the memory
auto globalContext = std::make_shared<GlobalContext>(T::makeGlobalContext(ep, processContext_));

if(actReg_) {
//Services may depend upon each other
ServiceRegistry::Operate op(token);
T::preScheduleSignal(actReg_.get(), globalContext.get());
}

auto doneTask = make_waiting_task(tbb::task::allocate_root(),
[this,iHolder, cleaningUpAfterException, globalContext, token](std::exception_ptr const* iPtr) mutable
{
std::exception_ptr excpt;
if(iPtr) {
excpt = *iPtr;
//add context information to the exception and print message
try {
convertException::wrap([&]() {
std::rethrow_exception(excpt);
});
} catch(cms::Exception& ex) {
//TODO: should add the transition type info
std::ostringstream ost;
if(ex.context().empty()) {
ost<<"Processing "<<T::transitionName()<<" ";
}
ServiceRegistry::Operate op(token);
addContextAndPrintException(ost.str().c_str(), ex, cleaningUpAfterException);
excpt = std::current_exception();
}
if(actReg_) {
ServiceRegistry::Operate op(token);
actReg_->preGlobalEarlyTerminationSignal_(*globalContext,TerminationOrigin::ExceptionFromThisContext);
}
addContextAndPrintException(ost.str().c_str(), ex, cleaningUpAfterException);
excpt = std::current_exception();
}
if(actReg_) {
actReg_->preGlobalEarlyTerminationSignal_(*globalContext,TerminationOrigin::ExceptionFromThisContext);
}
}
if(actReg_) {
try {
T::postScheduleSignal(actReg_.get(), globalContext.get());
} catch(...) {
if(not excpt) {
excpt = std::current_exception();
try {
ServiceRegistry::Operate op(token);
T::postScheduleSignal(actReg_.get(), globalContext.get());
} catch(...) {
if(not excpt) {
excpt = std::current_exception();
}
}
}
}
iHolder.doneWaiting(excpt);

});
workerManagers_[ep.index()].resetAll();

ParentContext parentContext(globalContext.get());
//make sure the ProductResolvers know about their
// workers to allow proper data dependency handling
workerManagers_[ep.index()].setupOnDemandSystem(ep,es);

//make sure the task doesn't get run until all workers have beens started
WaitingTaskHolder holdForLoop(doneTask);
auto& aw = workerManagers_[ep.index()].allWorkers();
for(Worker* worker: boost::adaptors::reverse(aw) ) {
worker->doWorkAsync<T>(doneTask,ep,es,token, StreamID::invalidStreamID(),parentContext,globalContext.get());
iHolder.doneWaiting(excpt);

});
workerManagers_[ep.index()].resetAll();

ParentContext parentContext(globalContext.get());
//make sure the ProductResolvers know about their
// workers to allow proper data dependency handling
workerManagers_[ep.index()].setupOnDemandSystem(ep,es);

//make sure the task doesn't get run until all workers have beens started
WaitingTaskHolder holdForLoop(doneTask);
auto& aw = workerManagers_[ep.index()].allWorkers();
for(Worker* worker: boost::adaptors::reverse(aw) ) {
worker->doWorkAsync<T>(doneTask,ep,es,token, StreamID::invalidStreamID(),parentContext,globalContext.get());
}
} catch(...) {
iHolder.doneWaiting(std::current_exception());
}

}

}

#endif
158 changes: 81 additions & 77 deletions FWCore/Framework/src/StreamSchedule.cc
Expand Up @@ -540,88 +540,92 @@ namespace edm {
EventSetup const& es,
ServiceToken const& serviceToken,
std::vector<edm::propagate_const<std::shared_ptr<PathStatusInserter>>>& pathStatusInserters) {
this->resetAll();
try {
this->resetAll();

using Traits = OccurrenceTraits<EventPrincipal, BranchActionStreamBegin>;

Traits::setStreamContext(streamContext_, ep);
//a service may want to communicate with another service
ServiceRegistry::Operate guard(serviceToken);
Traits::preScheduleSignal(actReg_.get(), &streamContext_);

HLTPathStatus hltPathStatus(hlt::Pass, 0);
for (int empty_trig_path : empty_trig_paths_) {
results_->at(empty_trig_path) = hltPathStatus;
pathStatusInserters[empty_trig_path]->setPathStatus(streamID_, hltPathStatus);
std::exception_ptr iException = pathStatusInserterWorkers_[empty_trig_path]->runModuleDirectly<OccurrenceTraits<EventPrincipal, BranchActionStreamBegin>>(
ep, es, streamID_, ParentContext(&streamContext_), &streamContext_
);
if (iException) {
iTask.doneWaiting(iException);
return;
}
}
for (int empty_end_path : empty_end_paths_) {
std::exception_ptr iException = endPathStatusInserterWorkers_[empty_end_path]->runModuleDirectly<OccurrenceTraits<EventPrincipal, BranchActionStreamBegin>>(
ep, es, streamID_, ParentContext(&streamContext_), &streamContext_
);
if (iException) {
iTask.doneWaiting(iException);
return;
using Traits = OccurrenceTraits<EventPrincipal, BranchActionStreamBegin>;

Traits::setStreamContext(streamContext_, ep);
//a service may want to communicate with another service
ServiceRegistry::Operate guard(serviceToken);
Traits::preScheduleSignal(actReg_.get(), &streamContext_);

HLTPathStatus hltPathStatus(hlt::Pass, 0);
for (int empty_trig_path : empty_trig_paths_) {
results_->at(empty_trig_path) = hltPathStatus;
pathStatusInserters[empty_trig_path]->setPathStatus(streamID_, hltPathStatus);
std::exception_ptr except = pathStatusInserterWorkers_[empty_trig_path]->runModuleDirectly<OccurrenceTraits<EventPrincipal, BranchActionStreamBegin>>(
ep, es, streamID_, ParentContext(&streamContext_), &streamContext_
);
if (except) {
iTask.doneWaiting(except);
return;
}
}
}

// This call takes care of the unscheduled processing.
workerManager_.setupOnDemandSystem(ep,es);

++total_events_;
auto allPathsDone = make_waiting_task(tbb::task::allocate_root(),
[iTask,this,serviceToken](std::exception_ptr const* iPtr) mutable
{
ServiceRegistry::Operate operate(serviceToken);

std::exception_ptr ptr;
if(iPtr) {
ptr = *iPtr;
}
iTask.doneWaiting(finishProcessOneEvent(ptr));
});
//The holder guarantees that if the paths finish before the loop ends
// that we do not start too soon. It also guarantees that the task will
// run under that condition.
WaitingTaskHolder allPathsHolder(allPathsDone);

auto pathsDone = make_waiting_task(tbb::task::allocate_root(),
[allPathsHolder,&ep, &es, this,serviceToken](std::exception_ptr const* iPtr) mutable
{
ServiceRegistry::Operate operate(serviceToken);

std::exception_ptr ptr;
if(iPtr) {
ptr = *iPtr;
}
finishedPaths(ptr, std::move(allPathsHolder), ep, es);
});

//The holder guarantees that if the paths finish before the loop ends
// that we do not start too soon. It also guarantees that the task will
// run under that condition.
WaitingTaskHolder taskHolder(pathsDone);
for (int empty_end_path : empty_end_paths_) {
std::exception_ptr except = endPathStatusInserterWorkers_[empty_end_path]->runModuleDirectly<OccurrenceTraits<EventPrincipal, BranchActionStreamBegin>>(
ep, es, streamID_, ParentContext(&streamContext_), &streamContext_
);
if (except) {
iTask.doneWaiting(except);
return;
}
}

// This call takes care of the unscheduled processing.
workerManager_.setupOnDemandSystem(ep,es);

++total_events_;
auto allPathsDone = make_waiting_task(tbb::task::allocate_root(),
[iTask,this,serviceToken](std::exception_ptr const* iPtr) mutable
{
ServiceRegistry::Operate operate(serviceToken);

std::exception_ptr ptr;
if(iPtr) {
ptr = *iPtr;
}
iTask.doneWaiting(finishProcessOneEvent(ptr));
});
//The holder guarantees that if the paths finish before the loop ends
// that we do not start too soon. It also guarantees that the task will
// run under that condition.
WaitingTaskHolder allPathsHolder(allPathsDone);

auto pathsDone = make_waiting_task(tbb::task::allocate_root(),
[allPathsHolder,&ep, &es, this,serviceToken](std::exception_ptr const* iPtr) mutable
{
ServiceRegistry::Operate operate(serviceToken);

std::exception_ptr ptr;
if(iPtr) {
ptr = *iPtr;
}
finishedPaths(ptr, std::move(allPathsHolder), ep, es);
});

//The holder guarantees that if the paths finish before the loop ends
// that we do not start too soon. It also guarantees that the task will
// run under that condition.
WaitingTaskHolder taskHolder(pathsDone);

//start end paths first so on single threaded the paths will run first
for(auto it = end_paths_.rbegin(), itEnd = end_paths_.rend();
it != itEnd; ++it) {
it->processOneOccurrenceAsync(allPathsDone,ep, es, serviceToken, streamID_, &streamContext_);
}
//start end paths first so on single threaded the paths will run first
for(auto it = end_paths_.rbegin(), itEnd = end_paths_.rend();
it != itEnd; ++it) {
it->processOneOccurrenceAsync(allPathsDone,ep, es, serviceToken, streamID_, &streamContext_);
}

for(auto it = trig_paths_.rbegin(), itEnd = trig_paths_.rend();
it != itEnd; ++ it) {
it->processOneOccurrenceAsync(pathsDone,ep, es, serviceToken, streamID_, &streamContext_);
}
for(auto it = trig_paths_.rbegin(), itEnd = trig_paths_.rend();
it != itEnd; ++ it) {
it->processOneOccurrenceAsync(pathsDone,ep, es, serviceToken, streamID_, &streamContext_);
}

ParentContext parentContext(&streamContext_);
workerManager_.processAccumulatorsAsync<OccurrenceTraits<EventPrincipal, BranchActionStreamBegin>>(
allPathsDone, ep, es, serviceToken, streamID_, parentContext, &streamContext_);
ParentContext parentContext(&streamContext_);
workerManager_.processAccumulatorsAsync<OccurrenceTraits<EventPrincipal, BranchActionStreamBegin>>(
allPathsDone, ep, es, serviceToken, streamID_, parentContext, &streamContext_);
}catch (...) {
iTask.doneWaiting(std::current_exception());
}
}

void
Expand Down