Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 25 additions & 5 deletions Framework/Core/src/O2ControlHelpers.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ void dumpChannelBind(std::ostream& dumpOut, const T& channel, std::string indLev
dumpOut << indLevel << indScheme << "transport: " << (channel.protocol == ChannelProtocol::IPC ? "shmem" : "zeromq") << "\n";
dumpOut << indLevel << indScheme << "addressing: " << (channel.protocol == ChannelProtocol::IPC ? "ipc" : "tcp") << "\n";
dumpOut << indLevel << indScheme << "rateLogging: \"{{ fmq_rate_logging }}\"\n";
dumpOut << indLevel << indScheme << "sndBufSize: " << channel.sendBufferSize << "\n";
dumpOut << indLevel << indScheme << "rcvBufSize: " << channel.recvBufferSize << "\n";
}

template <typename T>
Expand All @@ -53,6 +55,8 @@ void dumpChannelConnect(std::ostream& dumpOut, const T& channel, const std::stri
dumpOut << indLevel << indScheme << "transport: " << (channel.protocol == ChannelProtocol::IPC ? "shmem" : "zeromq") << "\n";
dumpOut << indLevel << indScheme << "target: \"{{ Parent().Path }}." << binderName << ":" << channel.name << "\"\n";
dumpOut << indLevel << indScheme << "rateLogging: \"{{ fmq_rate_logging }}\"\n";
dumpOut << indLevel << indScheme << "sndBufSize: " << channel.sendBufferSize << "\n";
dumpOut << indLevel << indScheme << "rcvBufSize: " << channel.recvBufferSize << "\n";
}

