Skip to content

Commit

Permalink
Modules in SubProcesses are run concurrently on global begin transitions
Browse files Browse the repository at this point in the history
Extended the concurrent running of modules on global begin transitions to SubProcesses.
Child SubProcesses are also run concurrently.
  • Loading branch information
Dr15Jones committed Apr 24, 2017
1 parent fe9e05e commit d677356
Show file tree
Hide file tree
Showing 5 changed files with 384 additions and 209 deletions.
2 changes: 2 additions & 0 deletions FWCore/Framework/interface/SubProcess.h
Expand Up @@ -76,10 +76,12 @@ namespace edm {
EventPrincipal const& principal);

void doBeginRun(RunPrincipal const& principal, IOVSyncValue const& ts);
void doBeginRunAsync(WaitingTaskHolder iHolder, RunPrincipal const& principal, IOVSyncValue const& ts);

void doEndRun(RunPrincipal const& principal, IOVSyncValue const& ts, bool cleaningUpAfterException);

void doBeginLuminosityBlock(LuminosityBlockPrincipal const& principal, IOVSyncValue const& ts);
void doBeginLuminosityBlockAsync(WaitingTaskHolder iHolder, LuminosityBlockPrincipal const& principal, IOVSyncValue const& ts);

void doEndLuminosityBlock(LuminosityBlockPrincipal const& principal, IOVSyncValue const& ts, bool cleaningUpAfterException);

Expand Down
18 changes: 13 additions & 5 deletions FWCore/Framework/src/EventProcessor.cc
Expand Up @@ -34,6 +34,7 @@
#include "FWCore/Framework/src/InputSourceFactory.h"
#include "FWCore/Framework/src/SharedResourcesRegistry.h"
#include "FWCore/Framework/src/streamTransitionAsync.h"
#include "FWCore/Framework/src/globalTransitionAsync.h"

#include "FWCore/MessageLogger/interface/MessageLogger.h"

