Skip to content

Commit

Permalink
Revert "Implement support for concurrent runs in the Framework"
Browse files Browse the repository at this point in the history
  • Loading branch information
perrotta committed Oct 5, 2022
1 parent e572ac7 commit a2d3721
Show file tree
Hide file tree
Showing 106 changed files with 1,931 additions and 3,068 deletions.
88 changes: 44 additions & 44 deletions FWCore/Framework/interface/EventProcessor.h
Expand Up @@ -35,7 +35,6 @@ configured in the user's main() function, and is set running.
#include "FWCore/Utilities/interface/get_underlying_safe.h"
#include "FWCore/Utilities/interface/propagate_const.h"

#include <atomic>
#include <map>
#include <memory>
#include <set>
Expand All @@ -57,7 +56,6 @@ namespace edm {
class WaitingTaskHolder;
class LuminosityBlockPrincipal;
class LuminosityBlockProcessingStatus;
class RunProcessingStatus;
class IOVSyncValue;

namespace eventsetup {
Expand Down Expand Up @@ -185,7 +183,14 @@ namespace edm {
// transition handling.

InputSource::ItemType nextTransitionType();
InputSource::ItemType lastTransitionType() const { return lastSourceTransition_; }
InputSource::ItemType lastTransitionType() const {
if (deferredExceptionPtrIsSet_) {
return InputSource::IsStop;
}
return lastSourceTransition_;
}
std::pair<edm::ProcessHistoryID, edm::RunNumber_t> nextRunID();
edm::LuminosityBlockNumber_t nextLuminosityBlockID();

void readFile();
bool fileBlockValid() { return fb_.get() != nullptr; }
Expand All @@ -208,35 +213,42 @@ namespace edm {
void inputProcessBlocks();
void endProcessBlock(bool cleaningUpAfterException, bool beginProcessBlockSucceeded);

InputSource::ItemType processRuns();
void beginRunAsync(IOVSyncValue const&, WaitingTaskHolder);
void streamBeginRunAsync(unsigned int iStream,
std::shared_ptr<RunProcessingStatus>,
bool precedingTasksSucceeded,
WaitingTaskHolder);
void releaseBeginRunResources(unsigned int iStream);
void endRunAsync(std::shared_ptr<RunProcessingStatus>, WaitingTaskHolder);
void handleEndRunExceptions(std::exception_ptr, WaitingTaskHolder const&);
void globalEndRunAsync(WaitingTaskHolder, std::shared_ptr<RunProcessingStatus>);
void streamEndRunAsync(WaitingTaskHolder, unsigned int iStreamIndex);
void endUnfinishedRun(bool cleaningUpAfterException);
void beginLumiAsync(IOVSyncValue const&, std::shared_ptr<RunProcessingStatus>, WaitingTaskHolder);
void continueLumiAsync(WaitingTaskHolder);
void handleEndLumiExceptions(std::exception_ptr, WaitingTaskHolder const&);
void globalEndLumiAsync(WaitingTaskHolder, std::shared_ptr<LuminosityBlockProcessingStatus>);
void streamEndLumiAsync(WaitingTaskHolder, unsigned int iStreamIndex);
void endUnfinishedLumi(bool cleaningUpAfterException);
void beginRun(ProcessHistoryID const& phid,
RunNumber_t run,
bool& globalBeginSucceeded,
bool& eventSetupForInstanceSucceeded);
void endRun(ProcessHistoryID const& phid, RunNumber_t run, bool globalBeginSucceeded, bool cleaningUpAfterException);
void endUnfinishedRun(ProcessHistoryID const& phid,
RunNumber_t run,
bool globalBeginSucceeded,
bool cleaningUpAfterException,
bool eventSetupForInstanceSucceeded);

InputSource::ItemType processLumis(std::shared_ptr<void> const& iRunResource);
void endUnfinishedLumi();

void beginLumiAsync(edm::IOVSyncValue const& iSyncValue,
std::shared_ptr<void> const& iRunResource,
edm::WaitingTaskHolder iHolder);
void continueLumiAsync(edm::WaitingTaskHolder iHolder);

void handleEndLumiExceptions(std::exception_ptr const* iPtr, WaitingTaskHolder& holder);
void globalEndLumiAsync(edm::WaitingTaskHolder iTask, std::shared_ptr<LuminosityBlockProcessingStatus> iLumiStatus);
void streamEndLumiAsync(edm::WaitingTaskHolder iTask, unsigned int iStreamIndex);
void readProcessBlock(ProcessBlockPrincipal&);
std::shared_ptr<RunPrincipal> readRun();
void readAndMergeRun(RunProcessingStatus&);
std::shared_ptr<LuminosityBlockPrincipal> readLuminosityBlock(std::shared_ptr<RunPrincipal> rp);
void readAndMergeLumi(LuminosityBlockProcessingStatus&);
std::pair<ProcessHistoryID, RunNumber_t> readRun();
std::pair<ProcessHistoryID, RunNumber_t> readAndMergeRun();
void readLuminosityBlock(LuminosityBlockProcessingStatus&);
int readAndMergeLumi(LuminosityBlockProcessingStatus&);
using ProcessBlockType = PrincipalCache::ProcessBlockType;
void writeProcessBlockAsync(WaitingTaskHolder, ProcessBlockType);
void writeRunAsync(WaitingTaskHolder, RunPrincipal const&, MergeableRunProductMetadata const*);
void clearRunPrincipal(RunProcessingStatus&);
void writeLumiAsync(WaitingTaskHolder, LuminosityBlockPrincipal&);
void clearLumiPrincipal(LuminosityBlockProcessingStatus&);
void writeRunAsync(WaitingTaskHolder,
ProcessHistoryID const& phid,
RunNumber_t run,
MergeableRunProductMetadata const*);
void deleteRunFromCache(ProcessHistoryID const& phid, RunNumber_t run);
void writeLumiAsync(WaitingTaskHolder, LuminosityBlockPrincipal& lumiPrincipal);
void deleteLumiFromCache(LuminosityBlockProcessingStatus&);

bool shouldWeStop() const;

Expand All @@ -253,14 +265,9 @@ namespace edm {
// init() is used by only by constructors
void init(std::shared_ptr<ProcessDesc>& processDesc, ServiceToken const& token, serviceregistry::ServiceLegacy);

void readAndMergeRunEntriesAsync(std::shared_ptr<RunProcessingStatus>, WaitingTaskHolder);
void readAndMergeLumiEntriesAsync(std::shared_ptr<LuminosityBlockProcessingStatus>, WaitingTaskHolder);

void handleNextItemAfterMergingRunEntries(std::shared_ptr<RunProcessingStatus>, WaitingTaskHolder);

bool readNextEventForStream(WaitingTaskHolder const&, unsigned int iStreamIndex, LuminosityBlockProcessingStatus&);
bool readNextEventForStream(unsigned int iStreamIndex, LuminosityBlockProcessingStatus& iLumiStatus);

void handleNextEventForStreamAsync(WaitingTaskHolder, unsigned int iStreamIndex);
void handleNextEventForStreamAsync(WaitingTaskHolder iTask, unsigned int iStreamIndex);

//read the next event using Stream iStreamIndex
void readEvent(unsigned int iStreamIndex);
Expand Down Expand Up @@ -291,7 +298,6 @@ namespace edm {
std::shared_ptr<EDLooperBase>& looper() { return get_underlying_safe(looper_); }

void throwAboutModulesRequiringLuminosityBlockSynchronization() const;
void warnAboutModulesRequiringRunSynchronization() const;
void warnAboutLegacyModules() const;
//------------------------------------------------------------------
//
Expand All @@ -310,7 +316,7 @@ namespace edm {
edm::propagate_const<std::shared_ptr<ThinnedAssociationsHelper>> thinnedAssociationsHelper_;
ServiceToken serviceToken_;
edm::propagate_const<std::unique_ptr<InputSource>> input_;
InputSource::ItemType lastSourceTransition_ = InputSource::IsInvalid;
InputSource::ItemType lastSourceTransition_;
edm::propagate_const<std::unique_ptr<eventsetup::EventSetupsController>> espController_;
edm::propagate_const<std::shared_ptr<eventsetup::EventSetupProvider>> esp_;
edm::SerialTaskQueue queueWhichWaitsForIOVsToFinish_;
Expand All @@ -321,13 +327,8 @@ namespace edm {
MergeableRunProductProcesses mergeableRunProductProcesses_;
edm::propagate_const<std::unique_ptr<Schedule>> schedule_;
std::vector<edm::SerialTaskQueue> streamQueues_;
SerialTaskQueue streamQueuesInserter_;
std::unique_ptr<edm::LimitedTaskQueue> runQueue_;
std::unique_ptr<edm::LimitedTaskQueue> lumiQueue_;
std::vector<std::shared_ptr<RunProcessingStatus>> streamRunStatus_;
std::shared_ptr<RunProcessingStatus> exceptionRunStatus_;
std::vector<std::shared_ptr<LuminosityBlockProcessingStatus>> streamLumiStatus_;
std::atomic<unsigned int> streamRunActive_{0}; //works as guard for streamRunStatus
std::atomic<unsigned int> streamLumiActive_{0}; //works as guard for streamLumiStatus

std::vector<std::string> branchesToDeleteEarly_;
Expand Down Expand Up @@ -367,7 +368,6 @@ namespace edm {

bool printDependencies_ = false;
bool deleteNonConsumedUnscheduledModules_ = true;
bool firstItemAfterLumiMerge_ = true;
}; // class EventProcessor

//--------------------------------------------------------------------
Expand Down
17 changes: 2 additions & 15 deletions FWCore/Framework/interface/EventSetupsController.h
Expand Up @@ -93,27 +93,14 @@ namespace edm {
unsigned int maxConcurrentIOVs = 0,
bool dumpOptions = false);

// The main purpose of this function is to call eventSetupForInstanceAsync. It might
// be called immediately or we might need to wait until all the currently active
// IOVs end. If there is an exception, then a signal is emitted and the exception
// is propagated.
void runOrQueueEventSetupForInstanceAsync(IOVSyncValue const&,
WaitingTaskHolder& taskToStartAfterIOVInit,
WaitingTaskList& endIOVWaitingTasks,
std::vector<std::shared_ptr<const EventSetupImpl>>&,
edm::SerialTaskQueue& queueWhichWaitsForIOVsToFinish,
ActivityRegistry*,
bool iForceCacheClear = false);

// Pass in an IOVSyncValue to let the EventSetup system know which run and lumi
// need to be processed and prepare IOVs for it (also could be a time or only a run).
// Pass in a WaitingTaskHolder that allows the EventSetup to communicate when all
// the IOVs are ready to process this IOVSyncValue. Note this preparation is often
// done in asynchronous tasks and the function might return before all the preparation
// is complete.
// Pass in endIOVWaitingTasks, additions to this WaitingTaskList allow the lumi or
// run to notify the EventSetup system when a lumi or run transition is done and no
// longer needs its EventSetup IOVs.
// Pass in endIOVWaitingTasks, additions to this WaitingTaskList allow the lumi to notify
// the EventSetup when the lumi is done and no longer needs its EventSetup IOVs.
// Pass in a vector of EventSetupImpl that gets filled and is used to give clients
// of EventSetup access to the EventSetup system such that for each record the IOV
// associated with this IOVSyncValue will be used. The first element of the vector
Expand Down
7 changes: 1 addition & 6 deletions FWCore/Framework/interface/GlobalSchedule.h
Expand Up @@ -156,7 +156,6 @@ namespace edm {
std::shared_ptr<ActivityRegistry> actReg_; // We do not use propagate_const because the registry itself is mutable.
std::vector<edm::propagate_const<WorkerPtr>> extraWorkers_;
ProcessContext const* processContext_;
unsigned int numberOfConcurrentLumis_;
};

template <typename T>
Expand Down Expand Up @@ -214,11 +213,7 @@ namespace edm {
}
iHolder.doneWaiting(excpt);
});
unsigned int managerIndex = principal.index();
if constexpr (T::branchType_ == InRun) {
managerIndex += numberOfConcurrentLumis_;
}
WorkerManager& workerManager = workerManagers_[managerIndex];
WorkerManager& workerManager = workerManagers_[principal.index()];
workerManager.resetAll();

ParentContext parentContext(globalContext.get());
Expand Down
7 changes: 4 additions & 3 deletions FWCore/Framework/interface/MergeableRunProductMetadata.h
Expand Up @@ -12,8 +12,9 @@ Most of the information here is associated with the current
run being processed. Most of it is cleared when a new run
is started. If multiple runs are being processed concurrently,
then there will be an object instantiated for each concurrent
run. The primary RunPrincipal for the current run owns the
object.
run. (At the present time concurrent runs are not possible, but
there plans to implement that in the future). The primary
RunPrincipal for the current run owns the object.
This class gets information from the input file from the
StoredMergeableRunProductMetadata object and IndexIntoFile object.
Expand All @@ -25,7 +26,7 @@ to use in later processing steps.
If there are SubProcesses, they use the same object as the top
level process because they share the same input.
There is a TWIKI page on the Framework page of the Software
There will be a TWIKI page on the Framework page of the Software
Guide which explains the details about how this works. There
are significant limitations related to what the Framework does
and does not do managing mergeable run products.
Expand Down
3 changes: 0 additions & 3 deletions FWCore/Framework/interface/OccurrenceTraits.h
Expand Up @@ -399,7 +399,6 @@ namespace edm {
using MyPrincipal = ProcessBlockPrincipal;
using TransitionInfoType = ProcessBlockTransitionInfo;
using Context = GlobalContext;
static BranchType constexpr branchType_ = InProcess;
static bool constexpr isEvent_ = false;
static Transition constexpr transition_ = Transition::BeginProcessBlock;

Expand Down Expand Up @@ -437,7 +436,6 @@ namespace edm {
using MyPrincipal = ProcessBlockPrincipal;
using TransitionInfoType = ProcessBlockTransitionInfo;
using Context = GlobalContext;
static BranchType constexpr branchType_ = InProcess;
static bool constexpr isEvent_ = false;
static Transition constexpr transition_ = Transition::AccessInputProcessBlock;

Expand Down Expand Up @@ -475,7 +473,6 @@ namespace edm {
using MyPrincipal = ProcessBlockPrincipal;
using TransitionInfoType = ProcessBlockTransitionInfo;
using Context = GlobalContext;
static BranchType constexpr branchType_ = InProcess;
static bool constexpr isEvent_ = false;
static Transition constexpr transition_ = Transition::EndProcessBlock;

Expand Down
75 changes: 64 additions & 11 deletions FWCore/Framework/interface/PrincipalCache.h
Expand Up @@ -2,26 +2,46 @@
#define FWCore_Framework_PrincipalCache_h

/*
Contains smart pointers to the RunPrincipals,
LuminosityBlockPrincipals, EventPrincipals,
and ProcessBlockPrincipals. It keeps the
objects alive so they can be reused as
necessary.
Contains a shared pointer to the RunPrincipal,
LuminosityBlockPrincipal, and EventPrincipal.
Manages merging of run and luminosity block
principals when there is more than one principal
from the same run or luminosity block and having
the same reduced ProcessHistoryID.
The EventPrincipal is reused each event and is created
by the EventProcessor or SubProcess which contains
an object of this type as a data member.
The RunPrincipal and LuminosityBlockPrincipal is
created by the InputSource each time a different
run or luminosity block is encountered.
Performs checks that process history IDs or runs and
lumis, run numbers, and luminosity numbers are consistent.
Original Author: W. David Dagenhart
*/

#include "DataFormats/Provenance/interface/ProcessHistoryID.h"
#include "DataFormats/Provenance/interface/ProcessHistoryRegistry.h"
#include "DataFormats/Provenance/interface/RunID.h"
#include "DataFormats/Provenance/interface/LuminosityBlockID.h"

#include "FWCore/Utilities/interface/ReusableObjectHolder.h"

#include <memory>
#include <vector>
#include <cassert>

namespace edm {

class ProcessBlockPrincipal;
class RunPrincipal;
class LuminosityBlockPrincipal;
class EventPrincipal;
class RunAuxiliary;
class LuminosityBlockAuxiliary;
class ProductRegistry;
class PreallocationConfiguration;

Expand All @@ -39,28 +59,61 @@ namespace edm {
return processBlockType == ProcessBlockType::Input ? *inputProcessBlockPrincipal_ : *processBlockPrincipal_;
}

std::shared_ptr<RunPrincipal> getAvailableRunPrincipalPtr();
RunPrincipal& runPrincipal(ProcessHistoryID const& phid, RunNumber_t run) const;
std::shared_ptr<RunPrincipal> const& runPrincipalPtr(ProcessHistoryID const& phid, RunNumber_t run) const;
RunPrincipal& runPrincipal() const;
std::shared_ptr<RunPrincipal> const& runPrincipalPtr() const;
bool hasRunPrincipal() const { return bool(runPrincipal_); }

std::shared_ptr<LuminosityBlockPrincipal> getAvailableLumiPrincipalPtr();

EventPrincipal& eventPrincipal(unsigned int iStreamIndex) const { return *(eventPrincipals_[iStreamIndex]); }

void merge(std::shared_ptr<RunAuxiliary> aux, std::shared_ptr<ProductRegistry const> reg);

void setNumberOfConcurrentPrincipals(PreallocationConfiguration const&);
void insert(std::unique_ptr<ProcessBlockPrincipal>);
void insertForInput(std::unique_ptr<ProcessBlockPrincipal>);
void insert(std::unique_ptr<RunPrincipal>);
void insert(std::unique_ptr<LuminosityBlockPrincipal>);
void insert(std::shared_ptr<EventPrincipal>);
void insert(std::shared_ptr<RunPrincipal> rp);
void insert(std::unique_ptr<LuminosityBlockPrincipal> lbp);
void insert(std::shared_ptr<EventPrincipal> ep);

void adjustEventsToNewProductRegistry(std::shared_ptr<ProductRegistry const>);
void deleteRun(ProcessHistoryID const& phid, RunNumber_t run);

void adjustEventsToNewProductRegistry(std::shared_ptr<ProductRegistry const> reg);

void adjustIndexesAfterProductRegistryAddition();

void setProcessHistoryRegistry(ProcessHistoryRegistry const& phr) { processHistoryRegistry_ = &phr; }

void preReadFile();

private:
void throwRunMissing() const;
void throwLumiMissing() const;

// These are explicitly cleared when finished with the processblock, run,
// lumi, or event
std::unique_ptr<ProcessBlockPrincipal> processBlockPrincipal_;
std::unique_ptr<ProcessBlockPrincipal> inputProcessBlockPrincipal_;
edm::ReusableObjectHolder<RunPrincipal> runHolder_;
std::shared_ptr<RunPrincipal> runPrincipal_;
edm::ReusableObjectHolder<LuminosityBlockPrincipal> lumiHolder_;
std::vector<std::shared_ptr<EventPrincipal>> eventPrincipals_;

// This is just an accessor to the registry owned by the input source.
ProcessHistoryRegistry const* processHistoryRegistry_; // We don't own this

// These are intentionally not cleared so that when inserting
// the next principal the conversion from full ProcessHistoryID_
// to reduced ProcessHistoryID_ is still in memory and does
// not need to be recalculated if the ID does not change. I
// expect that very often these ID's will not change from one
// principal to the next and a good amount of CPU can be saved
// by not recalculating.
ProcessHistoryID inputProcessHistoryID_;
ProcessHistoryID reducedInputProcessHistoryID_;
RunNumber_t run_;
LuminosityBlockNumber_t lumi_;
};
} // namespace edm

Expand Down

0 comments on commit a2d3721

Please sign in to comment.