Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Concurrent Path Processing #15882

Merged
merged 14 commits into from Sep 21, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
4 changes: 4 additions & 0 deletions DataFormats/Provenance/interface/ProductResolverIndexHelper.h
Expand Up @@ -61,6 +61,7 @@ ProductRegistry is frozen.
#include <set>
#include <string>
#include <vector>
#include <unordered_map>

namespace edm {

Expand Down Expand Up @@ -118,6 +119,9 @@ namespace edm {
char const* moduleLabel,
char const* instance,
char const* process = 0) const;

using ModulesToIndiciesMap =std::unordered_multimap<std::string,ProductResolverIndex>;
ModulesToIndiciesMap indiciesForModulesInProcess( const std::string& iProcessName ) const;

class Matches {
public:
Expand Down
18 changes: 18 additions & 0 deletions DataFormats/Provenance/src/ProductResolverIndexHelper.cc
Expand Up @@ -581,6 +581,24 @@ namespace edm {
return 0;
}

ProductResolverIndexHelper::ModulesToIndiciesMap
ProductResolverIndexHelper::indiciesForModulesInProcess(const std::string& iProcessName) const {

ModulesToIndiciesMap result;
for(unsigned int i=0; i<beginElements_; ++i) {
auto const& range= ranges_[i];
for(unsigned int j=range.begin(); j<range.end();++j) {
auto const& indexAndNames = indexAndNames_[j];
if(0 == strcmp(&processNames_[indexAndNames.startInProcessNames()], iProcessName.c_str())) {
//The first null terminated string is the module label
result.emplace(&bigNamesContainer_[indexAndNames.startInBigNamesContainer()],indexAndNames.index());
}
}
}
return result;
}


void ProductResolverIndexHelper::sanityCheck() const {
bool sanityChecksPass = true;
if (sortedTypeIDs_.size() != ranges_.size()) sanityChecksPass = false;
Expand Down
Expand Up @@ -125,6 +125,20 @@ void TestProductResolverIndexHelper::testOneEntry() {

matches = helper.relatedIndexes(ELEMENT_TYPE, typeID_ProductID, "labelA", "instanceA");
CPPUNIT_ASSERT(matches.numberOfMatches() == 0);

{
auto indexToModules = helper.indiciesForModulesInProcess("processA");
CPPUNIT_ASSERT(indexToModules.size() == 1);
CPPUNIT_ASSERT(indexToModules.count("labelA") == 1);
auto const& range = indexToModules.equal_range("labelA");
CPPUNIT_ASSERT( range.first->second == indexWithProcess);
}

{
auto indexToModules = helper.indiciesForModulesInProcess("processNotHere");
CPPUNIT_ASSERT(indexToModules.size() == 0);
}

}

void TestProductResolverIndexHelper::testManyEntries() {
Expand Down Expand Up @@ -214,4 +228,31 @@ void TestProductResolverIndexHelper::testManyEntries() {
ProductResolverIndex indexC = matches.index(1);
CPPUNIT_ASSERT_THROW(matches.index(2), cms::Exception);
CPPUNIT_ASSERT(indexC == 27);

{
auto indexToModules = helper.indiciesForModulesInProcess("processA");
CPPUNIT_ASSERT(indexToModules.size() == 1);
}
{
auto indexToModules = helper.indiciesForModulesInProcess("processB");
CPPUNIT_ASSERT(indexToModules.size() == 5);

}
{
auto indexToModules = helper.indiciesForModulesInProcess("processB1");
CPPUNIT_ASSERT(indexToModules.size() == 1);
}
{
auto indexToModules = helper.indiciesForModulesInProcess("processB2");
CPPUNIT_ASSERT(indexToModules.size() == 1);
}
{
auto indexToModules = helper.indiciesForModulesInProcess("processB3");
CPPUNIT_ASSERT(indexToModules.size() == 1);
}
{
auto indexToModules = helper.indiciesForModulesInProcess("processC");
CPPUNIT_ASSERT(indexToModules.size() == 3);
}

}
19 changes: 19 additions & 0 deletions FWCore/Concurrency/interface/WaitingTask.h
Expand Up @@ -69,6 +69,25 @@ namespace edm {

std::atomic<std::exception_ptr*> m_ptr;
};

template<typename F>
class FunctorWaitingTask : public WaitingTask {
public:
explicit FunctorWaitingTask( F f): func_(f) {}

task* execute() override {
func_(exceptionPtr());
return nullptr;
};

private:
F func_;
};

template< typename ALLOC, typename F>
FunctorWaitingTask<F>* make_waiting_task( ALLOC&& iAlloc, F f) {
return new (iAlloc) FunctorWaitingTask<F>(f);
}

}

Expand Down
2 changes: 1 addition & 1 deletion FWCore/Framework/interface/Event.h
Expand Up @@ -256,7 +256,7 @@ namespace edm {
friend class ProducerBase;
template<typename T> friend class stream::ProducingModuleAdaptorBase;

void commit_(std::vector<BranchID>* previousParentage= 0, ParentageID* previousParentageId = 0);
void commit_(std::vector<edm::ProductResolverIndex> const& iShouldPut, std::vector<BranchID>* previousParentage= 0, ParentageID* previousParentageId = 0);
void commit_aux(ProductPtrVec& products, bool record_parents, std::vector<BranchID>* previousParentage = 0, ParentageID* previousParentageId = 0);

BasicHandle
Expand Down
2 changes: 1 addition & 1 deletion FWCore/Framework/interface/LuminosityBlock.h
Expand Up @@ -149,7 +149,7 @@ namespace edm {
template<typename T> friend class stream::ProducingModuleAdaptorBase;


void commit_();
void commit_(std::vector<edm::ProductResolverIndex> const& iShouldPut);

PrincipalGetAdapter provRecorder_;
ProductPtrVec putProducts_;
Expand Down
4 changes: 4 additions & 0 deletions FWCore/Framework/interface/Principal.h
Expand Up @@ -182,6 +182,10 @@ namespace edm {
ProductData const* findProductByTag(TypeID const& typeID, InputTag const& tag, ModuleCallingContext const* mcc) const;

void readAllFromSourceAndMergeImmediately();
//For end Run/Lumi we need to reset products failed in the begin
// transition since they may be put into the Principal at the
// end transition
void resetFailedFromThisProcess();

std::vector<unsigned int> const& lookupProcessOrder() const { return lookupProcessOrder_; }

Expand Down
34 changes: 31 additions & 3 deletions FWCore/Framework/interface/ProducerBase.h
Expand Up @@ -9,13 +9,20 @@ EDProducts into an Event.
----------------------------------------------------------------------*/

#include "FWCore/Framework/interface/ProductRegistryHelper.h"
#include "FWCore/Utilities/interface/ProductResolverIndex.h"

#include <functional>
#include <unordered_map>
#include <string>
#include<vector>

namespace edm {
class BranchDescription;
class ModuleDescription;
class ProductRegistry;
class Event;
class LuminosityBlock;
class Run;

class EDProducer;
class EDFilter;
Expand All @@ -31,6 +38,19 @@ namespace edm {
template<typename T> class ProducingModuleAdaptorBase;
}

namespace producerbasehelper{
template<typename P> struct PrincipalTraits;
template<> struct PrincipalTraits<Run> {
static constexpr int kBranchType = InRun;
};
template<> struct PrincipalTraits<LuminosityBlock> {
static constexpr int kBranchType = InLumi;
};
template<> struct PrincipalTraits<Event> {
static constexpr int kBranchType = InEvent;
};
}

class ProducerBase : private ProductRegistryHelper {
public:
typedef ProductRegistryHelper::TypeLabelList TypeLabelList;
Expand All @@ -50,7 +70,14 @@ namespace edm {
void callWhenNewProductsRegistered(std::function<void(BranchDescription const&)> const& func) {
callWhenNewProductsRegistered_ = func;
}


void resolvePutIndicies(BranchType iBranchType,
std::unordered_multimap<std::string, edm::ProductResolverIndex> const& iIndicies,
std::string const& moduleLabel);

std::vector<edm::ProductResolverIndex> const& indiciesForPutProducts(BranchType iBranchType) const {
return putIndicies_[iBranchType];
}
private:
friend class EDProducer;
friend class EDFilter;
Expand All @@ -62,15 +89,16 @@ namespace edm {

template< typename P>
void commit_(P& iPrincipal) {
iPrincipal.commit_();
iPrincipal.commit_(putIndicies_[producerbasehelper::PrincipalTraits<P>::kBranchType]);
}

template< typename P, typename L, typename I>
void commit_(P& iPrincipal, L* iList, I* iID) {
iPrincipal.commit_(iList,iID);
iPrincipal.commit_(putIndicies_[producerbasehelper::PrincipalTraits<P>::kBranchType], iList,iID);
}

std::function<void(BranchDescription const&)> callWhenNewProductsRegistered_;
std::array<std::vector<edm::ProductResolverIndex>, edm::NumBranchTypes> putIndicies_;
};
}
#endif
2 changes: 2 additions & 0 deletions FWCore/Framework/interface/ProductResolverBase.h
Expand Up @@ -76,6 +76,8 @@ namespace edm {
}
void resetProductData() { resetProductData_(false); }

virtual void resetFailedFromThisProcess();

void unsafe_deleteProduct() const {
const_cast<ProductResolverBase*>(this)->resetProductData_(true);
}
Expand Down
2 changes: 1 addition & 1 deletion FWCore/Framework/interface/Run.h
Expand Up @@ -160,7 +160,7 @@ namespace edm {
friend class ProducerBase;
template<typename T> friend class stream::ProducingModuleAdaptorBase;

void commit_();
void commit_(std::vector<edm::ProductResolverIndex> const& iShouldPut);

PrincipalGetAdapter provRecorder_;
ProductPtrVec putProducts_;
Expand Down
12 changes: 1 addition & 11 deletions FWCore/Framework/interface/Schedule.h
Expand Up @@ -129,9 +129,8 @@ namespace edm {
PreallocationConfiguration const& config,
ProcessContext const* processContext);

template <typename T>
void processOneEvent(unsigned int iStreamID,
typename T::MyPrincipal& principal,
EventPrincipal& principal,
EventSetup const& eventSetup,
bool cleaningUpAfterException = false);

Expand Down Expand Up @@ -287,15 +286,6 @@ namespace edm {
};


template <typename T>
void Schedule::processOneEvent(unsigned int iStreamID,
typename T::MyPrincipal& ep,
EventSetup const& es,
bool cleaningUpAfterException) {
assert(iStreamID<streamSchedules_.size());
streamSchedules_[iStreamID]->processOneEvent<T>(ep,es,cleaningUpAfterException);
}

template <typename T>
void Schedule::processOneStream(unsigned int iStreamID,
typename T::MyPrincipal& ep,
Expand Down
13 changes: 10 additions & 3 deletions FWCore/Framework/interface/stream/ProducingModuleAdaptorBase.h
Expand Up @@ -22,6 +22,7 @@
#include <map>
#include <string>
#include <vector>
#include <unordered_map>

// user include files
#include "DataFormats/Provenance/interface/BranchType.h"
Expand Down Expand Up @@ -90,6 +91,12 @@ namespace edm {

std::vector<ConsumesInfo> consumesInfo() const;

void resolvePutIndicies(BranchType iBranchType,
std::unordered_multimap<std::string, edm::ProductResolverIndex> const& iIndicies,
std::string const& moduleLabel);

std::vector<edm::ProductResolverIndex> const& indiciesForPutProducts(BranchType iBranchType) const;

protected:
template<typename F> void createStreamModules(F iFunc) {
for(auto& m: m_streamModules) {
Expand All @@ -99,14 +106,14 @@ namespace edm {
}

void commit(Run& iRun) {
iRun.commit_();
iRun.commit_(m_streamModules[0]->indiciesForPutProducts(InRun));
}
void commit(LuminosityBlock& iLumi) {
iLumi.commit_();
iLumi.commit_(m_streamModules[0]->indiciesForPutProducts(InLumi));
}
template<typename L, typename I>
void commit(Event& iEvent, L* iList, I* iID) {
iEvent.commit_(iList,iID);
iEvent.commit_(m_streamModules[0]->indiciesForPutProducts(InEvent), iList,iID);
}

const EDConsumerBase* consumer() {
Expand Down
15 changes: 14 additions & 1 deletion FWCore/Framework/src/Event.cc
Expand Up @@ -115,9 +115,22 @@ namespace edm {
}

void
Event::commit_(std::vector<BranchID>* previousParentage, ParentageID* previousParentageId) {
Event::commit_(std::vector<edm::ProductResolverIndex> const& iShouldPut,
std::vector<BranchID>* previousParentage, ParentageID* previousParentageId) {
auto nPut = putProducts().size()+putProductsWithoutParents().size();
commit_aux(putProducts(), true, previousParentage, previousParentageId);
commit_aux(putProductsWithoutParents(), false);
auto sz = iShouldPut.size();
if(sz !=0 and sz != nPut) {
//some were missed
auto& p = provRecorder_.principal();
for(auto index: iShouldPut){
auto resolver = p.getProductResolverByIndex(index);
if(not resolver->productResolved()) {
resolver->putProduct(std::unique_ptr<WrapperBase>());
}
}
}
}

void
Expand Down
3 changes: 1 addition & 2 deletions FWCore/Framework/src/EventProcessor.cc
Expand Up @@ -2104,8 +2104,7 @@ namespace edm {
//espController_->eventSetupForInstance(ts);
EventSetup const& es = esp_->eventSetup();
{
typedef OccurrenceTraits<EventPrincipal, BranchActionStreamBegin> Traits;
schedule_->processOneEvent<Traits>(iStreamIndex,*pep, es);
schedule_->processOneEvent(iStreamIndex,*pep, es);
if(hasSubProcesses()) {
for(auto& subProcess : *subProcesses_) {
subProcess.doEvent(*pep);
Expand Down
5 changes: 5 additions & 0 deletions FWCore/Framework/src/GlobalSchedule.h
Expand Up @@ -172,6 +172,11 @@ namespace edm {

SendTerminationSignalIfException terminationSentry(actReg_.get(), &globalContext);

//If we are in an end transition, we need to reset failed items since they might
// be set this time around
if( not T::begin_) {
ep.resetFailedFromThisProcess();
}
// This call takes care of the unscheduled processing.
workerManager_.processOneOccurrence<T>(ep, es, StreamID::invalidStreamID(), &globalContext, &globalContext, cleaningUpAfterException);

Expand Down
8 changes: 4 additions & 4 deletions FWCore/Framework/src/InputSource.cc
Expand Up @@ -479,28 +479,28 @@ namespace edm {
InputSource::doBeginRun(RunPrincipal& rp, ProcessContext const* ) {
Run run(rp, moduleDescription(), nullptr);
callWithTryCatchAndPrint<void>( [this,&run](){ beginRun(run); }, "Calling InputSource::beginRun" );
run.commit_();
run.commit_(std::vector<edm::ProductResolverIndex>());
}

void
InputSource::doEndRun(RunPrincipal& rp, bool cleaningUpAfterException, ProcessContext const* ) {
Run run(rp, moduleDescription(), nullptr);
callWithTryCatchAndPrint<void>( [this,&run](){ endRun(run); }, "Calling InputSource::endRun", cleaningUpAfterException );
run.commit_();
run.commit_(std::vector<edm::ProductResolverIndex>());
}

void
InputSource::doBeginLumi(LuminosityBlockPrincipal& lbp, ProcessContext const* ) {
LuminosityBlock lb(lbp, moduleDescription(), nullptr);
callWithTryCatchAndPrint<void>( [this,&lb](){ beginLuminosityBlock(lb); }, "Calling InputSource::beginLuminosityBlock" );
lb.commit_();
lb.commit_(std::vector<edm::ProductResolverIndex>());
}

void
InputSource::doEndLumi(LuminosityBlockPrincipal& lbp, bool cleaningUpAfterException, ProcessContext const* ) {
LuminosityBlock lb(lbp, moduleDescription(), nullptr);
callWithTryCatchAndPrint<void>( [this,&lb](){ endLuminosityBlock(lb); }, "Calling InputSource::endLuminosityBlock", cleaningUpAfterException );
lb.commit_();
lb.commit_(std::vector<edm::ProductResolverIndex>());
}

void
Expand Down