Skip to content

Commit

Permalink
Concurrently construct Source while making Modules
Browse files Browse the repository at this point in the history
Separated Source and module construction from finalizing internal data structures.
  • Loading branch information
Dr15Jones committed Jul 5, 2022
1 parent fc1c9be commit f014987
Show file tree
Hide file tree
Showing 10 changed files with 186 additions and 55 deletions.
2 changes: 2 additions & 0 deletions DataFormats/Provenance/interface/ProductRegistry.h
Expand Up @@ -70,6 +70,8 @@ namespace edm {
BranchDescription::MatchMode branchesMustMatch = BranchDescription::Permissive);

void updateFromInput(ProductList const& other);
// triggers callbacks for modules watching registration
void addFromInput(edm::ProductRegistry const&);

void updateFromInput(std::vector<BranchDescription> const& other);

Expand Down
14 changes: 14 additions & 0 deletions DataFormats/Provenance/src/ProductRegistry.cc
Expand Up @@ -218,6 +218,20 @@ namespace edm {
}
}

void ProductRegistry::addFromInput(edm::ProductRegistry const& other) {
throwIfFrozen();
for (auto const& prod : other.productList_) {
ProductList::iterator iter = productList_.find(prod.first);
if (iter == productList_.end()) {
productList_.insert(std::make_pair(prod.first, prod.second));
addCalled(prod.second, false);
} else {
assert(combinable(iter->second, prod.second));
iter->second.merge(prod.second);
}
}
}

