Skip to content

Commit

Permalink
Merge pull request #36864 from Dr15Jones/workingRefInRepeatingCachedR…
Browse files Browse the repository at this point in the history
…ootSource

RepeatingCachedRootSource now supports Refs
  • Loading branch information
cmsbuild committed Feb 3, 2022
2 parents c357bac + 3c09a67 commit 0da411f
Show file tree
Hide file tree
Showing 4 changed files with 87 additions and 11 deletions.
80 changes: 75 additions & 5 deletions IOPool/Input/src/RepeatingCachedRootSource.cc
Expand Up @@ -21,6 +21,7 @@
#include "DataFormats/Provenance/interface/ProcessConfiguration.h"
#include "DataFormats/Provenance/interface/ThinnedAssociationsHelper.h"
#include "DataFormats/Common/interface/WrapperBase.h"
#include "DataFormats/Common/interface/EDProductGetter.h"

#include "IOPool/Common/interface/RootServiceChecker.h"

Expand All @@ -44,7 +45,6 @@

namespace edm {
class RunHelperBase;
class RCDelayedReader;

class RepeatingCachedRootSource : public InputSource {
public:
Expand All @@ -56,6 +56,40 @@ namespace edm {
BranchID const& k,
EDProductGetter const* ep) const;

class RCProductGetter : public EDProductGetter {
public:
RCProductGetter(RCProductGetter const& iOther) : map_(iOther.map_), wrappers_(iOther.wrappers_) {}

RCProductGetter const& operator=(RCProductGetter const& iOther) {
map_ = iOther.map_;
wrappers_ = iOther.wrappers_;
return *this;
}

RCProductGetter(std::map<edm::ProductID, size_t> const* iMap,
std::vector<std::shared_ptr<edm::WrapperBase>> const* iWrappers)
: map_(iMap), wrappers_(iWrappers) {}

WrapperBase const* getIt(ProductID const&) const override;

std::optional<std::tuple<WrapperBase const*, unsigned int>> getThinnedProduct(ProductID const&,
unsigned int key) const override;

void getThinnedProducts(ProductID const& pid,
std::vector<WrapperBase const*>& foundContainers,
std::vector<unsigned int>& keys) const override;

OptionalThinnedKey getThinnedKeyFrom(ProductID const& parent,
unsigned int key,
ProductID const& thinned) const override;

private:
unsigned int transitionIndex_() const override;

std::map<edm::ProductID, size_t> const* map_;
std::vector<std::shared_ptr<edm::WrapperBase>> const* wrappers_;
};

class RCDelayedReader : public edm::DelayedReader {
public:
std::shared_ptr<edm::WrapperBase> getProduct_(edm::BranchID const& k, edm::EDProductGetter const* ep) final {
Expand Down Expand Up @@ -109,12 +143,14 @@ namespace edm {
std::unique_ptr<RootFile> rootFile_;
std::vector<ProcessHistoryID> orderedProcessHistoryIDs_;
std::vector<std::vector<std::shared_ptr<edm::WrapperBase>>> cachedWrappers_;
std::vector<RCProductGetter> getters_; //one per cached event
std::vector<EventAuxiliary> eventAuxs_;
EventSelectionIDVector selectionIDs_;
BranchListIndexes branchListIndexes_;
ProductProvenanceRetriever provRetriever_;
std::vector<RCDelayedReader> delayedReaders_; //one per stream
std::map<edm::BranchID, size_t> branchIDToWrapperIndex_;
std::map<edm::ProductID, size_t> productIDToWrapperIndex_;
std::vector<size_t> streamToCacheIndex_;
size_t nextEventIndex_ = 0;
ItemType presentState_ = IsFile;
Expand Down Expand Up @@ -144,6 +180,11 @@ RepeatingCachedRootSource::RepeatingCachedRootSource(ParameterSet const& pset, I
delayedReaders_(desc.allocations_->numberOfStreams()),
streamToCacheIndex_(desc.allocations_->numberOfStreams(), 0) {
{
getters_.reserve(cachedWrappers_.size());
for (auto& cw : cachedWrappers_) {
getters_.emplace_back(&productIDToWrapperIndex_, &cw);
}

int index = 0;
std::for_each(delayedReaders_.begin(), delayedReaders_.end(), [&index, this](auto& iR) {
iR.m_streamIndex = index++;
Expand Down Expand Up @@ -190,17 +231,17 @@ void RepeatingCachedRootSource::beginJob() {
processConfiguration.setParameterSetID(ParameterSet::emptyParameterSetID());
processConfiguration.setProcessConfigurationID();

//TODO: to make edm::Ref work we need to find a way to pass in a different EDProductGetter
//Thinned collection associations are not supported at this time
EventPrincipal eventPrincipal(productRegistry(),
std::make_shared<BranchIDListHelper>(),
branchIDListHelper(),
std::make_shared<ThinnedAssociationsHelper>(),
processConfiguration,
nullptr);

{
RunNumber_t run = 0;
LuminosityBlockNumber_t lumi = 0;
auto itAux = eventAuxs_.begin();
auto itGetter = getters_.begin();
for (auto& cache : cachedWrappers_) {
rootFile_->nextEventEntry();
rootFile_->readCurrentEvent(eventPrincipal);
Expand All @@ -222,11 +263,16 @@ void RepeatingCachedRootSource::beginJob() {
branchListIndexes_ = eventPrincipal.branchListIndexes();
{
auto reader = eventPrincipal.reader();
auto& getter = *(itGetter++);
for (auto const& branchToIndex : branchIDToWrapperIndex_) {
cache[branchToIndex.second] = reader->getProduct(branchToIndex.first, &eventPrincipal);
cache[branchToIndex.second] = reader->getProduct(branchToIndex.first, &getter);
}
}
}
for (auto const& branchToIndex : branchIDToWrapperIndex_) {
auto pid = eventPrincipal.branchIDToProductID(branchToIndex.first);
productIDToWrapperIndex_[pid] = branchToIndex.second;
}
rootFile_->rewind();
}
}
Expand Down Expand Up @@ -369,6 +415,30 @@ void RepeatingCachedRootSource::readProcessBlock_(ProcessBlockPrincipal& process
rootFile_->readProcessBlock_(processBlockPrincipal);
}

WrapperBase const* RepeatingCachedRootSource::RCProductGetter::getIt(ProductID const& iPID) const {
auto itFound = map_->find(iPID);
if (itFound == map_->end()) {
return nullptr;
}
return (*wrappers_)[itFound->second].get();
}

std::optional<std::tuple<WrapperBase const*, unsigned int>>
RepeatingCachedRootSource::RCProductGetter::getThinnedProduct(ProductID const&, unsigned int key) const {
return {};
};

void RepeatingCachedRootSource::RCProductGetter::getThinnedProducts(ProductID const& pid,
std::vector<WrapperBase const*>& foundContainers,
std::vector<unsigned int>& keys) const {}

OptionalThinnedKey RepeatingCachedRootSource::RCProductGetter::getThinnedKeyFrom(ProductID const& parent,
unsigned int key,
ProductID const& thinned) const {
return {};
}
unsigned int RepeatingCachedRootSource::RCProductGetter::transitionIndex_() const { return 0; }

//
// const member functions
//
Expand Down
12 changes: 9 additions & 3 deletions IOPool/Input/test/PrePoolInputTest_cfg.py
Expand Up @@ -13,12 +13,15 @@
if ".py" in a:
foundpy = True

useOtherThing = False
if len(argv) > 6:
if argv[6] == "useOtherThing":
useOtherThing = True

process = cms.Process("TESTPROD")
process.load("FWCore.Framework.test.cmsExceptionsFatal_cff")

process.maxEvents = cms.untracked.PSet(
input = cms.untracked.int32(int(argv[1]))
)
process.maxEvents.input = int(argv[1])

process.Thing = cms.EDProducer("ThingProducer")

Expand All @@ -34,6 +37,9 @@
)

process.p = cms.Path(process.Thing)
if useOtherThing:
process.OtherThing = cms.EDProducer("OtherThingProducer")
process.p = cms.Path(process.Thing + process.OtherThing)
process.ep = cms.EndPath(process.output)


2 changes: 1 addition & 1 deletion IOPool/Input/test/testRepeatingCachedRootSource.sh
Expand Up @@ -2,6 +2,6 @@
# Pass in name and status
function die { echo $1: status $2 ; exit $2; }

cmsRun -j PoolInputRepeatingSourceTest_jobreport.xml ${LOCALTOP}/src/IOPool/Input/test/PrePoolInputTest_cfg.py PoolInputRepeatingSource.root 11 561 7 6 3 || die 'Failure using PrePoolInputTest_cfg.py' $?
cmsRun -j PoolInputRepeatingSourceTest_jobreport.xml ${LOCALTOP}/src/IOPool/Input/test/PrePoolInputTest_cfg.py PoolInputRepeatingSource.root 11 561 7 6 3 useOtherThing || die 'Failure using PrePoolInputTest_cfg.py' $?

cmsRun ${LOCALTOP}/src/IOPool/Input/test/test_repeating_cfg.py || die 'Failed cmsRun test_repeating_cfg.py' $?
4 changes: 2 additions & 2 deletions IOPool/Input/test/test_repeating_cfg.py
Expand Up @@ -5,8 +5,8 @@

process.maxEvents.input = 10000

process.OtherThing = cms.EDProducer("OtherThingProducer")
process.checker = cms.EDAnalyzer("OtherThingAnalyzer")
#process.dump = cms.EDAnalyzer("EventContentAnalyzer")

process.p = cms.Path(process.OtherThing)
process.p = cms.Path(process.checker)
#process.o = cms.EndPath(process.dump)

0 comments on commit 0da411f

Please sign in to comment.