Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Preprocess EDAlias and SwitchProducer information for ConditionalTask #38730

Merged
merged 1 commit into from Jul 15, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
24 changes: 15 additions & 9 deletions FWCore/Framework/interface/StreamSchedule.h
Expand Up @@ -114,6 +114,8 @@ namespace edm {
class PreallocationConfiguration;
class WaitingTaskHolder;

class ConditionalTaskHelper;

namespace service {
class TriggerNamesService;
}
Expand Down Expand Up @@ -257,6 +259,13 @@ namespace edm {

StreamContext const& context() const { return streamContext_; }

struct AliasInfo {
std::string friendlyClassName;
std::string instanceLabel;
std::string originalInstanceLabel;
std::string originalModuleLabel;
};

private:
//Sentry class to only send a signal if an
// exception occurs. An exception is identified
Expand Down Expand Up @@ -289,12 +298,6 @@ namespace edm {

void reportSkipped(EventPrincipal const& ep) const;

struct AliasInfo {
std::string friendlyClassName;
std::string instanceLabel;
std::string originalInstanceLabel;
std::string originalModuleLabel;
};
std::vector<Worker*> tryToPlaceConditionalModules(
Worker*,
std::unordered_set<std::string>& conditionalModules,
Expand All @@ -311,22 +314,25 @@ namespace edm {
std::string const& name,
bool ignoreFilters,
PathWorkers& out,
std::vector<std::string> const& endPathNames);
std::vector<std::string> const& endPathNames,
ConditionalTaskHelper const& conditionalTaskHelper);
void fillTrigPath(ParameterSet& proc_pset,
ProductRegistry& preg,
PreallocationConfiguration const* prealloc,
std::shared_ptr<ProcessConfiguration const> processConfiguration,
int bitpos,
std::string const& name,
TrigResPtr,
std::vector<std::string> const& endPathNames);
std::vector<std::string> const& endPathNames,
ConditionalTaskHelper const& conditionalTaskHelper);
void fillEndPath(ParameterSet& proc_pset,
ProductRegistry& preg,
PreallocationConfiguration const* prealloc,
std::shared_ptr<ProcessConfiguration const> processConfiguration,
int bitpos,
std::string const& name,
std::vector<std::string> const& endPathNames);
std::vector<std::string> const& endPathNames,
ConditionalTaskHelper const& conditionalTaskHelper);

void addToAllWorkers(Worker* w);

Expand Down
245 changes: 159 additions & 86 deletions FWCore/Framework/src/StreamSchedule.cc
Expand Up @@ -127,6 +127,22 @@ namespace edm {
branchToReadingWorker.insert(std::make_pair(branch, static_cast<Worker*>(nullptr)));
}
}

Worker* getWorker(std::string const& moduleLabel,
ParameterSet& proc_pset,
WorkerManager& workerManager,
ProductRegistry& preg,
PreallocationConfiguration const* prealloc,
std::shared_ptr<ProcessConfiguration const> processConfiguration) {
bool isTracked;
ParameterSet* modpset = proc_pset.getPSetForUpdate(moduleLabel, isTracked);
if (modpset == nullptr) {
return nullptr;
}
assert(isTracked);

return workerManager.getWorker(*modpset, preg, prealloc, processConfiguration, moduleLabel);
}
} // namespace

