Skip to content

Commit

Permalink
Merge pull request #3846 from wddgit/subProcessBugFix
Browse files Browse the repository at this point in the history
Core -- Fix bug affecting SubProcesses
  • Loading branch information
ktf committed May 16, 2014
2 parents c78cd1d + b276ac0 commit 802a6b2
Show file tree
Hide file tree
Showing 13 changed files with 122 additions and 63 deletions.
4 changes: 4 additions & 0 deletions DataFormats/Provenance/interface/BranchIDListHelper.h
Expand Up @@ -21,6 +21,9 @@ namespace edm {

//CMS-THREADING called when a new file is opened
bool updateFromInput(BranchIDLists const& bidlists);

void updateFromParent(BranchIDLists const& bidlists);

///Called by sources to convert their read indexes into the indexes used by the job
void fixBranchListIndexes(BranchListIndexes& indexes) const;

Expand All @@ -42,6 +45,7 @@ namespace edm {
BranchIDToIndexMap branchIDToIndexMap_;
std::vector<BranchListIndex> inputIndexToJobIndex_;
BranchListIndex producedBranchListIndex_;
BranchIDLists::size_type nAlreadyCopied_;
};
}

Expand Down
20 changes: 19 additions & 1 deletion DataFormats/Provenance/src/BranchIDListHelper.cc
Expand Up @@ -11,7 +11,8 @@ namespace edm {
branchIDLists_(),
branchIDToIndexMap_(),
inputIndexToJobIndex_(),
producedBranchListIndex_(std::numeric_limits<BranchListIndex>::max())
producedBranchListIndex_(std::numeric_limits<BranchListIndex>::max()),
nAlreadyCopied_(0)
{}

bool
Expand Down Expand Up @@ -42,6 +43,23 @@ namespace edm {
return unchanged;
}

void
BranchIDListHelper::updateFromParent(BranchIDLists const& bidlists) {

inputIndexToJobIndex_.resize(bidlists.size());
for(auto it = bidlists.begin() + nAlreadyCopied_, itEnd = bidlists.end(); it != itEnd; ++it) {
BranchListIndex oldBlix = it - bidlists.begin();
BranchListIndex blix = branchIDLists_.size();
branchIDLists_.push_back(*it);
for(BranchIDList::const_iterator i = it->begin(), iEnd = it->end(); i != iEnd; ++i) {
ProductIndex pix = i - it->begin();
branchIDToIndexMap_.insert(std::make_pair(BranchID(*i), std::make_pair(blix, pix)));
}
inputIndexToJobIndex_[oldBlix]=blix;
}
nAlreadyCopied_ = bidlists.size();
}

void
BranchIDListHelper::updateFromRegistry(ProductRegistry const& preg) {
BranchIDList bidlist;
Expand Down
12 changes: 3 additions & 9 deletions FWCore/Framework/interface/FileBlock.h
Expand Up @@ -9,7 +9,6 @@ FileBlock: Properties of an input file.

#include "DataFormats/Provenance/interface/FileFormatVersion.h"
#include "DataFormats/Provenance/interface/BranchChildren.h"
#include "DataFormats/Provenance/interface/BranchIDList.h"
#include "FWCore/Utilities/interface/BranchType.h"
class TTree;
#include "boost/shared_ptr.hpp"
Expand Down Expand Up @@ -66,8 +65,7 @@ namespace edm {
fileName_(),
branchListIndexesUnchanged_(false),
modifiedIDs_(false),
branchChildren_(new BranchChildren),
branchIDLists_(new BranchIDLists) {}
branchChildren_(new BranchChildren) {}

FileBlock(FileFormatVersion const& version,
TTree const* ev, TTree const* meta,
Expand All @@ -78,8 +76,7 @@ namespace edm {
std::string const& fileName,
bool branchListIndexesUnchanged,
bool modifiedIDs,
boost::shared_ptr<BranchChildren> branchChildren,
boost::shared_ptr<BranchIDLists const> branchIDLists) :
boost::shared_ptr<BranchChildren> branchChildren) :
fileFormatVersion_(version),
tree_(const_cast<TTree*>(ev)),
metaTree_(const_cast<TTree*>(meta)),
Expand All @@ -92,8 +89,7 @@ namespace edm {
fileName_(fileName),
branchListIndexesUnchanged_(branchListIndexesUnchanged),
modifiedIDs_(modifiedIDs),
branchChildren_(branchChildren),
branchIDLists_(branchIDLists) {}
branchChildren_(branchChildren) {}

~FileBlock() {}

Expand All @@ -115,7 +111,6 @@ namespace edm {
whyNotFastClonable_ |= why;
}
BranchChildren const& branchChildren() const { return *branchChildren_; }
BranchIDLists const& branchIDLists() const { return *branchIDLists_; }
void close () {runMetaTree_ = lumiMetaTree_ = metaTree_ = runTree_ = lumiTree_ = tree_ = 0;}

private:
Expand All @@ -133,7 +128,6 @@ namespace edm {
bool branchListIndexesUnchanged_;
bool modifiedIDs_;
boost::shared_ptr<BranchChildren> branchChildren_;
boost::shared_ptr<BranchIDLists const> branchIDLists_;
};
}
#endif
2 changes: 1 addition & 1 deletion FWCore/Framework/interface/ScheduleItems.h
Expand Up @@ -27,7 +27,7 @@ namespace edm {
struct ScheduleItems {
ScheduleItems();

ScheduleItems(ProductRegistry const& preg, BranchIDListHelper const& branchIDListHelper, SubProcess const& om);
ScheduleItems(ProductRegistry const& preg, SubProcess const& om);

ScheduleItems(ScheduleItems const&) = delete; // Disallow copying and moving
ScheduleItems& operator=(ScheduleItems const&) = delete; // Disallow copying and moving
Expand Down
2 changes: 2 additions & 0 deletions FWCore/Framework/interface/SubProcess.h
Expand Up @@ -110,6 +110,8 @@ namespace edm {
if(subProcess_.get()) subProcess_->openOutputFiles(fb);
}

void updateBranchIDListHelper(BranchIDLists const&);

// Call respondToOpenInputFile() on all Modules
void respondToOpenInputFile(FileBlock const& fb);

Expand Down
3 changes: 3 additions & 0 deletions FWCore/Framework/src/EventProcessor.cc
Expand Up @@ -1466,6 +1466,9 @@ namespace edm {
}

void EventProcessor::respondToOpenInputFile() {
if(hasSubProcess()) {
subProcess_->updateBranchIDListHelper(branchIDListHelper_->branchIDLists());
}
if (fb_.get() != nullptr) {
schedule_->respondToOpenInputFile(*fb_);
if(hasSubProcess()) subProcess_->respondToOpenInputFile(*fb_);
Expand Down
6 changes: 3 additions & 3 deletions FWCore/Framework/src/ScheduleItems.cc
Expand Up @@ -24,15 +24,15 @@ namespace edm {
ScheduleItems::ScheduleItems() :
actReg_(new ActivityRegistry),
preg_(new SignallingProductRegistry),
branchIDListHelper_(new BranchIDListHelper()),
branchIDListHelper_(new BranchIDListHelper),
act_table_(),
processConfiguration_() {
}

ScheduleItems::ScheduleItems(ProductRegistry const& preg, BranchIDListHelper const& branchIDListHelper, SubProcess const& om) :
ScheduleItems::ScheduleItems(ProductRegistry const& preg, SubProcess const& om) :
actReg_(new ActivityRegistry),
preg_(new SignallingProductRegistry(preg)),
branchIDListHelper_(new BranchIDListHelper(branchIDListHelper)),
branchIDListHelper_(new BranchIDListHelper),
act_table_(),
processConfiguration_() {

Expand Down
18 changes: 14 additions & 4 deletions FWCore/Framework/src/SubProcess.cc
Expand Up @@ -78,7 +78,6 @@ namespace edm {
"",
outputModulePathPositions,
parentProductRegistry->anyProductProduced());

selectProducts(*parentProductRegistry);

std::string const maxEvents("maxEvents");
Expand Down Expand Up @@ -106,7 +105,7 @@ namespace edm {

boost::shared_ptr<ParameterSet> subProcessParameterSet(popSubProcessParameterSet(*processParameterSet_).release());

ScheduleItems items(*parentProductRegistry, *parentBranchIDListHelper, *this);
ScheduleItems items(*parentProductRegistry, *this);

ParameterSet const& optionsPset(processParameterSet_->getUntrackedParameterSet("options", ParameterSet()));
IllegalParameters::setThrowAnException(optionsPset.getUntrackedParameter<bool>("throwIfIllegalParameter", true));
Expand All @@ -131,6 +130,9 @@ namespace edm {
// intialize the event setup provider
esp_ = esController.makeProvider(*processParameterSet_);

branchIDListHelper_ = items.branchIDListHelper_;
updateBranchIDListHelper(parentBranchIDListHelper->branchIDLists());

// intialize the Schedule
schedule_ = items.initSchedule(*processParameterSet_,subProcessParameterSet.get(),preallocConfig,&processContext_);

Expand All @@ -141,7 +143,8 @@ namespace edm {
// the reducedProcessHistoryID from a full ProcessHistoryID that registry will not be in use by
// another thread. We really need to change how this is done in the PrincipalCache.
principalCache_.setProcessHistoryRegistry(processHistoryRegistries_[historyRunOffset_]);
branchIDListHelper_ = items.branchIDListHelper_;


processConfiguration_ = items.processConfiguration_;
processContext_.setProcessConfiguration(processConfiguration_.get());
processContext_.setParentProcessContext(parentProcessContext);
Expand Down Expand Up @@ -307,6 +310,7 @@ namespace edm {
auto & processHistoryRegistry = processHistoryRegistries_[principal.streamID().value()];
processHistoryRegistry.registerProcessHistory(principal.processHistory());
BranchListIndexes bli(principal.branchListIndexes());
branchIDListHelper_->fixBranchListIndexes(bli);
ep.fillEventPrincipal(aux,
processHistoryRegistry,
std::move(esids),
Expand Down Expand Up @@ -526,11 +530,17 @@ namespace edm {
}
}

void SubProcess::updateBranchIDListHelper(BranchIDLists const& branchIDLists) {
branchIDListHelper_->updateFromParent(branchIDLists);
if(subProcess_.get()) {
subProcess_->updateBranchIDListHelper(branchIDListHelper_->branchIDLists());
}
}

// Call respondToOpenInputFile() on all Modules
void
SubProcess::respondToOpenInputFile(FileBlock const& fb) {
ServiceRegistry::Operate operate(serviceToken_);
branchIDListHelper_->updateFromInput(fb.branchIDLists());
schedule_->respondToOpenInputFile(fb);
if(subProcess_.get()) subProcess_->respondToOpenInputFile(fb);
}
Expand Down
22 changes: 22 additions & 0 deletions FWCore/Integration/test/ref_merge_subprocess_cfg.py
Expand Up @@ -9,11 +9,33 @@
testProcess = cms.Process("TEST")
process.subProcess = cms.SubProcess(testProcess)

testProcess.a = cms.EDProducer("IntProducer",
ivalue = cms.int32(1))

testProcess.tester = cms.EDAnalyzer("OtherThingAnalyzer",
other = cms.untracked.InputTag("d","testUserTag"))

testProcess.out = cms.OutputModule("PoolOutputModule",
fileName = cms.untracked.string('refInSubProcess.root')
)

testProcess.p = cms.Path(testProcess.a)

testProcess.e = cms.EndPath(testProcess.tester*testProcess.out)

testProcessA = cms.Process("TESTA")
testProcess.subProcess = cms.SubProcess(testProcessA)

testProcessA.a = cms.EDProducer("IntProducer",
ivalue = cms.int32(1))

testProcessA.tester = cms.EDAnalyzer("OtherThingAnalyzer",
other = cms.untracked.InputTag("d","testUserTag"))

testProcessA.out = cms.OutputModule("PoolOutputModule",
fileName = cms.untracked.string('refInSubProcessA.root')
)

testProcessA.p = cms.Path(testProcessA.a)

testProcessA.e = cms.EndPath(testProcessA.tester*testProcessA.out)
85 changes: 42 additions & 43 deletions FWCore/Integration/test/standalone_t.cppunit.cc
Expand Up @@ -22,8 +22,7 @@ if the MessageLogger is not runnning.
class testStandalone: public CppUnit::TestFixture
{
CPPUNIT_TEST_SUITE(testStandalone);
CPPUNIT_TEST(writeFile);
CPPUNIT_TEST(readFile);
CPPUNIT_TEST(writeAndReadFile);
CPPUNIT_TEST_SUITE_END();


Expand All @@ -38,8 +37,7 @@ class testStandalone: public CppUnit::TestFixture
m_handler.reset();
}

void writeFile();
void readFile();
void writeAndReadFile();

private:

Expand All @@ -51,45 +49,46 @@ CPPUNIT_TEST_SUITE_REGISTRATION(testStandalone);



void testStandalone::writeFile()
void testStandalone::writeAndReadFile()
{
std::string configuration("import FWCore.ParameterSet.Config as cms\n"
"process = cms.Process('TEST')\n"
"process.maxEvents = cms.untracked.PSet(\n"
" input = cms.untracked.int32(5)\n"
")\n"
"process.source = cms.Source('EmptySource')\n"
"process.JobReportService = cms.Service('JobReportService')\n"
"process.InitRootHandlers = cms.Service('InitRootHandlers')\n"
"process.m1 = cms.EDProducer('IntProducer',\n"
" ivalue = cms.int32(11)\n"
")\n"
"process.out = cms.OutputModule('PoolOutputModule',\n"
" fileName = cms.untracked.string('testStandalone.root')\n"
")\n"
"process.p = cms.Path(process.m1)\n"
"process.e = cms.EndPath(process.out)\n");

edm::EventProcessor proc(configuration, true);
proc.beginJob();
proc.run();
proc.endJob();
}
{
std::string configuration("import FWCore.ParameterSet.Config as cms\n"
"process = cms.Process('TEST')\n"
"process.maxEvents = cms.untracked.PSet(\n"
" input = cms.untracked.int32(5)\n"
")\n"
"process.source = cms.Source('EmptySource')\n"
"process.JobReportService = cms.Service('JobReportService')\n"
"process.InitRootHandlers = cms.Service('InitRootHandlers')\n"
"process.m1 = cms.EDProducer('IntProducer',\n"
" ivalue = cms.int32(11)\n"
")\n"
"process.out = cms.OutputModule('PoolOutputModule',\n"
" fileName = cms.untracked.string('testStandalone.root')\n"
")\n"
"process.p = cms.Path(process.m1)\n"
"process.e = cms.EndPath(process.out)\n");

edm::EventProcessor proc(configuration, true);
proc.beginJob();
proc.run();
proc.endJob();
}

void testStandalone::readFile()
{
std::string configuration("import FWCore.ParameterSet.Config as cms\n"
"process = cms.Process('TEST1')\n"
"process.source = cms.Source('PoolSource',\n"
" fileNames = cms.untracked.vstring('file:testStandalone.root')\n"
")\n"
"process.InitRootHandlers = cms.Service('InitRootHandlers')\n"
"process.JobReportService = cms.Service('JobReportService')\n"
"process.add_(cms.Service('SiteLocalConfigService'))\n"
);

edm::EventProcessor proc(configuration, true);
proc.beginJob();
proc.run();
proc.endJob();
{
std::string configuration("import FWCore.ParameterSet.Config as cms\n"
"process = cms.Process('TEST1')\n"
"process.source = cms.Source('PoolSource',\n"
" fileNames = cms.untracked.vstring('file:testStandalone.root')\n"
")\n"
"process.InitRootHandlers = cms.Service('InitRootHandlers')\n"
"process.JobReportService = cms.Service('JobReportService')\n"
"process.add_(cms.Service('SiteLocalConfigService'))\n"
);

edm::EventProcessor proc(configuration, true);
proc.beginJob();
proc.run();
proc.endJob();
}
}
3 changes: 1 addition & 2 deletions IOPool/Input/src/RootFile.cc
Expand Up @@ -643,8 +643,7 @@ namespace edm {
file_,
branchListIndexesUnchanged(),
modifiedIDs(),
branchChildren_,
branchIDLists_));
branchChildren_));
}

std::string const&
Expand Down
4 changes: 4 additions & 0 deletions IOPool/Output/test/TestPoolOutput.sh
Expand Up @@ -2,6 +2,8 @@
# Pass in name and status
function die { echo $1: status $2 ; exit $2; }

pushd ${LOCAL_TMP_DIR}

cmsRun --parameter-set ${LOCAL_TEST_DIR}/PoolOutputTest_cfg.py || die 'Failure using PoolOutputTest_cfg.py' $?

cmsRun --parameter-set ${LOCAL_TEST_DIR}/PoolParallelOutputCopy_cfg.py || die 'Failure using PoolParallelOutputCopy_cfg.py' $?
Expand Down Expand Up @@ -36,3 +38,5 @@ cmsRun ${LOCAL_TEST_DIR}/TestProvC_cfg.py || die 'Failure using TestProvC_cfg.py

cmsRun ${LOCAL_TEST_DIR}/PoolOutputTestUnscheduled_cfg.py || die 'Failure using PoolOutputTestUnscheduled_cfg.py' $?
cmsRun ${LOCAL_TEST_DIR}/PoolOutputTestUnscheduledRead_cfg.py || die 'Failure using PoolOutputTestUnscheduledRead_cfg.py' $?

popd
4 changes: 4 additions & 0 deletions IOPool/SecondaryInput/test/TestSecondaryInput.sh
Expand Up @@ -2,6 +2,8 @@
# Pass in name and status
function die { echo $1: status $2 ; exit $2; }

pushd ${LOCAL_TMP_DIR}

cmsRun --parameter-set ${LOCAL_TEST_DIR}/PreSecondaryInputTest2_cfg.py || die 'Failure using PreSecondaryInputTest2_cfg.py' $?

cmsRun --parameter-set ${LOCAL_TEST_DIR}/PreSecondaryInputTest_cfg.py || die 'Failure using PreSecondaryInputTest_cfg.py' $?
Expand All @@ -15,3 +17,5 @@ cmsRun --parameter-set ${LOCAL_TEST_DIR}/SecondaryInLumiInputTest_cfg.py || die
cmsRun --parameter-set ${LOCAL_TEST_DIR}/SecondarySeqInLumiInputTest_cfg.py || die 'Failure using SecondarySeqInLumiInputTest_cfg.py' $?

cmsRun --parameter-set ${LOCAL_TEST_DIR}/SecondarySpecInputTest_cfg.py || die 'Failure using SecondarySpecInputTest_cfg.py' $?

popd

0 comments on commit 802a6b2

Please sign in to comment.