Skip to content

Commit

Permalink
Implement lumi skipping for Online DQM Input Sources.
Browse files Browse the repository at this point in the history
In several cases some lumisections won't appear in our storage,
in such case, we should start processing the next available lumisection.

This commit implements a flag _nextLumiTimeoutMillis_.
If it is set, each missing lumi will start a timer.
After the timer is triggered, and if and only if the next lumi is
available, the source will switch to that lumi.

For example:

1. nextLumiTimeMillis is set to 45000
2. we see files ls0001 ls0002 ls0005
3. source processes ls 1 and 2
4. 45 seconds pass
5. source skips 3..4, and starts processing ls0005

This commit also changes how EoR files work: instead of relying on EoR
content, the source uses the highest *seen* lumi as the number of lumis.
This is compatible with our code in offline and online (where EoR are,
for now, empty).
  • Loading branch information
Dmitrijus Bugelskis committed Jul 14, 2014
1 parent d1c7f42 commit a14c0f8
Show file tree
Hide file tree
Showing 7 changed files with 227 additions and 89 deletions.
3 changes: 3 additions & 0 deletions DQMServices/Components/python/test/harv_filePB_cfg.py
Expand Up @@ -11,6 +11,9 @@

from DQMServices.StreamerIO.DQMProtobufReader_cff import DQMProtobufReader
process.source = DQMProtobufReader
process.source.runNumber = cms.untracked.uint32(1)
process.source.runInputDir = cms.untracked.string("./")


elements = c.createElements()

