Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix bug affecting SubProcesses #3846

Merged
merged 3 commits into from May 16, 2014
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
4 changes: 4 additions & 0 deletions DataFormats/Provenance/interface/BranchIDListHelper.h
Expand Up @@ -21,6 +21,9 @@ namespace edm {

//CMS-THREADING called when a new file is opened
bool updateFromInput(BranchIDLists const& bidlists);

void updateFromParent(BranchIDLists const& bidlists);

///Called by sources to convert their read indexes into the indexes used by the job
void fixBranchListIndexes(BranchListIndexes& indexes) const;

Expand All @@ -42,6 +45,7 @@ namespace edm {
BranchIDToIndexMap branchIDToIndexMap_;
std::vector<BranchListIndex> inputIndexToJobIndex_;
BranchListIndex producedBranchListIndex_;
BranchIDLists::size_type nAlreadyCopied_;
};
}

Expand Down
20 changes: 19 additions & 1 deletion DataFormats/Provenance/src/BranchIDListHelper.cc
Expand Up @@ -11,7 +11,8 @@ namespace edm {
branchIDLists_(),
branchIDToIndexMap_(),
inputIndexToJobIndex_(),
producedBranchListIndex_(std::numeric_limits<BranchListIndex>::max())
producedBranchListIndex_(std::numeric_limits<BranchListIndex>::max()),
nAlreadyCopied_(0)
{}

bool
Expand Down Expand Up @@ -42,6 +43,23 @@ namespace edm {
return unchanged;
}

void
BranchIDListHelper::updateFromParent(BranchIDLists const& bidlists) {

inputIndexToJobIndex_.resize(bidlists.size());
for(auto it = bidlists.begin() + nAlreadyCopied_, itEnd = bidlists.end(); it != itEnd; ++it) {
BranchListIndex oldBlix = it - bidlists.begin();
BranchListIndex blix = branchIDLists_.size();
branchIDLists_.push_back(*it);
for(BranchIDList::const_iterator i = it->begin(), iEnd = it->end(); i != iEnd; ++i) {
ProductIndex pix = i - it->begin();
branchIDToIndexMap_.insert(std::make_pair(BranchID(*i), std::make_pair(blix, pix)));
}
inputIndexToJobIndex_[oldBlix]=blix;
}
nAlreadyCopied_ = bidlists.size();
}

void
BranchIDListHelper::updateFromRegistry(ProductRegistry const& preg) {
BranchIDList bidlist;
Expand Down
12 changes: 3 additions & 9 deletions FWCore/Framework/interface/FileBlock.h
Expand Up @@ -9,7 +9,6 @@ FileBlock: Properties of an input file.

#include "DataFormats/Provenance/interface/FileFormatVersion.h"
#include "DataFormats/Provenance/interface/BranchChildren.h"
#include "DataFormats/Provenance/interface/BranchIDList.h"
#include "FWCore/Utilities/interface/BranchType.h"
class TTree;
#include "boost/shared_ptr.hpp"
Expand Down Expand Up @@ -66,8 +65,7 @@ namespace edm {
fileName_(),
branchListIndexesUnchanged_(false),
modifiedIDs_(false),
branchChildren_(new BranchChildren),
branchIDLists_(new BranchIDLists) {}
branchChildren_(new BranchChildren) {}

FileBlock(FileFormatVersion const& version,
TTree const* ev, TTree const* meta,
Expand All @@ -78,8 +76,7 @@ namespace edm {
std::string const& fileName,
bool branchListIndexesUnchanged,
bool modifiedIDs,
boost::shared_ptr<BranchChildren> branchChildren,
boost::shared_ptr<BranchIDLists const> branchIDLists) :
boost::shared_ptr<BranchChildren> branchChildren) :
fileFormatVersion_(version),
tree_(const_cast<TTree*>(ev)),
metaTree_(const_cast<TTree*>(meta)),
Expand All @@ -92,8 +89,7 @@ namespace edm {
fileName_(fileName),
branchListIndexesUnchanged_(branchListIndexesUnchanged),
modifiedIDs_(modifiedIDs),
branchChildren_(branchChildren),
branchIDLists_(branchIDLists) {}
branchChildren_(branchChildren) {}

~FileBlock() {}

Expand All @@ -115,7 +111,6 @@ namespace edm {
whyNotFastClonable_ |= why;
}
BranchChildren const& branchChildren() const { return *branchChildren_; }
BranchIDLists const& branchIDLists() const { return *branchIDLists_; }
void close () {runMetaTree_ = lumiMetaTree_ = metaTree_ = runTree_ = lumiTree_ = tree_ = 0;}

private:
Expand All @@ -133,7 +128,6 @@ namespace edm {
bool branchListIndexesUnchanged_;
bool modifiedIDs_;
boost::shared_ptr<BranchChildren> branchChildren_;
boost::shared_ptr<BranchIDLists const> branchIDLists_;
};
}
#endif
2 changes: 1 addition & 1 deletion FWCore/Framework/interface/ScheduleItems.h
Expand Up @@ -27,7 +27,7 @@ namespace edm {
struct ScheduleItems {
ScheduleItems();

ScheduleItems(ProductRegistry const& preg, BranchIDListHelper const& branchIDListHelper, SubProcess const& om);
ScheduleItems(ProductRegistry const& preg, SubProcess const& om);

ScheduleItems(ScheduleItems const&) = delete; // Disallow copying and moving
ScheduleItems& operator=(ScheduleItems const&) = delete; // Disallow copying and moving
Expand Down
2 changes: 2 additions & 0 deletions FWCore/Framework/interface/SubProcess.h
Expand Up @@ -110,6 +110,8 @@ namespace edm {
if(subProcess_.get()) subProcess_->openOutputFiles(fb);
}

void updateBranchIDListHelper(BranchIDLists const&);

// Call respondToOpenInputFile() on all Modules
void respondToOpenInputFile(FileBlock const& fb);

Expand Down
3 changes: 3 additions & 0 deletions FWCore/Framework/src/EventProcessor.cc
Expand Up @@ -1466,6 +1466,9 @@ namespace edm {
}

void EventProcessor::respondToOpenInputFile() {
if(hasSubProcess()) {
subProcess_->updateBranchIDListHelper(branchIDListHelper_->branchIDLists());
}
if (fb_.get() != nullptr) {
schedule_->respondToOpenInputFile(*fb_);
if(hasSubProcess()) subProcess_->respondToOpenInputFile(*fb_);
Expand Down
6 changes: 3 additions & 3 deletions FWCore/Framework/src/ScheduleItems.cc
Expand Up @@ -24,15 +24,15 @@ namespace edm {
ScheduleItems::ScheduleItems() :
actReg_(new ActivityRegistry),
preg_(new SignallingProductRegistry),
branchIDListHelper_(new BranchIDListHelper()),
branchIDListHelper_(new BranchIDListHelper),
act_table_(),
processConfiguration_() {
}

ScheduleItems::ScheduleItems(ProductRegistry const& preg, BranchIDListHelper const& branchIDListHelper, SubProcess const& om) :
ScheduleItems::ScheduleItems(ProductRegistry const& preg, SubProcess const& om) :
actReg_(new ActivityRegistry),
preg_(new SignallingProductRegistry(preg)),
branchIDListHelper_(new BranchIDListHelper(branchIDListHelper)),
branchIDListHelper_(new BranchIDListHelper),
act_table_(),
processConfiguration_() {

Expand Down
18 changes: 14 additions & 4 deletions FWCore/Framework/src/SubProcess.cc
Expand Up @@ -78,7 +78,6 @@ namespace edm {
"",
outputModulePathPositions,
parentProductRegistry->anyProductProduced());

selectProducts(*parentProductRegistry);

std::string const maxEvents("maxEvents");
Expand Down Expand Up @@ -106,7 +105,7 @@ namespace edm {

boost::shared_ptr<ParameterSet> subProcessParameterSet(popSubProcessParameterSet(*processParameterSet_).release());

ScheduleItems items(*parentProductRegistry, *parentBranchIDListHelper, *this);
ScheduleItems items(*parentProductRegistry, *this);

ParameterSet const& optionsPset(processParameterSet_->getUntrackedParameterSet("options", ParameterSet()));
IllegalParameters::setThrowAnException(optionsPset.getUntrackedParameter<bool>("throwIfIllegalParameter", true));
Expand All @@ -131,6 +130,9 @@ namespace edm {
// intialize the event setup provider
esp_ = esController.makeProvider(*processParameterSet_);

branchIDListHelper_ = items.branchIDListHelper_;
updateBranchIDListHelper(parentBranchIDListHelper->branchIDLists());

// intialize the Schedule
schedule_ = items.initSchedule(*processParameterSet_,subProcessParameterSet.get(),preallocConfig,&processContext_);

Expand All @@ -141,7 +143,8 @@ namespace edm {
// the reducedProcessHistoryID from a full ProcessHistoryID that registry will not be in use by
// another thread. We really need to change how this is done in the PrincipalCache.
principalCache_.setProcessHistoryRegistry(processHistoryRegistries_[historyRunOffset_]);
branchIDListHelper_ = items.branchIDListHelper_;


processConfiguration_ = items.processConfiguration_;
processContext_.setProcessConfiguration(processConfiguration_.get());
processContext_.setParentProcessContext(parentProcessContext);
Expand Down Expand Up @@ -307,6 +310,7 @@ namespace edm {
auto & processHistoryRegistry = processHistoryRegistries_[principal.streamID().value()];
processHistoryRegistry.registerProcessHistory(principal.processHistory());
BranchListIndexes bli(principal.branchListIndexes());
branchIDListHelper_->fixBranchListIndexes(bli);
ep.fillEventPrincipal(aux,
processHistoryRegistry,
std::move(esids),
Expand Down Expand Up @@ -526,11 +530,17 @@ namespace edm {
}
}

void SubProcess::updateBranchIDListHelper(BranchIDLists const& branchIDLists) {
branchIDListHelper_->updateFromParent(branchIDLists);
if(subProcess_.get()) {
subProcess_->updateBranchIDListHelper(branchIDListHelper_->branchIDLists());
}
}

// Call respondToOpenInputFile() on all Modules
void
SubProcess::respondToOpenInputFile(FileBlock const& fb) {
ServiceRegistry::Operate operate(serviceToken_);
branchIDListHelper_->updateFromInput(fb.branchIDLists());
schedule_->respondToOpenInputFile(fb);
if(subProcess_.get()) subProcess_->respondToOpenInputFile(fb);
}
Expand Down
22 changes: 22 additions & 0 deletions FWCore/Integration/test/ref_merge_subprocess_cfg.py
Expand Up @@ -9,11 +9,33 @@
testProcess = cms.Process("TEST")
process.subProcess = cms.SubProcess(testProcess)

testProcess.a = cms.EDProducer("IntProducer",
ivalue = cms.int32(1))

testProcess.tester = cms.EDAnalyzer("OtherThingAnalyzer",
other = cms.untracked.InputTag("d","testUserTag"))

testProcess.out = cms.OutputModule("PoolOutputModule",
fileName = cms.untracked.string('refInSubProcess.root')
)

testProcess.p = cms.Path(testProcess.a)

testProcess.e = cms.EndPath(testProcess.tester*testProcess.out)

testProcessA = cms.Process("TESTA")
testProcess.subProcess = cms.SubProcess(testProcessA)

testProcessA.a = cms.EDProducer("IntProducer",
ivalue = cms.int32(1))

testProcessA.tester = cms.EDAnalyzer("OtherThingAnalyzer",
other = cms.untracked.InputTag("d","testUserTag"))

testProcessA.out = cms.OutputModule("PoolOutputModule",
fileName = cms.untracked.string('refInSubProcessA.root')
)

testProcessA.p = cms.Path(testProcessA.a)

testProcessA.e = cms.EndPath(testProcessA.tester*testProcessA.out)
85 changes: 42 additions & 43 deletions FWCore/Integration/test/standalone_t.cppunit.cc
Expand Up @@ -22,8 +22,7 @@ if the MessageLogger is not runnning.
class testStandalone: public CppUnit::TestFixture
{
CPPUNIT_TEST_SUITE(testStandalone);
CPPUNIT_TEST(writeFile);
CPPUNIT_TEST(readFile);
CPPUNIT_TEST(writeAndReadFile);
CPPUNIT_TEST_SUITE_END();


Expand All @@ -38,8 +37,7 @@ class testStandalone: public CppUnit::TestFixture
m_handler.reset();
}

void writeFile();
void readFile();
void writeAndReadFile();

private:

Expand All @@ -51,45 +49,46 @@ CPPUNIT_TEST_SUITE_REGISTRATION(testStandalone);



void testStandalone::writeFile()
void testStandalone::writeAndReadFile()
{
std::string configuration("import FWCore.ParameterSet.Config as cms\n"
"process = cms.Process('TEST')\n"
"process.maxEvents = cms.untracked.PSet(\n"
" input = cms.untracked.int32(5)\n"
")\n"
"process.source = cms.Source('EmptySource')\n"
"process.JobReportService = cms.Service('JobReportService')\n"
"process.InitRootHandlers = cms.Service('InitRootHandlers')\n"
"process.m1 = cms.EDProducer('IntProducer',\n"
" ivalue = cms.int32(11)\n"
")\n"
"process.out = cms.OutputModule('PoolOutputModule',\n"
" fileName = cms.untracked.string('testStandalone.root')\n"
")\n"
"process.p = cms.Path(process.m1)\n"
"process.e = cms.EndPath(process.out)\n");

edm::EventProcessor proc(configuration, true);
proc.beginJob();
proc.run();
proc.endJob();
}
{
std::string configuration("import FWCore.ParameterSet.Config as cms\n"
"process = cms.Process('TEST')\n"
"process.maxEvents = cms.untracked.PSet(\n"
" input = cms.untracked.int32(5)\n"
")\n"
"process.source = cms.Source('EmptySource')\n"
"process.JobReportService = cms.Service('JobReportService')\n"
"process.InitRootHandlers = cms.Service('InitRootHandlers')\n"
"process.m1 = cms.EDProducer('IntProducer',\n"
" ivalue = cms.int32(11)\n"
")\n"
"process.out = cms.OutputModule('PoolOutputModule',\n"
" fileName = cms.untracked.string('testStandalone.root')\n"
")\n"
"process.p = cms.Path(process.m1)\n"
"process.e = cms.EndPath(process.out)\n");

edm::EventProcessor proc(configuration, true);
proc.beginJob();
proc.run();
proc.endJob();
}

void testStandalone::readFile()
{
std::string configuration("import FWCore.ParameterSet.Config as cms\n"
"process = cms.Process('TEST1')\n"
"process.source = cms.Source('PoolSource',\n"
" fileNames = cms.untracked.vstring('file:testStandalone.root')\n"
")\n"
"process.InitRootHandlers = cms.Service('InitRootHandlers')\n"
"process.JobReportService = cms.Service('JobReportService')\n"
"process.add_(cms.Service('SiteLocalConfigService'))\n"
);

edm::EventProcessor proc(configuration, true);
proc.beginJob();
proc.run();
proc.endJob();
{
std::string configuration("import FWCore.ParameterSet.Config as cms\n"
"process = cms.Process('TEST1')\n"
"process.source = cms.Source('PoolSource',\n"
" fileNames = cms.untracked.vstring('file:testStandalone.root')\n"
")\n"
"process.InitRootHandlers = cms.Service('InitRootHandlers')\n"
"process.JobReportService = cms.Service('JobReportService')\n"
"process.add_(cms.Service('SiteLocalConfigService'))\n"
);

edm::EventProcessor proc(configuration, true);
proc.beginJob();
proc.run();
proc.endJob();
}
}
3 changes: 1 addition & 2 deletions IOPool/Input/src/RootFile.cc
Expand Up @@ -643,8 +643,7 @@ namespace edm {
file_,
branchListIndexesUnchanged(),
modifiedIDs(),
branchChildren_,
branchIDLists_));
branchChildren_));
}

std::string const&
Expand Down
4 changes: 4 additions & 0 deletions IOPool/Output/test/TestPoolOutput.sh
Expand Up @@ -2,6 +2,8 @@
# Pass in name and status
function die { echo $1: status $2 ; exit $2; }

pushd ${LOCAL_TMP_DIR}

cmsRun --parameter-set ${LOCAL_TEST_DIR}/PoolOutputTest_cfg.py || die 'Failure using PoolOutputTest_cfg.py' $?

cmsRun --parameter-set ${LOCAL_TEST_DIR}/PoolParallelOutputCopy_cfg.py || die 'Failure using PoolParallelOutputCopy_cfg.py' $?
Expand Down Expand Up @@ -36,3 +38,5 @@ cmsRun ${LOCAL_TEST_DIR}/TestProvC_cfg.py || die 'Failure using TestProvC_cfg.py

cmsRun ${LOCAL_TEST_DIR}/PoolOutputTestUnscheduled_cfg.py || die 'Failure using PoolOutputTestUnscheduled_cfg.py' $?
cmsRun ${LOCAL_TEST_DIR}/PoolOutputTestUnscheduledRead_cfg.py || die 'Failure using PoolOutputTestUnscheduledRead_cfg.py' $?

popd
4 changes: 4 additions & 0 deletions IOPool/SecondaryInput/test/TestSecondaryInput.sh
Expand Up @@ -2,6 +2,8 @@
# Pass in name and status
function die { echo $1: status $2 ; exit $2; }

pushd ${LOCAL_TMP_DIR}

cmsRun --parameter-set ${LOCAL_TEST_DIR}/PreSecondaryInputTest2_cfg.py || die 'Failure using PreSecondaryInputTest2_cfg.py' $?

cmsRun --parameter-set ${LOCAL_TEST_DIR}/PreSecondaryInputTest_cfg.py || die 'Failure using PreSecondaryInputTest_cfg.py' $?
Expand All @@ -15,3 +17,5 @@ cmsRun --parameter-set ${LOCAL_TEST_DIR}/SecondaryInLumiInputTest_cfg.py || die
cmsRun --parameter-set ${LOCAL_TEST_DIR}/SecondarySeqInLumiInputTest_cfg.py || die 'Failure using SecondarySeqInLumiInputTest_cfg.py' $?

cmsRun --parameter-set ${LOCAL_TEST_DIR}/SecondarySpecInputTest_cfg.py || die 'Failure using SecondarySpecInputTest_cfg.py' $?

popd