// -----------------------------
Expand All @@ -135,6 +151,116 @@ namespace edm {

// -----------------------------

class ConditionalTaskHelper {
public:
using AliasInfo = StreamSchedule::AliasInfo;

ConditionalTaskHelper(ParameterSet& proc_pset,
ProductRegistry& preg,
PreallocationConfiguration const* prealloc,
std::shared_ptr<ProcessConfiguration const> processConfiguration,
WorkerManager& workerManager,
std::vector<std::string> const& trigPathNames) {
std::unordered_set<std::string> allConditionalMods;
for (auto const& pathName : trigPathNames) {
auto const modnames = proc_pset.getParameter<vstring>(pathName);

//Pull out ConditionalTask modules
auto itCondBegin = std::find(modnames.begin(), modnames.end(), "#");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be nice to find a way to share code between here and line 677

if (itCondBegin == modnames.end())
continue;

//the last entry should be ignored since it is required to be "@"
allConditionalMods.insert(itCondBegin + 1, std::prev(modnames.end()));
}

for (auto const& cond : allConditionalMods) {
//force the creation of the conditional modules so alias check can work
(void)getWorker(cond, proc_pset, workerManager, preg, prealloc, processConfiguration);
}

fillAliasMap(proc_pset, allConditionalMods);
makortel marked this conversation as resolved.
Show resolved Hide resolved
processSwitchEDAliases(proc_pset, preg, *processConfiguration, allConditionalMods);

//find branches created by the conditional modules
for (auto const& prod : preg.productList()) {
if (allConditionalMods.find(prod.first.moduleLabel()) != allConditionalMods.end()) {
conditionalModsBranches_.emplace(prod.first.moduleLabel(), &prod.second);
}
}
}

std::multimap<std::string, AliasInfo> const& aliasMap() const { return aliasMap_; }

std::multimap<std::string, edm::BranchDescription const*> conditionalModuleBranches(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could this become a std::multimap<std::string_view, ...> to avoid possible copies? Also, what about std::multi_unordered_map?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A hurdle with the std::string_view is that the element from this is used as a key for std::unordered_map<std::string>::find(), and before C++20 that key must be std::string (with C++20 anything that can be compared against std::string would work).

std::unordered_set<std::string> const& conditionalmods) const {
std::multimap<std::string, edm::BranchDescription const*> ret;
for (auto const& mod : conditionalmods) {
auto range = conditionalModsBranches_.equal_range(mod);
ret.insert(range.first, range.second);
}
return ret;
}

private:
void fillAliasMap(ParameterSet const& proc_pset, std::unordered_set<std::string> const& allConditionalMods) {
auto aliases = proc_pset.getParameter<std::vector<std::string>>("@all_aliases");
std::string const star("*");
for (auto const& alias : aliases) {
auto info = proc_pset.getParameter<edm::ParameterSet>(alias);
auto aliasedToModuleLabels = info.getParameterNames();
for (auto const& mod : aliasedToModuleLabels) {
if (not mod.empty() and mod[0] != '@' and allConditionalMods.find(mod) != allConditionalMods.end()) {
auto aliasVPSet = info.getParameter<std::vector<edm::ParameterSet>>(mod);
for (auto const& aliasPSet : aliasVPSet) {
std::string type = star;
std::string instance = star;
std::string originalInstance = star;
if (aliasPSet.exists("type")) {
type = aliasPSet.getParameter<std::string>("type");
}
if (aliasPSet.exists("toProductInstance")) {
instance = aliasPSet.getParameter<std::string>("toProductInstance");
}
if (aliasPSet.exists("fromProductInstance")) {
originalInstance = aliasPSet.getParameter<std::string>("fromProductInstance");
}

aliasMap_.emplace(alias, AliasInfo{type, instance, originalInstance, mod});
}
}
}
}
}

void processSwitchEDAliases(ParameterSet const& proc_pset,
ProductRegistry& preg,
ProcessConfiguration const& processConfiguration,
std::unordered_set<std::string> const& allConditionalMods) {
auto const& all_modules = proc_pset.getParameter<std::vector<std::string>>("@all_modules");
std::vector<std::string> switchEDAliases;
for (auto const& module : all_modules) {
auto const& mod_pset = proc_pset.getParameter<edm::ParameterSet>(module);
if (mod_pset.getParameter<std::string>("@module_type") == "SwitchProducer") {
auto const& all_cases = mod_pset.getParameter<std::vector<std::string>>("@all_cases");
for (auto const& case_label : all_cases) {
auto range = aliasMap_.equal_range(case_label);
if (range.first != range.second) {
switchEDAliases.push_back(case_label);
}
}
}
}
detail::processEDAliases(
switchEDAliases, allConditionalMods, proc_pset, processConfiguration.processName(), preg);
}

std::multimap<std::string, AliasInfo> aliasMap_;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what about std::unordered_multimap? Tends to have a smaller footprint and runs faster.

std::multimap<std::string, edm::BranchDescription const*> conditionalModsBranches_;
};

// -----------------------------

StreamSchedule::StreamSchedule(
std::shared_ptr<TriggerResultInserter> inserter,
std::vector<edm::propagate_const<std::shared_ptr<PathStatusInserter>>>& pathStatusInserters,
Expand Down Expand Up @@ -166,10 +292,21 @@ namespace edm {
std::vector<std::string> const& pathNames = tns.getTrigPaths();
std::vector<std::string> const& endPathNames = tns.getEndPaths();

ConditionalTaskHelper conditionalTaskHelper(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can't quite remember, is the one StreamSchedule or one per stream? If the latter, then maybe ConditionalTaskHelper could be shared by them all? Depends on if workerManager_ is shared.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is indeed one StreamSchedule per stream, but each them have their own workerManager_. Maybe it/them could be shared, but it sounds like a potentially big task.

proc_pset, preg, &prealloc, processConfiguration, workerManager_, pathNames);

int trig_bitpos = 0;
trig_paths_.reserve(pathNames.size());
for (auto const& trig_name : pathNames) {
fillTrigPath(proc_pset, preg, &prealloc, processConfiguration, trig_bitpos, trig_name, results(), endPathNames);
fillTrigPath(proc_pset,
preg,
&prealloc,
processConfiguration,
trig_bitpos,
trig_name,
results(),
endPathNames,
conditionalTaskHelper);
++trig_bitpos;
hasPath = true;
}
Expand All @@ -186,7 +323,8 @@ namespace edm {
int bitpos = 0;
end_paths_.reserve(endPathNames.size());
for (auto const& end_path_name : endPathNames) {
fillEndPath(proc_pset, preg, &prealloc, processConfiguration, bitpos, end_path_name, endPathNames);
fillEndPath(
proc_pset, preg, &prealloc, processConfiguration, bitpos, end_path_name, endPathNames, conditionalTaskHelper);
++bitpos;
}

Expand Down Expand Up @@ -380,22 +518,6 @@ namespace edm {
}
}

static Worker* getWorker(std::string const& moduleLabel,
makortel marked this conversation as resolved.
Show resolved Hide resolved
ParameterSet& proc_pset,
WorkerManager& workerManager,
ProductRegistry& preg,
PreallocationConfiguration const* prealloc,
std::shared_ptr<ProcessConfiguration const> processConfiguration) {
bool isTracked;
ParameterSet* modpset = proc_pset.getPSetForUpdate(moduleLabel, isTracked);
if (modpset == nullptr) {
return nullptr;
}
assert(isTracked);

return workerManager.getWorker(*modpset, preg, prealloc, processConfiguration, moduleLabel);
}

std::vector<Worker*> StreamSchedule::tryToPlaceConditionalModules(
Worker* worker,
std::unordered_set<std::string>& conditionalModules,
Expand Down Expand Up @@ -544,7 +666,8 @@ namespace edm {
std::string const& pathName,
bool ignoreFilters,
PathWorkers& out,
std::vector<std::string> const& endPathNames) {
std::vector<std::string> const& endPathNames,
ConditionalTaskHelper const& conditionalTaskHelper) {
vstring modnames = proc_pset.getParameter<vstring>(pathName);
PathWorkers tmpworkers;

Expand All @@ -565,67 +688,7 @@ namespace edm {
conditionalmods = std::unordered_set<std::string>(
std::make_move_iterator(itCondBegin + 1), std::make_move_iterator(modnames.begin() + modnames.size() - 1));

for (auto const& cond : conditionalmods) {
makortel marked this conversation as resolved.
Show resolved Hide resolved
//force the creation of the conditional modules so alias check can work
(void)getWorker(cond, proc_pset, workerManager_, preg, prealloc, processConfiguration);
}
//find aliases
{
auto aliases = proc_pset.getParameter<std::vector<std::string>>("@all_aliases");
std::string const star("*");
for (auto const& alias : aliases) {
auto info = proc_pset.getParameter<edm::ParameterSet>(alias);
auto aliasedToModuleLabels = info.getParameterNames();
for (auto const& mod : aliasedToModuleLabels) {
if (not mod.empty() and mod[0] != '@' and conditionalmods.find(mod) != conditionalmods.end()) {
auto aliasVPSet = info.getParameter<std::vector<edm::ParameterSet>>(mod);
for (auto const& aliasPSet : aliasVPSet) {
std::string type = star;
std::string instance = star;
std::string originalInstance = star;
if (aliasPSet.exists("type")) {
type = aliasPSet.getParameter<std::string>("type");
}
if (aliasPSet.exists("toProductInstance")) {
instance = aliasPSet.getParameter<std::string>("toProductInstance");
}
if (aliasPSet.exists("fromProductInstance")) {
originalInstance = aliasPSet.getParameter<std::string>("fromProductInstance");
}

aliasMap.emplace(alias, AliasInfo{type, instance, originalInstance, mod});
}
}
}
}
}
//find SwitchProducers whose cases are aliases
{
auto const& all_modules = proc_pset.getParameter<std::vector<std::string>>("@all_modules");
std::vector<std::string> switchEDAliases;
for (auto const& module : all_modules) {
auto const& mod_pset = proc_pset.getParameter<edm::ParameterSet>(module);
if (mod_pset.getParameter<std::string>("@module_type") == "SwitchProducer") {
auto const& all_cases = mod_pset.getParameter<std::vector<std::string>>("@all_cases");
for (auto const& case_label : all_cases) {
auto range = aliasMap.equal_range(case_label);
if (range.first != range.second) {
switchEDAliases.push_back(case_label);
}
}
}
}
detail::processEDAliases(
switchEDAliases, conditionalmods, proc_pset, processConfiguration->processName(), preg);
}
{
//find branches created by the conditional modules
for (auto const& prod : preg.productList()) {
if (conditionalmods.find(prod.first.moduleLabel()) != conditionalmods.end()) {
conditionalModsBranches.emplace(prod.first.moduleLabel(), &prod.second);
}
}
}
conditionalModsBranches = conditionalTaskHelper.conditionalModuleBranches(conditionalmods);
}
modnames.erase(itCondBegin, modnames.end());

Expand Down Expand Up @@ -680,8 +743,14 @@ namespace edm {
runConcurrently = false;
}

auto condModules = tryToPlaceConditionalModules(
worker, conditionalmods, conditionalModsBranches, aliasMap, proc_pset, preg, prealloc, processConfiguration);
auto condModules = tryToPlaceConditionalModules(worker,
conditionalmods,
conditionalModsBranches,
conditionalTaskHelper.aliasMap(),
proc_pset,
preg,
prealloc,
processConfiguration);
for (auto condMod : condModules) {
tmpworkers.emplace_back(
condMod, WorkerInPath::Ignore, conditionalModOrder[condMod->description()->moduleLabel()], true);
Expand All @@ -701,9 +770,11 @@ namespace edm {
int bitpos,
std::string const& name,
TrigResPtr trptr,
std::vector<std::string> const& endPathNames) {
std::vector<std::string> const& endPathNames,
ConditionalTaskHelper const& conditionalTaskHelper) {
PathWorkers tmpworkers;
fillWorkers(proc_pset, preg, prealloc, processConfiguration, name, false, tmpworkers, endPathNames);
fillWorkers(
proc_pset, preg, prealloc, processConfiguration, name, false, tmpworkers, endPathNames, conditionalTaskHelper);

// an empty path will cause an extra bit that is not used
if (!tmpworkers.empty()) {
Expand All @@ -730,9 +801,11 @@ namespace edm {
std::shared_ptr<ProcessConfiguration const> processConfiguration,
int bitpos,
std::string const& name,
std::vector<std::string> const& endPathNames) {
std::vector<std::string> const& endPathNames,
ConditionalTaskHelper const& conditionalTaskHelper) {
PathWorkers tmpworkers;
fillWorkers(proc_pset, preg, prealloc, processConfiguration, name, true, tmpworkers, endPathNames);
fillWorkers(
proc_pset, preg, prealloc, processConfiguration, name, true, tmpworkers, endPathNames, conditionalTaskHelper);

if (!tmpworkers.empty()) {
//EndPaths are not supposed to stop if SkipEvent type exception happens
Expand Down