Skip to content

Commit

Permalink
Run endStream concurrently as part of endJob
Browse files Browse the repository at this point in the history
  • Loading branch information
Dr15Jones committed Aug 31, 2021
1 parent bfe1358 commit 2601610
Showing 1 changed file with 39 additions and 5 deletions.
44 changes: 39 additions & 5 deletions FWCore/Framework/src/EventProcessor.cc
Expand Up @@ -698,13 +698,47 @@ namespace edm {
//make the services available
ServiceRegistry::Operate operate(serviceToken_);

//NOTE: this really should go elsewhere in the future
for (unsigned int i = 0; i < preallocations_.numberOfStreams(); ++i) {
c.call([this, i]() { this->schedule_->endStream(i); });
for (auto& subProcess : subProcesses_) {
c.call([&subProcess, i]() { subProcess.doEndStream(i); });
using namespace edm::waiting_task::chain;

edm::FinalWaitingTask waitTask;
tbb::task_group group;

{
//handle endStream transitions
edm::WaitingTaskHolder taskHolder(group, &waitTask);
std::mutex collectorMutex;
for (unsigned int i = 0; i < preallocations_.numberOfStreams(); ++i) {
first([this, i, &c, &collectorMutex](auto nextTask) {
std::exception_ptr ep;
try {
this->schedule_->endStream(i);
} catch (...) {
ep = std::current_exception();
}
if (ep) {
std::lock_guard<std::mutex> l(collectorMutex);
c.call([&ep]() { std::rethrow_exception(ep); });
}
}) | then([this, i, &c, &collectorMutex](auto nextTask) {
for (auto& subProcess : subProcesses_) {
first([i, &c, &collectorMutex, &subProcess](auto nextTask) {
std::exception_ptr ep;
try {
subProcess.doEndStream(i);
} catch (...) {
ep = std::current_exception();
}
if (ep) {
std::lock_guard<std::mutex> l(collectorMutex);
c.call([&ep]() { std::rethrow_exception(ep); });
}
}) | lastTask(nextTask);
}
}) | lastTask(taskHolder);
}
}
group.wait();

auto actReg = actReg_.get();
c.call([actReg]() { actReg->preEndJobSignal_(); });
schedule_->endJob(c);
Expand Down

0 comments on commit 2601610

Please sign in to comment.