Expand Down
202 changes: 148 additions & 54 deletions DQMServices/StreamerIO/plugins/DQMFileIterator.cc
Expand Up @@ -17,6 +17,7 @@ DQMFileIterator::LumiEntry DQMFileIterator::LumiEntry::load_json(
read_json(filename, pt);

LumiEntry lumi;
lumi.filename = filename;

// We rely on n_events to be the first item on the array...
lumi.n_events = std::next(pt.get_child("data").begin(), 1)
Expand All @@ -26,23 +27,25 @@ DQMFileIterator::LumiEntry DQMFileIterator::LumiEntry::load_json(

if (type == JS_PROTOBUF) {
lumi.datafilename = std::next(pt.get_child("data").begin(), 4)
->second.get_value<std::string>();
->second.get_value<std::string>();
} else {
lumi.datafilename = std::next(pt.get_child("data").begin(), 3)
->second.get_value<std::string>();
->second.get_value<std::string>();
}

lumi.loaded = true;
return lumi;
}

// Content of Eor json file is ignored for the moment since
// the content is not stable
// Contents of Eor json file are ignored for the moment.
// This function will not be called.
DQMFileIterator::EorEntry DQMFileIterator::EorEntry::load_json(
const std::string& filename) {
boost::property_tree::ptree pt;
read_json(filename, pt);

EorEntry eor;
eor.filename = filename;

// We rely on n_events to be the first item on the array...
eor.n_events = std::next(pt.get_child("data").begin(), 1)
Expand All @@ -51,8 +54,8 @@ DQMFileIterator::EorEntry DQMFileIterator::EorEntry::load_json(
->second.get_value<std::size_t>();
eor.datafilename = std::next(pt.get_child("data").begin(), 2)
->second.get_value<std::string>();
eor.loaded = true;

eor.loaded = true;
return eor;
}

Expand All @@ -62,7 +65,9 @@ DQMFileIterator::DQMFileIterator(ParameterSet const& pset, JsonType t)
runNumber_ = pset.getUntrackedParameter<unsigned int>("runNumber");
runInputDir_ = pset.getUntrackedParameter<std::string>("runInputDir");
streamLabel_ = pset.getUntrackedParameter<std::string>("streamLabel");
delayMillis_ = pset.getUntrackedParameter<unsigned int>("delayMillis");
delayMillis_ = pset.getUntrackedParameter<uint32_t>("delayMillis");
nextLumiTimeoutMillis_ =
pset.getUntrackedParameter<int32_t>("nextLumiTimeoutMillis");

reset();
}
Expand All @@ -74,35 +79,48 @@ void DQMFileIterator::reset() {

eor_.loaded = false;
state_ = State::OPEN;
lastLumiSeen_ = 0;
currentLumi_ = 1;
lumiSeen_.clear();

while (!queue_.empty()) {
queue_.pop();
}
lastLumiLoad_ = std::chrono::high_resolution_clock::now();

collect(true);
update_state();
}

DQMFileIterator::State DQMFileIterator::state() { return state_; }

const DQMFileIterator::LumiEntry& DQMFileIterator::front() {
return queue_.front();
return lumiSeen_[currentLumi_];
}

void DQMFileIterator::pop() { return queue_.pop(); }
void DQMFileIterator::pop() {
lastLumiLoad_ = std::chrono::high_resolution_clock::now();

bool DQMFileIterator::hasNext() {
update_state();
return !queue_.empty();
currentLumi_ += 1;
}

std::string DQMFileIterator::make_path_jsn(int lumi) {
return str(boost::format("%s/run%06d_ls%04d%s.jsn") % runPath_ % runNumber_ %
lumi % streamLabel_);
bool DQMFileIterator::lumiReady() {
if (lumiSeen_.find(currentLumi_) != lumiSeen_.end()) {
return true;
}

return false;
}

unsigned int DQMFileIterator::runNumber() { return runNumber_; }

unsigned int DQMFileIterator::lastLumiFound() {
if (!lumiSeen_.empty()) {
return lumiSeen_.rbegin()->first;
}

return 1;
}

std::string DQMFileIterator::make_path_eor() {
return str(boost::format("%s/run%06d_ls0000_EoR.jsn") % runPath_ % runNumber_);
void DQMFileIterator::advanceToLumi(unsigned int lumi) {
currentLumi_ = lumi;
lastLumiLoad_ = std::chrono::high_resolution_clock::now();
}

std::string DQMFileIterator::make_path_data(const LumiEntry& lumi) {
Expand All @@ -113,55 +131,101 @@ std::string DQMFileIterator::make_path_data(const LumiEntry& lumi) {
return p.string();
}

void DQMFileIterator::collect() {
void DQMFileIterator::collect(bool ignoreTimers) {
// search filesystem to find available lumi section files
// or the end of run file
// or the end of run files

auto now = std::chrono::high_resolution_clock::now();
auto last_ms = std::chrono::duration_cast<std::chrono::milliseconds>(
now - last_collect_).count();

if (last_ms < 100) return;
now - runPathLastCollect_).count();

last_collect_ = now;
// don't refresh if it's too soon
if ((!ignoreTimers) && (last_ms < 100)) {
return;
} else {
runPathLastCollect_ = now;
}

if (!eor_.loaded) {
// end of run is not yet read
std::string fn_eor = make_path_eor();
logFileAction("Checking eor file: ", fn_eor);
// check if directory changed
std::time_t t = boost::filesystem::last_write_time(runPath_);

if (boost::filesystem::exists(fn_eor)) {
eor_.loaded = true;
logFileAction("eor file exist ", fn_eor);
// eor_ = EorEntry::load_json(fn_eor);
// logFileAction("Loaded eor file: ", fn_eor);
}
if ((!ignoreTimers) && (t <= runPathMTime_)) {
logFileAction("Directory hasn't changed.");
return;
} else {
logFileAction("Directory changed, updating.");
runPathMTime_ = t;
}

int nextLumi = lastLumiSeen_; // initiate lumi
for (;;) {
nextLumi += 1;
using boost::filesystem::directory_iterator;
using boost::filesystem::directory_entry;

std::string fn_eor;

directory_iterator dend;
for (directory_iterator di(runPath_); di != dend; ++di) {
const boost::regex fn_re("run(\\d+)_ls(\\d+)(_.*).jsn");

std::string fn = make_path_jsn(nextLumi);
logFileAction("Checking json file: ", fn);
const std::string filename = di->path().filename().string();
const std::string fn = di->path().string();

if (!boost::filesystem::exists(fn)) {
// file not yet available
break;
boost::smatch result;
if (boost::regex_match(filename, result, fn_re)) {
unsigned int run = std::stoi(result[1]);
unsigned int lumi = std::stoi(result[2]);
std::string label = result[3];

if (run != runNumber_) continue;

// check if this is EoR
// for various reasons we have to load it after all other files
if ((lumi == 0) && (label == "_EoR") && (!eor_.loaded)) {
fn_eor = fn;
continue;
}

// check if lumi is loaded
if (lumiSeen_.find(lumi) != lumiSeen_.end()) {
continue; // already loaded
}

// check if this belongs to us
if (label != streamLabel_) {
logFileAction("Found and skipped json file (stream label mismatch): ",
fn);
continue;
}

LumiEntry lumi_jsn = LumiEntry::load_json(fn, lumi, type_);
lumiSeen_.emplace(lumi, lumi_jsn);
logFileAction("Found and loaded json file: ", fn);
}
}

if (!fn_eor.empty()) {
logFileAction("EoR file found: ", fn_eor);

LumiEntry lumi;
lumi = LumiEntry::load_json(fn, nextLumi, type_);
// @TODO load EoR files correctly
// eor_ = EorEntry::load_json(fn_eor);
// logFileAction("Loaded eor file: ", fn_eor);

lastLumiSeen_ = nextLumi;
queue_.push(lumi);
// for now , set n_lumi to the highest _found_ lumi
eor_.loaded = true;

logFileAction("Loaded json file: ", fn);
if (lumiSeen_.empty()) {
eor_.n_lumi = 0;
} else {
eor_.n_lumi = lumiSeen_.rbegin()->first;
}
}
}

void DQMFileIterator::update_state() {
collect();
using std::chrono::high_resolution_clock;
using std::chrono::duration_cast;
using std::chrono::milliseconds;

collect(false);

// now update the state
State old_state = state_;
Expand All @@ -170,10 +234,36 @@ void DQMFileIterator::update_state() {
state_ = State::EOR_CLOSING;
}

// special case for missing lumi files
// skip to the next available, but after the timeout
if ((state_ != State::EOR) && (nextLumiTimeoutMillis_ >= 0)) {
auto iter = lumiSeen_.lower_bound(currentLumi_);
if ((iter != lumiSeen_.end()) && iter->first != currentLumi_) {

auto elapsed = high_resolution_clock::now() - lastLumiLoad_;
auto elapsed_ms = duration_cast<milliseconds>(elapsed).count();

if (elapsed_ms >= nextLumiTimeoutMillis_) {
std::string msg("Timeout reached, skipping lumisection(s) ");
msg += std::to_string(currentLumi_) + " .. " +
std::to_string(iter->first - 1);
msg += ", currentLumi_ is now " + std::to_string(iter->first);

logFileAction(msg);

currentLumi_ = iter->first;
}
}
}

if (state_ == State::EOR_CLOSING) {
if (int(eor_.n_lumi) <= lastLumiSeen_) {
// last lumi number is also the number of lumis
// ie lumi start from 1
// check if we parsed all lumis
// n_lumi is both last lumi and the number of lumi
// since lumis are indexed from 1

// after all lumi have been pop()'ed
// current lumi will become larger than the last lumi
if (currentLumi_ > eor_.n_lumi) {
state_ = State::EOR;
}
}
Expand Down Expand Up @@ -215,9 +305,13 @@ void DQMFileIterator::fillDescription(ParameterSetDescription& desc) {
desc.addUntracked<std::string>("streamLabel")
->setComment("Stream label used in json discovery.");

desc.addUntracked<unsigned int>("delayMillis")
desc.addUntracked<uint32_t>("delayMillis")
->setComment("Number of milliseconds to wait between file checks.");

desc.addUntracked<int32_t>("nextLumiTimeoutMillis", -1)->setComment(
"Number of milliseconds to wait before switching to the next lumi "
"section if the current is missing, -1 to disable.");

desc.addUntracked<std::string>("runInputDir")
->setComment("Directory where the DQM files will appear.");
}
Expand Down

0 comments on commit a14c0f8

Please sign in to comment.