diff --git a/Framework/Core/include/Framework/runDataProcessing.h b/Framework/Core/include/Framework/runDataProcessing.h index 9e47b6757b78b..853735ff13f52 100644 --- a/Framework/Core/include/Framework/runDataProcessing.h +++ b/Framework/Core/include/Framework/runDataProcessing.h @@ -127,7 +127,6 @@ int main(int argc, char** argv) UserCustomizationsHelper::userDefinedCustomization(workflowOptions, 0); workflowOptions.push_back(ConfigParamSpec{"readers", VariantType::Int64, 1ll, {"number of parallel readers to use"}}); workflowOptions.push_back(ConfigParamSpec{"pipeline", VariantType::String, "", {"override default pipeline size"}}); - workflowOptions.push_back(ConfigParamSpec{"dangling-outputs-policy", VariantType::String, "file", {"what to do with dangling outputs. file: write to file, fairmq: send to output proxy"}}); // options for AOD rate limiting workflowOptions.push_back(ConfigParamSpec{"aod-memory-rate-limit", VariantType::Int64, 0LL, {"Rate limit AOD processing based on memory"}}); @@ -139,6 +138,18 @@ int main(int argc, char** argv) workflowOptions.push_back(ConfigParamSpec{"aod-writer-ntfmerge", VariantType::Int, -1, {"Number of time frames to merge into one file"}}); workflowOptions.push_back(ConfigParamSpec{"aod-writer-keep", VariantType::String, "", {"Comma separated list of ORIGIN/DESCRIPTION/SUBSPECIFICATION:treename:col1/col2/..:filename"}}); + workflowOptions.push_back(ConfigParamSpec{"forwarding-policy", + VariantType::String, + "dangling", + {"Which messages to forward." + " dangling: dangling outputs," + " all: all messages"}}); + workflowOptions.push_back(ConfigParamSpec{"forwarding-destination", + VariantType::String, + "file", + {"Destination for forwarded messages." + " file: write to file," + " fairmq: send to output proxy"}}); std::vector channelPolicies; UserCustomizationsHelper::userDefinedCustomization(channelPolicies, 0); auto defaultChannelPolicies = ChannelConfigurationPolicy::createDefaultPolicies(); diff --git a/Framework/Core/src/CommonDataProcessors.cxx b/Framework/Core/src/CommonDataProcessors.cxx index 9951143054ae9..5132375603223 100644 --- a/Framework/Core/src/CommonDataProcessors.cxx +++ b/Framework/Core/src/CommonDataProcessors.cxx @@ -480,7 +480,7 @@ DataProcessorSpec CommonDataProcessors::getGlobalFairMQSink(std::vector const& danglingOutputInputs) diff --git a/Framework/Core/src/WorkflowHelpers.cxx b/Framework/Core/src/WorkflowHelpers.cxx index bd5ed815c56c4..8e517cd2bccd4 100644 --- a/Framework/Core/src/WorkflowHelpers.cxx +++ b/Framework/Core/src/WorkflowHelpers.cxx @@ -468,23 +468,32 @@ void WorkflowHelpers::injectServiceDevices(WorkflowSpec& workflow, ConfigContext workflow.insert(workflow.end(), extraSpecs.begin(), extraSpecs.end()); extraSpecs.clear(); - // file sink for notAOD dangling outputs - // select dangling outputs which are not of type AOD - std::vector outputsInputsDangling; + // Select dangling outputs which are not of type AOD + std::vector redirectedOutputsInputs; for (auto ii = 0u; ii < outputsInputs.size(); ii++) { - if ((outputTypes[ii] & DANGLING) == DANGLING && (outputTypes[ii] & ANALYSIS) == 0) { - outputsInputsDangling.emplace_back(outputsInputs[ii]); + if (ctx.options().get("forwarding-policy") == "none") { + continue; + } + // We forward to the output proxy all the inputs only if they are dangling + // or if the forwarding policy is "proxy". + if (!(outputTypes[ii] & DANGLING) && (ctx.options().get("forwarding-policy") != "all")) { + continue; + } + // AODs are skipped in any case. + if ((outputTypes[ii] & ANALYSIS)) { + continue; } + redirectedOutputsInputs.emplace_back(outputsInputs[ii]); } std::vector unmatched; - if (outputsInputsDangling.size() > 0 && ctx.options().get("dangling-outputs-policy") == "file") { - auto fileSink = CommonDataProcessors::getGlobalFileSink(outputsInputsDangling, unmatched); - if (unmatched.size() != outputsInputsDangling.size()) { + if (redirectedOutputsInputs.size() > 0 && ctx.options().get("forwarding-destination") == "file") { + auto fileSink = CommonDataProcessors::getGlobalFileSink(redirectedOutputsInputs, unmatched); + if (unmatched.size() != redirectedOutputsInputs.size()) { extraSpecs.push_back(fileSink); } - } else if (outputsInputsDangling.size() > 0 && ctx.options().get("dangling-outputs-policy") == "fairmq") { - auto fairMQSink = CommonDataProcessors::getGlobalFairMQSink(outputsInputsDangling); + } else if (redirectedOutputsInputs.size() > 0 && ctx.options().get("forwarding-destination") == "fairmq") { + auto fairMQSink = CommonDataProcessors::getGlobalFairMQSink(redirectedOutputsInputs); extraSpecs.push_back(fairMQSink); } if (unmatched.size() > 0) { diff --git a/Framework/Core/test/test_WorkflowHelpers.cxx b/Framework/Core/test/test_WorkflowHelpers.cxx index b3ccd86aac17b..9b9c36a171a12 100644 --- a/Framework/Core/test/test_WorkflowHelpers.cxx +++ b/Framework/Core/test/test_WorkflowHelpers.cxx @@ -29,8 +29,16 @@ std::unique_ptr makeEmptyConfigContext() // FIXME: Ugly... We need to fix ownership and make sure the ConfigContext // either owns or shares ownership of the registry. std::vector> retrievers; - static std::vector specs; - specs.push_back(ConfigParamSpec{"dangling-outputs-policy", VariantType::String, "file", {"what to do with dangling outputs. file: write to file, fairmq: send to output proxy"}}); + static std::vector specs = { + ConfigParamSpec{"forwarding-policy", + VariantType::String, + "dangling", + {""}}, + ConfigParamSpec{"forwarding-destination", + VariantType::String, + "file", + {"what to do with dangling outputs. file: write to file, fairmq: send to output proxy"}}, + }; specs.push_back(ConfigParamSpec{"aod-memory-rate-limit", VariantType::String, "0", {"rate"}}); auto store = std::make_unique(specs, std::move(retrievers)); store->preload();