diff --git a/Framework/Core/README.md b/Framework/Core/README.md index 27d08ef02df3b..cca1a13e2242a 100644 --- a/Framework/Core/README.md +++ b/Framework/Core/README.md @@ -475,7 +475,7 @@ Data Sampling provides possibility to sample data in DPL workflows, basing on ce ### Usage To use Data Sampling in a DPL workflow insert following lines to your code: -``` +```cpp #include "Framework/DataSampling.h" using namespace o2::framework; void customize(std::vector& policies) @@ -510,7 +510,7 @@ Sampled data can be subscribed to by adding `InputSpecs` provided by `std::vecto The following sampling conditions are available. When more than one is used, a positive decision is taken when all the conditions are fulfilled. - **DataSamplingConditionRandom** - pseudo-randomly accepts specified fraction of incoming messages. -``` +```json { "condition": "random", "fraction": "0.1", @@ -518,7 +518,7 @@ The following sampling conditions are available. When more than one is used, a p } ``` - **DataSamplingConditionNConsecutive** - approves n consecutive samples in defined cycle. It assumes that timesliceID always increments by one. -``` +```json { "condition": "nConsecutive", "samplesNumber": "3", @@ -526,7 +526,7 @@ The following sampling conditions are available. When more than one is used, a p } ``` - **DataSamplingConditionPayloadSize** - approves messages having payload size within specified boundaries. -``` +```json { "condition": "payloadSize", "lowerLimit": "300", diff --git a/Framework/Core/include/Framework/DataSampling.h b/Framework/Core/include/Framework/DataSampling.h index f399da343f7b9..86f9983af0924 100644 --- a/Framework/Core/include/Framework/DataSampling.h +++ b/Framework/Core/include/Framework/DataSampling.h @@ -84,7 +84,7 @@ class DataSampling private: // Internal functions, used by GenerateInfrastructure() - static std::string dispatcherName(); + static std::string createDispatcherName(); }; } // namespace framework diff --git a/Framework/Core/include/Framework/DataSamplingPolicy.h b/Framework/Core/include/Framework/DataSamplingPolicy.h index 008c84c37818f..9fbd9374fb1d2 100644 --- a/Framework/Core/include/Framework/DataSamplingPolicy.h +++ b/Framework/Core/include/Framework/DataSamplingPolicy.h @@ -40,6 +40,8 @@ class DataSamplingPolicy struct inputSpecHasher { size_t operator()(const InputSpec& i) const { + // 'Compressing' dataOrigin and dataDescription to 64 bits. SubSpecification is not taken into account, + // because sometimes we want to have subSpec-agnostic match. return (static_cast(i.description.itg[0]) << 32 | static_cast(i.description.itg[1])) ^ static_cast(i.origin.itg[0]); @@ -82,8 +84,8 @@ class DataSamplingPolicy const std::string& getFairMQOutputChannel() const; std::string getFairMQOutputChannelName() const; - static header::DataOrigin policyDataOrigin(); - static header::DataDescription policyDataDescription(std::string policyName, size_t id); + static header::DataOrigin createPolicyDataOrigin(); + static header::DataDescription createPolicyDataDescription(std::string policyName, size_t id); private: std::string mName; diff --git a/Framework/Core/src/DataSampling.cxx b/Framework/Core/src/DataSampling.cxx index 54ea9a2dc69de..88c1b4bae792e 100644 --- a/Framework/Core/src/DataSampling.cxx +++ b/Framework/Core/src/DataSampling.cxx @@ -29,7 +29,7 @@ namespace o2 namespace framework { -std::string DataSampling::dispatcherName() +std::string DataSampling::createDispatcherName() { return std::string("Dispatcher"); //_") + getenv("HOSTNAME"); } @@ -37,7 +37,7 @@ std::string DataSampling::dispatcherName() void DataSampling::GenerateInfrastructure(WorkflowSpec& workflow, const std::string& policiesSource, size_t threads) { LOG(DEBUG) << "Generating Data Sampling infrastructure..."; - Dispatcher dispatcher(dispatcherName(), policiesSource); + Dispatcher dispatcher(createDispatcherName(), policiesSource); Options options; std::unique_ptr cfg = ConfigurationFactory::getConfiguration(policiesSource); @@ -101,7 +101,7 @@ void DataSampling::GenerateInfrastructure(WorkflowSpec& workflow, const std::str void DataSampling::CustomizeInfrastructure(std::vector& policies) { - CompletionPolicy dispatcherConsumesASAP = CompletionPolicyHelpers::defineByName(dispatcherName(), CompletionPolicy::CompletionOp::Consume); + CompletionPolicy dispatcherConsumesASAP = CompletionPolicyHelpers::defineByName(createDispatcherName(), CompletionPolicy::CompletionOp::Consume); policies.push_back(dispatcherConsumesASAP); } diff --git a/Framework/Core/src/DataSamplingPolicy.cxx b/Framework/Core/src/DataSamplingPolicy.cxx index 40db241f8b4d1..651d013974b23 100644 --- a/Framework/Core/src/DataSamplingPolicy.cxx +++ b/Framework/Core/src/DataSamplingPolicy.cxx @@ -66,8 +66,8 @@ void DataSamplingPolicy::configure(const ptree& config) OutputSpec outputSpec{ { dataHeaderConfig.second.get("binding") }, - policyDataOrigin(), - policyDataDescription(mName, outputId++), + createPolicyDataOrigin(), + createPolicyDataDescription(mName, outputId++), mSubSpec }; @@ -138,12 +138,12 @@ const header::DataHeader::SubSpecificationType DataSamplingPolicy::getSubSpec() return mSubSpec; } -header::DataOrigin DataSamplingPolicy::policyDataOrigin() +header::DataOrigin DataSamplingPolicy::createPolicyDataOrigin() { return header::DataOrigin("DS"); } -header::DataDescription DataSamplingPolicy::policyDataDescription(std::string policyName, size_t id) +header::DataDescription DataSamplingPolicy::createPolicyDataDescription(std::string policyName, size_t id) { if (policyName.size() > 14) { LOG(WARNING) << "DataSamplingPolicy name '" << policyName << "' is longer than 14 characters, trimming in dataDescription.";