diff --git a/examples/region/fairmq-start-ex-region.sh.in b/examples/region/fairmq-start-ex-region.sh.in index 882edea1a..56a3fd662 100755 --- a/examples/region/fairmq-start-ex-region.sh.in +++ b/examples/region/fairmq-start-ex-region.sh.in @@ -14,12 +14,12 @@ SAMPLER+=" --severity debug" SAMPLER+=" --msg-size $msgSize" # SAMPLER+=" --rate 10" SAMPLER+=" --transport shmem" -SAMPLER+=" --channel-config name=data,type=push,method=bind,address=tcp://127.0.0.1:7777" +SAMPLER+=" --channel-config name=data,type=push,method=bind,address=tcp://127.0.0.1:7777,sndKernelSize=212992" xterm -geometry 80x23+0+0 -hold -e @EX_BIN_DIR@/$SAMPLER & SINK="fairmq-ex-region-sink" SINK+=" --id sink1" SINK+=" --severity debug" SINK+=" --transport shmem" -SINK+=" --channel-config name=data,type=pull,method=connect,address=tcp://127.0.0.1:7777" +SINK+=" --channel-config name=data,type=pull,method=connect,address=tcp://127.0.0.1:7777,rcvKernelSize=212992" xterm -geometry 80x23+500+0 -hold -e @EX_BIN_DIR@/$SINK & diff --git a/fairmq/FairMQChannel.cxx b/fairmq/FairMQChannel.cxx index 66cc94c1f..15a696137 100644 --- a/fairmq/FairMQChannel.cxx +++ b/fairmq/FairMQChannel.cxx @@ -33,6 +33,7 @@ FairMQChannel::FairMQChannel() , fRcvBufSize(1000) , fSndKernelSize(0) , fRcvKernelSize(0) + , fLinger(500) , fRateLogging(1) , fName("") , fIsValid(false) @@ -53,6 +54,7 @@ FairMQChannel::FairMQChannel(const string& type, const string& method, const str , fRcvBufSize(1000) , fSndKernelSize(0) , fRcvKernelSize(0) + , fLinger(500) , fRateLogging(1) , fName("") , fIsValid(false) @@ -73,6 +75,7 @@ FairMQChannel::FairMQChannel(const string& name, const string& type, std::shared , fRcvBufSize(1000) , fSndKernelSize(0) , fRcvKernelSize(0) + , fLinger(500) , fRateLogging(1) , fName(name) , fIsValid(false) @@ -93,6 +96,7 @@ FairMQChannel::FairMQChannel(const FairMQChannel& chan) , fRcvBufSize(chan.fRcvBufSize) , fSndKernelSize(chan.fSndKernelSize) , fRcvKernelSize(chan.fRcvKernelSize) + , fLinger(chan.fLinger) , fRateLogging(chan.fRateLogging) , fName(chan.fName) , fIsValid(false) @@ -113,6 +117,7 @@ FairMQChannel& FairMQChannel::operator=(const FairMQChannel& chan) fRcvBufSize = chan.fRcvBufSize; fSndKernelSize = chan.fSndKernelSize; fRcvKernelSize = chan.fRcvKernelSize; + fLinger = chan.fLinger; fRateLogging = chan.fRateLogging; fName = chan.fName; fIsValid = false; @@ -262,6 +267,20 @@ int FairMQChannel::GetRcvKernelSize() const } } +int FairMQChannel::GetLinger() const +{ + try + { + unique_lock lock(fChannelMutex); + return fLinger; + } + catch (exception& e) + { + LOG(error) << "Exception caught in FairMQChannel::GetLinger: " << e.what(); + exit(EXIT_FAILURE); + } +} + int FairMQChannel::GetRateLogging() const { try @@ -404,6 +423,22 @@ void FairMQChannel::UpdateRcvKernelSize(const int rcvKernelSize) } } +void FairMQChannel::UpdateLinger(const int duration) +{ + try + { + unique_lock lock(fChannelMutex); + fIsValid = false; + fLinger = duration; + fModified = true; + } + catch (exception& e) + { + LOG(error) << "Exception caught in FairMQChannel::UpdateLinger: " << e.what(); + exit(EXIT_FAILURE); + } +} + void FairMQChannel::UpdateRateLogging(const int rateLogging) { try diff --git a/fairmq/FairMQChannel.h b/fairmq/FairMQChannel.h index fd1267df6..7f538b793 100644 --- a/fairmq/FairMQChannel.h +++ b/fairmq/FairMQChannel.h @@ -111,6 +111,10 @@ class FairMQChannel /// @return Returns socket kernel transmit receive buffer size (in bytes) int GetRcvKernelSize() const; + /// Get linger duration (in milliseconds) + /// @return Returns linger duration (in milliseconds) + int GetLinger() const; + /// Get socket rate logging interval (in seconds) /// @return Returns socket rate logging interval (in seconds) int GetRateLogging() const; @@ -147,6 +151,10 @@ class FairMQChannel /// @param rcvKernelSize Socket receive buffer size (in bytes) void UpdateRcvKernelSize(const int rcvKernelSize); + /// Set linger duration (in milliseconds) + /// @param duration linger duration (in milliseconds) + void UpdateLinger(const int duration); + /// Set socket rate logging interval (in seconds) /// @param rateLogging Socket rate logging interval (in seconds) void UpdateRateLogging(const int rateLogging); @@ -307,6 +315,7 @@ class FairMQChannel int fRcvBufSize; int fSndKernelSize; int fRcvKernelSize; + int fLinger; int fRateLogging; std::string fName; diff --git a/fairmq/FairMQDevice.cxx b/fairmq/FairMQDevice.cxx index 384eccfd1..eff2f84d6 100644 --- a/fairmq/FairMQDevice.cxx +++ b/fairmq/FairMQDevice.cxx @@ -295,6 +295,9 @@ bool FairMQDevice::AttachChannel(FairMQChannel& ch) } } + // set linger duration (how long socket should wait for outstanding transfers before shutdown) + ch.fSocket->SetOption("linger", &(ch.fLinger), sizeof(ch.fLinger)); + // set high water marks ch.fSocket->SetOption("snd-hwm", &(ch.fSndBufSize), sizeof(ch.fSndBufSize)); ch.fSocket->SetOption("rcv-hwm", &(ch.fRcvBufSize), sizeof(ch.fRcvBufSize)); @@ -365,7 +368,7 @@ bool FairMQDevice::AttachChannel(FairMQChannel& ch) } endpoint += address; - LOG(debug) << "Attached channel " << ch.fName << " to " << endpoint << (bind ? " (bind) " : " (connect) "); + LOG(debug) << "Attached channel " << ch.fName << " to " << endpoint << (bind ? " (bind) " : " (connect) ") << "(" << ch.fType << ")"; // after the book keeping is done, exit in case of errors if (!success) @@ -914,8 +917,7 @@ void FairMQDevice::LogSocketRates() } t0 = t1; - this_thread::sleep_for(chrono::milliseconds(1000)); - // WaitFor(chrono::milliseconds(1000)); TODO: enable this when nanomsg linger is fixed + WaitFor(chrono::milliseconds(1000)); } } } @@ -950,13 +952,11 @@ void FairMQDevice::ResetWrapper() { CallStateChangeCallbacks(RESETTING_DEVICE); - Reset(); - - ChangeState(internal_IDLE); -} + for (auto& t : fTransports) + { + t.second->Reset(); + } -void FairMQDevice::Reset() -{ // iterate over the channels map for (auto& mi : fChannels) { @@ -967,6 +967,14 @@ void FairMQDevice::Reset() vi.fSocket.reset(); // destroy FairMQSocket } } + + Reset(); + + ChangeState(internal_IDLE); +} + +void FairMQDevice::Reset() +{ } const FairMQChannel& FairMQDevice::GetChannel(const string& channelName, const int index) const diff --git a/fairmq/FairMQTransportFactory.h b/fairmq/FairMQTransportFactory.h index 72f9214c2..40a0ee57a 100644 --- a/fairmq/FairMQTransportFactory.h +++ b/fairmq/FairMQTransportFactory.h @@ -73,6 +73,7 @@ class FairMQTransportFactory virtual void Interrupt() = 0; virtual void Resume() = 0; + virtual void Reset() = 0; virtual ~FairMQTransportFactory() {}; diff --git a/fairmq/nanomsg/FairMQSocketNN.cxx b/fairmq/nanomsg/FairMQSocketNN.cxx index 451ff5040..5d0d33488 100644 --- a/fairmq/nanomsg/FairMQSocketNN.cxx +++ b/fairmq/nanomsg/FairMQSocketNN.cxx @@ -39,6 +39,7 @@ FairMQSocketNN::FairMQSocketNN(const string& type, const string& name, const str , fMessagesRx(0) , fSndTimeout(100) , fRcvTimeout(100) + , fLinger(500) { if (type == "router" || type == "dealer") { @@ -456,6 +457,13 @@ void FairMQSocketNN::SetOption(const string& option, const void* value, size_t v return; } + if (option == "linger") + { + int val = *(static_cast(const_cast(value))); + fLinger = val; + return; + } + int rc = nn_setsockopt(fSocket, NN_SOL_SOCKET, GetConstant(option), value, valueSize); if (rc < 0) { diff --git a/fairmq/nanomsg/FairMQSocketNN.h b/fairmq/nanomsg/FairMQSocketNN.h index 06e0bff37..e27662e07 100644 --- a/fairmq/nanomsg/FairMQSocketNN.h +++ b/fairmq/nanomsg/FairMQSocketNN.h @@ -15,8 +15,12 @@ #include "FairMQSocket.h" #include "FairMQMessage.h" +class FairMQTransportFactoryNN; + class FairMQSocketNN : public FairMQSocket { + friend class FairMQTransportFactoryNN; + public: FairMQSocketNN(const std::string& type, const std::string& name, const std::string& id = ""); FairMQSocketNN(const FairMQSocketNN&) = delete; @@ -69,6 +73,7 @@ class FairMQSocketNN : public FairMQSocket int fSndTimeout; int fRcvTimeout; + int fLinger; int SendImpl(FairMQMessagePtr& msg, const int flags, const int timeout); int ReceiveImpl(FairMQMessagePtr& msg, const int flags, const int timeout); diff --git a/fairmq/nanomsg/FairMQTransportFactoryNN.cxx b/fairmq/nanomsg/FairMQTransportFactoryNN.cxx index c840979f9..4bb31ca39 100644 --- a/fairmq/nanomsg/FairMQTransportFactoryNN.cxx +++ b/fairmq/nanomsg/FairMQTransportFactoryNN.cxx @@ -9,6 +9,9 @@ #include "FairMQTransportFactoryNN.h" #include +#include +#include +#include using namespace std; @@ -42,7 +45,9 @@ FairMQMessagePtr FairMQTransportFactoryNN::CreateMessage(FairMQUnmanagedRegionPt FairMQSocketPtr FairMQTransportFactoryNN::CreateSocket(const string& type, const string& name) const { - return unique_ptr(new FairMQSocketNN(type, name, GetId())); + unique_ptr socket(new FairMQSocketNN(type, name, GetId())); + fSockets.push_back(socket.get()); + return socket; } FairMQPollerPtr FairMQTransportFactoryNN::CreatePoller(const vector& channels) const @@ -75,6 +80,17 @@ fair::mq::Transport FairMQTransportFactoryNN::GetType() const return fTransportType; } +void FairMQTransportFactoryNN::Reset() +{ + auto result = max_element(fSockets.begin(), fSockets.end(), [](FairMQSocket* s1, FairMQSocket* s2) { + return static_cast(s1)->fLinger < static_cast(s2)->fLinger; + }); + if (result != fSockets.end()) { + this_thread::sleep_for(chrono::milliseconds(static_cast(*result)->fLinger)); + } + fSockets.clear(); +} + FairMQTransportFactoryNN::~FairMQTransportFactoryNN() { // nn_term(); diff --git a/fairmq/nanomsg/FairMQTransportFactoryNN.h b/fairmq/nanomsg/FairMQTransportFactoryNN.h index 000d1a2f0..09afdf9b7 100644 --- a/fairmq/nanomsg/FairMQTransportFactoryNN.h +++ b/fairmq/nanomsg/FairMQTransportFactoryNN.h @@ -43,9 +43,11 @@ class FairMQTransportFactoryNN : public FairMQTransportFactory void Interrupt() override { FairMQSocketNN::Interrupt(); } void Resume() override { FairMQSocketNN::Resume(); } + void Reset() override; private: static fair::mq::Transport fTransportType; + mutable std::vector fSockets; }; #endif /* FAIRMQTRANSPORTFACTORYNN_H_ */ diff --git a/fairmq/ofi/TransportFactory.h b/fairmq/ofi/TransportFactory.h index 9f1e3ff53..9c43eb88a 100644 --- a/fairmq/ofi/TransportFactory.h +++ b/fairmq/ofi/TransportFactory.h @@ -51,6 +51,7 @@ class TransportFactory : public FairMQTransportFactory void Interrupt() override {} void Resume() override {} + void Reset() override {} private: mutable Context fContext; diff --git a/fairmq/options/FairMQParser.cxx b/fairmq/options/FairMQParser.cxx index abf277d96..0d44aaaed 100644 --- a/fairmq/options/FairMQParser.cxx +++ b/fairmq/options/FairMQParser.cxx @@ -153,6 +153,7 @@ void ChannelParser(const boost::property_tree::ptree& tree, FairMQChannelMap& ch commonChannel.UpdateRcvBufSize(q.second.get("rcvBufSize", commonChannel.GetRcvBufSize())); commonChannel.UpdateSndKernelSize(q.second.get("sndKernelSize", commonChannel.GetSndKernelSize())); commonChannel.UpdateRcvKernelSize(q.second.get("rcvKernelSize", commonChannel.GetRcvKernelSize())); + commonChannel.UpdateLinger(q.second.get("linger", commonChannel.GetLinger())); commonChannel.UpdateRateLogging(q.second.get("rateLogging", commonChannel.GetRateLogging())); // temporary FairMQChannel container @@ -172,6 +173,7 @@ void ChannelParser(const boost::property_tree::ptree& tree, FairMQChannelMap& ch LOG(debug) << "\trcvBufSize = " << commonChannel.GetRcvBufSize(); LOG(debug) << "\tsndKernelSize = " << commonChannel.GetSndKernelSize(); LOG(debug) << "\trcvKernelSize = " << commonChannel.GetRcvKernelSize(); + LOG(debug) << "\tlinger = " << commonChannel.GetLinger(); LOG(debug) << "\trateLogging = " << commonChannel.GetRateLogging(); for (int i = 0; i < numSockets; ++i) @@ -214,6 +216,7 @@ void SocketParser(const boost::property_tree::ptree& tree, vector channel.UpdateRcvBufSize(q.second.get("rcvBufSize", channel.GetRcvBufSize())); channel.UpdateSndKernelSize(q.second.get("sndKernelSize", channel.GetSndKernelSize())); channel.UpdateRcvKernelSize(q.second.get("rcvKernelSize", channel.GetRcvKernelSize())); + channel.UpdateLinger(q.second.get("linger", channel.GetLinger())); channel.UpdateRateLogging(q.second.get("rateLogging", channel.GetRateLogging())); LOG(debug) << "" << channelName << "[" << socketCounter << "]:"; @@ -225,6 +228,7 @@ void SocketParser(const boost::property_tree::ptree& tree, vector LOG(debug) << "\trcvBufSize = " << channel.GetRcvBufSize(); LOG(debug) << "\tsndKernelSize = " << channel.GetSndKernelSize(); LOG(debug) << "\trcvKernelSize = " << channel.GetRcvKernelSize(); + LOG(debug) << "\tlinger = " << channel.GetLinger(); LOG(debug) << "\trateLogging = " << channel.GetRateLogging(); channelList.push_back(channel); @@ -253,6 +257,7 @@ void SocketParser(const boost::property_tree::ptree& tree, vector LOG(debug) << "\trcvBufSize = " << channel.GetRcvBufSize(); LOG(debug) << "\tsndKernelSize = " << channel.GetSndKernelSize(); LOG(debug) << "\trcvKernelSize = " << channel.GetRcvKernelSize(); + LOG(debug) << "\tlinger = " << channel.GetLinger(); LOG(debug) << "\trateLogging = " << channel.GetRateLogging(); channelList.push_back(channel); diff --git a/fairmq/options/FairMQProgOptions.cxx b/fairmq/options/FairMQProgOptions.cxx index 3b1f8e672..908a15414 100644 --- a/fairmq/options/FairMQProgOptions.cxx +++ b/fairmq/options/FairMQProgOptions.cxx @@ -268,6 +268,7 @@ void FairMQProgOptions::UpdateMQValues() string rcvBufSizeKey = "chans." + p.first + "." + to_string(index) + ".rcvBufSize"; string sndKernelSizeKey = "chans." + p.first + "." + to_string(index) + ".sndKernelSize"; string rcvKernelSizeKey = "chans." + p.first + "." + to_string(index) + ".rcvKernelSize"; + string lingerKey = "chans." + p.first + "." + to_string(index) + ".linger"; string rateLoggingKey = "chans." + p.first + "." + to_string(index) + ".rateLogging"; fChannelKeyMap[typeKey] = ChannelKey{p.first, index, "type"}; @@ -278,6 +279,7 @@ void FairMQProgOptions::UpdateMQValues() fChannelKeyMap[rcvBufSizeKey] = ChannelKey{p.first, index, "rcvBufSize"}; fChannelKeyMap[sndKernelSizeKey] = ChannelKey{p.first, index, "sndKernelSize"}; fChannelKeyMap[rcvKernelSizeKey] = ChannelKey{p.first, index, "rcvkernelSize"}; + fChannelKeyMap[lingerKey] = ChannelKey{p.first, index, "linger"}; fChannelKeyMap[rateLoggingKey] = ChannelKey{p.first, index, "rateLogging"}; UpdateVarMap(typeKey, channel.GetType()); @@ -288,6 +290,7 @@ void FairMQProgOptions::UpdateMQValues() UpdateVarMap(rcvBufSizeKey, channel.GetRcvBufSize()); UpdateVarMap(sndKernelSizeKey, channel.GetSndKernelSize()); UpdateVarMap(rcvKernelSizeKey, channel.GetRcvKernelSize()); + UpdateVarMap(lingerKey, channel.GetLinger()); UpdateVarMap(rateLoggingKey, channel.GetRateLogging()); index++; @@ -343,6 +346,12 @@ int FairMQProgOptions::UpdateChannelValue(const string& channelName, int index, return 0; } + if (member == "linger") + { + fFairMQChannelMap.at(channelName).at(index).UpdateLinger(val); + return 0; + } + if (member == "rateLogging") { fFairMQChannelMap.at(channelName).at(index).UpdateRateLogging(val); diff --git a/fairmq/options/FairMQSuboptParser.h b/fairmq/options/FairMQSuboptParser.h index b0d1b7530..7d6377edb 100644 --- a/fairmq/options/FairMQSuboptParser.h +++ b/fairmq/options/FairMQSuboptParser.h @@ -56,6 +56,7 @@ struct SUBOPT RCVBUFSIZE, // size of the receive queue SNDKERNELSIZE, RCVKERNELSIZE, + LINGER, RATELOGGING, // logging rate NUMSOCKETS, lastsocketkey @@ -71,6 +72,7 @@ struct SUBOPT /*[RCVBUFSIZE] = */ "rcvBufSize", /*[SNDKERNELSIZE] = */ "sndKernelSize", /*[RCVKERNELSIZE] = */ "rcvKernelSize", + /*[LINGER] = */ "linger", /*[RATELOGGING] = */ "rateLogging", /*[NUMSOCKETS] = */ "numSockets", nullptr diff --git a/fairmq/shmem/FairMQTransportFactorySHM.h b/fairmq/shmem/FairMQTransportFactorySHM.h index d98eeb7b4..ffc47ed18 100644 --- a/fairmq/shmem/FairMQTransportFactorySHM.h +++ b/fairmq/shmem/FairMQTransportFactorySHM.h @@ -51,6 +51,7 @@ class FairMQTransportFactorySHM : public FairMQTransportFactory void Interrupt() override { FairMQSocketSHM::Interrupt(); } void Resume() override { FairMQSocketSHM::Resume(); } + void Reset() override {} ~FairMQTransportFactorySHM() override; diff --git a/fairmq/zeromq/FairMQSocketZMQ.cxx b/fairmq/zeromq/FairMQSocketZMQ.cxx index 6a0d92093..b589f9d52 100644 --- a/fairmq/zeromq/FairMQSocketZMQ.cxx +++ b/fairmq/zeromq/FairMQSocketZMQ.cxx @@ -164,12 +164,11 @@ int FairMQSocketZMQ::SendImpl(FairMQMessagePtr& msg, const int flags, const int int FairMQSocketZMQ::ReceiveImpl(FairMQMessagePtr& msg, const int flags, const int timeout) { - int nbytes = -1; int elapsed = 0; while (true) { - nbytes = zmq_msg_recv(static_cast(msg.get())->GetMessage(), fSocket, flags); + int nbytes = zmq_msg_recv(static_cast(msg.get())->GetMessage(), fSocket, flags); if (nbytes >= 0) { fBytesRx += nbytes; diff --git a/fairmq/zeromq/FairMQTransportFactoryZMQ.h b/fairmq/zeromq/FairMQTransportFactoryZMQ.h index 276380138..6e156aaec 100644 --- a/fairmq/zeromq/FairMQTransportFactoryZMQ.h +++ b/fairmq/zeromq/FairMQTransportFactoryZMQ.h @@ -52,6 +52,7 @@ class FairMQTransportFactoryZMQ : public FairMQTransportFactory void Interrupt() override { FairMQSocketZMQ::Interrupt(); } void Resume() override { FairMQSocketZMQ::Resume(); } + void Reset() override {} private: static fair::mq::Transport fTransportType;