Skip to content

Commit

Permalink
Merge pull request #12643 from wmtan/RemoveDataDuplication
Browse files Browse the repository at this point in the history
Remove duplicate data from PoolSource
  • Loading branch information
cmsbuild committed Dec 3, 2015
2 parents 7a4f6ec + e9b89a8 commit 3c78246
Show file tree
Hide file tree
Showing 11 changed files with 168 additions and 192 deletions.
37 changes: 35 additions & 2 deletions IOPool/Input/src/EmbeddedRootSource.cc
Expand Up @@ -15,9 +15,24 @@ namespace edm {
EmbeddedRootSource::EmbeddedRootSource(ParameterSet const& pset, VectorInputSourceDescription const& desc) :
VectorInputSource(pset, desc),
rootServiceChecker_(),
nStreams_(desc.allocations_->numberOfStreams()),
// The default value provided as the second argument to the getUntrackedParameter function call
// is not used when the ParameterSet has been validated and the parameters are not optional
// in the description. This is currently true when PoolSource is the primary input source.
// The modules that use PoolSource as a SecSource have not defined their fillDescriptions function
// yet, so the ParameterSet does not get validated yet. As soon as all the modules with a SecSource
// have defined descriptions, the defaults in the getUntrackedParameterSet function calls can
// and should be deleted from the code.
//
skipBadFiles_(pset.getUntrackedParameter<bool>("skipBadFiles", false)),
bypassVersionCheck_(pset.getUntrackedParameter<bool>("bypassVersionCheck", false)),
treeMaxVirtualSize_(pset.getUntrackedParameter<int>("treeMaxVirtualSize", -1)),
productSelectorRules_(pset, "inputCommands", "InputSource"),
catalog_(pset.getUntrackedParameter<std::vector<std::string> >("fileNames"),
pset.getUntrackedParameter<std::string>("overrideCatalog", std::string())),
fileSequence_(new RootEmbeddedFileSequence(pset, *this, catalog_, desc.allocations_->numberOfStreams())) {
// Note: fileSequence_ needs to be initialized last, because it uses data members
// initialized previously in its own initialization.
fileSequence_(new RootEmbeddedFileSequence(pset, *this, catalog_)) {
}

EmbeddedRootSource::~EmbeddedRootSource() {}
Expand Down Expand Up @@ -48,7 +63,15 @@ namespace edm {

void
EmbeddedRootSource::dropUnwantedBranches_(std::vector<std::string> const& wantedBranches) {
fileSequence_->dropUnwantedBranches_(wantedBranches);
std::vector<std::string> rules;
rules.reserve(wantedBranches.size() + 1);
rules.emplace_back("drop *");
for(std::string const& branch : wantedBranches) {
rules.push_back("keep " + branch + "_*");
}
ParameterSet pset;
pset.addUntrackedParameter("inputCommands", rules);
productSelectorRules_ = ProductSelectorRules(pset, "inputCommands", "InputSource");
}

void
Expand All @@ -61,6 +84,16 @@ namespace edm {
desc.addUntracked<std::vector<std::string> >("fileNames")
->setComment("Names of files to be processed.");
desc.addUntracked<std::string>("overrideCatalog", std::string());
desc.addUntracked<bool>("skipBadFiles", false)
->setComment("True: Ignore any missing or unopenable input file.\n"
"False: Throw exception if missing or unopenable input file.");
desc.addUntracked<bool>("bypassVersionCheck", false)
->setComment("True: Bypass release version check.\n"
"False: Throw exception if reading file in a release prior to the release in which the file was written.");
desc.addUntracked<int>("treeMaxVirtualSize", -1)
->setComment("Size of ROOT TTree TBasket cache. Affects performance.");

ProductSelectorRules::fillDescription(desc, "inputCommands");
RootEmbeddedFileSequence::fillDescription(desc);

descriptions.add("source", desc);
Expand Down
19 changes: 17 additions & 2 deletions IOPool/Input/src/EmbeddedRootSource.h
Expand Up @@ -9,6 +9,7 @@ EmbeddedRootSource: This is an InputSource

#include "FWCore/Catalog/interface/InputFileCatalog.h"
#include "FWCore/Framework/interface/Frameworkfwd.h"
#include "FWCore/Framework/interface/ProductSelectorRules.h"
#include "FWCore/Sources/interface/VectorInputSource.h"
#include "IOPool/Common/interface/RootServiceChecker.h"

Expand All @@ -35,17 +36,31 @@ namespace edm {
using VectorInputSource::processHistoryRegistryForUpdate;
using VectorInputSource::productRegistryUpdate;

// const accessors
bool skipBadFiles() const {return skipBadFiles_;}
bool bypassVersionCheck() const {return bypassVersionCheck_;}
unsigned int nStreams() const {return nStreams_;}
int treeMaxVirtualSize() const {return treeMaxVirtualSize_;}
ProductSelectorRules const& productSelectorRules() const {return productSelectorRules_;}

static void fillDescriptions(ConfigurationDescriptions & descriptions);

private:
virtual void closeFile_();
virtual void beginJob();
virtual void endJob();
virtual void beginJob() override;
virtual void endJob() override;
virtual bool readOneEvent(EventPrincipal& cache, size_t& fileNameHash, CLHEP::HepRandomEngine*, EventID const* id) override;
virtual void readOneSpecified(EventPrincipal& cache, size_t& fileNameHash, SecondaryEventIDAndFileInfo const& id);
virtual void dropUnwantedBranches_(std::vector<std::string> const& wantedBranches);

RootServiceChecker rootServiceChecker_;

unsigned int nStreams_;
bool skipBadFiles_;
bool bypassVersionCheck_;
int const treeMaxVirtualSize_;
ProductSelectorRules productSelectorRules_;

InputFileCatalog catalog_;
std::unique_ptr<RootEmbeddedFileSequence> fileSequence_;

Expand Down
67 changes: 44 additions & 23 deletions IOPool/Input/src/PoolSource.cc
Expand Up @@ -68,22 +68,31 @@ namespace edm {
pset.getUntrackedParameter<std::string>("overrideCatalog", std::string())),
secondaryCatalog_(pset.getUntrackedParameter<std::vector<std::string> >("secondaryFileNames", std::vector<std::string>()),
pset.getUntrackedParameter<std::string>("overrideCatalog", std::string())),
primaryFileSequence_(new RootPrimaryFileSequence(pset, *this, catalog_, desc.allocations_->numberOfStreams())),
secondaryFileSequence_(secondaryCatalog_.empty() ? nullptr :
new RootSecondaryFileSequence(pset, *this, secondaryCatalog_, desc.allocations_->numberOfStreams())),
secondaryRunPrincipal_(),
secondaryLumiPrincipal_(),
secondaryEventPrincipals_(),
branchIDsToReplace_(),
resourceSharedWithDelayedReaderPtr_(new SharedResourcesAcquirer{SharedResourcesRegistry::instance()->createAcquirerForSourceDelayedReader()})
nStreams_(desc.allocations_->numberOfStreams()),
skipBadFiles_(pset.getUntrackedParameter<bool>("skipBadFiles")),
bypassVersionCheck_(pset.getUntrackedParameter<bool>("bypassVersionCheck")),
treeMaxVirtualSize_(pset.getUntrackedParameter<int>("treeMaxVirtualSize")),
setRun_(pset.getUntrackedParameter<unsigned int>("setRunNumber")),
productSelectorRules_(pset, "inputCommands", "InputSource"),
dropDescendants_(pset.getUntrackedParameter<bool>("dropDescendantsOfDroppedBranches")),
labelRawDataLikeMC_(pset.getUntrackedParameter<bool>("labelRawDataLikeMC")),
resourceSharedWithDelayedReaderPtr_(new SharedResourcesAcquirer{SharedResourcesRegistry::instance()->createAcquirerForSourceDelayedReader()}),
// Note: primaryFileSequence_ and secondaryFileSequence_ need to be initialized last, because they use data members
// initialized previously in their own initialization.
primaryFileSequence_(new RootPrimaryFileSequence(pset, *this, catalog_)),
secondaryFileSequence_(secondaryCatalog_.empty() ? nullptr :
new RootSecondaryFileSequence(pset, *this, secondaryCatalog_))
{
if (secondaryCatalog_.empty() && pset.getUntrackedParameter<bool>("needSecondaryFileNames", false)) {
throw Exception(errors::Configuration, "PoolSource") << "'secondaryFileNames' must be specified\n";
}
if(secondaryFileSequence_) {
unsigned int nStreams = desc.allocations_->numberOfStreams();
secondaryEventPrincipals_.reserve(nStreams);
for(unsigned int index = 0; index < nStreams; ++index) {
secondaryEventPrincipals_.reserve(nStreams_);
for(unsigned int index = 0; index < nStreams_; ++index) {
secondaryEventPrincipals_.emplace_back(new EventPrincipal(secondaryFileSequence_->fileProductRegistry(),
secondaryFileSequence_->fileBranchIDListHelper(),
std::make_shared<ThinnedAssociationsHelper const>(),
Expand All @@ -95,38 +104,35 @@ namespace edm {
ProductRegistry::ProductList const& secondary = secondaryFileSequence_->fileProductRegistry()->productList();
ProductRegistry::ProductList const& primary = primaryFileSequence_->fileProductRegistry()->productList();
std::set<BranchID> associationsFromSecondary;
typedef ProductRegistry::ProductList::const_iterator const_iterator;
typedef ProductRegistry::ProductList::iterator iterator;
//this is the registry used by the 'outside' world and only has the primary file information in it at present
ProductRegistry::ProductList& fullList = productRegistryUpdate().productListUpdator();
for(const_iterator it = secondary.begin(), itEnd = secondary.end(); it != itEnd; ++it) {
if(it->second.present()) {
idsToReplace[it->second.branchType()].insert(it->second.branchID());
if(it->second.branchType() == InEvent &&
it->second.unwrappedType() == typeid(ThinnedAssociation)) {
associationsFromSecondary.insert(it->second.branchID());
for(auto const& item : secondary) {
if(item.second.present()) {
idsToReplace[item.second.branchType()].insert(item.second.branchID());
if(item.second.branchType() == InEvent &&
item.second.unwrappedType() == typeid(ThinnedAssociation)) {
associationsFromSecondary.insert(item.second.branchID());
}
//now make sure this is marked as not dropped else the product will not be 'get'table from the Event
iterator itFound = fullList.find(it->first);
auto itFound = fullList.find(item.first);
if(itFound != fullList.end()) {
itFound->second.setDropped(false);
}
}
}
for(const_iterator it = primary.begin(), itEnd = primary.end(); it != itEnd; ++it) {
if(it->second.present()) {
idsToReplace[it->second.branchType()].erase(it->second.branchID());
associationsFromSecondary.erase(it->second.branchID());
for(auto const& item : primary) {
if(item.second.present()) {
idsToReplace[item.second.branchType()].erase(item.second.branchID());
associationsFromSecondary.erase(item.second.branchID());
}
}
if(idsToReplace[InEvent].empty() && idsToReplace[InLumi].empty() && idsToReplace[InRun].empty()) {
secondaryFileSequence_.reset();
} else {
for(int i = InEvent; i < NumBranchTypes; ++i) {
branchIDsToReplace_[i].reserve(idsToReplace[i].size());
for(std::set<BranchID>::const_iterator it = idsToReplace[i].begin(), itEnd = idsToReplace[i].end();
it != itEnd; ++it) {
branchIDsToReplace_[i].push_back(*it);
for(auto const& id : idsToReplace[i]) {
branchIDsToReplace_[i].push_back(id);
}
}
secondaryFileSequence_->initAssociationsFromSecondary(associationsFromSecondary);
Expand Down Expand Up @@ -303,6 +309,21 @@ namespace edm {
desc.addUntracked<bool>("needSecondaryFileNames", false)
->setComment("If True, 'secondaryFileNames' must be specified and be non-empty.");
desc.addUntracked<std::string>("overrideCatalog", std::string());
desc.addUntracked<bool>("skipBadFiles", false)
->setComment("True: Ignore any missing or unopenable input file.\n"
"False: Throw exception if missing or unopenable input file.");
desc.addUntracked<bool>("bypassVersionCheck", false)
->setComment("True: Bypass release version check.\n"
"False: Throw exception if reading file in a release prior to the release in which the file was written.");
desc.addUntracked<int>("treeMaxVirtualSize", -1)
->setComment("Size of ROOT TTree TBasket cache. Affects performance.");
desc.addUntracked<unsigned int>("setRunNumber", 0U)
->setComment("If non-zero, change number of first run to this number. Apply same offset to all runs. Allowed only for simulation.");
desc.addUntracked<bool>("dropDescendantsOfDroppedBranches", true)
->setComment("If True, also drop on input any descendent of any branch dropped on input.");
desc.addUntracked<bool>("labelRawDataLikeMC", true)
->setComment("If True: replace module label for raw data to match MC. Also use 'LHC' as process.");
ProductSelectorRules::fillDescription(desc, "inputCommands");
InputSource::fillDescription(desc);
RootPrimaryFileSequence::fillDescription(desc);

Expand Down
56 changes: 38 additions & 18 deletions IOPool/Input/src/PoolSource.h
Expand Up @@ -11,6 +11,7 @@ PoolSource: This is an InputSource
#include "FWCore/Catalog/interface/InputFileCatalog.h"
#include "FWCore/Framework/interface/Frameworkfwd.h"
#include "FWCore/Framework/interface/ProcessingController.h"
#include "FWCore/Framework/interface/ProductSelectorRules.h"
#include "FWCore/Framework/interface/InputSource.h"
#include "IOPool/Common/interface/RootServiceChecker.h"

Expand All @@ -33,40 +34,59 @@ namespace edm {
using InputSource::processHistoryRegistryForUpdate;
using InputSource::productRegistryUpdate;

// const accessors
bool skipBadFiles() const {return skipBadFiles_;}
bool dropDescendants() const {return dropDescendants_;}
bool bypassVersionCheck() const {return bypassVersionCheck_;}
bool labelRawDataLikeMC() const {return labelRawDataLikeMC_;}
unsigned int nStreams() const {return nStreams_;}
int treeMaxVirtualSize() const {return treeMaxVirtualSize_;}
RunNumber_t setRun() const {return setRun_;}
ProductSelectorRules const& productSelectorRules() const {return productSelectorRules_;}

static void fillDescriptions(ConfigurationDescriptions& descriptions);

private:
virtual void readEvent_(EventPrincipal& eventPrincipal);
virtual std::shared_ptr<LuminosityBlockAuxiliary> readLuminosityBlockAuxiliary_();
virtual void readLuminosityBlock_(LuminosityBlockPrincipal& lumiPrincipal);
virtual std::shared_ptr<RunAuxiliary> readRunAuxiliary_();
virtual void readRun_(RunPrincipal& runPrincipal);
virtual std::unique_ptr<FileBlock> readFile_();
virtual void closeFile_();
virtual void endJob();
virtual ItemType getNextItemType();
virtual void readEvent_(EventPrincipal& eventPrincipal) override;
virtual std::shared_ptr<LuminosityBlockAuxiliary> readLuminosityBlockAuxiliary_() override;
virtual void readLuminosityBlock_(LuminosityBlockPrincipal& lumiPrincipal) override;
virtual std::shared_ptr<RunAuxiliary> readRunAuxiliary_() override;
virtual void readRun_(RunPrincipal& runPrincipal) override;
virtual std::unique_ptr<FileBlock> readFile_() override;
virtual void closeFile_() override;
virtual void endJob() override;
virtual ItemType getNextItemType() override;
virtual bool readIt(EventID const& id, EventPrincipal& eventPrincipal, StreamContext& streamContext) override;
virtual void skip(int offset);
virtual bool goToEvent_(EventID const& eventID);
virtual void rewind_();
virtual void preForkReleaseResources();
virtual bool randomAccess_() const;
virtual ProcessingController::ForwardState forwardState_() const;
virtual ProcessingController::ReverseState reverseState_() const;
virtual void skip(int offset) override;
virtual bool goToEvent_(EventID const& eventID) override;
virtual void rewind_() override;
virtual void preForkReleaseResources() override;
virtual bool randomAccess_() const override;
virtual ProcessingController::ForwardState forwardState_() const override;
virtual ProcessingController::ReverseState reverseState_() const override;

SharedResourcesAcquirer* resourceSharedWithDelayedReader_() const override;

RootServiceChecker rootServiceChecker_;
InputFileCatalog catalog_;
InputFileCatalog secondaryCatalog_;
std::unique_ptr<RootPrimaryFileSequence> primaryFileSequence_;
std::unique_ptr<RootSecondaryFileSequence> secondaryFileSequence_;
std::shared_ptr<RunPrincipal> secondaryRunPrincipal_;
std::shared_ptr<LuminosityBlockPrincipal> secondaryLumiPrincipal_;
std::vector<std::unique_ptr<EventPrincipal>> secondaryEventPrincipals_;
std::array<std::vector<BranchID>, NumBranchTypes> branchIDsToReplace_;

unsigned int nStreams_;
bool skipBadFiles_;
bool bypassVersionCheck_;
int const treeMaxVirtualSize_;
RunNumber_t setRun_;
ProductSelectorRules productSelectorRules_;
bool dropDescendants_;
bool labelRawDataLikeMC_;

std::unique_ptr<SharedResourcesAcquirer> resourceSharedWithDelayedReaderPtr_;
std::unique_ptr<RootPrimaryFileSequence> primaryFileSequence_;
std::unique_ptr<RootSecondaryFileSequence> secondaryFileSequence_;
}; // class PoolSource
}
#endif

0 comments on commit 3c78246

Please sign in to comment.