Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support of condition updates per-lumisection #29363

Merged
merged 4 commits into from Apr 7, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion CondCore/ESSources/interface/DataProxy.h
Expand Up @@ -105,7 +105,7 @@ namespace cond {
void loadTag(std::string const& tag, boost::posix_time::ptime const& snapshotTime);
void reload();

ValidityInterval setIntervalFor(Time_t target);
ValidityInterval setIntervalFor(Time_t target, Time_t defaultIovSize);
TimeType timeType() const { return m_iovProxy.tagInfo().timeType; }

private:
Expand Down
55 changes: 39 additions & 16 deletions CondCore/ESSources/plugins/CondDBESSource.cc
Expand Up @@ -165,6 +165,11 @@ CondDBESSource::CondDBESSource(const edm::ParameterSet& iConfig)
boost::posix_time::time_from_string(std::string(cond::time::MAX_TIMESTAMP));
if (itToGet->exists("snapshotTime"))
tagSnapshotTime = boost::posix_time::time_from_string(itToGet->getParameter<std::string>("snapshotTime"));
if (itToGet->exists("refreshTime")) {
cond::Time_t refreshTime = itToGet->getParameter<unsigned long long>("refreshTime");
m_refreshTimeForRecord.insert(std::make_pair(recordname, refreshTime));
}

std::string recordLabelKey = joinRecordAndLabel(recordname, labelname);
replacements.insert(
std::make_pair(recordLabelKey, cond::GTEntry_t(std::make_tuple(recordname, labelname, fqTag))));
Expand Down Expand Up @@ -341,24 +346,39 @@ void CondDBESSource::setIntervalFor(const EventSetupRecordKey& iKey,
m_stats.nLumi++;
}
//}

cond::Time_t lastTime = m_lastRun;
cond::Time_t defaultIovSize = cond::time::MAX_VAL;
cond::Time_t minDiffTime = 1;
bool refreshThisRecord = false;
if (m_policy != REFRESH_ALWAYS) {
auto iR = m_refreshTimeForRecord.find(recordname);
refreshThisRecord = (iR != m_refreshTimeForRecord.end());
if (refreshThisRecord) {
lastTime = cond::time::lumiTime(m_lastRun, m_lastLumi);
defaultIovSize = iR->second;
minDiffTime = defaultIovSize;
}
}
bool doRefresh = false;
if (m_policy == REFRESH_EACH_RUN || m_policy == RECONNECT_EACH_RUN) {
if (m_policy == REFRESH_EACH_RUN || m_policy == RECONNECT_EACH_RUN || refreshThisRecord) {
// find out the last run number for the proxy of the specified record
std::map<std::string, unsigned int>::iterator iRec = m_lastRecordRuns.find(recordname);
std::map<std::string, cond::Time_t>::iterator iRec = m_lastRecordRuns.find(recordname);
if (iRec != m_lastRecordRuns.end()) {
unsigned int lastRecordRun = iRec->second;
if (lastRecordRun != m_lastRun) {
cond::Time_t lastRecordRun = iRec->second;
cond::Time_t diffTime = lastTime - lastRecordRun;
if (lastRecordRun > lastTime)
diffTime = lastRecordRun - lastTime;
if (diffTime >= minDiffTime) {
// a refresh is required!
doRefresh = true;
iRec->second = m_lastRun;
iRec->second = lastTime;
edm::LogInfo("CondDBESSource") << "Preparing refresh for record \"" << recordname
<< "\" since there has been a transition from run " << lastRecordRun
<< " to run " << m_lastRun << "; from CondDBESSource::setIntervalFor";
<< "\" since there has been a transition from run/lumi " << lastRecordRun
<< " to run/lumi " << lastTime << "; from CondDBESSource::setIntervalFor";
}
} else {
doRefresh = true;
m_lastRecordRuns.insert(std::make_pair(recordname, m_lastRun));
m_lastRecordRuns.insert(std::make_pair(recordname, lastTime));
edm::LogInfo("CondDBESSource") << "Preparing refresh for record \"" << recordname << "\" for " << iTime.eventID()
<< ", timestamp: " << iTime.time().value()
<< "; from CondDBESSource::setIntervalFor";
Expand Down Expand Up @@ -414,21 +434,24 @@ void CondDBESSource::setIntervalFor(const EventSetupRecordKey& iKey,
}

// first reconnect if required
if (m_policy == RECONNECT_EACH_RUN) {
if (m_policy == RECONNECT_EACH_RUN || refreshThisRecord) {
edm::LogInfo("CondDBESSource")
<< "Checking if the session must be closed and re-opened for getting correct conditions"
<< "; from CondDBESSource::setIntervalFor";
std::stringstream transId;
//transId << "long" << m_lastRun;
transId << m_lastRun;
transId << lastTime;
std::string connStr = m_connectionString;
std::pair<std::string, std::string> tagParams = cond::persistency::parseTag(tcIter->second.tagName());
if (!tagParams.second.empty())
connStr = tagParams.second;
std::map<std::string, std::pair<cond::persistency::Session, std::string>>::iterator iSess =
m_sessionPool.find(connStr);
std::map<std::string, std::pair<cond::persistency::Session, std::string>>* sessionPool = &m_sessionPool;
if (refreshThisRecord) {
sessionPool = &m_sessionPoolForLumiConditions;
}
auto iSess = sessionPool->find(connStr);
bool reopen = false;
if (iSess != m_sessionPool.end()) {
if (iSess != sessionPool->end()) {
if (iSess->second.second != transId.str()) {
// the available session is open for a different run: reopen
reopen = true;
Expand All @@ -437,7 +460,7 @@ void CondDBESSource::setIntervalFor(const EventSetupRecordKey& iKey,
} else {
// no available session: probably first run analysed...
iSess =
m_sessionPool.insert(std::make_pair(connStr, std::make_pair(cond::persistency::Session(), transId.str())))
sessionPool->insert(std::make_pair(connStr, std::make_pair(cond::persistency::Session(), transId.str())))
.first;
reopen = true;
}
Expand Down Expand Up @@ -485,7 +508,7 @@ void CondDBESSource::setIntervalFor(const EventSetupRecordKey& iKey,
*/

//query the IOVSequence
cond::ValidityInterval validity = (*pmIter).second->setIntervalFor(abtime);
cond::ValidityInterval validity = (*pmIter).second->setIntervalFor(abtime, defaultIovSize);

edm::LogInfo("CondDBESSource") << "Validity coming from IOV sequence for record \"" << recordname
<< "\" and label \"" << pmIter->second->label() << "\": (" << validity.first << ", "
Expand Down
4 changes: 3 additions & 1 deletion CondCore/ESSources/plugins/CondDBESSource.h
Expand Up @@ -90,8 +90,10 @@ class CondDBESSource : public edm::eventsetup::DataProxyProvider, public edm::Ev
typedef std::map<std::string, cond::GTEntry_t> TagCollection;
// the collections of tag, record/label used in this ESSource
TagCollection m_tagCollection;
std::map<std::string, cond::Time_t> m_refreshTimeForRecord;
std::map<std::string, std::pair<cond::persistency::Session, std::string> > m_sessionPool;
std::map<std::string, unsigned int> m_lastRecordRuns;
std::map<std::string, std::pair<cond::persistency::Session, std::string> > m_sessionPoolForLumiConditions;
std::map<std::string, cond::Time_t> m_lastRecordRuns;

struct Stats {
int nData;
Expand Down
4 changes: 2 additions & 2 deletions CondCore/ESSources/src/ProxyFactory.cc
Expand Up @@ -48,11 +48,11 @@ void cond::DataProxyWrapperBase::reload() {
loadTag(tag);
}

cond::ValidityInterval cond::DataProxyWrapperBase::setIntervalFor(Time_t time) {
cond::ValidityInterval cond::DataProxyWrapperBase::setIntervalFor(Time_t time, Time_t defaultIovSize) {
if (!m_currentIov.isValidFor(time)) {
m_currentIov.clear();
m_session.transaction().start(true);
m_currentIov = m_iovProxy.getInterval(time);
m_currentIov = m_iovProxy.getInterval(time, defaultIovSize);
m_session.transaction().commit();
}
return cond::ValidityInterval(m_currentIov.since, m_currentIov.till);
Expand Down
3 changes: 3 additions & 0 deletions CondCore/ESSources/test/BuildFile.xml
Expand Up @@ -22,3 +22,6 @@
<flags TEST_RUNNER_ARGS=" /bin/bash CondCore/ESSources/test TestConcurrentIOVsCondCore.sh"/>
<use name="FWCore/Utilities"/>
</bin>
<library file="stubs/LumiTestReadAnalyzer.cc" name="LumiTestReadAnalyzer">
<flags EDM_PLUGIN="1"/>
</library>
@@ -0,0 +1,173 @@
from __future__ import print_function
import time

import FWCore.ParameterSet.Config as cms
import FWCore.ParameterSet.VarParsing as VarParsing
from Configuration.AlCa.autoCond import autoCond
import six

options = VarParsing.VarParsing()
options.register('processId',
'0',
VarParsing.VarParsing.multiplicity.singleton,
VarParsing.VarParsing.varType.string,
"Process Id")
options.register('connectionString',
#'sqlite_file:cms_conditions.db', #default value
#'frontier://FrontierProd/CMS_CONDITIONS', #default value
'frontier://FrontierPrep/CMS_CONDITIONS',
#'oracle://cms_orcon_prod/CMS_CONDITIONS',
VarParsing.VarParsing.multiplicity.singleton,
VarParsing.VarParsing.varType.string,
"CondDB Connection string")
options.register('tag',
'BeamSpot_test_updateByLumi_00',
VarParsing.VarParsing.multiplicity.singleton,
VarParsing.VarParsing.varType.string,
"tag for record BeamSpotObjectsRcd")
options.register('snapshotTime',
'', #default value
VarParsing.VarParsing.multiplicity.singleton,
VarParsing.VarParsing.varType.string,
"GlobalTag snapshot time")
options.register('refresh',
0, #default value
VarParsing.VarParsing.multiplicity.singleton,
VarParsing.VarParsing.varType.int,
"Refresh type: default no refresh")
options.register('runNumber',
115, #default value, int limit -3
VarParsing.VarParsing.multiplicity.singleton,
VarParsing.VarParsing.varType.int,
"Run number; default gives latest IOV")
options.register('eventsPerLumi',
2, #default value
VarParsing.VarParsing.multiplicity.singleton,
VarParsing.VarParsing.varType.int,
"number of events per lumi")
options.register('numberOfLumis',
20, #default value
VarParsing.VarParsing.multiplicity.singleton,
VarParsing.VarParsing.varType.int,
"number of lumisections per run")
options.register('numberOfRuns',
1, #default value
VarParsing.VarParsing.multiplicity.singleton,
VarParsing.VarParsing.varType.int,
"number of runs in the job")
options.register('messageLevel',
0, #default value
VarParsing.VarParsing.multiplicity.singleton,
VarParsing.VarParsing.varType.int,
"Message level; default to 0")
options.register('security',
'', #default value
VarParsing.VarParsing.multiplicity.singleton,
VarParsing.VarParsing.varType.string,
"FroNTier connection security: activate it with 'sig'")

options.parseArguments()

process = cms.Process("TEST")

process.MessageLogger = cms.Service("MessageLogger",
cout = cms.untracked.PSet(threshold = cms.untracked.string('INFO')),
destinations = cms.untracked.vstring('cout')
)
#process.MessageLogger = cms.Service( "MessageLogger",
# destinations = cms.untracked.vstring( 'detailedInfo' ),
# detailedInfo = cms.untracked.PSet( threshold = cms.untracked.string( 'INFO' ) ),
# )

CondDBParameters = cms.PSet( authenticationPath = cms.untracked.string( '/build/gg/' ),
authenticationSystem = cms.untracked.int32( 0 ),
messageLevel = cms.untracked.int32( options.messageLevel ),
security = cms.untracked.string( options.security ),
)

refreshAlways, refreshOpenIOVs, refreshEachRun, reconnectEachRun = False, False, False, False
if options.refresh == 0:
refreshAlways, refreshOpenIOVs, refreshEachRun, reconnectEachRun = False, False, False, False
elif options.refresh == 1:
refreshAlways = True
refreshOpenIOVs, refreshEachRun, reconnectEachRun = False, False, False
elif options.refresh == 2:
refreshAlways = False
refreshOpenIOVs = True
refreshEachRun, reconnectEachRun = False, False
elif options.refresh == 3:
refreshAlways, refreshOpenIOVs = False, False
refreshEachRun = True
reconnectEachRun = False
elif options.refresh == 4:
refreshAlways, refreshOpenIOVs, refreshEachRun = False, False, False
reconnectEachRun = True

process.GlobalTag = cms.ESSource( "PoolDBESSource",
DBParameters = CondDBParameters,
connect = cms.string( options.connectionString ),
snapshotTime = cms.string( options.snapshotTime ),
toGet = cms.VPSet(cms.PSet(
record = cms.string('BeamSpotObjectsRcd'),
tag = cms.string( options.tag ),
refreshTime = cms.uint64( 1 )
)),
RefreshAlways = cms.untracked.bool( refreshAlways ),
RefreshOpenIOVs = cms.untracked.bool( refreshOpenIOVs ),
RefreshEachRun = cms.untracked.bool( refreshEachRun ),
ReconnectEachRun = cms.untracked.bool( reconnectEachRun ),
DumpStat = cms.untracked.bool( True ),
)


#TODO: add VarParsing support for adding custom conditions
#process.GlobalTag.toGet.append( cms.PSet( record = cms.string( "BeamSpotObjectsRcd" ),
# tag = cms.string( "firstcollisions" ),
# connect = cms.string( "frontier://FrontierProd/CMS_CONDITIONS" ),
# snapshotTime = cms.string('2014-01-01 00:00:00.000'),
# )
# )

process.source = cms.Source( "FileBasedEmptySource",
interval = cms.uint32( 5 ),
maxEvents = cms.uint32( 2 ),
pathForLastLumiFile = cms.string('/build/gg/last_lumi.txt'),
firstLuminosityBlock = cms.untracked.uint32(1),
firstRun = cms.untracked.uint32( options.runNumber ),
firstTime = cms.untracked.uint64( ( long( time.time() ) - 24 * 3600 ) << 32 ), #24 hours ago in nanoseconds
numberEventsInRun = cms.untracked.uint32( 1000 ), # options.numberOfLumis lumi sections per run
numberEventsInLuminosityBlock = cms.untracked.uint32( options.eventsPerLumi )
)

#process.maxEvents = cms.untracked.PSet( input = cms.untracked.int32( options.eventsPerLumi * options.numberOfLumis * options.numberOfRuns ) ) #options.numberOfRuns runs per job

process.prod = cms.EDAnalyzer("LumiTestReadAnalyzer",
processId = cms.untracked.string( options.processId ),
pathForLastLumiFile = cms.untracked.string("/build/gg/last_lumi.txt"),
pathForAllLumiFile = cms.untracked.string("./all_time.txt" ),
pathForErrorFile = cms.untracked.string("./lumi_read_errors")
)

#process.get = cms.EDAnalyzer( "EventSetupRecordDataGetter",
# toGet = cms.VPSet(),
# verbose = cms.untracked.bool( True )
# )

#process.escontent = cms.EDAnalyzer( "PrintEventSetupContent",
# compact = cms.untracked.bool( True ),
# printProviders = cms.untracked.bool( True )
# )

#process.esretrieval = cms.EDAnalyzer( "PrintEventSetupDataRetrieval",
# printProviders = cms.untracked.bool( True )
# )

process.p = cms.Path( process.prod )
#process.esout = cms.EndPath( process.escontent + process.esretrieval )
#if process.schedule_() is not None:
# process.schedule_().append( process.esout )

for name, module in six.iteritems(process.es_sources_()):
print("ESModules> provider:%s '%s'" % ( name, module.type_() ))
for name, module in six.iteritems(process.es_producers_()):
print("ESModules> provider:%s '%s'" % ( name, module.type_() ))