Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Revert "Run modules concurrently during global begin transitions" #18504

Merged
merged 1 commit into from Apr 28, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
4 changes: 2 additions & 2 deletions FWCore/Framework/interface/EDConsumerBase.h
Expand Up @@ -67,7 +67,7 @@ namespace edm {
void itemsToGet(BranchType, std::vector<ProductResolverIndexAndSkipBit>&) const;
void itemsMayGet(BranchType, std::vector<ProductResolverIndexAndSkipBit>&) const;

std::vector<ProductResolverIndexAndSkipBit> const& itemsToGetFrom(BranchType iType) const { return itemsToGetFromBranch_[iType]; }
std::vector<ProductResolverIndexAndSkipBit> const& itemsToGetFromEvent() const { return itemsToGetFromEvent_; }

///\return true if the product corresponding to the index was registered via consumes or mayConsume call
bool registeredToConsume(ProductResolverIndex, bool, BranchType) const;
Expand Down Expand Up @@ -184,7 +184,7 @@ namespace edm {
// for each of the 3 labels needed to id the data
std::vector<char> m_tokenLabels;

std::array<std::vector<ProductResolverIndexAndSkipBit>, edm::NumBranchTypes> itemsToGetFromBranch_;
std::vector<ProductResolverIndexAndSkipBit> itemsToGetFromEvent_;

bool frozen_;
};
Expand Down
4 changes: 0 additions & 4 deletions FWCore/Framework/interface/Principal.h
Expand Up @@ -83,9 +83,6 @@ namespace edm {

void clearPrincipal();

void setAtEndTransition(bool iAtEnd);
bool atEndTransition() const {return atEndTransition_;}

void deleteProduct(BranchID const& id) const;

EDProductGetter const* prodGetter() const {return this;}
Expand Down Expand Up @@ -289,7 +286,6 @@ namespace edm {

CacheIdentifier_t cacheIdentifier_;

bool atEndTransition_;
};

template <typename PROD>
Expand Down
16 changes: 0 additions & 16 deletions FWCore/Framework/interface/Schedule.h
Expand Up @@ -143,12 +143,6 @@ namespace edm {
EventSetup const& eventSetup,
bool cleaningUpAfterException = false);

template <typename T>
void processOneGlobalAsync(WaitingTaskHolder iTask,
typename T::MyPrincipal& principal,
EventSetup const& eventSetup,
bool cleaningUpAfterException = false);

template <typename T>
void processOneStream(unsigned int iStreamID,
typename T::MyPrincipal& principal,
Expand Down Expand Up @@ -331,15 +325,5 @@ namespace edm {
bool cleaningUpAfterException) {
globalSchedule_->processOneGlobal<T>(ep,es,cleaningUpAfterException);
}

template <typename T>
void
Schedule::processOneGlobalAsync(WaitingTaskHolder iTaskHolder,
typename T::MyPrincipal& ep,
EventSetup const& es,
bool cleaningUpAfterException) {
globalSchedule_->processOneGlobalAsync<T>(iTaskHolder,ep,es,cleaningUpAfterException);
}

}
#endif
2 changes: 0 additions & 2 deletions FWCore/Framework/interface/SubProcess.h
Expand Up @@ -76,12 +76,10 @@ namespace edm {
EventPrincipal const& principal);

void doBeginRun(RunPrincipal const& principal, IOVSyncValue const& ts);
void doBeginRunAsync(WaitingTaskHolder iHolder, RunPrincipal const& principal, IOVSyncValue const& ts);

void doEndRun(RunPrincipal const& principal, IOVSyncValue const& ts, bool cleaningUpAfterException);

void doBeginLuminosityBlock(LuminosityBlockPrincipal const& principal, IOVSyncValue const& ts);
void doBeginLuminosityBlockAsync(WaitingTaskHolder iHolder, LuminosityBlockPrincipal const& principal, IOVSyncValue const& ts);

void doEndLuminosityBlock(LuminosityBlockPrincipal const& principal, IOVSyncValue const& ts, bool cleaningUpAfterException);

Expand Down
2 changes: 1 addition & 1 deletion FWCore/Framework/interface/stream/EDAnalyzerAdaptorBase.h
Expand Up @@ -86,7 +86,7 @@ namespace edm {
//Same interface as EDConsumerBase
void itemsToGet(BranchType, std::vector<ProductResolverIndexAndSkipBit>&) const;
void itemsMayGet(BranchType, std::vector<ProductResolverIndexAndSkipBit>&) const;
std::vector<ProductResolverIndexAndSkipBit> const& itemsToGetFrom(BranchType) const;
std::vector<ProductResolverIndexAndSkipBit> const& itemsToGetFromEvent() const;

void updateLookup(BranchType iBranchType,
ProductResolverIndexHelper const&,
Expand Down
Expand Up @@ -78,7 +78,7 @@ namespace edm {

void itemsToGet(BranchType, std::vector<ProductResolverIndexAndSkipBit>&) const;
void itemsMayGet(BranchType, std::vector<ProductResolverIndexAndSkipBit>&) const;
std::vector<ProductResolverIndexAndSkipBit> const& itemsToGetFrom(BranchType) const;
std::vector<ProductResolverIndexAndSkipBit> const& itemsToGetFromEvent() const;

void updateLookup(BranchType iBranchType,
ProductResolverIndexHelper const&,
Expand Down
8 changes: 5 additions & 3 deletions FWCore/Framework/src/EDConsumerBase.cc
Expand Up @@ -187,9 +187,11 @@ EDConsumerBase::updateLookup(BranchType iBranchType,
}
m_tokenInfo.shrink_to_fit();

itemsToGet(iBranchType, itemsToGetFromBranch_[iBranchType]);
if(iPrefetchMayGet) {
itemsMayGet(iBranchType, itemsToGetFromBranch_[iBranchType]);
if(iBranchType == InEvent) {
itemsToGet(iBranchType, itemsToGetFromEvent_);
if(iPrefetchMayGet) {
itemsMayGet(iBranchType, itemsToGetFromEvent_);
}
}
}

Expand Down
31 changes: 4 additions & 27 deletions FWCore/Framework/src/EventProcessor.cc
Expand Up @@ -34,7 +34,6 @@
#include "FWCore/Framework/src/InputSourceFactory.h"
#include "FWCore/Framework/src/SharedResourcesRegistry.h"
#include "FWCore/Framework/src/streamTransitionAsync.h"
#include "FWCore/Framework/src/globalTransitionAsync.h"

#include "FWCore/MessageLogger/interface/MessageLogger.h"

Expand Down Expand Up @@ -1636,18 +1635,8 @@ namespace edm {
}
{
typedef OccurrenceTraits<RunPrincipal, BranchActionGlobalBegin> Traits;
auto globalWaitTask = make_empty_waiting_task();
globalWaitTask->increment_ref_count();
beginGlobalTransitionAsync<Traits>(WaitingTaskHolder(globalWaitTask.get()),
*schedule_,
runPrincipal,
ts,
es,
subProcesses_);
globalWaitTask->wait_for_all();
if(globalWaitTask->exceptionPtr() != nullptr) {
std::rethrow_exception(* (globalWaitTask->exceptionPtr()) );
}
schedule_->processOneGlobal<Traits>(runPrincipal, es);
for_all(subProcesses_, [&runPrincipal, &ts](auto& subProcess){ subProcess.doBeginRun(runPrincipal, ts); });
}
FDEBUG(1) << "\tbeginRun " << run.runNumber() << "\n";
if(looper_) {
Expand Down Expand Up @@ -1724,7 +1713,6 @@ namespace edm {
//looper_->doStreamEndRun(schedule_->streamID(),runPrincipal, es);
}
{
runPrincipal.setAtEndTransition(true);
typedef OccurrenceTraits<RunPrincipal, BranchActionGlobalEnd> Traits;
schedule_->processOneGlobal<Traits>(runPrincipal, es, cleaningUpAfterException);
for_all(subProcesses_, [&runPrincipal, &ts, cleaningUpAfterException](auto& subProcess){subProcess.doEndRun(runPrincipal, ts, cleaningUpAfterException); });
Expand Down Expand Up @@ -1761,18 +1749,8 @@ namespace edm {
EventSetup const& es = esp_->eventSetup();
{
typedef OccurrenceTraits<LuminosityBlockPrincipal, BranchActionGlobalBegin> Traits;
auto globalWaitTask = make_empty_waiting_task();
globalWaitTask->increment_ref_count();
beginGlobalTransitionAsync<Traits>(WaitingTaskHolder(globalWaitTask.get()),
*schedule_,
lumiPrincipal,
ts,
es,
subProcesses_);
globalWaitTask->wait_for_all();
if(globalWaitTask->exceptionPtr() != nullptr) {
std::rethrow_exception(* (globalWaitTask->exceptionPtr()) );
}
schedule_->processOneGlobal<Traits>(lumiPrincipal, es);
for_all(subProcesses_, [&lumiPrincipal, &ts](auto& subProcess){ subProcess.doBeginLuminosityBlock(lumiPrincipal, ts); });
}
FDEBUG(1) << "\tbeginLumi " << run << "/" << lumi << "\n";
if(looper_) {
Expand Down Expand Up @@ -1849,7 +1827,6 @@ namespace edm {
//looper_->doStreamEndLuminosityBlock(schedule_->streamID(),lumiPrincipal, es);
}
{
lumiPrincipal.setAtEndTransition(true);
typedef OccurrenceTraits<LuminosityBlockPrincipal, BranchActionGlobalEnd> Traits;
schedule_->processOneGlobal<Traits>(lumiPrincipal, es, cleaningUpAfterException);
for_all(subProcesses_, [&lumiPrincipal, &ts, cleaningUpAfterException](auto& subProcess){ subProcess.doEndLuminosityBlock(lumiPrincipal, ts, cleaningUpAfterException); });
Expand Down
81 changes: 0 additions & 81 deletions FWCore/Framework/src/GlobalSchedule.h
Expand Up @@ -20,16 +20,13 @@
#include "FWCore/Utilities/interface/Exception.h"
#include "FWCore/Utilities/interface/StreamID.h"
#include "FWCore/Utilities/interface/propagate_const.h"
#include "FWCore/Concurrency/interface/WaitingTaskHolder.h"

#include <map>
#include <memory>
#include <set>
#include <string>
#include <vector>
#include <sstream>
#include "boost/range/adaptor/reversed.hpp"


namespace edm {

Expand Down Expand Up @@ -94,12 +91,6 @@ namespace edm {
EventSetup const& eventSetup,
bool cleaningUpAfterException = false);

template <typename T>
void processOneGlobalAsync(WaitingTaskHolder holder,
typename T::MyPrincipal& principal,
EventSetup const& eventSetup,
bool cleaningUpAfterException = false);

void beginJob(ProductRegistry const&);
void endJob(ExceptionCollector & collector);

Expand Down Expand Up @@ -207,78 +198,6 @@ namespace edm {
//If we got here no other exception has happened so we can propogate any Service related exceptions
sentry.allowThrow();
}

template <typename T>
void
GlobalSchedule::processOneGlobalAsync(WaitingTaskHolder iHolder,
typename T::MyPrincipal& ep,
EventSetup const& es,
bool cleaningUpAfterException) {
ServiceToken token = ServiceRegistry::instance().presentToken();

//need the doneTask to own the memory
auto globalContext = std::make_shared<GlobalContext>(T::makeGlobalContext(ep, processContext_));

if(actReg_) {
T::preScheduleSignal(actReg_.get(), globalContext.get());
}


//If we are in an end transition, we need to reset failed items since they might
// be set this time around
if( not T::begin_) {
ep.resetFailedFromThisProcess();
}

auto doneTask = make_waiting_task(tbb::task::allocate_root(),
[this,iHolder, cleaningUpAfterException, globalContext, token](std::exception_ptr const* iPtr) mutable
{
ServiceRegistry::Operate op(token);
std::exception_ptr excpt;
if(iPtr) {
excpt = *iPtr;
//add context information to the exception and print message
try {
convertException::wrap([&]() {
std::rethrow_exception(excpt);
});
} catch(cms::Exception& ex) {
//TODO: should add the transition type info
std::ostringstream ost;
if(ex.context().empty()) {
ost<<"Processing "<<T::transitionName()<<" ";
}
addContextAndPrintException(ost.str().c_str(), ex, cleaningUpAfterException);
excpt = std::current_exception();
}
if(actReg_) {
actReg_->preGlobalEarlyTerminationSignal_(*globalContext,TerminationOrigin::ExceptionFromThisContext);
}
}
if(actReg_) {
try {
T::postScheduleSignal(actReg_.get(), globalContext.get());
} catch(...) {
if(not excpt) {
excpt = std::current_exception();
}
}
}
iHolder.doneWaiting(excpt);

});
workerManager_.resetAll();

ParentContext parentContext(globalContext.get());

//make sure the task doesn't get run until all workers have beens started
WaitingTaskHolder holdForLoop(doneTask);
for(auto& worker: boost::adaptors::reverse((allWorkers()))) {
worker->doWorkAsync<T>(doneTask,ep,es,StreamID::invalidStreamID(),parentContext,globalContext.get());
}

}

template <typename T>
void
GlobalSchedule::runNow(typename T::MyPrincipal const& p, EventSetup const& es,
Expand Down
9 changes: 1 addition & 8 deletions FWCore/Framework/src/Principal.cc
Expand Up @@ -114,8 +114,7 @@ namespace edm {
reader_(),
branchType_(bt),
historyAppender_(historyAppender),
cacheIdentifier_(nextIdentifier()),
atEndTransition_(false)
cacheIdentifier_(nextIdentifier())
{
productResolvers_.resize(reg->getNextIndexValue(bt));
//Now that these have been set, we can create the list of Branches we need.
Expand Down Expand Up @@ -306,11 +305,6 @@ namespace edm {
}
}

void
Principal::setAtEndTransition(bool iAtEnd) {
atEndTransition_ = iAtEnd;
}

void
Principal::deleteProduct(BranchID const& id) const {
auto phb = getExistingProduct(id);
Expand All @@ -325,7 +319,6 @@ namespace edm {
DelayedReader* reader) {
//increment identifier here since clearPrincipal isn't called for Run/Lumi
cacheIdentifier_=nextIdentifier();
atEndTransition_=false;
if(reader) {
reader_ = reader;
}
Expand Down