From 76c5a203d88ce24b328f86383eb79dd7df92d37a Mon Sep 17 00:00:00 2001 From: Mohammad Al-Turany Date: Wed, 11 Nov 2015 11:12:47 +0100 Subject: [PATCH] Remove obsolete example "flp2epn-dynamic" all functionality and features are available in flp2epn-distributed --- devices/CMakeLists.txt | 1 - devices/flp2epn-dynamic/CMakeLists.txt | 70 ------- devices/flp2epn-dynamic/O2EPNex.cxx | 105 ---------- devices/flp2epn-dynamic/O2EPNex.h | 45 ---- devices/flp2epn-dynamic/O2FLPex.cxx | 193 ------------------ devices/flp2epn-dynamic/O2FLPex.h | 56 ----- .../flp2epn-dynamic/run/runEPN_dynamic.cxx | 170 --------------- .../flp2epn-dynamic/run/runFLP_dynamic.cxx | 176 ---------------- .../run/startFLP2EPN-dynamic.sh.in | 67 ------ 9 files changed, 883 deletions(-) delete mode 100644 devices/flp2epn-dynamic/CMakeLists.txt delete mode 100644 devices/flp2epn-dynamic/O2EPNex.cxx delete mode 100644 devices/flp2epn-dynamic/O2EPNex.h delete mode 100644 devices/flp2epn-dynamic/O2FLPex.cxx delete mode 100644 devices/flp2epn-dynamic/O2FLPex.h delete mode 100644 devices/flp2epn-dynamic/run/runEPN_dynamic.cxx delete mode 100644 devices/flp2epn-dynamic/run/runFLP_dynamic.cxx delete mode 100755 devices/flp2epn-dynamic/run/startFLP2EPN-dynamic.sh.in diff --git a/devices/CMakeLists.txt b/devices/CMakeLists.txt index 80aa79c20a410..1ca43ce47d1c2 100644 --- a/devices/CMakeLists.txt +++ b/devices/CMakeLists.txt @@ -1,5 +1,4 @@ add_subdirectory (flp2epn) -add_subdirectory (flp2epn-dynamic) add_subdirectory (flp2epn-distributed) if(ALIROOT) add_subdirectory (hough) diff --git a/devices/flp2epn-dynamic/CMakeLists.txt b/devices/flp2epn-dynamic/CMakeLists.txt deleted file mode 100644 index ea0fb17badfe6..0000000000000 --- a/devices/flp2epn-dynamic/CMakeLists.txt +++ /dev/null @@ -1,70 +0,0 @@ -set(INCLUDE_DIRECTORIES - ${CMAKE_SOURCE_DIR}/devices/flp2epn-dynamic -) - -set(SYSTEM_INCLUDE_DIRECTORIES - ${BASE_INCLUDE_DIRECTORIES} - ${Boost_INCLUDE_DIR} - ${FAIRROOT_INCLUDE_DIR} - ${ZMQ_INCLUDE_DIR} -) - -include_directories(${INCLUDE_DIRECTORIES}) -include_directories(SYSTEM ${SYSTEM_INCLUDE_DIRECTORIES}) - -configure_file( ${CMAKE_SOURCE_DIR}/devices/flp2epn-dynamic/run/startFLP2EPN-dynamic.sh.in ${CMAKE_BINARY_DIR}/bin/startFLP2EPN-dynamic.sh ) - -set(LINK_DIRECTORIES - ${Boost_LIBRARY_DIRS} - ${FAIRROOT_LIBRARY_DIR} - ${AlFa_DIR}/lib -) - -link_directories(${LINK_DIRECTORIES}) - -set(SRCS - O2FLPex.cxx - O2EPNex.cxx -) - -if(FAIRMQ_DEPENDENCIES) - set(DEPENDENCIES - ${DEPENDENCIES} - ${CMAKE_THREAD_LIBS_INIT} - ${FAIRMQ_DEPENDENCIES} - FairMQ - ) -else(FAIRMQ_DEPENDENCIES) - set(DEPENDENCIES - ${DEPENDENCIES} - ${CMAKE_THREAD_LIBS_INIT} - boost_date_time boost_thread boost_timer boost_system boost_program_options boost_chrono FairMQ - ) -endif(FAIRMQ_DEPENDENCIES) - -set(LIBRARY_NAME FLP2EPNex_dynamic) - -GENERATE_LIBRARY() - -Set(Exe_Names - ${Exe_Names} - testFLP_dynamic - testEPN_dynamic -) - -set(Exe_Source - run/runFLP_dynamic.cxx - run/runEPN_dynamic.cxx -) - -list(LENGTH Exe_Names _length) -math(EXPR _length ${_length}-1) - -ForEach(_file RANGE 0 ${_length}) - list(GET Exe_Names ${_file} _name) - list(GET Exe_Source ${_file} _src) - set(EXE_NAME ${_name}) - set(SRCS ${_src}) - set(DEPENDENCIES FLP2EPNex_dynamic) - GENERATE_EXECUTABLE() -EndForEach(_file RANGE 0 ${_length}) diff --git a/devices/flp2epn-dynamic/O2EPNex.cxx b/devices/flp2epn-dynamic/O2EPNex.cxx deleted file mode 100644 index b832141f98b84..0000000000000 --- a/devices/flp2epn-dynamic/O2EPNex.cxx +++ /dev/null @@ -1,105 +0,0 @@ -/** - * O2EPNex.cxx - * - * @since 2013-01-09 - * @author D. Klein, A. Rybalchenko, M.Al-Turany, C. Kouzinopoulos - */ - -#include -#include - -#include "O2EPNex.h" -#include "FairMQLogger.h" - -using namespace std; - -O2EPNex::O2EPNex() : - fHeartbeatIntervalInMs(5000) -{ -} - -O2EPNex::~O2EPNex() -{ -} - -void O2EPNex::Run() -{ - boost::posix_time::ptime referenceTime = boost::posix_time::microsec_clock::local_time(); - - // Set the time difference to fHeartbeatIntervalInMs to immediately send a heartbeat to the EPNs - int timeDif = fHeartbeatIntervalInMs; - string ownAddress = fChannels["data-in"].at(0).GetAddress(); - int ownAddressLength = strlen(ownAddress.c_str()); - - while (CheckCurrentState(RUNNING)) { - if (timeDif >= fHeartbeatIntervalInMs) { - referenceTime = boost::posix_time::microsec_clock::local_time(); - - for (int i = 0; i < fChannels["data-out"].size(); ++i) { - FairMQMessage* heartbeatMsg = fTransportFactory->CreateMessage(ownAddressLength); - memcpy(heartbeatMsg->GetData(), ownAddress.c_str(), ownAddressLength); - - fChannels["data-out"].at(i).Send(heartbeatMsg); - - delete heartbeatMsg; - } - } - - // Update the time difference - timeDif = (boost::posix_time::microsec_clock::local_time() - referenceTime).total_milliseconds(); - - // Receive payload - FairMQMessage* payloadMsg = fTransportFactory->CreateMessage(); - - if (fChannels["data-in"].at(0).Receive(payloadMsg, "no-block") > 0) { - int inputSize = payloadMsg->GetSize(); - int numInput = inputSize / sizeof(Content); - Content* input = reinterpret_cast(payloadMsg->GetData()); - - // for (int i = 0; i < numInput; ++i) { - // LOG(INFO) << (&input[i])->x << " " << (&input[i])->y << " " << (&input[i])->z << " " << (&input[i])->a << " " << (&input[i])->b; - // } - } - - delete payloadMsg; - } -} - -void O2EPNex::SetProperty(const int key, const string& value) -{ - switch (key) { - default: - FairMQDevice::SetProperty(key, value); - break; - } -} - -string O2EPNex::GetProperty(const int key, const string& default_/*= ""*/) -{ - switch (key) { - default: - return FairMQDevice::GetProperty(key, default_); - } -} - -void O2EPNex::SetProperty(const int key, const int value) -{ - switch (key) { - case HeartbeatIntervalInMs: - fHeartbeatIntervalInMs = value; - break; - default: - FairMQDevice::SetProperty(key, value); - break; - } -} - -int O2EPNex::GetProperty(const int key, const int default_/*= 0*/) -{ - switch (key) { - case HeartbeatIntervalInMs: - return fHeartbeatIntervalInMs; - default: - return FairMQDevice::GetProperty(key, default_); - } -} diff --git a/devices/flp2epn-dynamic/O2EPNex.h b/devices/flp2epn-dynamic/O2EPNex.h deleted file mode 100644 index 21debdb07840e..0000000000000 --- a/devices/flp2epn-dynamic/O2EPNex.h +++ /dev/null @@ -1,45 +0,0 @@ -/** - * O2EPNex.h - * - * @since 2013-01-09 - * @author D. Klein, A. Rybalchenko, M.Al-Turany - */ - -#ifndef O2EPNEX_H_ -#define O2EPNEX_H_ - -#include - -#include "FairMQDevice.h" - -struct Content { - int id; - double a; - double b; - int x; - int y; - int z; -}; - -class O2EPNex: public FairMQDevice -{ - public: - enum { - HeartbeatIntervalInMs = FairMQDevice::Last, - Last - }; - O2EPNex(); - virtual ~O2EPNex(); - - virtual void SetProperty(const int key, const std::string& value); - virtual std::string GetProperty(const int key, const std::string& default_ = ""); - virtual void SetProperty(const int key, const int value); - virtual int GetProperty(const int key, const int default_ = 0); - - protected: - int fHeartbeatIntervalInMs; - - virtual void Run(); -}; - -#endif diff --git a/devices/flp2epn-dynamic/O2FLPex.cxx b/devices/flp2epn-dynamic/O2FLPex.cxx deleted file mode 100644 index 64c306483add2..0000000000000 --- a/devices/flp2epn-dynamic/O2FLPex.cxx +++ /dev/null @@ -1,193 +0,0 @@ -/** - * O2FLPex.cxx - * - * @since 2013-04-23 - * @author D. Klein, A. Rybalchenko, M.Al-Turany, C. Kouzinopoulos - */ - -#include -#include /* srand, rand */ -#include /* time */ - -#include -#include -#include -#include -#include -#include - -#include "O2FLPex.h" -#include "FairMQLogger.h" - -using namespace std; - -O2FLPex::O2FLPex() : - fEventSize(10000), - fHeartbeatTimeoutInMs(20000) -{ -} - -O2FLPex::~O2FLPex() -{ -} - -void O2FLPex::Init() -{ - FairMQDevice::Init(); - - boost::posix_time::ptime nullTime; - - for (int i = 0; i < fChannels["data-out"].size(); ++i) { - fOutputHeartbeat.push_back(nullTime); - } -} - -bool O2FLPex::updateIPHeartbeat (string str) -{ - for (int i = 0; i < fChannels["data-out"].size(); ++i) { - if ( fChannels["data-out"].at(i).GetAddress() == str ) { - boost::posix_time::ptime currentHeartbeat = boost::posix_time::microsec_clock::local_time(); - boost::posix_time::ptime storedHeartbeat = GetProperty (OutputHeartbeat, storedHeartbeat, i); - - if (to_simple_string(storedHeartbeat) != "not-a-date-time") { - LOG(INFO) << "EPN " << i << " (" << str << ")" << " last seen " - << (currentHeartbeat - storedHeartbeat).total_milliseconds() - << " ms ago. Updating heartbeat..."; - } - else { - LOG(INFO) << "IP has no heartbeat associated. Adding heartbeat: " << currentHeartbeat; - } - - SetProperty (OutputHeartbeat, currentHeartbeat, i); - - return true; - } - } - LOG(ERROR) << "IP " << str << " unknown, not provided at execution time"; - - return false; -} - -void O2FLPex::Run() -{ - srand(time(NULL)); - - stringstream ss(fId); - - int Flp_id; - ss >> Flp_id; - - Content* payload = new Content[fEventSize]; - for (int i = 0; i < fEventSize; ++i) { - (&payload[i])->id = Flp_id; - (&payload[i])->x = rand() % 100 + 1; - (&payload[i])->y = rand() % 100 + 1; - (&payload[i])->z = rand() % 100 + 1; - (&payload[i])->a = (rand() % 100 + 1) / (rand() % 100 + 1); - (&payload[i])->b = (rand() % 100 + 1) / (rand() % 100 + 1); - // LOG(INFO) << (&payload[i])->id << " " << (&payload[i])->x << " " << (&payload[i])->y << " " << (&payload[i])->z << " " << (&payload[i])->a << " " << (&payload[i])->b; - } - - delete[] payload; - - while (CheckCurrentState(RUNNING)) { - // Receive heartbeat - FairMQMessage* heartbeatMsg = fTransportFactory->CreateMessage(); - - size_t heartbeatSize = fChannels["data-in"].at(0).Receive(heartbeatMsg, "no-block"); - - if (heartbeatSize > 0) { - std::string rpl = std::string (static_cast(heartbeatMsg->GetData()), heartbeatMsg->GetSize()); - updateIPHeartbeat (rpl); - } - - delete heartbeatMsg; - - // Send payload - for (int i = 0; i < fChannels["data-out"].size(); ++i) { - boost::posix_time::ptime currentHeartbeat = boost::posix_time::microsec_clock::local_time(); - boost::posix_time::ptime storedHeartbeat = GetProperty (OutputHeartbeat, storedHeartbeat, i); - - if (to_simple_string(storedHeartbeat) == "not-a-date-time" || - (currentHeartbeat - storedHeartbeat).total_milliseconds() > fHeartbeatTimeoutInMs) { - // LOG(INFO) << "EPN " << i << " has not send a heartbeat, or heartbeat too old"; - continue; - } - - // LOG(INFO) << "Pubishing payload to EPN " << i; - FairMQMessage* payloadMsg = fTransportFactory->CreateMessage(fEventSize * sizeof(Content)); - memcpy(payloadMsg->GetData(), payload, fEventSize * sizeof(Content)); - - fChannels["data-out"].at(i).Send(payloadMsg); - - delete payloadMsg; - } - } -} - -void O2FLPex::SetProperty(const int key, const string& value) -{ - switch (key) { - default: - FairMQDevice::SetProperty(key, value); - break; - } -} - -string O2FLPex::GetProperty(const int key, const string& default_/*= ""*/) -{ - switch (key) { - default: - return FairMQDevice::GetProperty(key, default_); - } -} - -void O2FLPex::SetProperty(const int key, const int value) -{ - switch (key) { - case EventSize: - fEventSize = value; - break; - case HeartbeatTimeoutInMs: - fHeartbeatTimeoutInMs = value; - break; - default: - FairMQDevice::SetProperty(key, value); - break; - } -} - -int O2FLPex::GetProperty(const int key, const int default_/*= 0*/) -{ - switch (key) { - case EventSize: - return fEventSize; - case HeartbeatTimeoutInMs: - return fHeartbeatTimeoutInMs; - default: - return FairMQDevice::GetProperty(key, default_); - } -} - -// Method for setting properties represented as a heartbeat. -void O2FLPex::SetProperty(const int key, const boost::posix_time::ptime value, const int slot /*= 0*/) -{ - switch (key) - { - case OutputHeartbeat: - fOutputHeartbeat.erase(fOutputHeartbeat.begin() + slot); - fOutputHeartbeat.insert(fOutputHeartbeat.begin() + slot, value); - break; - } -} - -// Method for getting properties represented as a heartbeat. -boost::posix_time::ptime O2FLPex::GetProperty(const int key, const boost::posix_time::ptime default_, const int slot /*= 0*/) -{ - switch (key) - { - case OutputHeartbeat: - return fOutputHeartbeat.at(slot); - } - assert(false); -} diff --git a/devices/flp2epn-dynamic/O2FLPex.h b/devices/flp2epn-dynamic/O2FLPex.h deleted file mode 100644 index 85a85679c956e..0000000000000 --- a/devices/flp2epn-dynamic/O2FLPex.h +++ /dev/null @@ -1,56 +0,0 @@ -/** - * O2FLPex.h - * - * @since 2014-02-24 - * @author A. Rybalchenko - */ - -#ifndef O2FLPEX_H_ -#define O2FLPEX_H_ - -#include - -#include "FairMQDevice.h" - -struct Content { - int id; - double a; - double b; - int x; - int y; - int z; -}; - -class O2FLPex: public FairMQDevice -{ - public: - enum { - InputFile = FairMQDevice::Last, - EventSize, - OutputHeartbeat, - HeartbeatTimeoutInMs, - Last - }; - O2FLPex(); - virtual ~O2FLPex(); - - virtual void SetProperty(const int key, const std::string& value); - virtual std::string GetProperty(const int key, const std::string& default_ = ""); - virtual void SetProperty(const int key, const int value); - virtual int GetProperty(const int key, const int default_ = 0); - virtual void SetProperty(const int key, const boost::posix_time::ptime value, const int slot = 0); - virtual boost::posix_time::ptime GetProperty(const int key, const boost::posix_time::ptime value, const int slot = 0); - - protected: - int fEventSize; - int fHeartbeatTimeoutInMs; - - virtual void Init(); - virtual void Run(); - - private: - std::vector fOutputHeartbeat; - bool updateIPHeartbeat (std::string str); -}; - -#endif diff --git a/devices/flp2epn-dynamic/run/runEPN_dynamic.cxx b/devices/flp2epn-dynamic/run/runEPN_dynamic.cxx deleted file mode 100644 index aba93315bd169..0000000000000 --- a/devices/flp2epn-dynamic/run/runEPN_dynamic.cxx +++ /dev/null @@ -1,170 +0,0 @@ -/** - * runEPN_dynamic.cxx - * - * @since 2013-01-21 - * @author D. Klein, A. Rybalchenko, M. Al-Turany, C. Kouzinopoulos - */ - -#include - -#include "boost/program_options.hpp" - -#include "FairMQLogger.h" -#include "O2EPNex.h" - -#ifdef NANOMSG - #include "FairMQTransportFactoryNN.h" -#else - #include "FairMQTransportFactoryZMQ.h" -#endif - -using namespace std; - -typedef struct DeviceOptions -{ - string id; - int ioThreads; - int numOutputs; - int heartbeatIntervalInMs; - string inputSocketType; - int inputBufSize; - string inputMethod; - string inputAddress; - vector outputSocketType; - vector outputBufSize; - vector outputMethod; - vector outputAddress; -} DeviceOptions_t; - -inline bool parse_cmd_line(int _argc, char* _argv[], DeviceOptions* _options) -{ - if (_options == NULL) - throw std::runtime_error("Internal error: options' container is empty."); - - namespace bpo = boost::program_options; - bpo::options_description desc("Options"); - desc.add_options() - ("id", bpo::value()->required(), "Device ID") - ("io-threads", bpo::value()->default_value(1), "Number of I/O threads") - ("num-outputs", bpo::value()->required(), "Number of EPN output sockets") - ("heartbeat-interval", bpo::value()->default_value(5000), "Heartbeat interval in milliseconds") - ("input-socket-type", bpo::value()->required(), "Input socket type: sub/pull") - ("input-buff-size", bpo::value()->required(), "Input buffer size in number of messages (ZeroMQ)/bytes(nanomsg)") - ("input-method", bpo::value()->required(), "Input method: bind/connect") - ("input-address", bpo::value()->required(), "Input address, e.g.: \"tcp://localhost:5555\"") - ("output-socket-type", bpo::value< vector >()->required(), "Output socket type: pub/push") - ("output-buff-size", bpo::value< vector >()->required(), "Output buffer size in number of messages (ZeroMQ)/bytes(nanomsg)") - ("output-method", bpo::value< vector >()->required(), "Output method: bind/connect") - ("output-address", bpo::value< vector >()->required(), "Output address, e.g.: \"tcp://localhost:5555\"") - ("help", "Print help messages"); - - bpo::variables_map vm; - bpo::store(bpo::parse_command_line(_argc, _argv, desc), vm); - - if ( vm.count("help") ) - { - LOG(INFO) << "EPN" << endl << desc; - return false; - } - - bpo::notify(vm); - - if ( vm.count("id") ) - _options->id = vm["id"].as(); - - if ( vm.count("io-threads") ) - _options->ioThreads = vm["io-threads"].as(); - - if ( vm.count("num-outputs") ) - _options->numOutputs = vm["num-outputs"].as(); - - if ( vm.count("heartbeat-interval") ) - _options->heartbeatIntervalInMs = vm["heartbeat-interval"].as(); - - if ( vm.count("input-socket-type") ) - _options->inputSocketType = vm["input-socket-type"].as(); - - if ( vm.count("input-buff-size") ) - _options->inputBufSize = vm["input-buff-size"].as(); - - if ( vm.count("input-method") ) - _options->inputMethod = vm["input-method"].as(); - - if ( vm.count("input-address") ) - _options->inputAddress = vm["input-address"].as(); - - if ( vm.count("output-socket-type") ) - _options->outputSocketType = vm["output-socket-type"].as< vector >(); - - if ( vm.count("output-buff-size") ) - _options->outputBufSize = vm["output-buff-size"].as< vector >(); - - if ( vm.count("output-method") ) - _options->outputMethod = vm["output-method"].as< vector >(); - - if ( vm.count("output-address") ) - _options->outputAddress = vm["output-address"].as< vector >(); - - return true; -} - -int main(int argc, char** argv) -{ - O2EPNex epn; - epn.CatchSignals(); - - DeviceOptions_t options; - try - { - if (!parse_cmd_line(argc, argv, &options)) - return 0; - } - catch (exception& e) - { - LOG(ERROR) << e.what(); - return 1; - } - - LOG(INFO) << "PID: " << getpid(); - -#ifdef NANOMSG - FairMQTransportFactory* transportFactory = new FairMQTransportFactoryNN(); -#else - FairMQTransportFactory* transportFactory = new FairMQTransportFactoryZMQ(); -#endif - - epn.SetTransport(transportFactory); - - epn.SetProperty(O2EPNex::Id, options.id); - epn.SetProperty(O2EPNex::NumIoThreads, options.ioThreads); - - epn.SetProperty(O2EPNex::HeartbeatIntervalInMs, options.heartbeatIntervalInMs); - - FairMQChannel inputChannel(options.inputSocketType, options.inputMethod, options.inputAddress); - inputChannel.UpdateSndBufSize(options.inputBufSize); - inputChannel.UpdateRcvBufSize(options.inputBufSize); - inputChannel.UpdateRateLogging(1); - - epn.fChannels["data-in"].push_back(inputChannel); - - for (int i = 0; i < options.outputAddress.size(); ++i) - { - FairMQChannel outputChannel(options.outputSocketType.at(i), options.outputMethod.at(i), options.outputAddress.at(i)); - outputChannel.UpdateSndBufSize(options.outputBufSize.at(i)); - outputChannel.UpdateRcvBufSize(options.outputBufSize.at(i)); - outputChannel.UpdateRateLogging(1); - - epn.fChannels["data-out"].push_back(outputChannel); - } - - epn.ChangeState("INIT_DEVICE"); - epn.WaitForEndOfState("INIT_DEVICE"); - - epn.ChangeState("INIT_TASK"); - epn.WaitForEndOfState("INIT_TASK"); - - epn.ChangeState("RUN"); - epn.InteractiveStateLoop(); - - return 0; -} diff --git a/devices/flp2epn-dynamic/run/runFLP_dynamic.cxx b/devices/flp2epn-dynamic/run/runFLP_dynamic.cxx deleted file mode 100644 index 448302552dc35..0000000000000 --- a/devices/flp2epn-dynamic/run/runFLP_dynamic.cxx +++ /dev/null @@ -1,176 +0,0 @@ -/** - * runFLP_dynamic.cxx - * - * @since 2013-04-23 - * @author D. Klein, A. Rybalchenko, M. Al-Turany, C. Kouzinopoulos - */ - -#include - -#include "boost/program_options.hpp" - -#include "FairMQLogger.h" -#include "O2FLPex.h" - -#ifdef NANOMSG - #include "FairMQTransportFactoryNN.h" -#else - #include "FairMQTransportFactoryZMQ.h" -#endif - -using namespace std; - -typedef struct DeviceOptions -{ - string id; - int eventSize; - int ioThreads; - int numOutputs; - int heartbeatTimeoutInMs; - string inputSocketType; - int inputBufSize; - string inputMethod; - string inputAddress; - vector outputSocketType; - vector outputBufSize; - vector outputMethod; - vector outputAddress; -} DeviceOptions_t; - -inline bool parse_cmd_line(int _argc, char* _argv[], DeviceOptions* _options) -{ - if (_options == NULL) - throw std::runtime_error("Internal error: options' container is empty."); - - namespace bpo = boost::program_options; - bpo::options_description desc("Options"); - desc.add_options() - ("id", bpo::value()->required(), "Device ID") - ("event-size", bpo::value()->default_value(1000), "Event size in bytes") - ("io-threads", bpo::value()->default_value(1), "Number of I/O threads") - ("num-outputs", bpo::value()->required(), "Number of FLP output sockets") - ("heartbeat-timeout", bpo::value()->default_value(20000), "Heartbeat timeout in milliseconds") - ("input-socket-type", bpo::value()->required(), "Input socket type: sub/pull") - ("input-buff-size", bpo::value()->required(), "Input buffer size in number of messages (ZeroMQ)/bytes(nanomsg)") - ("input-method", bpo::value()->required(), "Input method: bind/connect") - ("input-address", bpo::value()->required(), "Input address, e.g.: \"tcp://localhost:5555\"") - ("output-socket-type", bpo::value< vector >()->required(), "Output socket type: pub/push") - ("output-buff-size", bpo::value< vector >()->required(), "Output buffer size in number of messages (ZeroMQ)/bytes(nanomsg)") - ("output-method", bpo::value< vector >()->required(), "Output method: bind/connect") - ("output-address", bpo::value< vector >()->required(), "Output address, e.g.: \"tcp://localhost:5555\"") - ("help", "Print help messages"); - - bpo::variables_map vm; - bpo::store(bpo::parse_command_line(_argc, _argv, desc), vm); - - if ( vm.count("help") ) - { - LOG(INFO) << "FLP" << endl << desc; - return false; - } - - bpo::notify(vm); - - if ( vm.count("id") ) - _options->id = vm["id"].as(); - - if ( vm.count("event-size") ) - _options->eventSize = vm["event-size"].as(); - - if ( vm.count("io-threads") ) - _options->ioThreads = vm["io-threads"].as(); - - if ( vm.count("num-outputs") ) - _options->numOutputs = vm["num-outputs"].as(); - - if ( vm.count("heartbeat-timeout") ) - _options->heartbeatTimeoutInMs = vm["heartbeat-timeout"].as(); - - if ( vm.count("input-socket-type") ) - _options->inputSocketType = vm["input-socket-type"].as(); - - if ( vm.count("input-buff-size") ) - _options->inputBufSize = vm["input-buff-size"].as(); - - if ( vm.count("input-method") ) - _options->inputMethod = vm["input-method"].as(); - - if ( vm.count("input-address") ) - _options->inputAddress = vm["input-address"].as(); - - if ( vm.count("output-socket-type") ) - _options->outputSocketType = vm["output-socket-type"].as< vector >(); - - if ( vm.count("output-buff-size") ) - _options->outputBufSize = vm["output-buff-size"].as< vector >(); - - if ( vm.count("output-method") ) - _options->outputMethod = vm["output-method"].as< vector >(); - - if ( vm.count("output-address") ) - _options->outputAddress = vm["output-address"].as< vector >(); - - return true; -} - -int main(int argc, char** argv) -{ - O2FLPex flp; - flp.CatchSignals(); - - DeviceOptions_t options; - try - { - if (!parse_cmd_line(argc, argv, &options)) - return 0; - } - catch (exception& e) - { - LOG(ERROR) << e.what(); - return 1; - } - - LOG(INFO) << "PID: " << getpid(); - -#ifdef NANOMSG - FairMQTransportFactory* transportFactory = new FairMQTransportFactoryNN(); -#else - FairMQTransportFactory* transportFactory = new FairMQTransportFactoryZMQ(); -#endif - - flp.SetTransport(transportFactory); - - flp.SetProperty(O2FLPex::Id, options.id); - flp.SetProperty(O2FLPex::NumIoThreads, options.ioThreads); - flp.SetProperty(O2FLPex::EventSize, options.eventSize); - - flp.SetProperty(O2FLPex::HeartbeatTimeoutInMs, options.heartbeatTimeoutInMs); - - FairMQChannel inputChannel(options.inputSocketType, options.inputMethod, options.inputAddress); - inputChannel.UpdateSndBufSize(options.inputBufSize); - inputChannel.UpdateRcvBufSize(options.inputBufSize); - inputChannel.UpdateRateLogging(1); - - flp.fChannels["data-in"].push_back(inputChannel); - - for (int i = 0; i < options.outputAddress.size(); ++i) - { - FairMQChannel outputChannel(options.outputSocketType.at(i), options.outputMethod.at(i), options.outputAddress.at(i)); - outputChannel.UpdateSndBufSize(options.outputBufSize.at(i)); - outputChannel.UpdateRcvBufSize(options.outputBufSize.at(i)); - outputChannel.UpdateRateLogging(1); - - flp.fChannels["data-out"].push_back(outputChannel); - } - - flp.ChangeState("INIT_DEVICE"); - flp.WaitForEndOfState("INIT_DEVICE"); - - flp.ChangeState("INIT_TASK"); - flp.WaitForEndOfState("INIT_TASK"); - - flp.ChangeState("RUN"); - flp.InteractiveStateLoop(); - - return 0; -} diff --git a/devices/flp2epn-dynamic/run/startFLP2EPN-dynamic.sh.in b/devices/flp2epn-dynamic/run/startFLP2EPN-dynamic.sh.in deleted file mode 100755 index 67a339f89862c..0000000000000 --- a/devices/flp2epn-dynamic/run/startFLP2EPN-dynamic.sh.in +++ /dev/null @@ -1,67 +0,0 @@ -#!/bin/bash - -buffSize="1000" # zeromq high-water mark is in messages -#buffSize="50000000" # nanomsg buffer size is in bytes - -FLP0="testFLP_dynamic" -FLP0+=" --id 0" -FLP0+=" --event-size 1000" -FLP0+=" --num-outputs 3" -FLP0+=" --heartbeat-timeout 20000" -FLP0+=" --input-socket-type sub --input-buff-size $buffSize --input-method bind --input-address tcp://127.0.0.1:5580" -FLP0+=" --output-socket-type pub --output-buff-size $buffSize --output-method connect --output-address tcp://127.0.0.1:5560" -FLP0+=" --output-socket-type pub --output-buff-size $buffSize --output-method connect --output-address tcp://127.0.0.1:5561" -FLP0+=" --output-socket-type pub --output-buff-size $buffSize --output-method connect --output-address tcp://127.0.0.1:5562" -xterm -e @CMAKE_BINARY_DIR@/bin/$FLP0 & - -FLP1="testFLP_dynamic" -FLP1+=" --id 1" -FLP1+=" --event-size 1000" -FLP1+=" --num-outputs 3" -FLP1+=" --heartbeat-timeout 20000" -FLP1+=" --input-socket-type sub --input-buff-size $buffSize --input-method bind --input-address tcp://127.0.0.1:5581" -FLP1+=" --output-socket-type pub --output-buff-size $buffSize --output-method connect --output-address tcp://127.0.0.1:5560" -FLP1+=" --output-socket-type pub --output-buff-size $buffSize --output-method connect --output-address tcp://127.0.0.1:5561" -FLP1+=" --output-socket-type pub --output-buff-size $buffSize --output-method connect --output-address tcp://127.0.0.1:5562" -xterm -e @CMAKE_BINARY_DIR@/bin/$FLP1 & - -FLP2="testFLP_dynamic" -FLP2+=" --id 2" -FLP2+=" --event-size 1000" -FLP2+=" --num-outputs 3" -FLP2+=" --heartbeat-timeout 20000" -FLP2+=" --input-socket-type sub --input-buff-size $buffSize --input-method bind --input-address tcp://127.0.0.1:5582" -FLP2+=" --output-socket-type pub --output-buff-size $buffSize --output-method connect --output-address tcp://127.0.0.1:5560" -FLP2+=" --output-socket-type pub --output-buff-size $buffSize --output-method connect --output-address tcp://127.0.0.1:5561" -FLP2+=" --output-socket-type pub --output-buff-size $buffSize --output-method connect --output-address tcp://127.0.0.1:5562" -xterm -e @CMAKE_BINARY_DIR@/bin/$FLP2 & - -EPN0="testEPN_dynamic" -EPN0+=" --id EPN0" -EPN0+=" --num-outputs 3" -EPN0+=" --heartbeat-interval 5000" -EPN0+=" --input-socket-type sub --input-buff-size $buffSize --input-method bind --input-address tcp://127.0.0.1:5560" -EPN0+=" --output-socket-type pub --output-buff-size $buffSize --output-method connect --output-address tcp://127.0.0.1:5580" -EPN0+=" --output-socket-type pub --output-buff-size $buffSize --output-method connect --output-address tcp://127.0.0.1:5581" -EPN0+=" --output-socket-type pub --output-buff-size $buffSize --output-method connect --output-address tcp://127.0.0.1:5582" -xterm -e @CMAKE_BINARY_DIR@/bin/$EPN0 & - -EPN1="testEPN_dynamic" -EPN1+=" --id EPN1" -EPN1+=" --num-outputs 3" -EPN1+=" --heartbeat-interval 5000" -EPN1+=" --input-socket-type sub --input-buff-size $buffSize --input-method bind --input-address tcp://127.0.0.1:5561" -EPN1+=" --output-socket-type pub --output-buff-size $buffSize --output-method connect --output-address tcp://127.0.0.1:5580" -EPN1+=" --output-socket-type pub --output-buff-size $buffSize --output-method connect --output-address tcp://127.0.0.1:5581" -EPN1+=" --output-socket-type pub --output-buff-size $buffSize --output-method connect --output-address tcp://127.0.0.1:5582" -xterm -e @CMAKE_BINARY_DIR@/bin/$EPN1 & - -EPN2="testEPN_dynamic" -EPN2+=" --id EPN2" -EPN2+=" --num-outputs 3" -EPN2+=" --heartbeat-interval 5000" -EPN2+=" --input-socket-type sub --input-buff-size $buffSize --input-method bind --input-address tcp://127.0.0.1:5562" -EPN2+=" --output-socket-type pub --output-buff-size $buffSize --output-method connect --output-address tcp://127.0.0.1:5580" -EPN2+=" --output-socket-type pub --output-buff-size $buffSize --output-method connect --output-address tcp://127.0.0.1:5581" -EPN2+=" --output-socket-type pub --output-buff-size $buffSize --output-method connect --output-address tcp://127.0.0.1:5582" -xterm -e @CMAKE_BINARY_DIR@/bin/$EPN2 &