Skip to content

Commit

Permalink
Merge pull request #18451 from Dr15Jones/unscheduledBeginTransitionHa…
Browse files Browse the repository at this point in the history
…ndling

Run modules concurrently during global begin transitions
  • Loading branch information
davidlange6 committed Apr 26, 2017
2 parents 35ee5e1 + d677356 commit 588aa71
Show file tree
Hide file tree
Showing 25 changed files with 793 additions and 247 deletions.
4 changes: 2 additions & 2 deletions FWCore/Framework/interface/EDConsumerBase.h
Expand Up @@ -67,7 +67,7 @@ namespace edm {
void itemsToGet(BranchType, std::vector<ProductResolverIndexAndSkipBit>&) const;
void itemsMayGet(BranchType, std::vector<ProductResolverIndexAndSkipBit>&) const;

std::vector<ProductResolverIndexAndSkipBit> const& itemsToGetFromEvent() const { return itemsToGetFromEvent_; }
std::vector<ProductResolverIndexAndSkipBit> const& itemsToGetFrom(BranchType iType) const { return itemsToGetFromBranch_[iType]; }

///\return true if the product corresponding to the index was registered via consumes or mayConsume call
bool registeredToConsume(ProductResolverIndex, bool, BranchType) const;
Expand Down Expand Up @@ -184,7 +184,7 @@ namespace edm {
// for each of the 3 labels needed to id the data
std::vector<char> m_tokenLabels;

std::vector<ProductResolverIndexAndSkipBit> itemsToGetFromEvent_;
std::array<std::vector<ProductResolverIndexAndSkipBit>, edm::NumBranchTypes> itemsToGetFromBranch_;

bool frozen_;
};
Expand Down
4 changes: 4 additions & 0 deletions FWCore/Framework/interface/Principal.h
Expand Up @@ -83,6 +83,9 @@ namespace edm {

void clearPrincipal();

void setAtEndTransition(bool iAtEnd);
bool atEndTransition() const {return atEndTransition_;}

void deleteProduct(BranchID const& id) const;

EDProductGetter const* prodGetter() const {return this;}
Expand Down Expand Up @@ -286,6 +289,7 @@ namespace edm {

CacheIdentifier_t cacheIdentifier_;

bool atEndTransition_;
};

template <typename PROD>
Expand Down
16 changes: 16 additions & 0 deletions FWCore/Framework/interface/Schedule.h
Expand Up @@ -143,6 +143,12 @@ namespace edm {
EventSetup const& eventSetup,
bool cleaningUpAfterException = false);

template <typename T>
void processOneGlobalAsync(WaitingTaskHolder iTask,
typename T::MyPrincipal& principal,
EventSetup const& eventSetup,
bool cleaningUpAfterException = false);

template <typename T>
void processOneStream(unsigned int iStreamID,
typename T::MyPrincipal& principal,
Expand Down Expand Up @@ -325,5 +331,15 @@ namespace edm {
bool cleaningUpAfterException) {
globalSchedule_->processOneGlobal<T>(ep,es,cleaningUpAfterException);
}

template <typename T>
void
Schedule::processOneGlobalAsync(WaitingTaskHolder iTaskHolder,
typename T::MyPrincipal& ep,
EventSetup const& es,
bool cleaningUpAfterException) {
globalSchedule_->processOneGlobalAsync<T>(iTaskHolder,ep,es,cleaningUpAfterException);
}

}
#endif
2 changes: 2 additions & 0 deletions FWCore/Framework/interface/SubProcess.h
Expand Up @@ -76,10 +76,12 @@ namespace edm {
EventPrincipal const& principal);

void doBeginRun(RunPrincipal const& principal, IOVSyncValue const& ts);
void doBeginRunAsync(WaitingTaskHolder iHolder, RunPrincipal const& principal, IOVSyncValue const& ts);

void doEndRun(RunPrincipal const& principal, IOVSyncValue const& ts, bool cleaningUpAfterException);

void doBeginLuminosityBlock(LuminosityBlockPrincipal const& principal, IOVSyncValue const& ts);
void doBeginLuminosityBlockAsync(WaitingTaskHolder iHolder, LuminosityBlockPrincipal const& principal, IOVSyncValue const& ts);

void doEndLuminosityBlock(LuminosityBlockPrincipal const& principal, IOVSyncValue const& ts, bool cleaningUpAfterException);

Expand Down
2 changes: 1 addition & 1 deletion FWCore/Framework/interface/stream/EDAnalyzerAdaptorBase.h
Expand Up @@ -86,7 +86,7 @@ namespace edm {
//Same interface as EDConsumerBase
void itemsToGet(BranchType, std::vector<ProductResolverIndexAndSkipBit>&) const;
void itemsMayGet(BranchType, std::vector<ProductResolverIndexAndSkipBit>&) const;
std::vector<ProductResolverIndexAndSkipBit> const& itemsToGetFromEvent() const;
std::vector<ProductResolverIndexAndSkipBit> const& itemsToGetFrom(BranchType) const;

void updateLookup(BranchType iBranchType,
ProductResolverIndexHelper const&,
Expand Down
Expand Up @@ -78,7 +78,7 @@ namespace edm {

void itemsToGet(BranchType, std::vector<ProductResolverIndexAndSkipBit>&) const;
void itemsMayGet(BranchType, std::vector<ProductResolverIndexAndSkipBit>&) const;
std::vector<ProductResolverIndexAndSkipBit> const& itemsToGetFromEvent() const;
std::vector<ProductResolverIndexAndSkipBit> const& itemsToGetFrom(BranchType) const;

void updateLookup(BranchType iBranchType,
ProductResolverIndexHelper const&,
Expand Down
8 changes: 3 additions & 5 deletions FWCore/Framework/src/EDConsumerBase.cc
Expand Up @@ -187,11 +187,9 @@ EDConsumerBase::updateLookup(BranchType iBranchType,
}
m_tokenInfo.shrink_to_fit();

if(iBranchType == InEvent) {
itemsToGet(iBranchType, itemsToGetFromEvent_);
if(iPrefetchMayGet) {
itemsMayGet(iBranchType, itemsToGetFromEvent_);
}
itemsToGet(iBranchType, itemsToGetFromBranch_[iBranchType]);
if(iPrefetchMayGet) {
itemsMayGet(iBranchType, itemsToGetFromBranch_[iBranchType]);
}
}

Expand Down
31 changes: 27 additions & 4 deletions FWCore/Framework/src/EventProcessor.cc
Expand Up @@ -34,6 +34,7 @@
#include "FWCore/Framework/src/InputSourceFactory.h"
#include "FWCore/Framework/src/SharedResourcesRegistry.h"
#include "FWCore/Framework/src/streamTransitionAsync.h"
#include "FWCore/Framework/src/globalTransitionAsync.h"

#include "FWCore/MessageLogger/interface/MessageLogger.h"

Expand Down Expand Up @@ -1635,8 +1636,18 @@ namespace edm {
}
{
typedef OccurrenceTraits<RunPrincipal, BranchActionGlobalBegin> Traits;
schedule_->processOneGlobal<Traits>(runPrincipal, es);
for_all(subProcesses_, [&runPrincipal, &ts](auto& subProcess){ subProcess.doBeginRun(runPrincipal, ts); });
auto globalWaitTask = make_empty_waiting_task();
globalWaitTask->increment_ref_count();
beginGlobalTransitionAsync<Traits>(WaitingTaskHolder(globalWaitTask.get()),
*schedule_,
runPrincipal,
ts,
es,
subProcesses_);
globalWaitTask->wait_for_all();
if(globalWaitTask->exceptionPtr() != nullptr) {
std::rethrow_exception(* (globalWaitTask->exceptionPtr()) );
}
}
FDEBUG(1) << "\tbeginRun " << run.runNumber() << "\n";
if(looper_) {
Expand Down Expand Up @@ -1713,6 +1724,7 @@ namespace edm {
//looper_->doStreamEndRun(schedule_->streamID(),runPrincipal, es);
}
{
runPrincipal.setAtEndTransition(true);
typedef OccurrenceTraits<RunPrincipal, BranchActionGlobalEnd> Traits;
schedule_->processOneGlobal<Traits>(runPrincipal, es, cleaningUpAfterException);
for_all(subProcesses_, [&runPrincipal, &ts, cleaningUpAfterException](auto& subProcess){subProcess.doEndRun(runPrincipal, ts, cleaningUpAfterException); });
Expand Down Expand Up @@ -1749,8 +1761,18 @@ namespace edm {
EventSetup const& es = esp_->eventSetup();
{
typedef OccurrenceTraits<LuminosityBlockPrincipal, BranchActionGlobalBegin> Traits;
schedule_->processOneGlobal<Traits>(lumiPrincipal, es);
for_all(subProcesses_, [&lumiPrincipal, &ts](auto& subProcess){ subProcess.doBeginLuminosityBlock(lumiPrincipal, ts); });
auto globalWaitTask = make_empty_waiting_task();
globalWaitTask->increment_ref_count();
beginGlobalTransitionAsync<Traits>(WaitingTaskHolder(globalWaitTask.get()),
*schedule_,
lumiPrincipal,
ts,
es,
subProcesses_);
globalWaitTask->wait_for_all();
if(globalWaitTask->exceptionPtr() != nullptr) {
std::rethrow_exception(* (globalWaitTask->exceptionPtr()) );
}
}
FDEBUG(1) << "\tbeginLumi " << run << "/" << lumi << "\n";
if(looper_) {
Expand Down Expand Up @@ -1827,6 +1849,7 @@ namespace edm {
//looper_->doStreamEndLuminosityBlock(schedule_->streamID(),lumiPrincipal, es);
}
{
lumiPrincipal.setAtEndTransition(true);
typedef OccurrenceTraits<LuminosityBlockPrincipal, BranchActionGlobalEnd> Traits;
schedule_->processOneGlobal<Traits>(lumiPrincipal, es, cleaningUpAfterException);
for_all(subProcesses_, [&lumiPrincipal, &ts, cleaningUpAfterException](auto& subProcess){ subProcess.doEndLuminosityBlock(lumiPrincipal, ts, cleaningUpAfterException); });
Expand Down
81 changes: 81 additions & 0 deletions FWCore/Framework/src/GlobalSchedule.h
Expand Up @@ -20,13 +20,16 @@
#include "FWCore/Utilities/interface/Exception.h"
#include "FWCore/Utilities/interface/StreamID.h"
#include "FWCore/Utilities/interface/propagate_const.h"
#include "FWCore/Concurrency/interface/WaitingTaskHolder.h"

#include <map>
#include <memory>
#include <set>
#include <string>
#include <vector>
#include <sstream>
#include "boost/range/adaptor/reversed.hpp"


namespace edm {

Expand Down Expand Up @@ -91,6 +94,12 @@ namespace edm {
EventSetup const& eventSetup,
bool cleaningUpAfterException = false);

template <typename T>
void processOneGlobalAsync(WaitingTaskHolder holder,
typename T::MyPrincipal& principal,
EventSetup const& eventSetup,
bool cleaningUpAfterException = false);

void beginJob(ProductRegistry const&);
void endJob(ExceptionCollector & collector);

Expand Down Expand Up @@ -198,6 +207,78 @@ namespace edm {
//If we got here no other exception has happened so we can propogate any Service related exceptions
sentry.allowThrow();
}

template <typename T>
void
GlobalSchedule::processOneGlobalAsync(WaitingTaskHolder iHolder,
typename T::MyPrincipal& ep,
EventSetup const& es,
bool cleaningUpAfterException) {
ServiceToken token = ServiceRegistry::instance().presentToken();

//need the doneTask to own the memory
auto globalContext = std::make_shared<GlobalContext>(T::makeGlobalContext(ep, processContext_));

if(actReg_) {
T::preScheduleSignal(actReg_.get(), globalContext.get());
}


//If we are in an end transition, we need to reset failed items since they might
// be set this time around
if( not T::begin_) {
ep.resetFailedFromThisProcess();
}

auto doneTask = make_waiting_task(tbb::task::allocate_root(),
[this,iHolder, cleaningUpAfterException, globalContext, token](std::exception_ptr const* iPtr) mutable
{
ServiceRegistry::Operate op(token);
std::exception_ptr excpt;
if(iPtr) {
excpt = *iPtr;
//add context information to the exception and print message
try {
convertException::wrap([&]() {
std::rethrow_exception(excpt);
});
} catch(cms::Exception& ex) {
//TODO: should add the transition type info
std::ostringstream ost;
if(ex.context().empty()) {
ost<<"Processing "<<T::transitionName()<<" ";
}
addContextAndPrintException(ost.str().c_str(), ex, cleaningUpAfterException);
excpt = std::current_exception();
}
if(actReg_) {
actReg_->preGlobalEarlyTerminationSignal_(*globalContext,TerminationOrigin::ExceptionFromThisContext);
}
}
if(actReg_) {
try {
T::postScheduleSignal(actReg_.get(), globalContext.get());
} catch(...) {
if(not excpt) {
excpt = std::current_exception();
}
}
}
iHolder.doneWaiting(excpt);

});
workerManager_.resetAll();

ParentContext parentContext(globalContext.get());

//make sure the task doesn't get run until all workers have beens started
WaitingTaskHolder holdForLoop(doneTask);
for(auto& worker: boost::adaptors::reverse((allWorkers()))) {
worker->doWorkAsync<T>(doneTask,ep,es,StreamID::invalidStreamID(),parentContext,globalContext.get());
}

}

template <typename T>
void
GlobalSchedule::runNow(typename T::MyPrincipal const& p, EventSetup const& es,
Expand Down
9 changes: 8 additions & 1 deletion FWCore/Framework/src/Principal.cc
Expand Up @@ -114,7 +114,8 @@ namespace edm {
reader_(),
branchType_(bt),
historyAppender_(historyAppender),
cacheIdentifier_(nextIdentifier())
cacheIdentifier_(nextIdentifier()),
atEndTransition_(false)
{
productResolvers_.resize(reg->getNextIndexValue(bt));
//Now that these have been set, we can create the list of Branches we need.
Expand Down Expand Up @@ -305,6 +306,11 @@ namespace edm {
}
}

void
Principal::setAtEndTransition(bool iAtEnd) {
atEndTransition_ = iAtEnd;
}

void
Principal::deleteProduct(BranchID const& id) const {
auto phb = getExistingProduct(id);
Expand All @@ -319,6 +325,7 @@ namespace edm {
DelayedReader* reader) {
//increment identifier here since clearPrincipal isn't called for Run/Lumi
cacheIdentifier_=nextIdentifier();
atEndTransition_=false;
if(reader) {
reader_ = reader;
}
Expand Down

0 comments on commit 588aa71

Please sign in to comment.