From 893d74eae2d9a0ba728df2a0c74a278a683f1b2c Mon Sep 17 00:00:00 2001 From: Piotr Konopka Date: Thu, 2 Dec 2021 11:06:35 +0100 Subject: [PATCH] [OCTRL-586] AliECS dump: include queues sizes --- Framework/Core/src/O2ControlHelpers.cxx | 30 +++++++++++++++---- .../test_FrameworkDataFlowToO2Control.cxx | 15 +++++++++- 2 files changed, 39 insertions(+), 6 deletions(-) diff --git a/Framework/Core/src/O2ControlHelpers.cxx b/Framework/Core/src/O2ControlHelpers.cxx index 06e00cb84d3d7..ffcb986f71b40 100644 --- a/Framework/Core/src/O2ControlHelpers.cxx +++ b/Framework/Core/src/O2ControlHelpers.cxx @@ -42,6 +42,8 @@ void dumpChannelBind(std::ostream& dumpOut, const T& channel, std::string indLev dumpOut << indLevel << indScheme << "transport: " << (channel.protocol == ChannelProtocol::IPC ? "shmem" : "zeromq") << "\n"; dumpOut << indLevel << indScheme << "addressing: " << (channel.protocol == ChannelProtocol::IPC ? "ipc" : "tcp") << "\n"; dumpOut << indLevel << indScheme << "rateLogging: \"{{ fmq_rate_logging }}\"\n"; + dumpOut << indLevel << indScheme << "sndBufSize: " << channel.sendBufferSize << "\n"; + dumpOut << indLevel << indScheme << "rcvBufSize: " << channel.recvBufferSize << "\n"; } template @@ -53,6 +55,8 @@ void dumpChannelConnect(std::ostream& dumpOut, const T& channel, const std::stri dumpOut << indLevel << indScheme << "transport: " << (channel.protocol == ChannelProtocol::IPC ? "shmem" : "zeromq") << "\n"; dumpOut << indLevel << indScheme << "target: \"{{ Parent().Path }}." << binderName << ":" << channel.name << "\"\n"; dumpOut << indLevel << indScheme << "rateLogging: \"{{ fmq_rate_logging }}\"\n"; + dumpOut << indLevel << indScheme << "sndBufSize: " << channel.sendBufferSize << "\n"; + dumpOut << indLevel << indScheme << "rcvBufSize: " << channel.recvBufferSize << "\n"; } struct RawChannel { @@ -62,6 +66,8 @@ struct RawChannel { std::string_view address; std::string_view rateLogging; std::string_view transport; + std::string_view sndBufSize; + std::string_view rcvBufSize; }; std::string rawChannelReference(std::string_view channelName, bool isUniqueChannel) @@ -81,17 +87,23 @@ void dumpRawChannelConnect(std::ostream& dumpOut, const RawChannel& channel, boo dumpOut << indLevel << indScheme << "transport: " << channel.transport << "\n"; if (preserveRawChannels) { dumpOut << indLevel << indScheme << "target: \"" << channel.address << "\"\n"; - LOG(info) << "This topology will connect to the channel '" << channel.name << "', which is most likely bound outside." + LOG(info) << "This workflow will connect to the channel '" << channel.name << "', which is most likely bound outside." << " Please make sure it is available under the address '" << channel.address << "' in the mother workflow or another subworkflow."; } else { auto channelRef = rawChannelReference(channel.name, isUniqueChannel); - LOG(info) << "This topology will connect to the channel '" << channel.name << "', which is most likely bound outside." + LOG(info) << "This workflow will connect to the channel '" << channel.name << "', which is most likely bound outside." << " Please make sure it is declared in the global channel space under the name '" << channelRef << "' in the mother workflow or another subworkflow."; dumpOut << indLevel << indScheme << "target: \"::" << channelRef << "\"\n"; } dumpOut << indLevel << indScheme << "rateLogging: \"{{ fmq_rate_logging }}\"\n"; + if (!channel.sndBufSize.empty()) { + dumpOut << indLevel << indScheme << "sndBufSize: " << channel.sndBufSize << "\n"; + } + if (!channel.rcvBufSize.empty()) { + dumpOut << indLevel << indScheme << "rcvBufSize: " << channel.rcvBufSize << "\n"; + } } void dumpRawChannelBind(std::ostream& dumpOut, const RawChannel& channel, bool isUniqueChannel, bool preserveRawChannels, std::string indLevel) @@ -102,19 +114,25 @@ void dumpRawChannelBind(std::ostream& dumpOut, const RawChannel& channel, bool i dumpOut << indLevel << indScheme << "addressing: " << (channel.address.find("ipc") != std::string_view::npos ? "ipc" : "tcp") << "\n"; dumpOut << indLevel << indScheme << "rateLogging: \"{{ fmq_rate_logging }}\"\n"; if (preserveRawChannels) { - LOG(info) << "This topology will bind a dangling channel '" << channel.name << "'" + LOG(info) << "This workflow will bind a dangling channel '" << channel.name << "'" << " with the address '" << channel.address << "'." << " Please make sure that another device connects to this channel elsewhere." << " Also, don't mind seeing the message twice, it will be addressed in future releases."; dumpOut << indLevel << indScheme << "target: \"" << channel.address << "\"\n"; } else { auto channelRef = rawChannelReference(channel.name, isUniqueChannel); - LOG(info) << "This topology will bind a dangling channel '" << channel.name << "'" + LOG(info) << "This workflow will bind a dangling channel '" << channel.name << "'" << " and declare it in the global channel space under the name '" << channelRef << "'." << " Please make sure that another device connects to this channel elsewhere." << " Also, don't mind seeing the message twice, it will be addressed in future releases."; dumpOut << indLevel << indScheme << "global: \"" << channelRef << "\"\n"; } + if (!channel.sndBufSize.empty()) { + dumpOut << indLevel << indScheme << "sndBufSize: " << channel.sndBufSize << "\n"; + } + if (!channel.rcvBufSize.empty()) { + dumpOut << indLevel << indScheme << "rcvBufSize: " << channel.rcvBufSize << "\n"; + } } std::string_view extractValueFromChannelConfig(std::string_view string, std::string_view token) @@ -158,7 +176,9 @@ std::vector extractRawChannels(const DeviceSpec& spec, const DeviceE extractValueFromChannelConfig(channelConfig, "method="), extractValueFromChannelConfig(channelConfig, "address="), extractValueFromChannelConfig(channelConfig, "rateLogging="), - extractValueFromChannelConfig(channelConfig, "transport=")}); + extractValueFromChannelConfig(channelConfig, "transport="), + extractValueFromChannelConfig(channelConfig, "sndBufSize="), + extractValueFromChannelConfig(channelConfig, "rcvBufSize=")}); } } } diff --git a/Framework/Core/test/test_FrameworkDataFlowToO2Control.cxx b/Framework/Core/test/test_FrameworkDataFlowToO2Control.cxx index bf79a3f0dcc74..936693091e2e5 100644 --- a/Framework/Core/test/test_FrameworkDataFlowToO2Control.cxx +++ b/Framework/Core/test/test_FrameworkDataFlowToO2Control.cxx @@ -35,7 +35,7 @@ WorkflowSpec defineDataProcessing() Outputs{OutputSpec{"TST", "A1"}, OutputSpec{"TST", "A2"}}, // A1 will be consumed twice, A2 is dangling AlgorithmSpec{}, // {ConfigParamSpec{"channel-config", VariantType::String, // raw input channel - "name=into_dpl,type=pull,method=connect,address=ipc:///tmp/pipe-into-dpl,transport=shmem,rateLogging=10", + "name=into_dpl,type=pull,method=connect,address=ipc:///tmp/pipe-into-dpl,transport=shmem,rateLogging=10,rcvBufSize=789", {"Out-of-band channel config"}}}}, {"B", // producer, no inputs Inputs{}, @@ -85,6 +85,7 @@ const auto expectedWorkflow = R"EXPECTED(name: testwf transport: shmem target: "::into_dpl-{{ it }}" rateLogging: "{{ fmq_rate_logging }}" + rcvBufSize: 789 task: load: testwf-A - name: "B" @@ -98,11 +99,15 @@ const auto expectedWorkflow = R"EXPECTED(name: testwf transport: shmem target: "{{ Parent().Path }}.A:from_A_to_C" rateLogging: "{{ fmq_rate_logging }}" + sndBufSize: 1 + rcvBufSize: 1 - name: from_B_to_C type: pull transport: shmem target: "{{ Parent().Path }}.B:from_B_to_C" rateLogging: "{{ fmq_rate_logging }}" + sndBufSize: 1 + rcvBufSize: 1 task: load: testwf-C - name: "D" @@ -112,6 +117,8 @@ const auto expectedWorkflow = R"EXPECTED(name: testwf transport: shmem target: "{{ Parent().Path }}.C:from_C_to_D" rateLogging: "{{ fmq_rate_logging }}" + sndBufSize: 1 + rcvBufSize: 1 bind: - name: outta_dpl type: push @@ -142,6 +149,8 @@ const std::vector expectedTasks{ transport: shmem addressing: ipc rateLogging: "{{ fmq_rate_logging }}" + sndBufSize: 1 + rcvBufSize: 1 command: shell: true log: "{{ log_task_output }}" @@ -213,6 +222,8 @@ const std::vector expectedTasks{ transport: shmem addressing: ipc rateLogging: "{{ fmq_rate_logging }}" + sndBufSize: 1 + rcvBufSize: 1 command: shell: true log: "{{ log_task_output }}" @@ -284,6 +295,8 @@ const std::vector expectedTasks{ transport: shmem addressing: ipc rateLogging: "{{ fmq_rate_logging }}" + sndBufSize: 1 + rcvBufSize: 1 command: shell: true log: "{{ log_task_output }}"