Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement lumi skipping for Online DQM Input Sources. #4634

Merged
merged 1 commit into from Jul 15, 2014
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
3 changes: 3 additions & 0 deletions DQMServices/Components/python/test/harv_filePB_cfg.py
Expand Up @@ -11,6 +11,9 @@

from DQMServices.StreamerIO.DQMProtobufReader_cff import DQMProtobufReader
process.source = DQMProtobufReader
process.source.runNumber = cms.untracked.uint32(1)
process.source.runInputDir = cms.untracked.string("./")


elements = c.createElements()

Expand Down
202 changes: 148 additions & 54 deletions DQMServices/StreamerIO/plugins/DQMFileIterator.cc
Expand Up @@ -17,6 +17,7 @@ DQMFileIterator::LumiEntry DQMFileIterator::LumiEntry::load_json(
read_json(filename, pt);

LumiEntry lumi;
lumi.filename = filename;

// We rely on n_events to be the first item on the array...
lumi.n_events = std::next(pt.get_child("data").begin(), 1)
Expand All @@ -26,23 +27,25 @@ DQMFileIterator::LumiEntry DQMFileIterator::LumiEntry::load_json(

if (type == JS_PROTOBUF) {
lumi.datafilename = std::next(pt.get_child("data").begin(), 4)
->second.get_value<std::string>();
->second.get_value<std::string>();
} else {
lumi.datafilename = std::next(pt.get_child("data").begin(), 3)
->second.get_value<std::string>();
->second.get_value<std::string>();
}

lumi.loaded = true;
return lumi;
}

// Content of Eor json file is ignored for the moment since
// the content is not stable
// Contents of Eor json file are ignored for the moment.
// This function will not be called.
DQMFileIterator::EorEntry DQMFileIterator::EorEntry::load_json(
const std::string& filename) {
boost::property_tree::ptree pt;
read_json(filename, pt);

EorEntry eor;
eor.filename = filename;

// We rely on n_events to be the first item on the array...
eor.n_events = std::next(pt.get_child("data").begin(), 1)
Expand All @@ -51,8 +54,8 @@ DQMFileIterator::EorEntry DQMFileIterator::EorEntry::load_json(
->second.get_value<std::size_t>();
eor.datafilename = std::next(pt.get_child("data").begin(), 2)
->second.get_value<std::string>();
eor.loaded = true;

eor.loaded = true;
return eor;
}

Expand All @@ -62,7 +65,9 @@ DQMFileIterator::DQMFileIterator(ParameterSet const& pset, JsonType t)
runNumber_ = pset.getUntrackedParameter<unsigned int>("runNumber");
runInputDir_ = pset.getUntrackedParameter<std::string>("runInputDir");
streamLabel_ = pset.getUntrackedParameter<std::string>("streamLabel");
delayMillis_ = pset.getUntrackedParameter<unsigned int>("delayMillis");
delayMillis_ = pset.getUntrackedParameter<uint32_t>("delayMillis");
nextLumiTimeoutMillis_ =
pset.getUntrackedParameter<int32_t>("nextLumiTimeoutMillis");

reset();
}
Expand All @@ -74,35 +79,48 @@ void DQMFileIterator::reset() {

eor_.loaded = false;
state_ = State::OPEN;
lastLumiSeen_ = 0;
currentLumi_ = 1;
lumiSeen_.clear();

while (!queue_.empty()) {
queue_.pop();
}
lastLumiLoad_ = std::chrono::high_resolution_clock::now();

collect(true);
update_state();
}

DQMFileIterator::State DQMFileIterator::state() { return state_; }

const DQMFileIterator::LumiEntry& DQMFileIterator::front() {
return queue_.front();
return lumiSeen_[currentLumi_];
}

void DQMFileIterator::pop() { return queue_.pop(); }
void DQMFileIterator::pop() {
lastLumiLoad_ = std::chrono::high_resolution_clock::now();

bool DQMFileIterator::hasNext() {
update_state();
return !queue_.empty();
currentLumi_ += 1;
}

std::string DQMFileIterator::make_path_jsn(int lumi) {
return str(boost::format("%s/run%06d_ls%04d%s.jsn") % runPath_ % runNumber_ %
lumi % streamLabel_);
bool DQMFileIterator::lumiReady() {
if (lumiSeen_.find(currentLumi_) != lumiSeen_.end()) {
return true;
}

return false;
}

unsigned int DQMFileIterator::runNumber() { return runNumber_; }

unsigned int DQMFileIterator::lastLumiFound() {
if (!lumiSeen_.empty()) {
return lumiSeen_.rbegin()->first;
}

return 1;
}

std::string DQMFileIterator::make_path_eor() {
return str(boost::format("%s/run%06d_ls0000_EoR.jsn") % runPath_ % runNumber_);
void DQMFileIterator::advanceToLumi(unsigned int lumi) {
currentLumi_ = lumi;
lastLumiLoad_ = std::chrono::high_resolution_clock::now();
}

std::string DQMFileIterator::make_path_data(const LumiEntry& lumi) {
Expand All @@ -113,55 +131,101 @@ std::string DQMFileIterator::make_path_data(const LumiEntry& lumi) {
return p.string();
}

void DQMFileIterator::collect() {
void DQMFileIterator::collect(bool ignoreTimers) {
// search filesystem to find available lumi section files
// or the end of run file
// or the end of run files

auto now = std::chrono::high_resolution_clock::now();
auto last_ms = std::chrono::duration_cast<std::chrono::milliseconds>(
now - last_collect_).count();

if (last_ms < 100) return;
now - runPathLastCollect_).count();

last_collect_ = now;
// don't refresh if it's too soon
if ((!ignoreTimers) && (last_ms < 100)) {
return;
} else {
runPathLastCollect_ = now;
}

if (!eor_.loaded) {
// end of run is not yet read
std::string fn_eor = make_path_eor();
logFileAction("Checking eor file: ", fn_eor);
// check if directory changed
std::time_t t = boost::filesystem::last_write_time(runPath_);

if (boost::filesystem::exists(fn_eor)) {
eor_.loaded = true;
logFileAction("eor file exist ", fn_eor);
// eor_ = EorEntry::load_json(fn_eor);
// logFileAction("Loaded eor file: ", fn_eor);
}
if ((!ignoreTimers) && (t <= runPathMTime_)) {
logFileAction("Directory hasn't changed.");
return;
} else {
logFileAction("Directory changed, updating.");
runPathMTime_ = t;
}

int nextLumi = lastLumiSeen_; // initiate lumi
for (;;) {
nextLumi += 1;
using boost::filesystem::directory_iterator;
using boost::filesystem::directory_entry;

std::string fn_eor;

directory_iterator dend;
for (directory_iterator di(runPath_); di != dend; ++di) {
const boost::regex fn_re("run(\\d+)_ls(\\d+)(_.*).jsn");

std::string fn = make_path_jsn(nextLumi);
logFileAction("Checking json file: ", fn);
const std::string filename = di->path().filename().string();
const std::string fn = di->path().string();

if (!boost::filesystem::exists(fn)) {
// file not yet available
break;
boost::smatch result;
if (boost::regex_match(filename, result, fn_re)) {
unsigned int run = std::stoi(result[1]);
unsigned int lumi = std::stoi(result[2]);
std::string label = result[3];

if (run != runNumber_) continue;

// check if this is EoR
// for various reasons we have to load it after all other files
if ((lumi == 0) && (label == "_EoR") && (!eor_.loaded)) {
fn_eor = fn;
continue;
}

// check if lumi is loaded
if (lumiSeen_.find(lumi) != lumiSeen_.end()) {
continue; // already loaded
}

// check if this belongs to us
if (label != streamLabel_) {
logFileAction("Found and skipped json file (stream label mismatch): ",
fn);
continue;
}

LumiEntry lumi_jsn = LumiEntry::load_json(fn, lumi, type_);
lumiSeen_.emplace(lumi, lumi_jsn);
logFileAction("Found and loaded json file: ", fn);
}
}

if (!fn_eor.empty()) {
logFileAction("EoR file found: ", fn_eor);

LumiEntry lumi;
lumi = LumiEntry::load_json(fn, nextLumi, type_);
// @TODO load EoR files correctly
// eor_ = EorEntry::load_json(fn_eor);
// logFileAction("Loaded eor file: ", fn_eor);

lastLumiSeen_ = nextLumi;
queue_.push(lumi);
// for now , set n_lumi to the highest _found_ lumi
eor_.loaded = true;

logFileAction("Loaded json file: ", fn);
if (lumiSeen_.empty()) {
eor_.n_lumi = 0;
} else {
eor_.n_lumi = lumiSeen_.rbegin()->first;
}
}
}

void DQMFileIterator::update_state() {
collect();
using std::chrono::high_resolution_clock;
using std::chrono::duration_cast;
using std::chrono::milliseconds;

collect(false);

// now update the state
State old_state = state_;
Expand All @@ -170,10 +234,36 @@ void DQMFileIterator::update_state() {
state_ = State::EOR_CLOSING;
}

// special case for missing lumi files
// skip to the next available, but after the timeout
if ((state_ != State::EOR) && (nextLumiTimeoutMillis_ >= 0)) {
auto iter = lumiSeen_.lower_bound(currentLumi_);
if ((iter != lumiSeen_.end()) && iter->first != currentLumi_) {

auto elapsed = high_resolution_clock::now() - lastLumiLoad_;
auto elapsed_ms = duration_cast<milliseconds>(elapsed).count();

if (elapsed_ms >= nextLumiTimeoutMillis_) {
std::string msg("Timeout reached, skipping lumisection(s) ");
msg += std::to_string(currentLumi_) + " .. " +
std::to_string(iter->first - 1);
msg += ", currentLumi_ is now " + std::to_string(iter->first);

logFileAction(msg);

currentLumi_ = iter->first;
}
}
}

if (state_ == State::EOR_CLOSING) {
if (int(eor_.n_lumi) <= lastLumiSeen_) {
// last lumi number is also the number of lumis
// ie lumi start from 1
// check if we parsed all lumis
// n_lumi is both last lumi and the number of lumi
// since lumis are indexed from 1

// after all lumi have been pop()'ed
// current lumi will become larger than the last lumi
if (currentLumi_ > eor_.n_lumi) {
state_ = State::EOR;
}
}
Expand Down Expand Up @@ -215,9 +305,13 @@ void DQMFileIterator::fillDescription(ParameterSetDescription& desc) {
desc.addUntracked<std::string>("streamLabel")
->setComment("Stream label used in json discovery.");

desc.addUntracked<unsigned int>("delayMillis")
desc.addUntracked<uint32_t>("delayMillis")
->setComment("Number of milliseconds to wait between file checks.");

desc.addUntracked<int32_t>("nextLumiTimeoutMillis", -1)->setComment(
"Number of milliseconds to wait before switching to the next lumi "
"section if the current is missing, -1 to disable.");

desc.addUntracked<std::string>("runInputDir")
->setComment("Directory where the DQM files will appear.");
}
Expand Down