Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 5 additions & 3 deletions Utilities/DataSampling/include/DataSampling/DataSampling.h
Original file line number Diff line number Diff line change
Expand Up @@ -81,14 +81,16 @@ class DataSampling
/// QC tasks.
/// \param policiesSource Path to configuration file.
/// \param threads Number of dispatcher threads, that will handle the data
static void GenerateInfrastructure(framework::WorkflowSpec& workflow, const std::string& policiesSource, size_t threads = 1);
/// \param host Host name. If the host or a policy machine list are empty, the policy will always be created.
static void GenerateInfrastructure(framework::WorkflowSpec& workflow, const std::string& policiesSource, size_t threads = 1, const std::string& host = "");

/// \brief Generates data sampling infrastructure.
/// \param workflow DPL workflow with already declared data processors which provide data desired by
/// QC tasks.
/// \param policiesSource boost::property_tree::ptree with the configuration
/// \param threads Number of dispatcher threads, that will handle the data
static void GenerateInfrastructure(framework::WorkflowSpec& workflow, boost::property_tree::ptree const& policies, size_t threads = 1);
/// \param host Host name. If the host or a policy machine list are empty, the policy will always be created.
static void GenerateInfrastructure(framework::WorkflowSpec& workflow, boost::property_tree::ptree const& policies, size_t threads = 1, const std::string& host = "");
/// \brief Configures dispatcher to consume any data immediately.
static void CustomizeInfrastructure(std::vector<framework::CompletionPolicy>&);
/// \brief Applies blocking/nonblocking data sampling configuration to the workflow.
Expand All @@ -114,7 +116,7 @@ class DataSampling
static std::vector<std::string> MachinesForPolicy(const std::string& policiesSource, const std::string& policyName);

private:
static void DoGenerateInfrastructure(Dispatcher&, framework::WorkflowSpec& workflow, boost::property_tree::ptree const& policies, size_t threads = 1);
static void DoGenerateInfrastructure(Dispatcher&, framework::WorkflowSpec& workflow, boost::property_tree::ptree const& policies, size_t threads = 1, const std::string& host = "");
// Internal functions, used by GenerateInfrastructure()
static std::string createDispatcherName();
};
Expand Down
23 changes: 15 additions & 8 deletions Utilities/DataSampling/src/DataSampling.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ std::string DataSampling::createDispatcherName()
return std::string("Dispatcher"); //_") + getenv("HOSTNAME");
}

