Skip to content

Commit

Permalink
RepeatingCachedRootSource now supports Refs
Browse files Browse the repository at this point in the history
Added the necessary infrastructure to support Ref like classes
when doing a delayed read from RepeatingCachedRootSource.
Extended the unit test to read and check Refs.
  • Loading branch information
Dr15Jones committed Feb 2, 2022
1 parent 375694e commit b1a3423
Show file tree
Hide file tree
Showing 4 changed files with 86 additions and 10 deletions.
78 changes: 74 additions & 4 deletions IOPool/Input/src/RepeatingCachedRootSource.cc
Expand Up @@ -21,6 +21,7 @@
#include "DataFormats/Provenance/interface/ProcessConfiguration.h"
#include "DataFormats/Provenance/interface/ThinnedAssociationsHelper.h"
#include "DataFormats/Common/interface/WrapperBase.h"
#include "DataFormats/Common/interface/EDProductGetter.h"

#include "IOPool/Common/interface/RootServiceChecker.h"

Expand All @@ -44,7 +45,6 @@

namespace edm {
class RunHelperBase;
class RCDelayedReader;

class RepeatingCachedRootSource : public InputSource {
public:
Expand All @@ -56,6 +56,40 @@ namespace edm {
BranchID const& k,
EDProductGetter const* ep) const;

class RCProductGetter : public EDProductGetter {
public:
RCProductGetter(RCProductGetter const& iOther) : map_(iOther.map_), wrappers_(iOther.wrappers_) {}

RCProductGetter const& operator=(RCProductGetter const& iOther) {
map_ = iOther.map_;
wrappers_ = iOther.wrappers_;
return *this;
}

RCProductGetter(std::map<edm::ProductID, size_t> const* iMap,
std::vector<std::shared_ptr<edm::WrapperBase>> const* iWrappers)
: map_(iMap), wrappers_(iWrappers) {}

WrapperBase const* getIt(ProductID const&) const override;

std::optional<std::tuple<WrapperBase const*, unsigned int>> getThinnedProduct(ProductID const&,
unsigned int key) const override;

void getThinnedProducts(ProductID const& pid,
std::vector<WrapperBase const*>& foundContainers,
std::vector<unsigned int>& keys) const override;

OptionalThinnedKey getThinnedKeyFrom(ProductID const& parent,
unsigned int key,
ProductID const& thinned) const override;

private:
unsigned int transitionIndex_() const override;

std::map<edm::ProductID, size_t> const* map_;
std::vector<std::shared_ptr<edm::WrapperBase>> const* wrappers_;
};

class RCDelayedReader : public edm::DelayedReader {
public:
std::shared_ptr<edm::WrapperBase> getProduct_(edm::BranchID const& k, edm::EDProductGetter const* ep) final {
Expand Down Expand Up @@ -109,12 +143,14 @@ namespace edm {
std::unique_ptr<RootFile> rootFile_;
std::vector<ProcessHistoryID> orderedProcessHistoryIDs_;
std::vector<std::vector<std::shared_ptr<edm::WrapperBase>>> cachedWrappers_;
std::vector<RCProductGetter> getters_; //one per cached event
std::vector<EventAuxiliary> eventAuxs_;
EventSelectionIDVector selectionIDs_;
BranchListIndexes branchListIndexes_;
ProductProvenanceRetriever provRetriever_;
std::vector<RCDelayedReader> delayedReaders_; //one per stream
std::map<edm::BranchID, size_t> branchIDToWrapperIndex_;
std::map<edm::ProductID, size_t> productIDToWrapperIndex_;
std::vector<size_t> streamToCacheIndex_;
size_t nextEventIndex_ = 0;
ItemType presentState_ = IsFile;
Expand Down Expand Up @@ -144,6 +180,11 @@ RepeatingCachedRootSource::RepeatingCachedRootSource(ParameterSet const& pset, I
delayedReaders_(desc.allocations_->numberOfStreams()),
streamToCacheIndex_(desc.allocations_->numberOfStreams(), 0) {
{
getters_.reserve(cachedWrappers_.size());
for (auto& cw : cachedWrappers_) {
getters_.emplace_back(&productIDToWrapperIndex_, &cw);
}

int index = 0;
std::for_each(delayedReaders_.begin(), delayedReaders_.end(), [&index, this](auto& iR) {
iR.m_streamIndex = index++;
Expand Down Expand Up @@ -192,15 +233,15 @@ void RepeatingCachedRootSource::beginJob() {

//TODO: to make edm::Ref work we need to find a way to pass in a different EDProductGetter
EventPrincipal eventPrincipal(productRegistry(),
std::make_shared<BranchIDListHelper>(),
branchIDListHelper(),
std::make_shared<ThinnedAssociationsHelper>(),
processConfiguration,
nullptr);

{
RunNumber_t run = 0;
LuminosityBlockNumber_t lumi = 0;
auto itAux = eventAuxs_.begin();
auto itGetter = getters_.begin();
for (auto& cache : cachedWrappers_) {
rootFile_->nextEventEntry();
rootFile_->readCurrentEvent(eventPrincipal);
Expand All @@ -222,11 +263,16 @@ void RepeatingCachedRootSource::beginJob() {
branchListIndexes_ = eventPrincipal.branchListIndexes();
{
auto reader = eventPrincipal.reader();
auto& getter = *(itGetter++);
for (auto const& branchToIndex : branchIDToWrapperIndex_) {
cache[branchToIndex.second] = reader->getProduct(branchToIndex.first, &eventPrincipal);
cache[branchToIndex.second] = reader->getProduct(branchToIndex.first, &getter);
}
}
}
for (auto const& branchToIndex : branchIDToWrapperIndex_) {
auto pid = eventPrincipal.branchIDToProductID(branchToIndex.first);
productIDToWrapperIndex_[pid] = branchToIndex.second;
}
rootFile_->rewind();
}
}
Expand Down Expand Up @@ -369,6 +415,30 @@ void RepeatingCachedRootSource::readProcessBlock_(ProcessBlockPrincipal& process
rootFile_->readProcessBlock_(processBlockPrincipal);
}

WrapperBase const* RepeatingCachedRootSource::RCProductGetter::getIt(ProductID const& iPID) const {
auto itFound = map_->find(iPID);
if (itFound == map_->end()) {
return nullptr;
}
return (*wrappers_)[itFound->second].get();
}

std::optional<std::tuple<WrapperBase const*, unsigned int>>
RepeatingCachedRootSource::RCProductGetter::getThinnedProduct(ProductID const&, unsigned int key) const {
return {};
};

void RepeatingCachedRootSource::RCProductGetter::getThinnedProducts(ProductID const& pid,
std::vector<WrapperBase const*>& foundContainers,
std::vector<unsigned int>& keys) const {}

OptionalThinnedKey RepeatingCachedRootSource::RCProductGetter::getThinnedKeyFrom(ProductID const& parent,
unsigned int key,
ProductID const& thinned) const {
return {};
}
unsigned int RepeatingCachedRootSource::RCProductGetter::transitionIndex_() const { return 0; }

//
// const member functions
//
Expand Down
12 changes: 9 additions & 3 deletions IOPool/Input/test/PrePoolInputTest_cfg.py
Expand Up @@ -13,12 +13,15 @@
if ".py" in a:
foundpy = True

useOtherThing = False
if len(argv) > 6:
if argv[6] == "useOtherThing":
useOtherThing = True

process = cms.Process("TESTPROD")
process.load("FWCore.Framework.test.cmsExceptionsFatal_cff")

process.maxEvents = cms.untracked.PSet(
input = cms.untracked.int32(int(argv[1]))
)
process.maxEvents.input = int(argv[1])

process.Thing = cms.EDProducer("ThingProducer")

Expand All @@ -34,6 +37,9 @@
)

process.p = cms.Path(process.Thing)
if useOtherThing:
process.OtherThing = cms.EDProducer("OtherThingProducer")
process.p = cms.Path(process.Thing + process.OtherThing)
process.ep = cms.EndPath(process.output)


2 changes: 1 addition & 1 deletion IOPool/Input/test/testRepeatingCachedRootSource.sh
Expand Up @@ -2,6 +2,6 @@
# Pass in name and status
function die { echo $1: status $2 ; exit $2; }

cmsRun -j PoolInputRepeatingSourceTest_jobreport.xml ${LOCALTOP}/src/IOPool/Input/test/PrePoolInputTest_cfg.py PoolInputRepeatingSource.root 11 561 7 6 3 || die 'Failure using PrePoolInputTest_cfg.py' $?
cmsRun -j PoolInputRepeatingSourceTest_jobreport.xml ${LOCALTOP}/src/IOPool/Input/test/PrePoolInputTest_cfg.py PoolInputRepeatingSource.root 11 561 7 6 3 useOtherThing || die 'Failure using PrePoolInputTest_cfg.py' $?

cmsRun ${LOCALTOP}/src/IOPool/Input/test/test_repeating_cfg.py || die 'Failed cmsRun test_repeating_cfg.py' $?
4 changes: 2 additions & 2 deletions IOPool/Input/test/test_repeating_cfg.py
Expand Up @@ -5,8 +5,8 @@

process.maxEvents.input = 10000

process.OtherThing = cms.EDProducer("OtherThingProducer")
process.checker = cms.EDAnalyzer("OtherThingAnalyzer")
#process.dump = cms.EDAnalyzer("EventContentAnalyzer")

process.p = cms.Path(process.OtherThing)
process.p = cms.Path(process.checker)
#process.o = cms.EndPath(process.dump)

0 comments on commit b1a3423

Please sign in to comment.