From d4a4ea14d208e4b135fe63122020c35d5855b2b5 Mon Sep 17 00:00:00 2001 From: Alexey Rybalchenko Date: Thu, 18 Oct 2018 11:10:06 +0200 Subject: [PATCH] Add example/test for built-in devices --- examples/CMakeLists.txt | 1 + examples/builtin-devices/CMakeLists.txt | 50 ++++++ examples/builtin-devices/README.md | 4 + .../fairmq-start-ex-builtin-devices.sh.in | 93 +++++++++++ .../test-ex-builtin-devices.sh.in | 148 ++++++++++++++++++ fairmq/devices/FairMQBenchmarkSampler.cxx | 20 ++- fairmq/devices/FairMQBenchmarkSampler.h | 5 +- fairmq/devices/FairMQMerger.cxx | 6 +- fairmq/devices/FairMQMerger.h | 2 +- fairmq/devices/FairMQMultiplier.cxx | 4 +- fairmq/devices/FairMQMultiplier.h | 2 +- fairmq/devices/FairMQProxy.cxx | 4 +- fairmq/devices/FairMQProxy.h | 2 +- fairmq/devices/FairMQSink.h | 38 ++++- fairmq/devices/FairMQSplitter.cxx | 4 +- fairmq/devices/FairMQSplitter.h | 2 +- fairmq/run/runBenchmarkSampler.cxx | 4 +- fairmq/run/runMerger.cxx | 2 +- fairmq/run/runMultiplier.cxx | 2 +- fairmq/run/runProxy.cxx | 2 +- fairmq/run/runSink.cxx | 3 +- fairmq/run/runSplitter.cxx | 2 +- fairmq/run/startMQBenchmark.sh.in | 18 +-- 23 files changed, 370 insertions(+), 48 deletions(-) create mode 100644 examples/builtin-devices/CMakeLists.txt create mode 100644 examples/builtin-devices/README.md create mode 100755 examples/builtin-devices/fairmq-start-ex-builtin-devices.sh.in create mode 100755 examples/builtin-devices/test-ex-builtin-devices.sh.in diff --git a/examples/CMakeLists.txt b/examples/CMakeLists.txt index 59f399242..5f4c76e87 100644 --- a/examples/CMakeLists.txt +++ b/examples/CMakeLists.txt @@ -8,6 +8,7 @@ add_subdirectory(1-1) add_subdirectory(1-n-1) +add_subdirectory(builtin-devices) add_subdirectory(copypush) add_subdirectory(dds) add_subdirectory(multipart) diff --git a/examples/builtin-devices/CMakeLists.txt b/examples/builtin-devices/CMakeLists.txt new file mode 100644 index 000000000..a7126c3a6 --- /dev/null +++ b/examples/builtin-devices/CMakeLists.txt @@ -0,0 +1,50 @@ + ################################################################################ + # Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH # + # # + # This software is distributed under the terms of the # + # GNU Lesser General Public Licence (LGPL) version 3, # + # copied verbatim in the file "LICENSE" # + ################################################################################ + +set(EX_BIN_DIR ${CMAKE_CURRENT_BINARY_DIR}) +set(FAIRMQ_BIN_DIR ${CMAKE_BINARY_DIR}/fairmq) +configure_file(${CMAKE_CURRENT_SOURCE_DIR}/fairmq-start-ex-builtin-devices.sh.in ${CMAKE_CURRENT_BINARY_DIR}/fairmq-start-ex-builtin-devices.sh) + +# test + +configure_file(${CMAKE_CURRENT_SOURCE_DIR}/test-ex-builtin-devices.sh.in ${CMAKE_CURRENT_BINARY_DIR}/test-ex-builtin-devices.sh) + +add_test(NAME Example-Builtin-Devices-zeromq COMMAND ${CMAKE_CURRENT_BINARY_DIR}/test-ex-builtin-devices.sh zeromq) +set_tests_properties(Example-Builtin-Devices-zeromq PROPERTIES TIMEOUT "30" RUN_SERIAL true PASS_REGULAR_EXPRESSION "Configured maximum number of iterations reached") + +if(BUILD_NANOMSG_TRANSPORT) + add_test(NAME Example-Builtin-Devices-nanomsg COMMAND ${CMAKE_CURRENT_BINARY_DIR}/test-ex-builtin-devices.sh nanomsg) + set_tests_properties(Example-Builtin-Devices-nanomsg PROPERTIES TIMEOUT "30" RUN_SERIAL true PASS_REGULAR_EXPRESSION "Configured maximum number of iterations reached") +endif() + +add_test(NAME Example-Builtin-Devices-shmem COMMAND ${CMAKE_CURRENT_BINARY_DIR}/test-ex-builtin-devices.sh shmem) +set_tests_properties(Example-Builtin-Devices-shmem PROPERTIES TIMEOUT "30" RUN_SERIAL true PASS_REGULAR_EXPRESSION "Configured maximum number of iterations reached") + +add_test(NAME Example-Builtin-Devices-zeromq-multipart COMMAND ${CMAKE_CURRENT_BINARY_DIR}/test-ex-builtin-devices.sh zeromq true 2) +set_tests_properties(Example-Builtin-Devices-zeromq-multipart PROPERTIES TIMEOUT "30" RUN_SERIAL true PASS_REGULAR_EXPRESSION "Configured maximum number of iterations reached") + +if(BUILD_NANOMSG_TRANSPORT) + add_test(NAME Example-Builtin-Devices-nanomsg-multipart COMMAND ${CMAKE_CURRENT_BINARY_DIR}/test-ex-builtin-devices.sh nanomsg true 2) + set_tests_properties(Example-Builtin-Devices-nanomsg-multipart PROPERTIES TIMEOUT "30" RUN_SERIAL true PASS_REGULAR_EXPRESSION "Configured maximum number of iterations reached") +endif() + +add_test(NAME Example-Builtin-Devices-shmem-multipart COMMAND ${CMAKE_CURRENT_BINARY_DIR}/test-ex-builtin-devices.sh shmem true 2) +set_tests_properties(Example-Builtin-Devices-shmem-multipart PROPERTIES TIMEOUT "30" RUN_SERIAL true PASS_REGULAR_EXPRESSION "Configured maximum number of iterations reached") + +# install + +# configure run script with different executable paths for build and for install directories +set(EX_BIN_DIR ${CMAKE_INSTALL_PREFIX}/${PROJECT_INSTALL_BINDIR}) +set(FAIRMQ_BIN_DIR ${CMAKE_INSTALL_PREFIX}/${PROJECT_INSTALL_BINDIR}/fairmq) +configure_file(${CMAKE_CURRENT_SOURCE_DIR}/fairmq-start-ex-builtin-devices.sh.in ${CMAKE_CURRENT_BINARY_DIR}/fairmq-start-ex-builtin-devices.sh_install) + +install( + PROGRAMS ${CMAKE_CURRENT_BINARY_DIR}/fairmq-start-ex-builtin-devices.sh_install + DESTINATION ${PROJECT_INSTALL_BINDIR} + RENAME fairmq-start-ex-builtin-devices.sh +) diff --git a/examples/builtin-devices/README.md b/examples/builtin-devices/README.md new file mode 100644 index 000000000..b8d62e7c4 --- /dev/null +++ b/examples/builtin-devices/README.md @@ -0,0 +1,4 @@ +Built-in devices +========================== + +This example demonstrates use of generic devices that are provided with FairMQ - BenchmarkSampler, Merger, Multiplier, Proxy, Sink, Splitter. They are all connected in one topology and transfer some dummy buffers generated by the BenchmarkSampler. diff --git a/examples/builtin-devices/fairmq-start-ex-builtin-devices.sh.in b/examples/builtin-devices/fairmq-start-ex-builtin-devices.sh.in new file mode 100755 index 000000000..7de1397e4 --- /dev/null +++ b/examples/builtin-devices/fairmq-start-ex-builtin-devices.sh.in @@ -0,0 +1,93 @@ +#!/bin/bash + +export FAIRMQ_PATH=@FAIRMQ_BIN_DIR@ + +transport="zeromq" +multipart="false" +numParts="1" + +if [[ $1 =~ ^[a-z]+$ ]]; then + transport=$1 +fi + +if [[ $2 =~ ^[a-z]+$ ]]; then + multipart=$2 +fi + +if [[ $3 =~ ^[0-9]+$ ]]; then + numParts=$3 +fi + +SAMPLER="fairmq-bsampler" +SAMPLER+=" --id bsampler1" +SAMPLER+=" --transport $transport" +SAMPLER+=" --control interactive" +SAMPLER+=" --severity debug" +SAMPLER+=" --msg-size 100000" +SAMPLER+=" --multipart $multipart" +SAMPLER+=" --num-parts $numParts" +SAMPLER+=" --msg-rate 100" +SAMPLER+=" --max-iterations 0" +SAMPLER+=" --out-channel data1" +SAMPLER+=" --channel-config name=data1,type=push,method=bind,sndBufSize=50,rcvBufSize=50,address=tcp://localhost:5555" +xterm -geometry 90x20+0+175 -hold -e @FAIRMQ_BIN_DIR@/$SAMPLER & + +SPLITTER="fairmq-splitter" +SPLITTER+=" --id splitter" +SPLITTER+=" --transport $transport" +SPLITTER+=" --multipart $multipart" +SPLITTER+=" --in-channel data1" +SPLITTER+=" --out-channel data2" +SPLITTER+=" --channel-config name=data1,type=pull,method=connect,sndBufSize=50,rcvBufSize=50,address=tcp://localhost:5555" +SPLITTER+=" name=data2,type=push,method=bind,sndBufSize=50,rcvBufSize=50,address=tcp://localhost:5556,address=tcp://localhost:5557" +xterm -geometry 90x20+0+475 -hold -e @FAIRMQ_BIN_DIR@/$SPLITTER & + +PROXY1="fairmq-proxy" +PROXY1+=" --id proxy1" +PROXY1+=" --transport $transport" +PROXY1+=" --multipart $multipart" +PROXY1+=" --in-channel data2" +PROXY1+=" --out-channel data3" +PROXY1+=" --channel-config name=data2,type=pull,method=connect,sndBufSize=50,rcvBufSize=50,address=tcp://localhost:5556" +PROXY1+=" name=data3,type=push,method=connect,sndBufSize=50,rcvBufSize=50,address=tcp://localhost:5558" +xterm -geometry 90x20+550+175 -hold -e @FAIRMQ_BIN_DIR@/$PROXY1 & + +PROXY2="fairmq-proxy" +PROXY2+=" --id proxy2" +PROXY2+=" --transport $transport" +PROXY2+=" --multipart $multipart" +PROXY2+=" --in-channel data2" +PROXY2+=" --out-channel data3" +PROXY2+=" --channel-config name=data2,type=pull,method=connect,sndBufSize=50,rcvBufSize=50,address=tcp://localhost:5557" +PROXY2+=" name=data3,type=push,method=connect,sndBufSize=50,rcvBufSize=50,address=tcp://localhost:5559" +xterm -geometry 90x20+550+475 -hold -e @FAIRMQ_BIN_DIR@/$PROXY2 & + +MERGER="fairmq-merger" +MERGER+=" --id merger" +MERGER+=" --transport $transport" +MERGER+=" --multipart $multipart" +MERGER+=" --in-channel data3" +MERGER+=" --out-channel data4" +MERGER+=" --channel-config name=data3,type=pull,method=bind,sndBufSize=50,rcvBufSize=50,address=tcp://localhost:5558,address=tcp://localhost:5559" +MERGER+=" name=data4,type=push,method=connect,sndBufSize=50,rcvBufSize=50,address=tcp://localhost:5560" +xterm -geometry 90x20+1100+50 -hold -e @FAIRMQ_BIN_DIR@/$MERGER & + +MULTIPLIER="fairmq-multiplier" +MULTIPLIER+=" --id multiplier" +MULTIPLIER+=" --transport $transport" +MULTIPLIER+=" --multipart $multipart" +MULTIPLIER+=" --in-channel data4" +MULTIPLIER+=" --out-channel data5" +MULTIPLIER+=" --channel-config name=data4,type=pull,method=bind,sndBufSize=50,rcvBufSize=50,address=tcp://localhost:5560" +MULTIPLIER+=" name=data5,type=push,method=connect,sndBufSize=50,rcvBufSize=50,address=tcp://localhost:5561,address=tcp://localhost:5561" +xterm -geometry 90x20+1100+350 -hold -e @FAIRMQ_BIN_DIR@/$MULTIPLIER & + +SINK="fairmq-sink" +SINK+=" --id sink1" +SINK+=" --transport $transport" +SINK+=" --severity debug" +SINK+=" --multipart $multipart" +SINK+=" --max-iterations 0" +SINK+=" --in-channel data5" +SINK+=" --channel-config name=data5,type=pull,method=bind,sndBufSize=50,rcvBufSize=50,address=tcp://localhost:5561" +xterm -geometry 90x20+1100+650 -hold -e @FAIRMQ_BIN_DIR@/$SINK & diff --git a/examples/builtin-devices/test-ex-builtin-devices.sh.in b/examples/builtin-devices/test-ex-builtin-devices.sh.in new file mode 100755 index 000000000..f1329b2b0 --- /dev/null +++ b/examples/builtin-devices/test-ex-builtin-devices.sh.in @@ -0,0 +1,148 @@ +#!/bin/bash + +export FAIRMQ_PATH=@FAIRMQ_BIN_DIR@ + +SESSION="$(@CMAKE_BINARY_DIR@/fairmq/fairmq-uuid-gen -h)" + +transport="zeromq" +multipart="false" +numParts="1" + +if [[ $1 =~ ^[a-z]+$ ]]; then + transport=$1 +fi + +if [[ $2 =~ ^[a-z]+$ ]]; then + multipart=$2 +fi + +if [[ $3 =~ ^[0-9]+$ ]]; then + numParts=$3 +fi + +# setup a trap to kill everything if the test fails/timeouts +trap 'kill -TERM $SAMPLER_PID; kill -TERM $SPLITTER_PID; kill -TERM $PROXY1_PID; kill -TERM $PROXY2_PID; kill -TERM $MERGER_PID; kill -TERM $MULTIPLIER_PID; kill -TERM $SINK_PID;' TERM + +SAMPLER="fairmq-bsampler" +SAMPLER+=" --id bsampler1" +SAMPLER+=" --session $SESSION" +SAMPLER+=" --transport $transport" +SAMPLER+=" --color false" +SAMPLER+=" --control static" +SAMPLER+=" --verbosity veryhigh" +SAMPLER+=" --severity debug" +SAMPLER+=" --msg-size 100000" +SAMPLER+=" --multipart $multipart" +SAMPLER+=" --num-parts $numParts" +SAMPLER+=" --msg-rate 1" +SAMPLER+=" --max-iterations 0" +SAMPLER+=" --out-channel data1" +SAMPLER+=" --channel-config name=data1,type=push,method=bind,sndBufSize=50,rcvBufSize=50,address=tcp://localhost:5555" +@FAIRMQ_BIN_DIR@/$SAMPLER & +SAMPLER_PID=$! + +SPLITTER="fairmq-splitter" +SPLITTER+=" --id splitter" +SPLITTER+=" --session $SESSION" +SPLITTER+=" --transport $transport" +SPLITTER+=" --color false" +SPLITTER+=" --control static" +SPLITTER+=" --verbosity veryhigh" +SPLITTER+=" --multipart $multipart" +SPLITTER+=" --in-channel data1" +SPLITTER+=" --out-channel data2" +SPLITTER+=" --channel-config name=data1,type=pull,method=connect,sndBufSize=50,rcvBufSize=50,address=tcp://localhost:5555" +SPLITTER+=" name=data2,type=push,method=bind,sndBufSize=50,rcvBufSize=50,address=tcp://localhost:5556,address=tcp://localhost:5557" +@FAIRMQ_BIN_DIR@/$SPLITTER & +SPLITTER_PID=$! + +PROXY1="fairmq-proxy" +PROXY1+=" --id proxy1" +PROXY1+=" --session $SESSION" +PROXY1+=" --transport $transport" +PROXY1+=" --color false" +PROXY1+=" --control static" +PROXY1+=" --verbosity veryhigh" +PROXY1+=" --multipart $multipart" +PROXY1+=" --in-channel data2" +PROXY1+=" --out-channel data3" +PROXY1+=" --channel-config name=data2,type=pull,method=connect,sndBufSize=50,rcvBufSize=50,address=tcp://localhost:5556" +PROXY1+=" name=data3,type=push,method=connect,sndBufSize=50,rcvBufSize=50,address=tcp://localhost:5558" +@FAIRMQ_BIN_DIR@/$PROXY1 & +PROXY1_PID=$! + +PROXY2="fairmq-proxy" +PROXY2+=" --id proxy2" +PROXY2+=" --session $SESSION" +PROXY2+=" --transport $transport" +PROXY2+=" --color false" +PROXY2+=" --control static" +PROXY2+=" --verbosity veryhigh" +PROXY2+=" --multipart $multipart" +PROXY2+=" --in-channel data2" +PROXY2+=" --out-channel data3" +PROXY2+=" --channel-config name=data2,type=pull,method=connect,sndBufSize=50,rcvBufSize=50,address=tcp://localhost:5557" +PROXY2+=" name=data3,type=push,method=connect,sndBufSize=50,rcvBufSize=50,address=tcp://localhost:5559" +@FAIRMQ_BIN_DIR@/$PROXY2 & +PROXY2_PID=$! + +MERGER="fairmq-merger" +MERGER+=" --id merger" +MERGER+=" --session $SESSION" +MERGER+=" --transport $transport" +MERGER+=" --color false" +MERGER+=" --control static" +MERGER+=" --verbosity veryhigh" +MERGER+=" --multipart $multipart" +MERGER+=" --in-channel data3" +MERGER+=" --out-channel data4" +MERGER+=" --channel-config name=data3,type=pull,method=bind,sndBufSize=50,rcvBufSize=50,address=tcp://localhost:5558,address=tcp://localhost:5559" +MERGER+=" name=data4,type=push,method=connect,sndBufSize=50,rcvBufSize=50,address=tcp://localhost:5560" +@FAIRMQ_BIN_DIR@/$MERGER & +MERGER_PID=$! + +MULTIPLIER="fairmq-multiplier" +MULTIPLIER+=" --id multiplier" +MULTIPLIER+=" --session $SESSION" +MULTIPLIER+=" --transport $transport" +MULTIPLIER+=" --color false" +MULTIPLIER+=" --control static" +MULTIPLIER+=" --verbosity veryhigh" +MULTIPLIER+=" --multipart $multipart" +MULTIPLIER+=" --in-channel data4" +MULTIPLIER+=" --out-channel data5" +MULTIPLIER+=" --channel-config name=data4,type=pull,method=bind,sndBufSize=50,rcvBufSize=50,address=tcp://localhost:5560" +MULTIPLIER+=" name=data5,type=push,method=connect,sndBufSize=50,rcvBufSize=50,address=tcp://localhost:5561,address=tcp://localhost:5561" +@FAIRMQ_BIN_DIR@/$MULTIPLIER & +MULTIPLIER_PID=$! + +SINK="fairmq-sink" +SINK+=" --id sink1" +SINK+=" --session $SESSION" +SINK+=" --transport $transport" +SINK+=" --color false" +SINK+=" --control static" +SINK+=" --verbosity veryhigh" +SINK+=" --severity debug" +SINK+=" --multipart $multipart" +SINK+=" --max-iterations 2" +SINK+=" --in-channel data5" +SINK+=" --channel-config name=data5,type=pull,method=bind,sndBufSize=50,rcvBufSize=50,address=tcp://localhost:5561" +@FAIRMQ_BIN_DIR@/$SINK & +SINK_PID=$! + +wait $SINK_PID + +kill -SIGINT $SAMPLER_PID +kill -SIGINT $SPLITTER_PID +kill -SIGINT $PROXY1_PID +kill -SIGINT $PROXY2_PID +kill -SIGINT $MERGER_PID +kill -SIGINT $MULTIPLIER_PID + +wait $SAMPLER_PID +wait $SPLITTER_PID +wait $PROXY1_PID +wait $PROXY2_PID +wait $MERGER_PID +wait $MULTIPLIER_PID diff --git a/fairmq/devices/FairMQBenchmarkSampler.cxx b/fairmq/devices/FairMQBenchmarkSampler.cxx index 8e31025f0..080826421 100644 --- a/fairmq/devices/FairMQBenchmarkSampler.cxx +++ b/fairmq/devices/FairMQBenchmarkSampler.cxx @@ -18,7 +18,8 @@ using namespace std; FairMQBenchmarkSampler::FairMQBenchmarkSampler() - : fSameMessage(true) + : fMultipart(false) + , fNumParts(1) , fMsgSize(10000) , fMsgRate(0) , fNumIterations(0) @@ -33,8 +34,9 @@ FairMQBenchmarkSampler::~FairMQBenchmarkSampler() void FairMQBenchmarkSampler::InitTask() { - fSameMessage = fConfig->GetValue("same-msg"); - fMsgSize = fConfig->GetValue("msg-size"); + fMultipart = fConfig->GetValue("multipart"); + fNumParts = fConfig->GetValue("num-parts"); + fMsgSize = fConfig->GetValue("msg-size"); fMsgRate = fConfig->GetValue("msg-rate"); fMaxIterations = fConfig->GetValue("max-iterations"); fOutChannelName = fConfig->GetValue("out-channel"); @@ -54,12 +56,16 @@ void FairMQBenchmarkSampler::Run() while (CheckCurrentState(RUNNING)) { - if (fSameMessage) + if (fMultipart) { - FairMQMessagePtr msg(dataOutChannel.NewMessage()); - msg->Copy(*baseMsg); + FairMQParts parts; - if (dataOutChannel.Send(msg) >= 0) + for (size_t i = 0; i < fNumParts; ++i) + { + parts.AddPart(dataOutChannel.NewMessage(fMsgSize)); + } + + if (dataOutChannel.Send(parts) >= 0) { if (fMaxIterations > 0) { diff --git a/fairmq/devices/FairMQBenchmarkSampler.h b/fairmq/devices/FairMQBenchmarkSampler.h index f173c79f6..b031de71a 100644 --- a/fairmq/devices/FairMQBenchmarkSampler.h +++ b/fairmq/devices/FairMQBenchmarkSampler.h @@ -26,8 +26,9 @@ class FairMQBenchmarkSampler : public FairMQDevice virtual ~FairMQBenchmarkSampler(); protected: - bool fSameMessage; - int fMsgSize; + bool fMultipart; + size_t fNumParts; + size_t fMsgSize; std::atomic fMsgCounter; float fMsgRate; uint64_t fNumIterations; diff --git a/fairmq/devices/FairMQMerger.cxx b/fairmq/devices/FairMQMerger.cxx index c7cd3574b..2f32c989d 100644 --- a/fairmq/devices/FairMQMerger.cxx +++ b/fairmq/devices/FairMQMerger.cxx @@ -20,7 +20,7 @@ using namespace std; FairMQMerger::FairMQMerger() - : fMultipart(1) + : fMultipart(true) , fInChannelName("data-in") , fOutChannelName("data-out") { @@ -30,6 +30,8 @@ void FairMQMerger::RegisterChannelEndpoints() { RegisterChannelEndpoint(fInChannelName, 1, 10000); RegisterChannelEndpoint(fOutChannelName, 1, 1); + + PrintRegisteredChannels(); } FairMQMerger::~FairMQMerger() @@ -38,7 +40,7 @@ FairMQMerger::~FairMQMerger() void FairMQMerger::InitTask() { - fMultipart = fConfig->GetValue("multipart"); + fMultipart = fConfig->GetValue("multipart"); fInChannelName = fConfig->GetValue("in-channel"); fOutChannelName = fConfig->GetValue("out-channel"); } diff --git a/fairmq/devices/FairMQMerger.h b/fairmq/devices/FairMQMerger.h index 9a17edfee..0e24a03cf 100644 --- a/fairmq/devices/FairMQMerger.h +++ b/fairmq/devices/FairMQMerger.h @@ -26,7 +26,7 @@ class FairMQMerger : public FairMQDevice virtual ~FairMQMerger(); protected: - int fMultipart; + bool fMultipart; std::string fInChannelName; std::string fOutChannelName; diff --git a/fairmq/devices/FairMQMultiplier.cxx b/fairmq/devices/FairMQMultiplier.cxx index c40fa289a..ec18ff15d 100644 --- a/fairmq/devices/FairMQMultiplier.cxx +++ b/fairmq/devices/FairMQMultiplier.cxx @@ -14,7 +14,7 @@ using namespace std; FairMQMultiplier::FairMQMultiplier() - : fMultipart(1) + : fMultipart(true) , fNumOutputs(0) , fInChannelName() , fOutChannelNames() @@ -27,7 +27,7 @@ FairMQMultiplier::~FairMQMultiplier() void FairMQMultiplier::InitTask() { - fMultipart = fConfig->GetValue("multipart"); + fMultipart = fConfig->GetValue("multipart"); fInChannelName = fConfig->GetValue("in-channel"); fOutChannelNames = fConfig->GetValue>("out-channel"); fNumOutputs = fChannels.at(fOutChannelNames.at(0)).size(); diff --git a/fairmq/devices/FairMQMultiplier.h b/fairmq/devices/FairMQMultiplier.h index e87389e62..0e1513516 100644 --- a/fairmq/devices/FairMQMultiplier.h +++ b/fairmq/devices/FairMQMultiplier.h @@ -20,7 +20,7 @@ class FairMQMultiplier : public FairMQDevice virtual ~FairMQMultiplier(); protected: - int fMultipart; + bool fMultipart; int fNumOutputs; std::string fInChannelName; std::vector fOutChannelNames; diff --git a/fairmq/devices/FairMQProxy.cxx b/fairmq/devices/FairMQProxy.cxx index ee802e893..915ac509f 100644 --- a/fairmq/devices/FairMQProxy.cxx +++ b/fairmq/devices/FairMQProxy.cxx @@ -20,7 +20,7 @@ using namespace std; FairMQProxy::FairMQProxy() - : fMultipart(1) + : fMultipart(true) , fInChannelName() , fOutChannelName() { @@ -32,7 +32,7 @@ FairMQProxy::~FairMQProxy() void FairMQProxy::InitTask() { - fMultipart = fConfig->GetValue("multipart"); + fMultipart = fConfig->GetValue("multipart"); fInChannelName = fConfig->GetValue("in-channel"); fOutChannelName = fConfig->GetValue("out-channel"); } diff --git a/fairmq/devices/FairMQProxy.h b/fairmq/devices/FairMQProxy.h index 0f4699a7a..b2f432049 100644 --- a/fairmq/devices/FairMQProxy.h +++ b/fairmq/devices/FairMQProxy.h @@ -26,7 +26,7 @@ class FairMQProxy : public FairMQDevice virtual ~FairMQProxy(); protected: - int fMultipart; + bool fMultipart; std::string fInChannelName; std::string fOutChannelName; diff --git a/fairmq/devices/FairMQSink.h b/fairmq/devices/FairMQSink.h index f35c466a1..e3b9d0aaf 100644 --- a/fairmq/devices/FairMQSink.h +++ b/fairmq/devices/FairMQSink.h @@ -27,7 +27,8 @@ class FairMQSink : public FairMQDevice//, public OutputPolicy { public: FairMQSink() - : fMaxIterations(0) + : fMultipart(false) + , fMaxIterations(0) , fNumIterations(0) , fInChannelName() {} @@ -36,12 +37,14 @@ class FairMQSink : public FairMQDevice//, public OutputPolicy {} protected: + bool fMultipart; uint64_t fMaxIterations; uint64_t fNumIterations; std::string fInChannelName; virtual void InitTask() { + fMultipart = fConfig->GetValue("multipart"); fMaxIterations = fConfig->GetValue("max-iterations"); fInChannelName = fConfig->GetValue("in-channel"); } @@ -56,18 +59,39 @@ class FairMQSink : public FairMQDevice//, public OutputPolicy while (CheckCurrentState(RUNNING)) { - FairMQMessagePtr msg(dataInChannel.NewMessage()); + if (fMultipart) + { + FairMQParts parts; - if (dataInChannel.Receive(msg) >= 0) + if (dataInChannel.Receive(parts) >= 0) + { + if (fMaxIterations > 0) + { + if (fNumIterations >= fMaxIterations) + { + LOG(info) << "Configured maximum number of iterations reached."; + break; + } + } + fNumIterations++; + } + } + else { - if (fMaxIterations > 0) + FairMQMessagePtr msg(dataInChannel.NewMessage()); + + if (dataInChannel.Receive(msg) >= 0) { - if (fNumIterations >= fMaxIterations) + if (fMaxIterations > 0) { - break; + if (fNumIterations >= fMaxIterations) + { + LOG(info) << "Configured maximum number of iterations reached."; + break; + } } + fNumIterations++; } - fNumIterations++; } } diff --git a/fairmq/devices/FairMQSplitter.cxx b/fairmq/devices/FairMQSplitter.cxx index 024352e5c..a8a87b438 100644 --- a/fairmq/devices/FairMQSplitter.cxx +++ b/fairmq/devices/FairMQSplitter.cxx @@ -20,7 +20,7 @@ using namespace std; FairMQSplitter::FairMQSplitter() - : fMultipart(1) + : fMultipart(true) , fNumOutputs(0) , fDirection(0) , fInChannelName() @@ -34,7 +34,7 @@ FairMQSplitter::~FairMQSplitter() void FairMQSplitter::InitTask() { - fMultipart = fConfig->GetValue("multipart"); + fMultipart = fConfig->GetValue("multipart"); fInChannelName = fConfig->GetValue("in-channel"); fOutChannelName = fConfig->GetValue("out-channel"); fNumOutputs = fChannels.at(fOutChannelName).size(); diff --git a/fairmq/devices/FairMQSplitter.h b/fairmq/devices/FairMQSplitter.h index ad59adf47..b51344b33 100644 --- a/fairmq/devices/FairMQSplitter.h +++ b/fairmq/devices/FairMQSplitter.h @@ -26,7 +26,7 @@ class FairMQSplitter : public FairMQDevice virtual ~FairMQSplitter(); protected: - int fMultipart; + bool fMultipart; int fNumOutputs; int fDirection; std::string fInChannelName; diff --git a/fairmq/run/runBenchmarkSampler.cxx b/fairmq/run/runBenchmarkSampler.cxx index 7f5099685..7e822af95 100644 --- a/fairmq/run/runBenchmarkSampler.cxx +++ b/fairmq/run/runBenchmarkSampler.cxx @@ -16,7 +16,9 @@ void addCustomOptions(bpo::options_description& options) options.add_options() ("out-channel", bpo::value()->default_value("data"), "Name of the output channel") ("same-msg", bpo::value()->default_value(false), "Re-send the same message, or recreate for each iteration") - ("msg-size", bpo::value()->default_value(1000000), "Message size in bytes") + ("multipart", bpo::value()->default_value(false), "Handle multipart payloads") + ("num-parts", bpo::value()->default_value(1), "Number of parts to send. 1 will send single messages, not parts") + ("msg-size", bpo::value()->default_value(1000000), "Message size in bytes") ("max-iterations", bpo::value()->default_value(0), "Number of run iterations (0 - infinite)") ("msg-rate", bpo::value()->default_value(0), "Msg rate limit in maximum number of messages per second"); } diff --git a/fairmq/run/runMerger.cxx b/fairmq/run/runMerger.cxx index 6adf6183d..643d9aab0 100644 --- a/fairmq/run/runMerger.cxx +++ b/fairmq/run/runMerger.cxx @@ -16,7 +16,7 @@ void addCustomOptions(bpo::options_description& options) options.add_options() ("in-channel", bpo::value()->default_value("data-in"), "Name of the input channel") ("out-channel", bpo::value()->default_value("data-out"), "Name of the output channel") - ("multipart", bpo::value()->default_value(1), "Handle multipart payloads"); + ("multipart", bpo::value()->default_value(true), "Handle multipart payloads"); } FairMQDevicePtr getDevice(const FairMQProgOptions& /*config*/) diff --git a/fairmq/run/runMultiplier.cxx b/fairmq/run/runMultiplier.cxx index a635bef47..2bd7d5a28 100644 --- a/fairmq/run/runMultiplier.cxx +++ b/fairmq/run/runMultiplier.cxx @@ -16,7 +16,7 @@ void addCustomOptions(bpo::options_description& options) options.add_options() ("in-channel", bpo::value()->default_value("data-in"), "Name of the input channel") ("out-channel", bpo::value>()->multitoken(), "Names of the output channels") - ("multipart", bpo::value()->default_value(1), "Handle multipart payloads"); + ("multipart", bpo::value()->default_value(true), "Handle multipart payloads"); } FairMQDevicePtr getDevice(const FairMQProgOptions& /*config*/) diff --git a/fairmq/run/runProxy.cxx b/fairmq/run/runProxy.cxx index cb2453d08..2ef1c5d7c 100644 --- a/fairmq/run/runProxy.cxx +++ b/fairmq/run/runProxy.cxx @@ -16,7 +16,7 @@ void addCustomOptions(bpo::options_description& options) options.add_options() ("in-channel", bpo::value()->default_value("data-in"), "Name of the input channel") ("out-channel", bpo::value()->default_value("data-out"), "Name of the output channel") - ("multipart", bpo::value()->default_value(1), "Handle multipart payloads"); + ("multipart", bpo::value()->default_value(true), "Handle multipart payloads"); } FairMQDevicePtr getDevice(const FairMQProgOptions& /*config*/) diff --git a/fairmq/run/runSink.cxx b/fairmq/run/runSink.cxx index a30d2751e..eabe0f440 100644 --- a/fairmq/run/runSink.cxx +++ b/fairmq/run/runSink.cxx @@ -15,7 +15,8 @@ void addCustomOptions(bpo::options_description& options) { options.add_options() ("in-channel", bpo::value()->default_value("data"), "Name of the input channel") - ("max-iterations", bpo::value()->default_value(0), "Number of run iterations (0 - infinite)"); + ("max-iterations", bpo::value()->default_value(0), "Number of run iterations (0 - infinite)") + ("multipart", bpo::value()->default_value(false), "Handle multipart payloads"); } FairMQDevicePtr getDevice(const FairMQProgOptions& /*config*/) diff --git a/fairmq/run/runSplitter.cxx b/fairmq/run/runSplitter.cxx index 59af877db..cf0070685 100644 --- a/fairmq/run/runSplitter.cxx +++ b/fairmq/run/runSplitter.cxx @@ -16,7 +16,7 @@ void addCustomOptions(bpo::options_description& options) options.add_options() ("in-channel", bpo::value()->default_value("data-in"), "Name of the input channel") ("out-channel", bpo::value()->default_value("data-out"), "Name of the output channel") - ("multipart", bpo::value()->default_value(1), "Handle multipart payloads"); + ("multipart", bpo::value()->default_value(true), "Handle multipart payloads"); } FairMQDevicePtr getDevice(const FairMQProgOptions& /*config*/) diff --git a/fairmq/run/startMQBenchmark.sh.in b/fairmq/run/startMQBenchmark.sh.in index 86609931c..a3ca853b0 100755 --- a/fairmq/run/startMQBenchmark.sh.in +++ b/fairmq/run/startMQBenchmark.sh.in @@ -5,7 +5,6 @@ export FAIRMQ_PATH=@FAIRMQ_BIN_DIR@ maxIterations="0" msgSize="1000000" transport="zeromq" -sameMsg="true" affinity="false" affinitySamp="" affinitySink="" @@ -24,11 +23,7 @@ if [[ $3 =~ ^[a-z]+$ ]]; then fi if [[ $4 =~ ^[a-z]+$ ]]; then - sameMsg=$4 -fi - -if [[ $5 =~ ^[a-z]+$ ]]; then - affinity=$5 + affinity=$4 fi @@ -45,12 +40,6 @@ fi echo "transport: $transport" -if [ $sameMsg = "true" ]; then - echo "resend same message: yes, using Copy() method to resend the same message" -else - echo "resend same message: no, allocating each message separately" -fi - if [ $affinity = "true" ]; then affinitySamp="taskset -c 0" affinitySink="taskset -c 1" @@ -60,7 +49,7 @@ else fi echo "" -echo "Usage: startBenchmark [message size=1000000] [number of iterations=0] [transport=zeromq/nanomsg/shmem] [resend same message=true] [affinity=false]" +echo "Usage: startBenchmark [message size=1000000] [number of iterations=0] [transport=zeromq/nanomsg/shmem] [affinity=false]" SAMPLER="fairmq-bsampler" SAMPLER+=" --id bsampler1" @@ -69,7 +58,7 @@ SAMPLER+=" --id bsampler1" SAMPLER+=" --transport $transport" SAMPLER+=" --severity debug" SAMPLER+=" --msg-size $msgSize" -SAMPLER+=" --same-msg $sameMsg" +SAMPLER+=" --num-parts 1" # SAMPLER+=" --msg-rate 1000" SAMPLER+=" --max-iterations $maxIterations" SAMPLER+=" --channel-config name=data,type=push,method=bind,address=tcp://127.0.0.1:5555" @@ -84,6 +73,7 @@ SINK+=" --id sink1" #SINK+=" --control static" SINK+=" --transport $transport" SINK+=" --severity debug" +SINK+=" --multipart false" SINK+=" --max-iterations $maxIterations" SINK+=" --channel-config name=data,type=pull,method=connect,address=tcp://127.0.0.1:5555" xterm -geometry 90x50+550+0 -hold -e $affinitySink @CMAKE_CURRENT_BINARY_DIR@/$SINK &