void DataSampling::GenerateInfrastructure(WorkflowSpec& workflow, const std::string& policiesSource, size_t threads)
void DataSampling::GenerateInfrastructure(WorkflowSpec& workflow, const std::string& policiesSource, size_t threads, const std::string& host)
{
std::unique_ptr<ConfigurationInterface> cfg = ConfigurationFactory::getConfiguration(policiesSource);
if (cfg->getRecursive("").count("dataSamplingPolicies") == 0) {
Expand All @@ -45,26 +45,33 @@ void DataSampling::GenerateInfrastructure(WorkflowSpec& workflow, const std::str
}
auto policiesTree = cfg->getRecursive("dataSamplingPolicies");
Dispatcher dispatcher(createDispatcherName(), policiesSource);
DataSampling::DoGenerateInfrastructure(dispatcher, workflow, policiesTree, threads);
DataSampling::DoGenerateInfrastructure(dispatcher, workflow, policiesTree, threads, host);
}

void DataSampling::GenerateInfrastructure(WorkflowSpec& workflow, const boost::property_tree::ptree& policiesTree, size_t threads)
void DataSampling::GenerateInfrastructure(WorkflowSpec& workflow, const boost::property_tree::ptree& policiesTree, size_t threads, const std::string& host)
{
Dispatcher dispatcher(createDispatcherName(), "");
DataSampling::DoGenerateInfrastructure(dispatcher, workflow, policiesTree, threads);
DataSampling::DoGenerateInfrastructure(dispatcher, workflow, policiesTree, threads, host);
}

void DataSampling::DoGenerateInfrastructure(Dispatcher& dispatcher, WorkflowSpec& workflow, const boost::property_tree::ptree& policiesTree, size_t threads)
void DataSampling::DoGenerateInfrastructure(Dispatcher& dispatcher, WorkflowSpec& workflow, const boost::property_tree::ptree& policiesTree, size_t threads, const std::string& host)
{
LOG(DEBUG) << "Generating Data Sampling infrastructure...";

for (auto&& policyConfig : policiesTree) {

std::unique_ptr<DataSamplingPolicy> policy;

// We don't want the Dispatcher to exit due to one faulty Policy
try {
dispatcher.registerPolicy(std::make_unique<DataSamplingPolicy>(DataSamplingPolicy::fromConfiguration(policyConfig.second)));
auto policy = DataSamplingPolicy::fromConfiguration(policyConfig.second);
std::vector<std::string> machines;
if (policyConfig.second.count("machines") > 0) {
for (const auto& machine : policyConfig.second.get_child("machines")) {
machines.emplace_back(machine.second.get<std::string>(""));
}
}
if (host.empty() || machines.empty() || std::find(machines.begin(), machines.end(), host) != machines.end()) {
dispatcher.registerPolicy(std::make_unique<DataSamplingPolicy>(std::move(policy)));
}
} catch (const std::exception& ex) {
LOG(WARN) << "Could not load the Data Sampling Policy '"
<< policyConfig.second.get_optional<std::string>("id").value_or("") << "', because: " << ex.what();
Expand Down
76 changes: 76 additions & 0 deletions Utilities/DataSampling/test/test_DataSampling.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,82 @@ BOOST_AUTO_TEST_CASE(MultinodeUtilities)
auto machines = DataSampling::MachinesForPolicy(configFilePath, "tpcraw");
BOOST_CHECK_EQUAL(machines.size(), 2);
}
{
// empty host -> match any policy
WorkflowSpec workflow;
DataSampling::GenerateInfrastructure(workflow, configFilePath, 1, "");

auto disp = std::find_if(workflow.begin(), workflow.end(),
[](const DataProcessorSpec& d) {
return d.name.find("Dispatcher") != std::string::npos;
});
BOOST_REQUIRE(disp != workflow.end());

auto input1 = std::find_if(disp->inputs.begin(), disp->inputs.end(),
[](const InputSpec& in) {
return DataSpecUtils::match(in, DataOrigin("TPC"), DataDescription("CLUSTERS"), 0) && in.lifetime == Lifetime::Timeframe;
});
BOOST_CHECK(input1 != disp->inputs.end());
auto input2 = std::find_if(disp->inputs.begin(), disp->inputs.end(),
[](const InputSpec& in) {
return DataSpecUtils::match(in, DataOrigin("TPC"), DataDescription("CLUSTERS_P"), 0) && in.lifetime == Lifetime::Timeframe;
});
BOOST_CHECK(input2 != disp->inputs.end());
auto input3 = std::find_if(disp->inputs.begin(), disp->inputs.end(),
[](const InputSpec& in) {
return DataSpecUtils::match(in, DataOrigin("TPC"), DataDescription("RAWDATA"), 0) && in.lifetime == Lifetime::Timeframe;
});
BOOST_CHECK(input3 != disp->inputs.end());
}
{
// mismatching host -> create only policies with empty machines list
WorkflowSpec workflow;
DataSampling::GenerateInfrastructure(workflow, configFilePath, 1, "mismatching host");

auto disp = std::find_if(workflow.begin(), workflow.end(),
[](const DataProcessorSpec& d) {
return d.name.find("Dispatcher") != std::string::npos;
});
BOOST_REQUIRE(disp != workflow.end());

auto input1 = std::find_if(disp->inputs.begin(), disp->inputs.end(),
[](const InputSpec& in) {
return DataSpecUtils::match(in, DataOrigin("TPC"), DataDescription("CLUSTERS"), 0) && in.lifetime == Lifetime::Timeframe;
});
BOOST_CHECK(input1 != disp->inputs.end());
auto input2 = std::find_if(disp->inputs.begin(), disp->inputs.end(),
[](const InputSpec& in) {
return DataSpecUtils::match(in, DataOrigin("TPC"), DataDescription("CLUSTERS_P"), 0) && in.lifetime == Lifetime::Timeframe;
});
BOOST_CHECK(input2 != disp->inputs.end());
}
{
// matching host -> create policies with empty machines list and the ones which match
WorkflowSpec workflow;
DataSampling::GenerateInfrastructure(workflow, configFilePath, 1, "machineA");

auto disp = std::find_if(workflow.begin(), workflow.end(),
[](const DataProcessorSpec& d) {
return d.name.find("Dispatcher") != std::string::npos;
});
BOOST_REQUIRE(disp != workflow.end());

auto input1 = std::find_if(disp->inputs.begin(), disp->inputs.end(),
[](const InputSpec& in) {
return DataSpecUtils::match(in, DataOrigin("TPC"), DataDescription("CLUSTERS"), 0) && in.lifetime == Lifetime::Timeframe;
});
BOOST_CHECK(input1 != disp->inputs.end());
auto input2 = std::find_if(disp->inputs.begin(), disp->inputs.end(),
[](const InputSpec& in) {
return DataSpecUtils::match(in, DataOrigin("TPC"), DataDescription("CLUSTERS_P"), 0) && in.lifetime == Lifetime::Timeframe;
});
BOOST_CHECK(input2 != disp->inputs.end());
auto input3 = std::find_if(disp->inputs.begin(), disp->inputs.end(),
[](const InputSpec& in) {
return DataSpecUtils::match(in, DataOrigin("TPC"), DataDescription("RAWDATA"), 0) && in.lifetime == Lifetime::Timeframe;
});
BOOST_CHECK(input3 != disp->inputs.end());
}
}

BOOST_AUTO_TEST_CASE(DataSamplingEmptyConfig)
Expand Down