Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 23 additions & 12 deletions Framework/include/QualityControl/InfrastructureGenerator.h
Original file line number Diff line number Diff line change
Expand Up @@ -158,18 +158,29 @@ class InfrastructureGenerator
private:
// Dedicated methods for creating each QC component to hide implementation details.

static void generateDataSamplingPolicyLocalProxy(framework::WorkflowSpec& workflow,
const std::string& policyName,
const framework::Inputs& inputSpecs,
const std::string& localMachine,
const std::string& localPort,
const std::string& control);
static void generateDataSamplingPolicyRemoteProxy(framework::WorkflowSpec& workflow,
const std::string& policyName,
const framework::Outputs& outputSpecs,
const std::string& localMachine,
const std::string& localPort,
const std::string& control);
static void generateDataSamplingPolicyLocalProxyBind(framework::WorkflowSpec& workflow,
const std::string& policyName,
const framework::Inputs& inputSpecs,
const std::string& localMachine,
const std::string& localPort,
const std::string& control);
static void generateDataSamplingPolicyRemoteProxyConnect(framework::WorkflowSpec& workflow,
const std::string& policyName,
const framework::Outputs& outputSpecs,
const std::string& localMachine,
const std::string& localPort,
const std::string& control);
static void generateDataSamplingPolicyLocalProxyConnect(framework::WorkflowSpec& workflow,
const std::string& policyName,
const framework::Inputs& inputSpecs,
const std::string& remoteMachine,
const std::string& remotePort,
const std::string& control);
static void generateDataSamplingPolicyRemoteProxyBind(framework::WorkflowSpec& workflow,
const std::string& policyName,
const framework::Outputs& outputSpecs,
const std::string& remotePort,
const std::string& control);
static void generateLocalTaskLocalProxy(framework::WorkflowSpec& workflow,
size_t id,
std::string taskName,
Expand Down
3 changes: 2 additions & 1 deletion Framework/multinode-test.json.in
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,8 @@
"name": "sampling2"
},
"taskParameters": {},
"location": "remote"
"location": "remote",
"remoteMachine": "localhost"
}
},
"checks": {
Expand Down
115 changes: 83 additions & 32 deletions Framework/src/InfrastructureGenerator.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -58,13 +58,15 @@ namespace o2::quality_control::core
uint16_t defaultPolicyPort = 42349;

struct DataSamplingPolicySpec {
DataSamplingPolicySpec(std::string name, std::string control) : name(std::move(name)), control(std::move(control)) {}
DataSamplingPolicySpec(std::string name, std::string control, std::string remoteMachine = "")
: name(std::move(name)), control(std::move(control)), remoteMachine(std::move(remoteMachine)) {}
bool operator<(const DataSamplingPolicySpec other) const
{
return std::tie(name, control) < std::tie(other.name, other.control);
return std::tie(name, control, remoteMachine) < std::tie(other.name, other.control, other.remoteMachine);
}
std::string name;
std::string control;
std::string remoteMachine;
};

framework::WorkflowSpec InfrastructureGenerator::generateStandaloneInfrastructure(std::string configurationSource)
Expand Down Expand Up @@ -107,7 +109,7 @@ WorkflowSpec InfrastructureGenerator::generateLocalInfrastructure(std::string co
auto infrastructureSpec = InfrastructureSpecReader::readInfrastructureSpec(config->getRecursive(), configurationSource);

WorkflowSpec workflow;
std::set<DataSamplingPolicySpec> samplingPoliciesUsed;
std::set<DataSamplingPolicySpec> samplingPoliciesForRemoteTasks;

if (infrastructureSpec.tasks.empty()) {
return workflow;
Expand Down Expand Up @@ -146,7 +148,7 @@ WorkflowSpec InfrastructureGenerator::generateLocalInfrastructure(std::string co
// Collecting Data Sampling Policies
switch (taskSpec.dataSource.type) {
case DataSourceType::DataSamplingPolicy:
samplingPoliciesUsed.insert({ taskSpec.dataSource.typeSpecificParams.at("name"), taskSpec.localControl });
samplingPoliciesForRemoteTasks.insert({ taskSpec.dataSource.typeSpecificParams.at("name"), taskSpec.localControl, taskSpec.remoteMachine });
break;
case DataSourceType::Direct:
throw std::runtime_error("Configuration error: Remote QC tasks such as " + taskSpec.taskName + " cannot use direct data sources");
Expand All @@ -156,17 +158,19 @@ WorkflowSpec InfrastructureGenerator::generateLocalInfrastructure(std::string co
}
}

if (!samplingPoliciesUsed.empty()) {
if (!samplingPoliciesForRemoteTasks.empty()) {
auto dataSamplingTree = config->getRecursive("dataSamplingPolicies");
// Creating Data Sampling Policies proxies
for (const auto& [policyName, control] : samplingPoliciesUsed) {
for (const auto& [policyName, control, remoteMachine] : samplingPoliciesForRemoteTasks) {
std::string port = std::to_string(DataSampling::PortForPolicy(dataSamplingTree, policyName).value_or(defaultPolicyPort));
Inputs inputSpecs = DataSampling::InputSpecsForPolicy(dataSamplingTree, policyName);

std::vector<std::string> machines = DataSampling::MachinesForPolicy(dataSamplingTree, policyName);
for (const auto& machine : machines) {
if (machine == targetHost) {
generateDataSamplingPolicyLocalProxy(workflow, policyName, inputSpecs, machine, port, control);

if (machines.empty() || std::find(machines.begin(), machines.end(), targetHost) != machines.end()) {
if (DataSampling::BindLocationForPolicy(dataSamplingTree, policyName) == "remote") {
generateDataSamplingPolicyLocalProxyConnect(workflow, policyName, inputSpecs, remoteMachine, port, control);
} else {
generateDataSamplingPolicyLocalProxyBind(workflow, policyName, inputSpecs, targetHost, port, control);
}
}
}
Expand All @@ -189,7 +193,7 @@ o2::framework::WorkflowSpec InfrastructureGenerator::generateRemoteInfrastructur
auto infrastructureSpec = InfrastructureSpecReader::readInfrastructureSpec(config->getRecursive(), configurationSource);

WorkflowSpec workflow;
std::set<DataSamplingPolicySpec> samplingPoliciesUsed;
std::set<DataSamplingPolicySpec> samplingPoliciesForRemoteTasks;

for (const auto& taskSpec : infrastructureSpec.tasks) {
if (!taskSpec.active) {
Expand Down Expand Up @@ -219,7 +223,7 @@ o2::framework::WorkflowSpec InfrastructureGenerator::generateRemoteInfrastructur
// Collecting Data Sampling Policies
switch (taskSpec.dataSource.type) {
case DataSourceType::DataSamplingPolicy:
samplingPoliciesUsed.insert({ taskSpec.dataSource.typeSpecificParams.at("name"), taskSpec.localControl });
samplingPoliciesForRemoteTasks.insert({ taskSpec.dataSource.typeSpecificParams.at("name"), taskSpec.localControl });
break;
case DataSourceType::Direct:
throw std::runtime_error("Configuration error: Remote QC tasks such as " + taskSpec.taskName + " cannot use direct data sources");
Expand All @@ -233,18 +237,22 @@ o2::framework::WorkflowSpec InfrastructureGenerator::generateRemoteInfrastructur
}
}

if (!samplingPoliciesUsed.empty()) {
if (!samplingPoliciesForRemoteTasks.empty()) {
auto dataSamplingTree = config->getRecursive("dataSamplingPolicies");
// Creating Data Sampling Policies proxies
for (const auto& [policyName, control] : samplingPoliciesUsed) {
// todo now we have to generate one proxy per local machine and policy, because of the proxy limitations.
// Use one proxy per policy when it is possible.

for (const auto& [policyName, control, remoteMachine] : samplingPoliciesForRemoteTasks) {
(void)remoteMachine;
std::string port = std::to_string(DataSampling::PortForPolicy(dataSamplingTree, policyName).value_or(defaultPolicyPort));
Outputs outputSpecs = DataSampling::OutputSpecsForPolicy(dataSamplingTree, policyName);
std::vector<std::string> machines = DataSampling::MachinesForPolicy(dataSamplingTree, policyName);
for (const auto& machine : machines) {
generateDataSamplingPolicyRemoteProxy(workflow, policyName, outputSpecs, machine, port, control);
if (DataSampling::BindLocationForPolicy(dataSamplingTree, policyName) == "remote") {
generateDataSamplingPolicyRemoteProxyBind(workflow, policyName, outputSpecs, port, control);
} else {
// todo now we have to generate one proxy per local machine and policy, because of the proxy limitations.
// Use one proxy per policy when it is possible.
std::vector<std::string> localMachines = DataSampling::MachinesForPolicy(dataSamplingTree, policyName);
for (const auto& localMachine : localMachines) {
generateDataSamplingPolicyRemoteProxyConnect(workflow, policyName, outputSpecs, localMachine, port, control);
}
}
}
}
Expand Down Expand Up @@ -351,12 +359,12 @@ void InfrastructureGenerator::printVersion()
ILOG(Info, Support) << "QC version " << o2::quality_control::core::Version::GetQcVersion().getString() << ENDM;
}

void InfrastructureGenerator::generateDataSamplingPolicyLocalProxy(framework::WorkflowSpec& workflow,
const string& policyName,
const framework::Inputs& inputSpecs,
const std::string& localMachine,
const string& localPort,
const std::string& control)
void InfrastructureGenerator::generateDataSamplingPolicyLocalProxyBind(framework::WorkflowSpec& workflow,
const string& policyName,
const framework::Inputs& inputSpecs,
const std::string& localMachine,
const string& localPort,
const std::string& control)
{
std::string proxyName = policyName + "-proxy";
std::string channelName = policyName + "-" + localMachine;
Expand All @@ -375,12 +383,12 @@ void InfrastructureGenerator::generateDataSamplingPolicyLocalProxy(framework::Wo
workflow.back().labels.emplace_back(control == "odc" ? ecs::preserveRawChannelsLabel : ecs::uniqueProxyLabel);
}

void InfrastructureGenerator::generateDataSamplingPolicyRemoteProxy(framework::WorkflowSpec& workflow,
const std::string& policyName,
const Outputs& outputSpecs,
const std::string& localMachine,
const std::string& localPort,
const std::string& control)
void InfrastructureGenerator::generateDataSamplingPolicyRemoteProxyConnect(framework::WorkflowSpec& workflow,
const std::string& policyName,
const Outputs& outputSpecs,
const std::string& localMachine,
const std::string& localPort,
const std::string& control)
{
std::string channelName = policyName + "-" + localMachine;
const std::string& proxyName = channelName; // channel name has to match proxy name
Expand All @@ -396,6 +404,49 @@ void InfrastructureGenerator::generateDataSamplingPolicyRemoteProxy(framework::W
workflow.back().labels.emplace_back(control == "odc" ? ecs::preserveRawChannelsLabel : ecs::uniqueProxyLabel);
}

void InfrastructureGenerator::generateDataSamplingPolicyLocalProxyConnect(framework::WorkflowSpec& workflow,
const string& policyName,
const framework::Inputs& inputSpecs,
const std::string& remoteMachine,
const string& remotePort,
const std::string& control)
{
std::string proxyName = policyName + "-proxy";
const std::string& channelName = policyName;
std::string channelConfig = "name=" + channelName + ",type=push,method=connect,address=tcp://" + remoteMachine + ":" + remotePort +
",rateLogging=60,transport=zeromq";
auto channelSelector = [channelName](InputSpec const&, const std::unordered_map<std::string, std::vector<FairMQChannel>>&) {
return channelName;
};

workflow.emplace_back(
specifyFairMQDeviceMultiOutputProxy(
proxyName.c_str(),
inputSpecs,
channelConfig.c_str(),
channelSelector));
workflow.back().labels.emplace_back(control == "odc" ? ecs::preserveRawChannelsLabel : ecs::uniqueProxyLabel);
}

void InfrastructureGenerator::generateDataSamplingPolicyRemoteProxyBind(framework::WorkflowSpec& workflow,
const std::string& policyName,
const Outputs& outputSpecs,
const std::string& remotePort,
const std::string& control)
{
std::string channelName = policyName;
const std::string& proxyName = channelName; // channel name has to match proxy name

std::string channelConfig = "name=" + channelName + ",type=pull,method=bind,address=tcp://*:" + remotePort + ",rateLogging=60,transport=zeromq";

workflow.emplace_back(specifyExternalFairMQDeviceProxy(
proxyName.c_str(),
outputSpecs,
channelConfig.c_str(),
dplModelAdaptor()));
workflow.back().labels.emplace_back(control == "odc" ? ecs::preserveRawChannelsLabel : ecs::uniqueProxyLabel);
}

void InfrastructureGenerator::generateLocalTaskLocalProxy(framework::WorkflowSpec& workflow, size_t id,
std::string taskName, std::string remoteHost,
std::string remotePort, const std::string& control)
Expand Down
2 changes: 1 addition & 1 deletion Framework/test/testInfrastructureGenerator.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ BOOST_AUTO_TEST_CASE(qc_factory_remote_test)
auto tcpclustProxy = std::find_if(
workflow.begin(), workflow.end(),
[](const DataProcessorSpec& d) {
return d.name == "tpcclust-o2flp1" &&
return d.name == "tpcclust" &&
d.inputs.size() == 0 &&
d.outputs.size() == 1;
});
Expand Down
Loading