Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions Framework/Core/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -475,7 +475,7 @@ Data Sampling provides possibility to sample data in DPL workflows, basing on ce
### Usage

To use Data Sampling in a DPL workflow insert following lines to your code:
```
```cpp
#include "Framework/DataSampling.h"
using namespace o2::framework;
void customize(std::vector<CompletionPolicy>& policies)
Expand Down Expand Up @@ -510,23 +510,23 @@ Sampled data can be subscribed to by adding `InputSpecs` provided by `std::vecto

The following sampling conditions are available. When more than one is used, a positive decision is taken when all the conditions are fulfilled.
- **DataSamplingConditionRandom** - pseudo-randomly accepts specified fraction of incoming messages.
```
```json
{
"condition": "random",
"fraction": "0.1",
"seed": "22222"
}
```
- **DataSamplingConditionNConsecutive** - approves n consecutive samples in defined cycle. It assumes that timesliceID always increments by one.
```
```json
{
"condition": "nConsecutive",
"samplesNumber": "3",
"cycleSize": "100"
}
```
- **DataSamplingConditionPayloadSize** - approves messages having payload size within specified boundaries.
```
```json
{
"condition": "payloadSize",
"lowerLimit": "300",
Expand Down
2 changes: 1 addition & 1 deletion Framework/Core/include/Framework/DataSampling.h
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ class DataSampling

private:
// Internal functions, used by GenerateInfrastructure()
static std::string dispatcherName();
static std::string createDispatcherName();
};

} // namespace framework
Expand Down
6 changes: 4 additions & 2 deletions Framework/Core/include/Framework/DataSamplingPolicy.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ class DataSamplingPolicy
struct inputSpecHasher {
size_t operator()(const InputSpec& i) const
{
// 'Compressing' dataOrigin and dataDescription to 64 bits. SubSpecification is not taken into account,
// because sometimes we want to have subSpec-agnostic match.
return (static_cast<size_t>(i.description.itg[0]) << 32 |
static_cast<size_t>(i.description.itg[1])) ^
static_cast<size_t>(i.origin.itg[0]);
Expand Down Expand Up @@ -82,8 +84,8 @@ class DataSamplingPolicy
const std::string& getFairMQOutputChannel() const;
std::string getFairMQOutputChannelName() const;

static header::DataOrigin policyDataOrigin();
static header::DataDescription policyDataDescription(std::string policyName, size_t id);
static header::DataOrigin createPolicyDataOrigin();
static header::DataDescription createPolicyDataDescription(std::string policyName, size_t id);

private:
std::string mName;
Expand Down
6 changes: 3 additions & 3 deletions Framework/Core/src/DataSampling.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -29,15 +29,15 @@ namespace o2
namespace framework
{

std::string DataSampling::dispatcherName()
std::string DataSampling::createDispatcherName()
{
return std::string("Dispatcher"); //_") + getenv("HOSTNAME");
}

void DataSampling::GenerateInfrastructure(WorkflowSpec& workflow, const std::string& policiesSource, size_t threads)
{
LOG(DEBUG) << "Generating Data Sampling infrastructure...";
Dispatcher dispatcher(dispatcherName(), policiesSource);
Dispatcher dispatcher(createDispatcherName(), policiesSource);
Options options;

std::unique_ptr<ConfigurationInterface> cfg = ConfigurationFactory::getConfiguration(policiesSource);
Expand Down Expand Up @@ -101,7 +101,7 @@ void DataSampling::GenerateInfrastructure(WorkflowSpec& workflow, const std::str

void DataSampling::CustomizeInfrastructure(std::vector<CompletionPolicy>& policies)
{
CompletionPolicy dispatcherConsumesASAP = CompletionPolicyHelpers::defineByName(dispatcherName(), CompletionPolicy::CompletionOp::Consume);
CompletionPolicy dispatcherConsumesASAP = CompletionPolicyHelpers::defineByName(createDispatcherName(), CompletionPolicy::CompletionOp::Consume);
policies.push_back(dispatcherConsumesASAP);
}

Expand Down
8 changes: 4 additions & 4 deletions Framework/Core/src/DataSamplingPolicy.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,8 @@ void DataSamplingPolicy::configure(const ptree& config)

OutputSpec outputSpec{
{ dataHeaderConfig.second.get<std::string>("binding") },
policyDataOrigin(),
policyDataDescription(mName, outputId++),
createPolicyDataOrigin(),
createPolicyDataDescription(mName, outputId++),
mSubSpec
};

Expand Down Expand Up @@ -138,12 +138,12 @@ const header::DataHeader::SubSpecificationType DataSamplingPolicy::getSubSpec()
return mSubSpec;
}

header::DataOrigin DataSamplingPolicy::policyDataOrigin()
header::DataOrigin DataSamplingPolicy::createPolicyDataOrigin()
{
return header::DataOrigin("DS");
}

header::DataDescription DataSamplingPolicy::policyDataDescription(std::string policyName, size_t id)
header::DataDescription DataSamplingPolicy::createPolicyDataDescription(std::string policyName, size_t id)
{
if (policyName.size() > 14) {
LOG(WARNING) << "DataSamplingPolicy name '" << policyName << "' is longer than 14 characters, trimming in dataDescription.";
Expand Down