Skip to content

Commit

Permalink
Merge pull request #38030 from wddgit/modsToRunToCompletion
Browse files Browse the repository at this point in the history
Prepare runToCompletion() for concurrent runs
  • Loading branch information
cmsbuild committed May 21, 2022
2 parents 7b8c075 + 86a9954 commit 74c3c51
Show file tree
Hide file tree
Showing 5 changed files with 58 additions and 68 deletions.
6 changes: 2 additions & 4 deletions FWCore/Framework/interface/EventProcessor.h
Expand Up @@ -253,7 +253,7 @@ namespace edm {
bool shouldWeStop() const;

void setExceptionMessageFiles(std::string& message);
void setExceptionMessageRuns(std::string& message);
void setExceptionMessageRuns();
void setExceptionMessageLumis();

bool setDeferredException(std::exception_ptr);
Expand Down Expand Up @@ -350,16 +350,14 @@ namespace edm {
bool shouldWeStop_;
bool fileModeNoMerge_;
std::string exceptionMessageFiles_;
std::string exceptionMessageRuns_;
std::atomic<bool> exceptionMessageRuns_;
std::atomic<bool> exceptionMessageLumis_;
bool forceLooperToEnd_;
bool looperBeginJobRun_;
bool forceESCacheClearOnNewRun_;

PreallocationConfiguration preallocations_;

bool asyncStopRequestedWhileProcessingEvents_;
StatusCode asyncStopStatusCodeFromProcessingEvents_;
bool firstEventInBlock_ = true;

typedef std::set<std::pair<std::string, std::string>> ExcludedData;
Expand Down
112 changes: 53 additions & 59 deletions FWCore/Framework/src/EventProcessor.cc
Expand Up @@ -257,7 +257,7 @@ namespace edm {
shouldWeStop_(false),
fileModeNoMerge_(false),
exceptionMessageFiles_(),
exceptionMessageRuns_(),
exceptionMessageRuns_(false),
exceptionMessageLumis_(false),
forceLooperToEnd_(false),
looperBeginJobRun_(false),
Expand Down Expand Up @@ -293,12 +293,11 @@ namespace edm {
shouldWeStop_(false),
fileModeNoMerge_(false),
exceptionMessageFiles_(),
exceptionMessageRuns_(),
exceptionMessageRuns_(false),
exceptionMessageLumis_(false),
forceLooperToEnd_(false),
looperBeginJobRun_(false),
forceESCacheClearOnNewRun_(false),
asyncStopRequestedWhileProcessingEvents_(false),
eventSetupDataToExcludeFromPrefetching_() {
auto processDesc = std::make_shared<ProcessDesc>(std::move(parameterSet));
processDesc->addServices(defaultServices, forcedServices);
Expand Down Expand Up @@ -330,12 +329,11 @@ namespace edm {
shouldWeStop_(false),
fileModeNoMerge_(false),
exceptionMessageFiles_(),
exceptionMessageRuns_(),
exceptionMessageRuns_(false),
exceptionMessageLumis_(false),
forceLooperToEnd_(false),
looperBeginJobRun_(false),
forceESCacheClearOnNewRun_(false),
asyncStopRequestedWhileProcessingEvents_(false),
eventSetupDataToExcludeFromPrefetching_() {
init(processDesc, token, legacy);
}
Expand Down Expand Up @@ -847,72 +845,68 @@ namespace edm {
edm::LuminosityBlockNumber_t EventProcessor::nextLuminosityBlockID() { return input_->luminosityBlock(); }

EventProcessor::StatusCode EventProcessor::runToCompletion() {
StatusCode returnCode = epSuccess;
asyncStopStatusCodeFromProcessingEvents_ = epSuccess;
{
beginJob(); //make sure this was called
beginJob(); //make sure this was called

// make the services available
ServiceRegistry::Operate operate(serviceToken_);
// make the services available
ServiceRegistry::Operate operate(serviceToken_);

asyncStopRequestedWhileProcessingEvents_ = false;
try {
FilesProcessor fp(fileModeNoMerge_);
try {
FilesProcessor fp(fileModeNoMerge_);

convertException::wrap([&]() {
bool firstTime = true;
do {
if (not firstTime) {
prepareForNextLoop();
rewindInput();
} else {
firstTime = false;
}
startingNewLoop();
convertException::wrap([&]() {
bool firstTime = true;
do {
if (not firstTime) {
prepareForNextLoop();
rewindInput();
} else {
firstTime = false;
}
startingNewLoop();

auto trans = fp.processFiles(*this);
auto trans = fp.processFiles(*this);

fp.normalEnd();
fp.normalEnd();

if (deferredExceptionPtrIsSet_.load()) {
std::rethrow_exception(deferredExceptionPtr_);
}
if (trans != InputSource::IsStop) {
//problem with the source
doErrorStuff();
if (deferredExceptionPtrIsSet_.load()) {
std::rethrow_exception(deferredExceptionPtr_);
}
if (trans != InputSource::IsStop) {
//problem with the source
doErrorStuff();

throw cms::Exception("BadTransition") << "Unexpected transition change " << trans;
}
} while (not endOfLoop());
}); // convertException::wrap

} // Try block
catch (cms::Exception& e) {
if (exceptionMessageLumis_) {
std::string message(
"Another exception was caught while trying to clean up lumis after the primary fatal exception.");
e.addAdditionalInfo(message);
if (e.alreadyPrinted()) {
LogAbsolute("Additional Exceptions") << message;
throw cms::Exception("BadTransition") << "Unexpected transition change " << trans;
}
} while (not endOfLoop());
}); // convertException::wrap

} // Try block
catch (cms::Exception& e) {
if (exceptionMessageLumis_) {
std::string message(
"Another exception was caught while trying to clean up lumis after the primary fatal exception.");
e.addAdditionalInfo(message);
if (e.alreadyPrinted()) {
LogAbsolute("Additional Exceptions") << message;
}
if (!exceptionMessageRuns_.empty()) {
e.addAdditionalInfo(exceptionMessageRuns_);
if (e.alreadyPrinted()) {
LogAbsolute("Additional Exceptions") << exceptionMessageRuns_;
}
}
if (exceptionMessageRuns_) {
std::string message(
"Another exception was caught while trying to clean up runs after the primary fatal exception.");
e.addAdditionalInfo(message);
if (e.alreadyPrinted()) {
LogAbsolute("Additional Exceptions") << message;
}
if (!exceptionMessageFiles_.empty()) {
e.addAdditionalInfo(exceptionMessageFiles_);
if (e.alreadyPrinted()) {
LogAbsolute("Additional Exceptions") << exceptionMessageFiles_;
}
}
if (!exceptionMessageFiles_.empty()) {
e.addAdditionalInfo(exceptionMessageFiles_);
if (e.alreadyPrinted()) {
LogAbsolute("Additional Exceptions") << exceptionMessageFiles_;
}
throw;
}
throw;
}

return returnCode;
return epSuccess;
}

void EventProcessor::readFile() {
Expand Down Expand Up @@ -2026,7 +2020,7 @@ namespace edm {

void EventProcessor::setExceptionMessageFiles(std::string& message) { exceptionMessageFiles_ = message; }

void EventProcessor::setExceptionMessageRuns(std::string& message) { exceptionMessageRuns_ = message; }
void EventProcessor::setExceptionMessageRuns() { exceptionMessageRuns_ = true; }

void EventProcessor::setExceptionMessageLumis() { exceptionMessageLumis_ = true; }

Expand Down
4 changes: 1 addition & 3 deletions FWCore/Framework/src/TransitionProcessors.icc
Expand Up @@ -62,9 +62,7 @@ struct RunResources {
eventSetupForInstanceSucceeded_);
} catch (...) {
if (cleaningUpAfterException_ or not ep_.setDeferredException(std::current_exception())) {
std::string message(
"Another exception was caught while trying to clean up runs after the primary fatal exception.");
ep_.setExceptionMessageRuns(message);
ep_.setExceptionMessageRuns();
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion FWCore/Framework/test/MockEventProcessor.cc
Expand Up @@ -346,7 +346,7 @@ namespace edm {
}

void MockEventProcessor::setExceptionMessageFiles(std::string&) {}
void MockEventProcessor::setExceptionMessageRuns(std::string&) {}
void MockEventProcessor::setExceptionMessageRuns() {}
void MockEventProcessor::setExceptionMessageLumis() {}

bool MockEventProcessor::setDeferredException(std::exception_ptr) { return true; }
Expand Down
2 changes: 1 addition & 1 deletion FWCore/Framework/test/MockEventProcessor.h
Expand Up @@ -86,7 +86,7 @@ namespace edm {
bool shouldWeStop() const;

void setExceptionMessageFiles(std::string& message);
void setExceptionMessageRuns(std::string& message);
void setExceptionMessageRuns();
void setExceptionMessageLumis();

bool setDeferredException(std::exception_ptr);
Expand Down

0 comments on commit 74c3c51

Please sign in to comment.