diff --git a/Framework/include/QualityControl/InfrastructureGenerator.h b/Framework/include/QualityControl/InfrastructureGenerator.h index 3493c9f2b4..693d66296a 100644 --- a/Framework/include/QualityControl/InfrastructureGenerator.h +++ b/Framework/include/QualityControl/InfrastructureGenerator.h @@ -158,18 +158,29 @@ class InfrastructureGenerator private: // Dedicated methods for creating each QC component to hide implementation details. - static void generateDataSamplingPolicyLocalProxy(framework::WorkflowSpec& workflow, - const std::string& policyName, - const framework::Inputs& inputSpecs, - const std::string& localMachine, - const std::string& localPort, - const std::string& control); - static void generateDataSamplingPolicyRemoteProxy(framework::WorkflowSpec& workflow, - const std::string& policyName, - const framework::Outputs& outputSpecs, - const std::string& localMachine, - const std::string& localPort, - const std::string& control); + static void generateDataSamplingPolicyLocalProxyBind(framework::WorkflowSpec& workflow, + const std::string& policyName, + const framework::Inputs& inputSpecs, + const std::string& localMachine, + const std::string& localPort, + const std::string& control); + static void generateDataSamplingPolicyRemoteProxyConnect(framework::WorkflowSpec& workflow, + const std::string& policyName, + const framework::Outputs& outputSpecs, + const std::string& localMachine, + const std::string& localPort, + const std::string& control); + static void generateDataSamplingPolicyLocalProxyConnect(framework::WorkflowSpec& workflow, + const std::string& policyName, + const framework::Inputs& inputSpecs, + const std::string& remoteMachine, + const std::string& remotePort, + const std::string& control); + static void generateDataSamplingPolicyRemoteProxyBind(framework::WorkflowSpec& workflow, + const std::string& policyName, + const framework::Outputs& outputSpecs, + const std::string& remotePort, + const std::string& control); static void generateLocalTaskLocalProxy(framework::WorkflowSpec& workflow, size_t id, std::string taskName, diff --git a/Framework/multinode-test.json.in b/Framework/multinode-test.json.in index 698b9c9eb0..b0958d20e1 100644 --- a/Framework/multinode-test.json.in +++ b/Framework/multinode-test.json.in @@ -57,7 +57,8 @@ "name": "sampling2" }, "taskParameters": {}, - "location": "remote" + "location": "remote", + "remoteMachine": "localhost" } }, "checks": { diff --git a/Framework/src/InfrastructureGenerator.cxx b/Framework/src/InfrastructureGenerator.cxx index cc5f971352..4a74c64465 100644 --- a/Framework/src/InfrastructureGenerator.cxx +++ b/Framework/src/InfrastructureGenerator.cxx @@ -58,13 +58,15 @@ namespace o2::quality_control::core uint16_t defaultPolicyPort = 42349; struct DataSamplingPolicySpec { - DataSamplingPolicySpec(std::string name, std::string control) : name(std::move(name)), control(std::move(control)) {} + DataSamplingPolicySpec(std::string name, std::string control, std::string remoteMachine = "") + : name(std::move(name)), control(std::move(control)), remoteMachine(std::move(remoteMachine)) {} bool operator<(const DataSamplingPolicySpec other) const { - return std::tie(name, control) < std::tie(other.name, other.control); + return std::tie(name, control, remoteMachine) < std::tie(other.name, other.control, other.remoteMachine); } std::string name; std::string control; + std::string remoteMachine; }; framework::WorkflowSpec InfrastructureGenerator::generateStandaloneInfrastructure(std::string configurationSource) @@ -107,7 +109,7 @@ WorkflowSpec InfrastructureGenerator::generateLocalInfrastructure(std::string co auto infrastructureSpec = InfrastructureSpecReader::readInfrastructureSpec(config->getRecursive(), configurationSource); WorkflowSpec workflow; - std::set samplingPoliciesUsed; + std::set samplingPoliciesForRemoteTasks; if (infrastructureSpec.tasks.empty()) { return workflow; @@ -146,7 +148,7 @@ WorkflowSpec InfrastructureGenerator::generateLocalInfrastructure(std::string co // Collecting Data Sampling Policies switch (taskSpec.dataSource.type) { case DataSourceType::DataSamplingPolicy: - samplingPoliciesUsed.insert({ taskSpec.dataSource.typeSpecificParams.at("name"), taskSpec.localControl }); + samplingPoliciesForRemoteTasks.insert({ taskSpec.dataSource.typeSpecificParams.at("name"), taskSpec.localControl, taskSpec.remoteMachine }); break; case DataSourceType::Direct: throw std::runtime_error("Configuration error: Remote QC tasks such as " + taskSpec.taskName + " cannot use direct data sources"); @@ -156,17 +158,19 @@ WorkflowSpec InfrastructureGenerator::generateLocalInfrastructure(std::string co } } - if (!samplingPoliciesUsed.empty()) { + if (!samplingPoliciesForRemoteTasks.empty()) { auto dataSamplingTree = config->getRecursive("dataSamplingPolicies"); // Creating Data Sampling Policies proxies - for (const auto& [policyName, control] : samplingPoliciesUsed) { + for (const auto& [policyName, control, remoteMachine] : samplingPoliciesForRemoteTasks) { std::string port = std::to_string(DataSampling::PortForPolicy(dataSamplingTree, policyName).value_or(defaultPolicyPort)); Inputs inputSpecs = DataSampling::InputSpecsForPolicy(dataSamplingTree, policyName); - std::vector machines = DataSampling::MachinesForPolicy(dataSamplingTree, policyName); - for (const auto& machine : machines) { - if (machine == targetHost) { - generateDataSamplingPolicyLocalProxy(workflow, policyName, inputSpecs, machine, port, control); + + if (machines.empty() || std::find(machines.begin(), machines.end(), targetHost) != machines.end()) { + if (DataSampling::BindLocationForPolicy(dataSamplingTree, policyName) == "remote") { + generateDataSamplingPolicyLocalProxyConnect(workflow, policyName, inputSpecs, remoteMachine, port, control); + } else { + generateDataSamplingPolicyLocalProxyBind(workflow, policyName, inputSpecs, targetHost, port, control); } } } @@ -189,7 +193,7 @@ o2::framework::WorkflowSpec InfrastructureGenerator::generateRemoteInfrastructur auto infrastructureSpec = InfrastructureSpecReader::readInfrastructureSpec(config->getRecursive(), configurationSource); WorkflowSpec workflow; - std::set samplingPoliciesUsed; + std::set samplingPoliciesForRemoteTasks; for (const auto& taskSpec : infrastructureSpec.tasks) { if (!taskSpec.active) { @@ -219,7 +223,7 @@ o2::framework::WorkflowSpec InfrastructureGenerator::generateRemoteInfrastructur // Collecting Data Sampling Policies switch (taskSpec.dataSource.type) { case DataSourceType::DataSamplingPolicy: - samplingPoliciesUsed.insert({ taskSpec.dataSource.typeSpecificParams.at("name"), taskSpec.localControl }); + samplingPoliciesForRemoteTasks.insert({ taskSpec.dataSource.typeSpecificParams.at("name"), taskSpec.localControl }); break; case DataSourceType::Direct: throw std::runtime_error("Configuration error: Remote QC tasks such as " + taskSpec.taskName + " cannot use direct data sources"); @@ -233,18 +237,22 @@ o2::framework::WorkflowSpec InfrastructureGenerator::generateRemoteInfrastructur } } - if (!samplingPoliciesUsed.empty()) { + if (!samplingPoliciesForRemoteTasks.empty()) { auto dataSamplingTree = config->getRecursive("dataSamplingPolicies"); // Creating Data Sampling Policies proxies - for (const auto& [policyName, control] : samplingPoliciesUsed) { - // todo now we have to generate one proxy per local machine and policy, because of the proxy limitations. - // Use one proxy per policy when it is possible. - + for (const auto& [policyName, control, remoteMachine] : samplingPoliciesForRemoteTasks) { + (void)remoteMachine; std::string port = std::to_string(DataSampling::PortForPolicy(dataSamplingTree, policyName).value_or(defaultPolicyPort)); Outputs outputSpecs = DataSampling::OutputSpecsForPolicy(dataSamplingTree, policyName); - std::vector machines = DataSampling::MachinesForPolicy(dataSamplingTree, policyName); - for (const auto& machine : machines) { - generateDataSamplingPolicyRemoteProxy(workflow, policyName, outputSpecs, machine, port, control); + if (DataSampling::BindLocationForPolicy(dataSamplingTree, policyName) == "remote") { + generateDataSamplingPolicyRemoteProxyBind(workflow, policyName, outputSpecs, port, control); + } else { + // todo now we have to generate one proxy per local machine and policy, because of the proxy limitations. + // Use one proxy per policy when it is possible. + std::vector localMachines = DataSampling::MachinesForPolicy(dataSamplingTree, policyName); + for (const auto& localMachine : localMachines) { + generateDataSamplingPolicyRemoteProxyConnect(workflow, policyName, outputSpecs, localMachine, port, control); + } } } } @@ -351,12 +359,12 @@ void InfrastructureGenerator::printVersion() ILOG(Info, Support) << "QC version " << o2::quality_control::core::Version::GetQcVersion().getString() << ENDM; } -void InfrastructureGenerator::generateDataSamplingPolicyLocalProxy(framework::WorkflowSpec& workflow, - const string& policyName, - const framework::Inputs& inputSpecs, - const std::string& localMachine, - const string& localPort, - const std::string& control) +void InfrastructureGenerator::generateDataSamplingPolicyLocalProxyBind(framework::WorkflowSpec& workflow, + const string& policyName, + const framework::Inputs& inputSpecs, + const std::string& localMachine, + const string& localPort, + const std::string& control) { std::string proxyName = policyName + "-proxy"; std::string channelName = policyName + "-" + localMachine; @@ -375,12 +383,12 @@ void InfrastructureGenerator::generateDataSamplingPolicyLocalProxy(framework::Wo workflow.back().labels.emplace_back(control == "odc" ? ecs::preserveRawChannelsLabel : ecs::uniqueProxyLabel); } -void InfrastructureGenerator::generateDataSamplingPolicyRemoteProxy(framework::WorkflowSpec& workflow, - const std::string& policyName, - const Outputs& outputSpecs, - const std::string& localMachine, - const std::string& localPort, - const std::string& control) +void InfrastructureGenerator::generateDataSamplingPolicyRemoteProxyConnect(framework::WorkflowSpec& workflow, + const std::string& policyName, + const Outputs& outputSpecs, + const std::string& localMachine, + const std::string& localPort, + const std::string& control) { std::string channelName = policyName + "-" + localMachine; const std::string& proxyName = channelName; // channel name has to match proxy name @@ -396,6 +404,49 @@ void InfrastructureGenerator::generateDataSamplingPolicyRemoteProxy(framework::W workflow.back().labels.emplace_back(control == "odc" ? ecs::preserveRawChannelsLabel : ecs::uniqueProxyLabel); } +void InfrastructureGenerator::generateDataSamplingPolicyLocalProxyConnect(framework::WorkflowSpec& workflow, + const string& policyName, + const framework::Inputs& inputSpecs, + const std::string& remoteMachine, + const string& remotePort, + const std::string& control) +{ + std::string proxyName = policyName + "-proxy"; + const std::string& channelName = policyName; + std::string channelConfig = "name=" + channelName + ",type=push,method=connect,address=tcp://" + remoteMachine + ":" + remotePort + + ",rateLogging=60,transport=zeromq"; + auto channelSelector = [channelName](InputSpec const&, const std::unordered_map>&) { + return channelName; + }; + + workflow.emplace_back( + specifyFairMQDeviceMultiOutputProxy( + proxyName.c_str(), + inputSpecs, + channelConfig.c_str(), + channelSelector)); + workflow.back().labels.emplace_back(control == "odc" ? ecs::preserveRawChannelsLabel : ecs::uniqueProxyLabel); +} + +void InfrastructureGenerator::generateDataSamplingPolicyRemoteProxyBind(framework::WorkflowSpec& workflow, + const std::string& policyName, + const Outputs& outputSpecs, + const std::string& remotePort, + const std::string& control) +{ + std::string channelName = policyName; + const std::string& proxyName = channelName; // channel name has to match proxy name + + std::string channelConfig = "name=" + channelName + ",type=pull,method=bind,address=tcp://*:" + remotePort + ",rateLogging=60,transport=zeromq"; + + workflow.emplace_back(specifyExternalFairMQDeviceProxy( + proxyName.c_str(), + outputSpecs, + channelConfig.c_str(), + dplModelAdaptor())); + workflow.back().labels.emplace_back(control == "odc" ? ecs::preserveRawChannelsLabel : ecs::uniqueProxyLabel); +} + void InfrastructureGenerator::generateLocalTaskLocalProxy(framework::WorkflowSpec& workflow, size_t id, std::string taskName, std::string remoteHost, std::string remotePort, const std::string& control) diff --git a/Framework/test/testInfrastructureGenerator.cxx b/Framework/test/testInfrastructureGenerator.cxx index 74d6360efc..fa29abed9f 100644 --- a/Framework/test/testInfrastructureGenerator.cxx +++ b/Framework/test/testInfrastructureGenerator.cxx @@ -88,7 +88,7 @@ BOOST_AUTO_TEST_CASE(qc_factory_remote_test) auto tcpclustProxy = std::find_if( workflow.begin(), workflow.end(), [](const DataProcessorSpec& d) { - return d.name == "tpcclust-o2flp1" && + return d.name == "tpcclust" && d.inputs.size() == 0 && d.outputs.size() == 1; }); diff --git a/doc/Advanced.md b/doc/Advanced.md index 3d6742280a..8004ff5d3f 100644 --- a/doc/Advanced.md +++ b/doc/Advanced.md @@ -194,14 +194,15 @@ List the local processing machines in the `localMachines` array. `remoteMachine` `localControl` parameter allows to properly configure QC with respect to the control software it is run with. It can be either `aliecs` (on FLPs) or `odc` (EPNs). It has no influence when running the software by hand. - One also may choose the merging mode - `delta` is the default and recommended (tasks are reset after each cycle, so they +One also may choose the merging mode - `delta` is the default and recommended (tasks are reset after each cycle, so they send only updates), but if it is not feasible, Mergers may expect `entire` objects - tasks are not reset, they always send entire objects and the latest versions are combined in Mergers. - With the `delta` mode, one can cheat by specifying just one local machine name and referencing only that one later. + With the `delta` mode, one can cheat by specifying just one local machine name and using only that one during execution. This is not possible with `entire` mode, because then Mergers need identifiable data sources to merge objects correctly. -In case of a remote task, choosing `"remote"` option for the `"location"` parameter is needed. Also, `localControl` -should be specified, so data samples can be correctly dispatched. +In case of a remote task, choosing `"remote"` option for the `"location"` parameter is needed. In standalone setups +and those controlled by ODC, one should also specify the `"remoteMachine"`, so sampled data reaches the right node. +Also, `"localControl"` should be specified to generate the correct AliECS workflow template. ```json "tasks": { @@ -215,13 +216,16 @@ should be specified, so data samples can be correctly dispatched. }, "taskParameters": {}, "location": "remote", - "localControl": "aliecs", + "remoteMachine": "qcnode", "":"not needed with AliECS", + "localControl": "aliecs", "":"aliecs is default, not needed in setups fully controlled by AliECS" } } ``` -In case the task is running remotely, one has to specify the machines where data should be published to external -machines (with remote tasks) and a local port number. Use separate ports for each Data Sampling Policy. +In case the task is running remotely, data should be sampled. The minimal-effort approach requires adding a port number + (see the example below). Use separate ports for each Data Sampling Policy. If the same configuration file will be used + on many nodes, but only some of them should apply a given sampling policy, one should also specify the list of + machines to match (or generalized aliases, e.g. "flp", "epn"). ```json { "dataSamplingPolicies": [ @@ -229,16 +233,18 @@ machines (with remote tasks) and a local port number. Use separate ports for eac { "id": "rnd-little", "active": "true", - "machines": [ "","needed only for remote QC tasks", + "machines": [ "","only needed when the policy should run on a subgroup of nodes", "localnode2" ], - "port": "30333", "":"not needed with AliECS", + "port": "30333", "":"compulsory on standalone and ODC setups, not needed with AliECS", ... } ] } ``` -/ +By default, the channel is bound on the QC Task side. If this is not what you need, add `"bindLocation" : "local"` in +the policy configuration (`"remote"` is the default value) and make sure to use valid host names. + 2. Make sure that the firewalls are properly configured. If your machines block incoming/outgoing connections by default, you can add these rules to the firewall (run as sudo). Consider enabling only concrete ports or a small range of those. @@ -262,6 +268,7 @@ systemctl disable firewalld # to disable permanently 3. Install the same version of the QC software on each of these nodes. We cannot guarantee that different QC versions will talk to each other without problems. Also, make sure the configuration file that you will use is the same everywhere. 4. Run each part of the workflow. In this example `o2-qc-run-producer` represents any DPL workflow, here it is just a process which produces some random data. +The `--host` argument is matched against the `machines` lists in the configuration files. ``` # On localnode1: o2-qc-run-producer | o2-qc --config json:/${QUALITYCONTROL_ROOT}/etc/multiNode.json --local --host localnode1 -b