Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 0 additions & 43 deletions Framework/Core/src/SendingPolicy.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -28,49 +28,6 @@ namespace o2::framework
std::vector<SendingPolicy> SendingPolicy::createDefaultPolicies()
{
return {SendingPolicy{
.name = "dispatcher",
.matcher = [](DataProcessorSpec const& source, DataProcessorSpec const& dest, ConfigContext const&) {
if (source.name == "Dispatcher") {
return true;
}
// Check if any of the labels has "Dispatcher" as prefix
for (auto const& label : source.labels) {
if (label.value.find("Dispatcher") == 0) {
return true;
}
}
// Check if any of the destination's labels is "expendable"
for (auto const& label : dest.labels) {
if (label.value == "expendable") {
return true;
}
}
return false; },
.send = [](fair::mq::Parts& parts, ChannelIndex channelIndex, ServiceRegistryRef registry) {
auto &proxy = registry.get<FairMQDeviceProxy>();
OutputChannelInfo const& info = proxy.getOutputChannelInfo(channelIndex);
OutputChannelState& state = proxy.getOutputChannelState(channelIndex);
// Default timeout is 50ms.
// We count the number of consecutively dropped messages.
// If we have more than 10, we switch to a completely
// non-blocking approach.
int64_t timeout = 50;
if (state.droppedMessages == 10 + 1) {
LOG(warning) << "Failed to send 10 messages with 10ms timeout in a row, switching to completely non-blocking mode.";
}
if (state.droppedMessages == 0) {
timeout = 50;
}
if (state.droppedMessages > 10) {
timeout = 0;
}
int64_t result = info.channel.Send(parts, timeout);
if (result >= 0) {
state.droppedMessages = 0;
} else if (state.droppedMessages < std::numeric_limits<decltype(state.droppedMessages)>::max()) {
state.droppedMessages++;
} }},
SendingPolicy{
.name = "profiling",
.matcher = [](DataProcessorSpec const&, DataProcessorSpec const&, ConfigContext const&) { return getenv("DPL_DEBUG_MESSAGE_SIZE"); },
.send = [](fair::mq::Parts& parts, ChannelIndex channelIndex, ServiceRegistryRef registry) {
Expand Down