void ProductRegistry::setUnscheduledProducts(std::set<std::string> const& unscheduledLabels) {
throwIfFrozen();

Expand Down
2 changes: 1 addition & 1 deletion FWCore/Framework/interface/InputSource.h
Expand Up @@ -170,7 +170,7 @@ namespace edm {

/// switch to a different ProductRegistry.
void switchTo(std::shared_ptr<ProductRegistry> iOther) { productRegistry_ = iOther; }

/// Accessor for maximum number of events to be read.
/// -1 is used for unlimited.
int maxEvents() const { return maxEvents_; }
Expand Down
19 changes: 13 additions & 6 deletions FWCore/Framework/interface/Schedule.h
Expand Up @@ -131,16 +131,23 @@ namespace edm {
Schedule(ParameterSet& proc_pset,
service::TriggerNamesService const& tns,
ProductRegistry& pregistry,
BranchIDListHelper& branchIDListHelper,
ProcessBlockHelperBase&,
ThinnedAssociationsHelper& thinnedAssociationsHelper,
SubProcessParentageHelper const* subProcessParentageHelper,
ExceptionToActionTable const& actions,
std::shared_ptr<ActivityRegistry> areg,
std::shared_ptr<ProcessConfiguration> processConfiguration,
bool hasSubprocesses,
std::shared_ptr<ProcessConfiguration const> processConfiguration,
PreallocationConfiguration const& config,
ProcessContext const* processContext);
void finishSetup(ParameterSet& proc_pset,
service::TriggerNamesService const& tns,
ProductRegistry& preg,
BranchIDListHelper& branchIDListHelper,
ProcessBlockHelperBase& processBlockHelper,
ThinnedAssociationsHelper& thinnedAssociationsHelper,
SubProcessParentageHelper const* subProcessParentageHelper,
std::shared_ptr<ActivityRegistry> areg,
std::shared_ptr<ProcessConfiguration> processConfiguration,
bool hasSubprocesses,
PreallocationConfiguration const& prealloc,
ProcessContext const* processContext);

void processOneEventAsync(WaitingTaskHolder iTask,
unsigned int iStreamID,
Expand Down
25 changes: 25 additions & 0 deletions FWCore/Framework/interface/ScheduleItems.h
Expand Up @@ -26,6 +26,9 @@ namespace edm {
class StreamID;
class PreallocationConfiguration;
class SubProcessParentageHelper;
namespace service {
class TriggerNamesService;
}

struct ScheduleItems {
ScheduleItems();
Expand Down Expand Up @@ -54,6 +57,28 @@ namespace edm {
ProcessContext const*,
ProcessBlockHelperBase& processBlockHelper);

class MadeModules {
friend struct ScheduleItems;
explicit MadeModules(std::unique_ptr<Schedule> iSched) : m_schedule(std::move(iSched)) {}

std::unique_ptr<Schedule> m_schedule;

public:
MadeModules() = delete;
};

MadeModules initModules(ParameterSet& parameterSet,
service::TriggerNamesService const& tns,
PreallocationConfiguration const& iAllocConfig,
ProcessContext const*);
std::unique_ptr<Schedule> finishSchedule(MadeModules,
ParameterSet& parameterSet,
service::TriggerNamesService const& tns,
bool hasSubprocesses,
PreallocationConfiguration const& iAllocConfig,
ProcessContext const*,
ProcessBlockHelperBase& processBlockHelper);

std::shared_ptr<SignallingProductRegistry const> preg() const { return get_underlying_safe(preg_); }
std::shared_ptr<SignallingProductRegistry>& preg() { return get_underlying_safe(preg_); }
std::shared_ptr<BranchIDListHelper const> branchIDListHelper() const {
Expand Down
1 change: 0 additions & 1 deletion FWCore/Framework/interface/StreamSchedule.h
Expand Up @@ -172,7 +172,6 @@ namespace edm {
service::TriggerNamesService const& tns,
PreallocationConfiguration const& prealloc,
ProductRegistry& pregistry,
BranchIDListHelper& branchIDListHelper,
ExceptionToActionTable const& actions,
std::shared_ptr<ActivityRegistry> areg,
std::shared_ptr<ProcessConfiguration const> processConfiguration,
Expand Down
65 changes: 49 additions & 16 deletions FWCore/Framework/src/EventProcessor.cc
Expand Up @@ -39,6 +39,7 @@
#include "FWCore/Framework/interface/TransitionInfoTypes.h"
#include "FWCore/Framework/interface/ensureAvailableAccelerators.h"
#include "FWCore/Framework/interface/globalTransitionAsync.h"
#include "FWCore/Framework/interface/TriggerNamesService.h"

#include "FWCore/MessageLogger/interface/MessageLogger.h"

Expand Down Expand Up @@ -120,7 +121,8 @@ namespace edm {
namespace chain = waiting_task::chain;

// ---------------------------------------------------------------
std::unique_ptr<InputSource> makeInput(ParameterSet& params,
std::unique_ptr<InputSource> makeInput(unsigned int moduleIndex,
ParameterSet& params,
CommonParams const& common,
std::shared_ptr<ProductRegistry> preg,
std::shared_ptr<BranchIDListHelper> branchIDListHelper,
Expand Down Expand Up @@ -164,7 +166,7 @@ namespace edm {
main_input->getParameter<std::string>("@module_type"),
"source",
processConfiguration.get(),
ModuleDescription::getUniqueID());
moduleIndex);

InputSourceDescription isdesc(md,
preg,
Expand Down Expand Up @@ -488,20 +490,51 @@ namespace edm {

processBlockHelper_ = std::make_shared<ProcessBlockHelper>();

// initialize the input source
input_ = makeInput(*parameterSet,
*common,
items.preg(),
items.branchIDListHelper(),
get_underlying_safe(processBlockHelper_),
items.thinnedAssociationsHelper(),
items.actReg_,
items.processConfiguration(),
preallocations_);

// initialize the Schedule
schedule_ =
items.initSchedule(*parameterSet, hasSubProcesses, preallocations_, &processContext_, *processBlockHelper_);
std::optional<ScheduleItems::MadeModules> madeModules;
{
//setup input and modules concurrently
tbb::task_group group;

// initialize the input source
auto tempReg = std::make_shared<ProductRegistry>();
auto sourceID = ModuleDescription::getUniqueID();

group.run([&, this]() {
// initialize the Schedule
ServiceRegistry::Operate operate(serviceToken_);
auto const& tns = ServiceRegistry::instance().get<service::TriggerNamesService>();
madeModules = items.initModules(*parameterSet, tns, preallocations_, &processContext_);
});

group.run([&, this, tempReg]() {
ServiceRegistry::Operate operate(serviceToken_);
input_ = makeInput(sourceID,
*parameterSet,
*common,
/*items.preg(),*/ tempReg,
items.branchIDListHelper(),
get_underlying_safe(processBlockHelper_),
items.thinnedAssociationsHelper(),
items.actReg_,
items.processConfiguration(),
preallocations_);
});

group.wait();
items.preg()->addFromInput(*tempReg);
input_->switchTo(items.preg());

{
auto const& tns = ServiceRegistry::instance().get<service::TriggerNamesService>();
schedule_ = items.finishSchedule(std::move(*madeModules),
*parameterSet,
tns,
hasSubProcesses,
preallocations_,
&processContext_,
*processBlockHelper_);
}
}

// set the data members
act_table_ = std::move(items.act_table_);
Expand Down
51 changes: 34 additions & 17 deletions FWCore/Framework/src/Schedule.cc
Expand Up @@ -65,12 +65,13 @@ namespace edm {
// Here we make the trigger results inserter directly. This should
// probably be a utility in the WorkerRegistry or elsewhere.

std::shared_ptr<TriggerResultInserter> makeInserter(ParameterSet& proc_pset,
PreallocationConfiguration const& iPrealloc,
ProductRegistry& preg,
ExceptionToActionTable const& actions,
std::shared_ptr<ActivityRegistry> areg,
std::shared_ptr<ProcessConfiguration> processConfiguration) {
std::shared_ptr<TriggerResultInserter> makeInserter(
ParameterSet& proc_pset,
PreallocationConfiguration const& iPrealloc,
ProductRegistry& preg,
ExceptionToActionTable const& actions,
std::shared_ptr<ActivityRegistry> areg,
std::shared_ptr<ProcessConfiguration const> processConfiguration) {
ParameterSet* trig_pset = proc_pset.getPSetForUpdate("@trigger_paths");
trig_pset->registerIt();

Expand Down Expand Up @@ -112,7 +113,7 @@ namespace edm {
PreallocationConfiguration const& iPrealloc,
ProductRegistry& preg,
std::shared_ptr<ActivityRegistry> areg,
std::shared_ptr<ProcessConfiguration> processConfiguration,
std::shared_ptr<ProcessConfiguration const> processConfiguration,
std::string const& moduleTypeName) {
ParameterSet pset;
pset.addParameter<std::string>("@module_type", moduleTypeName);
Expand Down Expand Up @@ -478,14 +479,9 @@ namespace edm {
Schedule::Schedule(ParameterSet& proc_pset,
service::TriggerNamesService const& tns,
ProductRegistry& preg,
BranchIDListHelper& branchIDListHelper,
ProcessBlockHelperBase& processBlockHelper,
ThinnedAssociationsHelper& thinnedAssociationsHelper,
SubProcessParentageHelper const* subProcessParentageHelper,
ExceptionToActionTable const& actions,
std::shared_ptr<ActivityRegistry> areg,
std::shared_ptr<ProcessConfiguration> processConfiguration,
bool hasSubprocesses,
std::shared_ptr<ProcessConfiguration const> processConfiguration,
PreallocationConfiguration const& prealloc,
ProcessContext const* processContext)
: //Only create a resultsInserter if there is a trigger path
Expand Down Expand Up @@ -525,7 +521,6 @@ namespace edm {
tns,
prealloc,
preg,
branchIDListHelper,
actions,
areg,
processConfiguration,
Expand Down Expand Up @@ -567,9 +562,24 @@ namespace edm {
areg,
processConfiguration,
processContext);
}

void Schedule::finishSetup(ParameterSet& proc_pset,
service::TriggerNamesService const& tns,
ProductRegistry& preg,
BranchIDListHelper& branchIDListHelper,
ProcessBlockHelperBase& processBlockHelper,
ThinnedAssociationsHelper& thinnedAssociationsHelper,
SubProcessParentageHelper const* subProcessParentageHelper,
std::shared_ptr<ActivityRegistry> areg,
std::shared_ptr<ProcessConfiguration> processConfiguration,
bool hasSubprocesses,
PreallocationConfiguration const& prealloc,
ProcessContext const* processContext) {
//TriggerResults is not in the top level ParameterSet so the call to
// reduceParameterSet would fail to find it. Just remove it up front.
const std::string kTriggerResults("TriggerResults");

std::set<std::string> usedModuleLabels;
for (auto const& worker : allWorkers()) {
if (worker->description()->moduleLabel() != kTriggerResults) {
Expand All @@ -586,9 +596,16 @@ namespace edm {

// At this point all BranchDescriptions are created. Mark now the
// ones of unscheduled workers to be on-demand.
if (nUnscheduledModules > 0) {
std::set<std::string> unscheduledModules(modulesToUse.begin(), modulesToUse.begin() + nUnscheduledModules);
preg.setUnscheduledProducts(unscheduledModules);
{
auto const& unsched = streamSchedules_[0]->unscheduledWorkers();
if (not unsched.empty()) {
std::set<std::string> unscheduledModules;
std::transform(unsched.begin(),
unsched.end(),
std::insert_iterator<std::set<std::string>>(unscheduledModules, unscheduledModules.begin()),
[](auto worker) { return worker->description()->moduleLabel(); });
preg.setUnscheduledProducts(unscheduledModules);
}
}

processSwitchProducers(proc_pset, processConfiguration->processName(), preg);
Expand Down
61 changes: 48 additions & 13 deletions FWCore/Framework/src/ScheduleItems.cc
Expand Up @@ -137,18 +137,53 @@ namespace edm {
PreallocationConfiguration const& config,
ProcessContext const* processContext,
ProcessBlockHelperBase& processBlockHelper) {
return std::make_unique<Schedule>(parameterSet,
ServiceRegistry::instance().get<service::TriggerNamesService>(),
*preg_,
*branchIDListHelper_,
processBlockHelper,
*thinnedAssociationsHelper_,
subProcessParentageHelper_ ? subProcessParentageHelper_.get() : nullptr,
*act_table_,
actReg_,
processConfiguration(),
hasSubprocesses,
config,
processContext);
auto& tns = ServiceRegistry::instance().get<service::TriggerNamesService>();
auto ret = std::make_unique<Schedule>(
parameterSet, tns, *preg_, *act_table_, actReg_, processConfiguration(), config, processContext);
ret->finishSetup(parameterSet,
tns,
*preg_,
*branchIDListHelper_,
processBlockHelper,
*thinnedAssociationsHelper_,
subProcessParentageHelper_ ? subProcessParentageHelper_.get() : nullptr,
actReg_,
processConfiguration(),
hasSubprocesses,
config,
processContext);
return ret;
}

ScheduleItems::MadeModules ScheduleItems::initModules(ParameterSet& parameterSet,
service::TriggerNamesService const& tns,
PreallocationConfiguration const& config,
ProcessContext const* processContext) {
return MadeModules(std::make_unique<Schedule>(
parameterSet, tns, *preg_, *act_table_, actReg_, processConfiguration(), config, processContext));
}

std::unique_ptr<Schedule> ScheduleItems::finishSchedule(MadeModules madeModules,
ParameterSet& parameterSet,
service::TriggerNamesService const& tns,
bool hasSubprocesses,
PreallocationConfiguration const& config,
ProcessContext const* processContext,
ProcessBlockHelperBase& processBlockHelper) {
auto sched = std::move(madeModules.m_schedule);
sched->finishSetup(parameterSet,
tns,
*preg_,
*branchIDListHelper_,
processBlockHelper,
*thinnedAssociationsHelper_,
subProcessParentageHelper_ ? subProcessParentageHelper_.get() : nullptr,
actReg_,
processConfiguration(),
hasSubprocesses,
config,
processContext);
return sched;
}

} // namespace edm
1 change: 0 additions & 1 deletion FWCore/Framework/src/StreamSchedule.cc
Expand Up @@ -144,7 +144,6 @@ namespace edm {
service::TriggerNamesService const& tns,
PreallocationConfiguration const& prealloc,
ProductRegistry& preg,
BranchIDListHelper& branchIDListHelper,
ExceptionToActionTable const& actions,
std::shared_ptr<ActivityRegistry> areg,
std::shared_ptr<ProcessConfiguration const> processConfiguration,
Expand Down

0 comments on commit f014987

Please sign in to comment.