Skip to content

Commit

Permalink
Merge pull request #25439 from makortel/switchProducer
Browse files Browse the repository at this point in the history
Add SwitchProducer mechanism to allow runtime decision which algorithm implementation to run
  • Loading branch information
cmsbuild committed Jan 10, 2019
2 parents 9194783 + 3cfce3d commit 308403e
Show file tree
Hide file tree
Showing 25 changed files with 1,547 additions and 15 deletions.
6 changes: 3 additions & 3 deletions DataFormats/Common/interface/ProductData.h
Expand Up @@ -32,8 +32,7 @@ namespace edm {
Provenance const& provenance() const { return prov_;}

WrapperBase const* wrapper() const { return wrapper_.get();}
WrapperBase* wrapper() { return wrapper_.get(); }
WrapperBase* unsafe_wrapper() const { return wrapper_.get(); }
WrapperBase* unsafe_wrapper() const { return const_cast<WrapperBase*>(wrapper_.get()); }
std::shared_ptr<WrapperBase const> sharedConstWrapper() const {
return wrapper_;
}
Expand All @@ -47,6 +46,7 @@ namespace edm {

//Not const thread-safe update
void unsafe_setWrapper(std::unique_ptr<WrapperBase> iValue) const;
void unsafe_setWrapper(std::shared_ptr<WrapperBase const> iValue) const; // for SwitchProducer

void resetBranchDescription(std::shared_ptr<BranchDescription const> bd);

Expand Down Expand Up @@ -78,7 +78,7 @@ namespace edm {

private:
// "non-const data" (updated every event)
mutable std::shared_ptr<WrapperBase> wrapper_;
mutable std::shared_ptr<WrapperBase const> wrapper_;
Provenance prov_;
};

Expand Down
4 changes: 4 additions & 0 deletions DataFormats/Common/src/ProductData.cc
Expand Up @@ -38,4 +38,8 @@ namespace edm {
void ProductData::unsafe_setWrapper(std::unique_ptr<WrapperBase> iValue) const {
wrapper_ = std::move(iValue);
}

void ProductData::unsafe_setWrapper(std::shared_ptr<WrapperBase const> iValue) const {
wrapper_ = std::move(iValue);
}
}
16 changes: 16 additions & 0 deletions DataFormats/Provenance/interface/BranchDescription.h
Expand Up @@ -101,6 +101,14 @@ namespace edm {
int basketSize() const {return transient_.basketSize_;}
void setBasketSize(int size) {transient_.basketSize_ = size;}

bool isSwitchAlias() const {return not transient_.switchAliasModuleLabel_.empty();}
std::string const& switchAliasModuleLabel() const { return transient_.switchAliasModuleLabel_; }
void setSwitchAliasModuleLabel(std::string label) {transient_.switchAliasModuleLabel_ = std::move(label);}
BranchID const& switchAliasForBranchID() const {return transient_.switchAliasForBranchID_;}
void setSwitchAliasForBranch(BranchDescription const& aliasForBranch);

bool isAnyAlias() const {return isAlias() or isSwitchAlias();}

ParameterSetID const& parameterSetID() const {return transient_.parameterSetID_;}
std::string const& moduleName() const {return transient_.moduleName_;}

Expand Down Expand Up @@ -140,6 +148,14 @@ namespace edm {
// The wrapped class name, which is currently derivable fron the other attributes.
std::string wrappedName_;

// For SwitchProducer alias, the label of the aliased-for label; otherwise empty
std::string switchAliasModuleLabel_;

// Need a separate (transient) BranchID for switch, because
// otherwise originalBranchID() gives wrong answer when reading
// from a file (leading to wrong ProductProvenance to be retrieved)
BranchID switchAliasForBranchID_;

// A TypeWithDict object for the wrapped object
TypeWithDict wrappedType_;

Expand Down
21 changes: 21 additions & 0 deletions DataFormats/Provenance/src/BranchDescription.cc
Expand Up @@ -237,6 +237,27 @@ namespace edm {
if(basketSize() == invalidBasketSize) setBasketSize(other.basketSize());
}

void
BranchDescription::setSwitchAliasForBranch(BranchDescription const& aliasForBranch) {
if(branchType_ != aliasForBranch.branchType()) {
throw Exception(errors::LogicError) << "BranchDescription::setSwitchAliasForBranch: branchType ("
<< branchType_ << ") differs from aliasForBranch ("
<< aliasForBranch.branchType() << ").\nPlease report this error to the FWCore developers";
}
if(produced() != aliasForBranch.produced()) {
throw Exception(errors::LogicError) << "BranchDescription::setSwitchAliasForBranch: produced differs from aliasForBranch.\nPlease report this error to the FWCore developers";
}
if(unwrappedTypeID().typeInfo() != aliasForBranch.unwrappedType().typeInfo()) {
throw Exception(errors::LogicError) << "BranchDescription::setSwitchAliasForBranch: unwrapped type info ("
<< unwrappedTypeID().name() << ") differs from aliasForBranch ("
<< aliasForBranch.unwrappedType().typeInfo().name() << ").\nPlease report this error to the FWCore developers";
}

branchAliases_ = aliasForBranch.branchAliases();
transient_.switchAliasForBranchID_ = aliasForBranch.branchID();
transient_.availableOnlyAtEndTransition_ = aliasForBranch.availableOnlyAtEndTransition();
}

void
BranchDescription::write(std::ostream& os) const {
os << "Branch Type = " << branchType() << std::endl;
Expand Down
2 changes: 2 additions & 0 deletions FWCore/Framework/interface/Principal.h
Expand Up @@ -220,6 +220,8 @@ namespace edm {
void addInputProduct(std::shared_ptr<BranchDescription const> bd);
void addUnscheduledProduct(std::shared_ptr<BranchDescription const> bd);
void addAliasedProduct(std::shared_ptr<BranchDescription const> bd);
void addSwitchProducerProduct(std::shared_ptr<BranchDescription const> bd);
void addSwitchAliasProduct(std::shared_ptr<BranchDescription const> bd);
void addParentProcessProduct(std::shared_ptr<BranchDescription const> bd);


Expand Down
11 changes: 10 additions & 1 deletion FWCore/Framework/interface/ProductRegistryHelper.h
Expand Up @@ -34,15 +34,19 @@ namespace edm {
};

struct TypeLabelItem {
enum class AliasType { kBranchAlias, kSwitchAlias };

TypeLabelItem (Transition const& transition, TypeID const& tid, std::string pin) :
transition_(transition),
typeID_(tid),
productInstanceName_(std::move(pin)),
branchAlias_() {}
branchAlias_(),
aliasType_(AliasType::kBranchAlias) {}
Transition transition_;
TypeID typeID_;
std::string productInstanceName_;
std::string branchAlias_;
AliasType aliasType_;
};

struct BranchAliasSetter {
Expand All @@ -53,6 +57,11 @@ namespace edm {
value_.branchAlias_ = std::move(alias);
return *this;
}
BranchAliasSetter& setSwitchAlias(std::string moduleLabel) {
value_.branchAlias_ = std::move(moduleLabel);
value_.aliasType_ = TypeLabelItem::AliasType::kSwitchAlias;
return *this;
}
TypeLabelItem& value_;
EDPutToken token_;

Expand Down
44 changes: 38 additions & 6 deletions FWCore/Framework/src/Principal.cc
Expand Up @@ -143,7 +143,7 @@ namespace edm {
BranchDescription const& bd = prod.second;
if(bd.branchType() == branchType_) {
if(isForPrimaryProcess or bd.processName() == pc.processName()) {
if(bd.isAlias()) {
if(bd.isAnyAlias()) {
hasAliases = true;
} else {
auto cbd = std::make_shared<BranchDescription const>(bd);
Expand Down Expand Up @@ -171,9 +171,25 @@ namespace edm {
if(hasAliases) {
for(auto const& prod : prodsList) {
BranchDescription const& bd = prod.second;
if(bd.isAlias() && bd.branchType() == branchType_) {
if(bd.isAnyAlias() && bd.branchType() == branchType_) {
auto cbd = std::make_shared<BranchDescription const>(bd);
addAliasedProduct(cbd);
if(bd.isSwitchAlias()) {
assert(branchType_ == InEvent);
// Need different implementation for SwitchProducers not
// in any Path (onDemand) and for those in a Path in order
// to prevent the switch-aliased-for EDProducers from
// being run when the SwitchProducer is in a Path after a
// failing EDFilter.
if(bd.onDemand()) {
addSwitchAliasProduct(cbd);
}
else {
addSwitchProducerProduct(cbd);
}
}
else {
addAliasedProduct(cbd);
}
}
}
}
Expand Down Expand Up @@ -330,6 +346,22 @@ namespace edm {
addProductOrThrow(std::make_unique<AliasProductResolver>(std::move(bd), dynamic_cast<ProducedProductResolver&>(*productResolvers_[index])));
}

void
Principal::addSwitchProducerProduct(std::shared_ptr<BranchDescription const> bd) {
ProductResolverIndex index = preg_->indexFrom(bd->switchAliasForBranchID());
assert(index != ProductResolverIndexInvalid);

addProductOrThrow(std::make_unique<SwitchProducerProductResolver>(std::move(bd), dynamic_cast<ProducedProductResolver&>(*productResolvers_[index])));
}

void
Principal::addSwitchAliasProduct(std::shared_ptr<BranchDescription const> bd) {
ProductResolverIndex index = preg_->indexFrom(bd->switchAliasForBranchID());
assert(index != ProductResolverIndexInvalid);

addProductOrThrow(std::make_unique<SwitchAliasProductResolver>(std::move(bd), dynamic_cast<ProducedProductResolver&>(*productResolvers_[index])));
}

void
Principal::addParentProcessProduct(std::shared_ptr<BranchDescription const> bd) {
addProductOrThrow(std::make_unique<ParentProcessProductResolver>(std::move(bd)));
Expand Down Expand Up @@ -681,7 +713,7 @@ namespace edm {
if (process == bd.processName()) {

// Ignore aliases to avoid matching the same product multiple times.
if(bd.isAlias()) {
if(bd.isAnyAlias()) {
continue;
}

Expand Down Expand Up @@ -816,7 +848,7 @@ namespace edm {
Principal::getAllProvenance(std::vector<Provenance const*>& provenances) const {
provenances.clear();
for(auto const& productResolver : *this) {
if(productResolver->singleProduct() && productResolver->provenanceAvailable() && !productResolver->branchDescription().isAlias()) {
if(productResolver->singleProduct() && productResolver->provenanceAvailable() && !productResolver->branchDescription().isAnyAlias()) {
// We do not attempt to get the event/lumi/run status from the provenance,
// because the per event provenance may have been dropped.
if(productResolver->provenance()->branchDescription().present()) {
Expand All @@ -833,7 +865,7 @@ namespace edm {
Principal::getAllStableProvenance(std::vector<StableProvenance const*>& provenances) const {
provenances.clear();
for(auto const& productResolver : *this) {
if(productResolver->singleProduct() && !productResolver->branchDescription().isAlias()) {
if(productResolver->singleProduct() && !productResolver->branchDescription().isAnyAlias()) {
if(productResolver->stableProvenance()->branchDescription().present()) {
provenances.push_back(productResolver->stableProvenance());
}
Expand Down
7 changes: 7 additions & 0 deletions FWCore/Framework/src/ProductRegistryHelper.cc
Expand Up @@ -89,6 +89,13 @@ namespace edm {
type,
true,
isEndTransition(p->transition_));
if(p->aliasType_ == TypeLabelItem::AliasType::kSwitchAlias) {
if(p->branchAlias_.empty()) {
throw edm::Exception(edm::errors::LogicError) << "Branch alias type has been set to SwitchAlias, but the alias content is empty.\n"
<< "Please report this error to the FWCore developers";
}
pdesc.setSwitchAliasModuleLabel(p->branchAlias_);
}
setIsMergeable(pdesc);

if (pdesc.transient()) {
Expand Down
142 changes: 142 additions & 0 deletions FWCore/Framework/src/ProductResolvers.cc
Expand Up @@ -11,6 +11,7 @@
#include "FWCore/Framework/interface/DelayedReader.h"
#include "DataFormats/Provenance/interface/ProductProvenanceRetriever.h"
#include "DataFormats/Provenance/interface/BranchKey.h"
#include "DataFormats/Provenance/interface/ParentageRegistry.h"
#include "FWCore/MessageLogger/interface/MessageLogger.h"
#include "FWCore/Concurrency/interface/SerialTaskQueue.h"
#include "FWCore/Concurrency/interface/FunctorTask.h"
Expand Down Expand Up @@ -655,6 +656,147 @@ namespace edm {
}


SwitchBaseProductResolver::SwitchBaseProductResolver(std::shared_ptr<BranchDescription const> bd, ProducedProductResolver& realProduct):
realProduct_(realProduct),
productData_(std::move(bd)),
prefetchRequested_(false),
status_(defaultStatus_)
{
// Parentage of this branch is always the same by construction, so we can compute the ID just "once" here.
Parentage p;
p.setParents(std::vector<BranchID>{realProduct.branchDescription().branchID()});
parentageID_ = p.id();
ParentageRegistry::instance()->insertMapped(p);
}

void SwitchBaseProductResolver::connectTo(ProductResolverBase const& iOther, Principal const *iParentPrincipal) {
throw Exception(errors::LogicError)
<< "SwitchBaseProductResolver::connectTo() not implemented and should never be called.\n"
<< "Contact a Framework developer\n";
}

void SwitchBaseProductResolver::setupUnscheduled(UnscheduledConfigurator const& iConfigure) {
worker_ = iConfigure.findWorker(branchDescription().moduleLabel());
}

ProductResolverBase::Resolution
SwitchBaseProductResolver::resolveProductImpl(Resolution res) const {
if(res.data() == nullptr)
return res;
// Use the Wrapper of the pointed-to resolver, but the provenance of this resolver
productData_.unsafe_setWrapper(res.data()->sharedConstWrapper());
return Resolution(&productData_);
}

bool SwitchBaseProductResolver::productResolved_() const {
// SwitchProducer will never put anything in the event, and
// "false" will make Event::commit_() to call putProduct() with
// null unique_ptr<WrapperBase> to signal that the produce() was
// run.
return false;
}

void SwitchBaseProductResolver::putProduct_(std::unique_ptr<WrapperBase> edp) const {
if(status_ != defaultStatus_) {
throw Exception(errors::InsertFailure) << "Attempt to insert more than one product for a branch " << branchDescription().branchName() << "This makes no sense for SwitchBaseProductResolver.\nContact a Framework developer";
}
// Let's use ResolveFailed to signal that produce() was called, as
// there is no real product in this resolver
status_ = ProductStatus::ResolveFailed;
bool expected = false;
if(prefetchRequested_.compare_exchange_strong(expected, true)) {
waitingTasks_.doneWaiting(std::exception_ptr());
}
}

void SwitchBaseProductResolver::putOrMergeProduct_(std::unique_ptr<WrapperBase> edp, MergeableRunProductMetadata const*) const {
throw Exception(errors::LogicError)
<< "SwitchBaseProductResolver::putOrMergeProduct_(std::unique_ptr<WrapperBase> edp, MergeableRunProductMetadata const*) not implemented and should never be called.\n"
<< "Contact a Framework developer\n";
}

void SwitchBaseProductResolver::setProvenance_(ProductProvenanceRetriever const* provRetriever, ProcessHistory const& ph, ProductID const& pid) {
// insertIntoSet is const, so let's exploit that to fake the getting of the "input" product
provRetriever->insertIntoSet(ProductProvenance(branchDescription().branchID(), parentageID_));
productData_.setProvenance(provRetriever,ph,pid);
}

void SwitchBaseProductResolver::resetProductData_(bool deleteEarly) {
productData_.resetProductData();
realProduct_.resetProductData_(deleteEarly);
if(not deleteEarly) {
status_ = defaultStatus_;
}
}

ProductResolverBase::Resolution
SwitchProducerProductResolver::resolveProduct_(Principal const& principal,
bool skipCurrentProcess,
SharedResourcesAcquirer* sra,
ModuleCallingContext const* mcc) const {
if(status() == ProductStatus::ResolveFailed) {
return resolveProductImpl(realProduct().resolveProduct(principal, skipCurrentProcess, sra, mcc));
}
return Resolution(nullptr);
}

void SwitchProducerProductResolver::prefetchAsync_(WaitingTask* waitTask,
Principal const& principal,
bool skipCurrentProcess,
ServiceToken const& token,
SharedResourcesAcquirer* sra,
ModuleCallingContext const* mcc) const {
if(skipCurrentProcess) { return; }
if(branchDescription().availableOnlyAtEndTransition() and mcc and not mcc->parent().isAtEndTransition()) {
return;
}
waitingTasks().add(waitTask);

bool expected = false;
if(prefetchRequested().compare_exchange_strong(expected, true)) {
//using a waiting task to do a callback guarantees that
// the waitingTasks() list will be released from waiting even
// if the module does not put this data product or the
// module has an exception while running
auto waiting = make_waiting_task(tbb::task::allocate_root(),
[this](std::exception_ptr const *iException) {
if(nullptr != iException) {
waitingTasks().doneWaiting(*iException);
}
else {
waitingTasks().doneWaiting(std::exception_ptr());
}
});
worker()->callWhenDoneAsync(waiting);
}
}

bool SwitchProducerProductResolver::productUnavailable_() const {
// if produce() was run (ResolveFailed), ask from the real resolver
if(status() == ProductStatus::ResolveFailed) {
return realProduct().productUnavailable();
}
return true;
}

ProductResolverBase::Resolution
SwitchAliasProductResolver::resolveProduct_(Principal const& principal,
bool skipCurrentProcess,
SharedResourcesAcquirer* sra,
ModuleCallingContext const* mcc) const {
return resolveProductImpl(realProduct().resolveProduct(principal, skipCurrentProcess, sra, mcc));
}

void SwitchAliasProductResolver::prefetchAsync_(WaitingTask* waitTask,
Principal const& principal,
bool skipCurrentProcess,
ServiceToken const& token,
SharedResourcesAcquirer* sra,
ModuleCallingContext const* mcc) const {
if(skipCurrentProcess) { return; }
realProduct().prefetchAsync(waitTask, principal, skipCurrentProcess, token, sra, mcc);
}


void ParentProcessProductResolver::setProvenance_(ProductProvenanceRetriever const* provRetriever, ProcessHistory const& ph, ProductID const& pid) {
provRetriever_ = provRetriever;
Expand Down

0 comments on commit 308403e

Please sign in to comment.