Expand Down Expand Up @@ -1637,13 +1638,16 @@ namespace edm {
typedef OccurrenceTraits<RunPrincipal, BranchActionGlobalBegin> Traits;
auto globalWaitTask = make_empty_waiting_task();
globalWaitTask->increment_ref_count();
schedule_->processOneGlobalAsync<Traits>(WaitingTaskHolder(globalWaitTask.get()),runPrincipal, es);
beginGlobalTransitionAsync<Traits>(WaitingTaskHolder(globalWaitTask.get()),
*schedule_,
runPrincipal,
ts,
es,
subProcesses_);
globalWaitTask->wait_for_all();
if(globalWaitTask->exceptionPtr() != nullptr) {
std::rethrow_exception(* (globalWaitTask->exceptionPtr()) );
}

for_all(subProcesses_, [&runPrincipal, &ts](auto& subProcess){ subProcess.doBeginRun(runPrincipal, ts); });
}
FDEBUG(1) << "\tbeginRun " << run.runNumber() << "\n";
if(looper_) {
Expand Down Expand Up @@ -1759,12 +1763,16 @@ namespace edm {
typedef OccurrenceTraits<LuminosityBlockPrincipal, BranchActionGlobalBegin> Traits;
auto globalWaitTask = make_empty_waiting_task();
globalWaitTask->increment_ref_count();
schedule_->processOneGlobalAsync<Traits>(WaitingTaskHolder(globalWaitTask.get()),lumiPrincipal, es);
beginGlobalTransitionAsync<Traits>(WaitingTaskHolder(globalWaitTask.get()),
*schedule_,
lumiPrincipal,
ts,
es,
subProcesses_);
globalWaitTask->wait_for_all();
if(globalWaitTask->exceptionPtr() != nullptr) {
std::rethrow_exception(* (globalWaitTask->exceptionPtr()) );
}
for_all(subProcesses_, [&lumiPrincipal, &ts](auto& subProcess){ subProcess.doBeginLuminosityBlock(lumiPrincipal, ts); });
}
FDEBUG(1) << "\tbeginLumi " << run << "/" << lumi << "\n";
if(looper_) {
Expand Down
53 changes: 53 additions & 0 deletions FWCore/Framework/src/SubProcess.cc
Expand Up @@ -25,6 +25,7 @@
#include "FWCore/Framework/src/SignallingProductRegistry.h"
#include "FWCore/Framework/src/PreallocationConfiguration.h"
#include "FWCore/Framework/src/streamTransitionAsync.h"
#include "FWCore/Framework/src/globalTransitionAsync.h"
#include "FWCore/ParameterSet/interface/IllegalParameters.h"
#include "FWCore/ParameterSet/interface/ParameterSet.h"
#include "FWCore/ServiceRegistry/interface/ActivityRegistry.h"
Expand Down Expand Up @@ -433,6 +434,35 @@ namespace edm {
for_all(subProcesses_, [&rp, &ts](auto& subProcess){ subProcess.doBeginRun(rp, ts); });
}

void
SubProcess::doBeginRunAsync(WaitingTaskHolder iHolder, RunPrincipal const& principal, IOVSyncValue const& ts) {
ServiceRegistry::Operate operate(serviceToken_);

auto aux = std::make_shared<RunAuxiliary>(principal.aux());
aux->setProcessHistoryID(principal.processHistoryID());
auto rpp = std::make_shared<RunPrincipal>(aux, preg_, *processConfiguration_, &(historyAppenders_[historyRunOffset_+principal.index()]),principal.index(),false);
auto & processHistoryRegistry = processHistoryRegistries_[historyRunOffset_+principal.index()];
processHistoryRegistry.registerProcessHistory(principal.processHistory());
rpp->fillRunPrincipal(processHistoryRegistry, principal.reader());
principalCache_.insert(rpp);

ProcessHistoryID const& parentInputReducedPHID = principal.reducedProcessHistoryID();
ProcessHistoryID const& inputReducedPHID = rpp->reducedProcessHistoryID();

parentToChildPhID_.insert(std::make_pair(parentInputReducedPHID,inputReducedPHID));

RunPrincipal& rp = *principalCache_.runPrincipalPtr();
propagateProducts(InRun, principal, rp);
typedef OccurrenceTraits<RunPrincipal, BranchActionGlobalBegin> Traits;
beginGlobalTransitionAsync<Traits>(std::move(iHolder),
*schedule_,
rp,
ts,
esp_->eventSetupForInstance(ts),
subProcesses_);
}


void
SubProcess::doEndRun(RunPrincipal const& principal, IOVSyncValue const& ts, bool cleaningUpAfterException) {
ServiceRegistry::Operate operate(serviceToken_);
Expand Down Expand Up @@ -492,6 +522,29 @@ namespace edm {
for_all(subProcesses_, [&lbp, &ts](auto& subProcess){ subProcess.doBeginLuminosityBlock(lbp, ts); });
}

void
SubProcess::doBeginLuminosityBlockAsync(WaitingTaskHolder iHolder, LuminosityBlockPrincipal const& principal, IOVSyncValue const& ts) {
ServiceRegistry::Operate operate(serviceToken_);

auto aux = std::make_shared<LuminosityBlockAuxiliary>(principal.aux());
aux->setProcessHistoryID(principal.processHistoryID());
auto lbpp = std::make_shared<LuminosityBlockPrincipal>(aux, preg_, *processConfiguration_, &(historyAppenders_[historyLumiOffset_+principal.index()]),principal.index(),false);
auto & processHistoryRegistry = processHistoryRegistries_[historyLumiOffset_+principal.index()];
processHistoryRegistry.registerProcessHistory(principal.processHistory());
lbpp->fillLuminosityBlockPrincipal(processHistoryRegistry, principal.reader());
lbpp->setRunPrincipal(principalCache_.runPrincipalPtr());
principalCache_.insert(lbpp);
LuminosityBlockPrincipal& lbp = *principalCache_.lumiPrincipalPtr();
propagateProducts(InLumi, principal, lbp);
typedef OccurrenceTraits<LuminosityBlockPrincipal, BranchActionGlobalBegin> Traits;
beginGlobalTransitionAsync<Traits>(std::move(iHolder),
*schedule_,
lbp,
ts,
esp_->eventSetupForInstance(ts),
subProcesses_);
}

void
SubProcess::doEndLuminosityBlock(LuminosityBlockPrincipal const& principal, IOVSyncValue const& ts, bool cleaningUpAfterException) {
ServiceRegistry::Operate operate(serviceToken_);
Expand Down
112 changes: 112 additions & 0 deletions FWCore/Framework/src/globalTransitionAsync.h
@@ -0,0 +1,112 @@
#ifndef FWCore_Framework_globalTransitionAsync_h
#define FWCore_Framework_globalTransitionAsync_h
// -*- C++ -*-
//
// Package: FWCore/Framework
// Function: globalTransitionAsync
//
/**\function globalTransitionAsync globalTransitionAsync.h "globalTransitionAsync.h"
Description: Helper functions for handling asynchronous global transitions
Usage:
<usage>
*/
//
// Original Author: Chris Jones
// Created: Tue, 06 Sep 2016 16:04:26 GMT
//

// system include files
#include "FWCore/Framework/interface/Schedule.h"
#include "FWCore/Framework/interface/SubProcess.h"
#include "FWCore/Concurrency/interface/WaitingTask.h"
#include "FWCore/Concurrency/interface/WaitingTaskHolder.h"

// user include files

// forward declarations

namespace edm {
class IOVSyncValue;
class EventSetup;
class LuminosityBlockPrincipal;
class RunPrincipal;

//This is code in common between beginStreamRun and beginGlobalLuminosityBlock
inline void subProcessDoGlobalBeginTransitionAsync(WaitingTaskHolder iHolder,SubProcess& iSubProcess, LuminosityBlockPrincipal& iPrincipal, IOVSyncValue const& iTS) {
iSubProcess.doBeginLuminosityBlockAsync(std::move(iHolder),iPrincipal, iTS);
}

inline void subProcessDoGlobalBeginTransitionAsync(WaitingTaskHolder iHolder, SubProcess& iSubProcess, RunPrincipal& iPrincipal, IOVSyncValue const& iTS) {
iSubProcess.doBeginRunAsync(std::move(iHolder),iPrincipal, iTS);
}

/*Not implemented yet
inline void subProcessDoGlobalEndTransitionAsync(WaitingTaskHolder iHolder, SubProcess& iSubProcess, LuminosityBlockPrincipal& iPrincipal, IOVSyncValue const& iTS, bool cleaningUpAfterException) {
iSubProcess.doEndLuminosityBlockAsync(std::move(iHolder),iPrincipal, iTS,cleaningUpAfterException);
}
inline void subProcessDoGlobalEndTransitionAsync(WaitingTaskHolder iHolder, SubProcess& iSubProcess, RunPrincipal& iPrincipal, IOVSyncValue const& iTS, bool cleaningUpAfterException) {
iSubProcess.doEndRunAsync(std::move(iHolder), iPrincipal, iTS, cleaningUpAfterException);
}
*/

template<typename Traits, typename P, typename SC >
void beginGlobalTransitionAsync(WaitingTaskHolder iWait,
Schedule& iSchedule,
P& iPrincipal,
IOVSyncValue const & iTS,
EventSetup const& iES,
SC& iSubProcesses) {
ServiceToken token = ServiceRegistry::instance().presentToken();

//When we are done processing the global for this process,
// we need to run the global for all SubProcesses
auto subs = make_waiting_task(tbb::task::allocate_root(), [&iSubProcesses, iWait,&iPrincipal,iTS,token](std::exception_ptr const* iPtr) mutable {
if(iPtr) {
iWait.doneWaiting(*iPtr);
return;
}
ServiceRegistry::Operate op(token);
for_all(iSubProcesses, [&iWait, &iPrincipal, iTS](auto& subProcess){ subProcessDoGlobalBeginTransitionAsync(iWait,subProcess,iPrincipal, iTS); });
});

WaitingTaskHolder h(subs);
iSchedule.processOneGlobalAsync<Traits>(std::move(h),iPrincipal, iES);
}


template<typename Traits, typename P, typename SC >
void endGlobalTransitionAsync(WaitingTaskHolder iWait,
Schedule& iSchedule,
P& iPrincipal,
IOVSyncValue const & iTS,
EventSetup const& iES,
SC& iSubProcesses,
bool cleaningUpAfterException)
{
ServiceToken token = ServiceRegistry::instance().presentToken();

//When we are done processing the global for this process,
// we need to run the global for all SubProcesses
auto subs = make_waiting_task(tbb::task::allocate_root(), [&iSubProcesses, iWait,&iPrincipal,iTS,token,cleaningUpAfterException](std::exception_ptr const* iPtr) mutable {
if(iPtr) {
iWait.doneWaiting(*iPtr);
return;
}
ServiceRegistry::Operate op(token);
for_all(iSubProcesses, [&iWait, &iPrincipal, iTS,cleaningUpAfterException](auto& subProcess){
subProcessDoGlobalEndTransitionAsync(iWait,subProcess,iPrincipal, iTS,cleaningUpAfterException); });
});

WaitingTaskHolder h(subs);
iSchedule.processOneGlobalAsync<Traits>(std::move(h),iPrincipal, iES,cleaningUpAfterException);


}

};

#endif

0 comments on commit d677356

Please sign in to comment.