struct RawChannel {
Expand All @@ -62,6 +66,8 @@ struct RawChannel {
std::string_view address;
std::string_view rateLogging;
std::string_view transport;
std::string_view sndBufSize;
std::string_view rcvBufSize;
};

std::string rawChannelReference(std::string_view channelName, bool isUniqueChannel)
Expand All @@ -81,17 +87,23 @@ void dumpRawChannelConnect(std::ostream& dumpOut, const RawChannel& channel, boo
dumpOut << indLevel << indScheme << "transport: " << channel.transport << "\n";
if (preserveRawChannels) {
dumpOut << indLevel << indScheme << "target: \"" << channel.address << "\"\n";
LOG(info) << "This topology will connect to the channel '" << channel.name << "', which is most likely bound outside."
LOG(info) << "This workflow will connect to the channel '" << channel.name << "', which is most likely bound outside."
<< " Please make sure it is available under the address '" << channel.address
<< "' in the mother workflow or another subworkflow.";
} else {
auto channelRef = rawChannelReference(channel.name, isUniqueChannel);
LOG(info) << "This topology will connect to the channel '" << channel.name << "', which is most likely bound outside."
LOG(info) << "This workflow will connect to the channel '" << channel.name << "', which is most likely bound outside."
<< " Please make sure it is declared in the global channel space under the name '" << channelRef
<< "' in the mother workflow or another subworkflow.";
dumpOut << indLevel << indScheme << "target: \"::" << channelRef << "\"\n";
}
dumpOut << indLevel << indScheme << "rateLogging: \"{{ fmq_rate_logging }}\"\n";
if (!channel.sndBufSize.empty()) {
dumpOut << indLevel << indScheme << "sndBufSize: " << channel.sndBufSize << "\n";
}
if (!channel.rcvBufSize.empty()) {
dumpOut << indLevel << indScheme << "rcvBufSize: " << channel.rcvBufSize << "\n";
}
}

void dumpRawChannelBind(std::ostream& dumpOut, const RawChannel& channel, bool isUniqueChannel, bool preserveRawChannels, std::string indLevel)
Expand All @@ -102,19 +114,25 @@ void dumpRawChannelBind(std::ostream& dumpOut, const RawChannel& channel, bool i
dumpOut << indLevel << indScheme << "addressing: " << (channel.address.find("ipc") != std::string_view::npos ? "ipc" : "tcp") << "\n";
dumpOut << indLevel << indScheme << "rateLogging: \"{{ fmq_rate_logging }}\"\n";
if (preserveRawChannels) {
LOG(info) << "This topology will bind a dangling channel '" << channel.name << "'"
LOG(info) << "This workflow will bind a dangling channel '" << channel.name << "'"
<< " with the address '" << channel.address << "'."
<< " Please make sure that another device connects to this channel elsewhere."
<< " Also, don't mind seeing the message twice, it will be addressed in future releases.";
dumpOut << indLevel << indScheme << "target: \"" << channel.address << "\"\n";
} else {
auto channelRef = rawChannelReference(channel.name, isUniqueChannel);
LOG(info) << "This topology will bind a dangling channel '" << channel.name << "'"
LOG(info) << "This workflow will bind a dangling channel '" << channel.name << "'"
<< " and declare it in the global channel space under the name '" << channelRef << "'."
<< " Please make sure that another device connects to this channel elsewhere."
<< " Also, don't mind seeing the message twice, it will be addressed in future releases.";
dumpOut << indLevel << indScheme << "global: \"" << channelRef << "\"\n";
}
if (!channel.sndBufSize.empty()) {
dumpOut << indLevel << indScheme << "sndBufSize: " << channel.sndBufSize << "\n";
}
if (!channel.rcvBufSize.empty()) {
dumpOut << indLevel << indScheme << "rcvBufSize: " << channel.rcvBufSize << "\n";
}
}

std::string_view extractValueFromChannelConfig(std::string_view string, std::string_view token)
Expand Down Expand Up @@ -158,7 +176,9 @@ std::vector<RawChannel> extractRawChannels(const DeviceSpec& spec, const DeviceE
extractValueFromChannelConfig(channelConfig, "method="),
extractValueFromChannelConfig(channelConfig, "address="),
extractValueFromChannelConfig(channelConfig, "rateLogging="),
extractValueFromChannelConfig(channelConfig, "transport=")});
extractValueFromChannelConfig(channelConfig, "transport="),
extractValueFromChannelConfig(channelConfig, "sndBufSize="),
extractValueFromChannelConfig(channelConfig, "rcvBufSize=")});
}
}
}
Expand Down
15 changes: 14 additions & 1 deletion Framework/Core/test/test_FrameworkDataFlowToO2Control.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ WorkflowSpec defineDataProcessing()
Outputs{OutputSpec{"TST", "A1"}, OutputSpec{"TST", "A2"}}, // A1 will be consumed twice, A2 is dangling
AlgorithmSpec{}, //
{ConfigParamSpec{"channel-config", VariantType::String, // raw input channel
"name=into_dpl,type=pull,method=connect,address=ipc:///tmp/pipe-into-dpl,transport=shmem,rateLogging=10",
"name=into_dpl,type=pull,method=connect,address=ipc:///tmp/pipe-into-dpl,transport=shmem,rateLogging=10,rcvBufSize=789",
{"Out-of-band channel config"}}}},
{"B", // producer, no inputs
Inputs{},
Expand Down Expand Up @@ -85,6 +85,7 @@ const auto expectedWorkflow = R"EXPECTED(name: testwf
transport: shmem
target: "::into_dpl-{{ it }}"
rateLogging: "{{ fmq_rate_logging }}"
rcvBufSize: 789
task:
load: testwf-A
- name: "B"
Expand All @@ -98,11 +99,15 @@ const auto expectedWorkflow = R"EXPECTED(name: testwf
transport: shmem
target: "{{ Parent().Path }}.A:from_A_to_C"
rateLogging: "{{ fmq_rate_logging }}"
sndBufSize: 1
rcvBufSize: 1
- name: from_B_to_C
type: pull
transport: shmem
target: "{{ Parent().Path }}.B:from_B_to_C"
rateLogging: "{{ fmq_rate_logging }}"
sndBufSize: 1
rcvBufSize: 1
task:
load: testwf-C
- name: "D"
Expand All @@ -112,6 +117,8 @@ const auto expectedWorkflow = R"EXPECTED(name: testwf
transport: shmem
target: "{{ Parent().Path }}.C:from_C_to_D"
rateLogging: "{{ fmq_rate_logging }}"
sndBufSize: 1
rcvBufSize: 1
bind:
- name: outta_dpl
type: push
Expand Down Expand Up @@ -142,6 +149,8 @@ const std::vector expectedTasks{
transport: shmem
addressing: ipc
rateLogging: "{{ fmq_rate_logging }}"
sndBufSize: 1
rcvBufSize: 1
command:
shell: true
log: "{{ log_task_output }}"
Expand Down Expand Up @@ -213,6 +222,8 @@ const std::vector expectedTasks{
transport: shmem
addressing: ipc
rateLogging: "{{ fmq_rate_logging }}"
sndBufSize: 1
rcvBufSize: 1
command:
shell: true
log: "{{ log_task_output }}"
Expand Down Expand Up @@ -284,6 +295,8 @@ const std::vector expectedTasks{
transport: shmem
addressing: ipc
rateLogging: "{{ fmq_rate_logging }}"
sndBufSize: 1
rcvBufSize: 1
command:
shell: true
log: "{{ log_task_output }}"
Expand Down