From f24dee33c21cd452d7e7e6eebc47fa360d378282 Mon Sep 17 00:00:00 2001 From: Alexey Rybalchenko Date: Thu, 11 Nov 2021 11:16:18 +0100 Subject: [PATCH] Add configurable default snd/rcv timeout --- fairmq/Channel.cxx | 10 ++++ fairmq/Channel.h | 112 ++++++++++++++++++++++------------------ fairmq/Device.h | 102 ++++++++++++++++++------------------ fairmq/JSONParser.cxx | 4 ++ fairmq/Socket.h | 11 ++++ fairmq/SuboptParser.cxx | 4 ++ 6 files changed, 141 insertions(+), 102 deletions(-) diff --git a/fairmq/Channel.cxx b/fairmq/Channel.cxx index 6b3a26a9c..6b47916cb 100644 --- a/fairmq/Channel.cxx +++ b/fairmq/Channel.cxx @@ -39,6 +39,8 @@ constexpr int Channel::DefaultSndBufSize; constexpr int Channel::DefaultRcvBufSize; constexpr int Channel::DefaultSndKernelSize; constexpr int Channel::DefaultRcvKernelSize; +constexpr int Channel::DefaultSndTimeoutMs; +constexpr int Channel::DefaultRcvTimeoutMs; constexpr int Channel::DefaultLinger; constexpr int Channel::DefaultRateLogging; constexpr int Channel::DefaultPortRangeMin; @@ -73,6 +75,8 @@ Channel::Channel(string name, string type, string method, string address, shared , fRcvBufSize(DefaultRcvBufSize) , fSndKernelSize(DefaultSndKernelSize) , fRcvKernelSize(DefaultRcvKernelSize) + , fSndTimeoutMs(DefaultSndTimeoutMs) + , fRcvTimeoutMs(DefaultRcvTimeoutMs) , fLinger(DefaultLinger) , fRateLogging(DefaultRateLogging) , fPortRangeMin(DefaultPortRangeMin) @@ -97,6 +101,8 @@ Channel::Channel(const string& name, int index, const Properties& properties) fRcvBufSize = GetPropertyOrDefault(properties, string(prefix + "rcvBufSize"), DefaultRcvBufSize); fSndKernelSize = GetPropertyOrDefault(properties, string(prefix + "sndKernelSize"), DefaultSndKernelSize); fRcvKernelSize = GetPropertyOrDefault(properties, string(prefix + "rcvKernelSize"), DefaultRcvKernelSize); + fSndTimeoutMs = GetPropertyOrDefault(properties, string(prefix + "sndTimeoutMs"), DefaultSndTimeoutMs); + fRcvTimeoutMs = GetPropertyOrDefault(properties, string(prefix + "rcvTimeoutMs"), DefaultRcvTimeoutMs); fLinger = GetPropertyOrDefault(properties, string(prefix + "linger"), DefaultLinger); fRateLogging = GetPropertyOrDefault(properties, string(prefix + "rateLogging"), DefaultRateLogging); fPortRangeMin = GetPropertyOrDefault(properties, string(prefix + "portRangeMin"), DefaultPortRangeMin); @@ -120,6 +126,8 @@ Channel::Channel(const Channel& chan, string newName) , fRcvBufSize(chan.fRcvBufSize) , fSndKernelSize(chan.fSndKernelSize) , fRcvKernelSize(chan.fRcvKernelSize) + , fSndTimeoutMs(chan.fSndTimeoutMs) + , fRcvTimeoutMs(chan.fRcvTimeoutMs) , fLinger(chan.fLinger) , fRateLogging(chan.fRateLogging) , fPortRangeMin(chan.fPortRangeMin) @@ -146,6 +154,8 @@ Channel& Channel::operator=(const Channel& chan) fRcvBufSize = chan.fRcvBufSize; fSndKernelSize = chan.fSndKernelSize; fRcvKernelSize = chan.fRcvKernelSize; + fSndTimeoutMs = chan.fSndTimeoutMs; + fRcvTimeoutMs = chan.fRcvTimeoutMs; fLinger = chan.fLinger; fRateLogging = chan.fRateLogging; fPortRangeMin = chan.fPortRangeMin; diff --git a/fairmq/Channel.h b/fairmq/Channel.h index 272e7b7a1..35e06c18f 100644 --- a/fairmq/Channel.h +++ b/fairmq/Channel.h @@ -166,6 +166,14 @@ class Channel /// @return Returns socket kernel transmit receive buffer size (in bytes) int GetRcvKernelSize() const { return fRcvKernelSize; } + /// Get socket default send timeout (in ms) + /// @return Returns socket default send timeout (in ms) + int GetSndTimeout() const { return fSndTimeoutMs; } + + /// Get socket default receive timeout (in ms) + /// @return Returns socket default receive timeout (in ms) + int GetRcvTimeout() const { return fRcvTimeoutMs; } + /// Get linger duration (in milliseconds) /// @return Returns linger duration (in milliseconds) int GetLinger() const { return fLinger; } @@ -230,6 +238,14 @@ class Channel /// @param rcvKernelSize Socket receive buffer size (in bytes) void UpdateRcvKernelSize(int rcvKernelSize) { fRcvKernelSize = rcvKernelSize; Invalidate(); } + /// Set socket default send timeout (in ms) + /// @param sndTimeoutMs Socket default send timeout (in ms) + void UpdateSndTimeout(int sndTimeoutMs) { fSndTimeoutMs = sndTimeoutMs; Invalidate(); } + + /// Set socket default receive timeout (in ms) + /// @param rcvTimeoutMs Socket default receive timeout (in ms) + void UpdateRcvTimeout(int rcvTimeoutMs) { fRcvTimeoutMs = rcvTimeoutMs; Invalidate(); } + /// Set linger duration (in milliseconds) /// @param duration linger duration (in milliseconds) void UpdateLinger(int duration) { fLinger = duration; Invalidate(); } @@ -267,62 +283,52 @@ class Channel /// invalidates the channel (requires validation to be used again). void Invalidate() { fValid = false; } - /// Sends a message to the socket queue. - /// @param msg Constant reference of unique_ptr to a Message - /// @param sndTimeoutInMs send timeout in ms. -1 will wait forever (or until interrupt (e.g. via state change)), 0 will not wait (return immediately if cannot send) - /// @return Number of bytes that have been queued, TransferCode::timeout if timed out, TransferCode::error if there was an error, TransferCode::interrupted if interrupted (e.g. by requested state change) - int64_t Send(MessagePtr& msg, int sndTimeoutInMs = -1) - { - CheckSendCompatibility(msg); - return fSocket->Send(msg, sndTimeoutInMs); - } - - /// Receives a message from the socket queue. - /// @param msg Constant reference of unique_ptr to a Message - /// @param rcvTimeoutInMs receive timeout in ms. -1 will wait forever (or until interrupt (e.g. via state change)), 0 will not wait (return immediately if cannot receive) - /// @return Number of bytes that have been received, TransferCode::timeout if timed out, TransferCode::error if there was an error, TransferCode::interrupted if interrupted (e.g. by requested state change) - int64_t Receive(MessagePtr& msg, int rcvTimeoutInMs = -1) - { - CheckReceiveCompatibility(msg); - return fSocket->Receive(msg, rcvTimeoutInMs); - } - - /// Send a vector of messages - /// @param msgVec message vector reference - /// @param sndTimeoutInMs send timeout in ms. -1 will wait forever (or until interrupt (e.g. via state change)), 0 will not wait (return immediately if cannot send) - /// @return Number of bytes that have been queued, TransferCode::timeout if timed out, TransferCode::error if there was an error, TransferCode::interrupted if interrupted (e.g. by requested state change) - int64_t Send(std::vector& msgVec, int sndTimeoutInMs = -1) + /// Send message(s) to the socket queue. + /// @param m reference to MessagePtr/Parts/vector + /// @param sndTimeoutMs send timeout in ms. + /// -1 will wait forever (or until interrupt (e.g. via state change)), + /// 0 will not wait (return immediately if cannot send). + /// If not provided, default timeout will be taken. + /// @return Number of bytes that have been queued, + /// TransferCode::timeout if timed out, + /// TransferCode::error if there was an error, + /// TransferCode::interrupted if interrupted (e.g. by requested state change) + template + std::enable_if_t::value, int64_t> + Send(M& m, Timeout&&... sndTimeoutMs) { - CheckSendCompatibility(msgVec); - return fSocket->Send(msgVec, sndTimeoutInMs); - } + static_assert(sizeof...(sndTimeoutMs) <= 1, "Send called with too many arguments"); - /// Receive a vector of messages - /// @param msgVec message vector reference - /// @param rcvTimeoutInMs receive timeout in ms. -1 will wait forever (or until interrupt (e.g. via state change)), 0 will not wait (return immediately if cannot receive) - /// @return Number of bytes that have been received, TransferCode::timeout if timed out, TransferCode::error if there was an error, TransferCode::interrupted if interrupted (e.g. by requested state change) - int64_t Receive(std::vector& msgVec, int rcvTimeoutInMs = -1) - { - CheckReceiveCompatibility(msgVec); - return fSocket->Receive(msgVec, rcvTimeoutInMs); + CheckSendCompatibility(m); + int t = fSndTimeoutMs; + if constexpr (sizeof...(sndTimeoutMs) == 1) { + t = {sndTimeoutMs...}; + } + return fSocket->Send(m, t); } - /// Send Parts - /// @param parts Parts reference - /// @param sndTimeoutInMs send timeout in ms. -1 will wait forever (or until interrupt (e.g. via state change)), 0 will not wait (return immediately if cannot send) - /// @return Number of bytes that have been queued, TransferCode::timeout if timed out, TransferCode::error if there was an error, TransferCode::interrupted if interrupted (e.g. by requested state change) - int64_t Send(Parts& parts, int sndTimeoutInMs = -1) + /// Receive message(s) from the socket queue. + /// @param m reference to MessagePtr/Parts/vector + /// @param rcvTimeoutMs receive timeout in ms. + /// -1 will wait forever (or until interrupt (e.g. via state change)), + /// 0 will not wait (return immediately if cannot receive). + /// If not provided, default timeout will be taken. + /// @return Number of bytes that have been received, + /// TransferCode::timeout if timed out, + /// TransferCode::error if there was an error, + /// TransferCode::interrupted if interrupted (e.g. by requested state change) + template + std::enable_if_t::value, int64_t> + Receive(M& m, Timeout&&... rcvTimeoutMs) { - return Send(parts.fParts, sndTimeoutInMs); - } + static_assert(sizeof...(rcvTimeoutMs) <= 1, "Receive called with too many arguments"); - /// Receive Parts - /// @param parts Parts reference - /// @param rcvTimeoutInMs receive timeout in ms. -1 will wait forever (or until interrupt (e.g. via state change)), 0 will not wait (return immediately if cannot receive) - /// @return Number of bytes that have been received, TransferCode::timeout if timed out, TransferCode::error if there was an error, TransferCode::interrupted if interrupted (e.g. by requested state change) - int64_t Receive(Parts& parts, int rcvTimeoutInMs = -1) - { - return Receive(parts.fParts, rcvTimeoutInMs); + CheckReceiveCompatibility(m); + int t = fRcvTimeoutMs; + if constexpr (sizeof...(rcvTimeoutMs) == 1) { + t = {rcvTimeoutMs...}; + } + return fSocket->Receive(m, t); } unsigned long GetBytesTx() const { return fSocket->GetBytesTx(); } @@ -366,6 +372,8 @@ class Channel static constexpr int DefaultRcvBufSize = 1000; static constexpr int DefaultSndKernelSize = 0; static constexpr int DefaultRcvKernelSize = 0; + static constexpr int DefaultSndTimeoutMs = -1; + static constexpr int DefaultRcvTimeoutMs = -1; static constexpr int DefaultLinger = 500; static constexpr int DefaultRateLogging = 1; static constexpr int DefaultPortRangeMin = 22000; @@ -385,6 +393,8 @@ class Channel int fRcvBufSize; int fSndKernelSize; int fRcvKernelSize; + int fSndTimeoutMs; + int fRcvTimeoutMs; int fLinger; int fRateLogging; int fPortRangeMin; @@ -414,6 +424,7 @@ class Channel } } + void CheckSendCompatibility(Parts& parts) { CheckSendCompatibility(parts.fParts); } void CheckSendCompatibility(std::vector& msgVec) { for (auto& msg : msgVec) { @@ -443,6 +454,7 @@ class Channel } } + void CheckReceiveCompatibility(Parts& parts) { CheckReceiveCompatibility(parts.fParts); } void CheckReceiveCompatibility(std::vector& msgVec) { for (auto& msg : msgVec) { diff --git a/fairmq/Device.h b/fairmq/Device.h index 6ec0d6c33..933fc0c2e 100644 --- a/fairmq/Device.h +++ b/fairmq/Device.h @@ -81,72 +81,70 @@ class Device Deserializer().Deserialize(msg, std::forward(data), std::forward(args)...); } - /// Shorthand method to send `msg` on `chan` at index `i` - /// @param msg message reference + /// Send `m` on `chan` at index `i` + /// @param m reference to MessagePtr/Parts/vector /// @param chan channel name /// @param i channel index - /// @param sndTimeoutInMs send timeout in ms, -1 will wait forever (or until interrupt (e.g. via - /// state change)), 0 will not wait (return immediately if cannot send) - /// @return Number of bytes that have been queued, TransferCode::timeout if timed out, - /// TransferCode::error if there was an error, TransferCode::interrupted if interrupted (e.g. by - /// requested state change) - int64_t Send(MessagePtr& msg, - const std::string& channel, - const int index = 0, - int sndTimeoutInMs = -1) + /// @return Number of queued bytes, + /// TransferCode::timeout if timed out, + /// TransferCode::error if there was an error, + /// TransferCode::interrupted if interrupted (e.g. by requested state change) + template + std::enable_if_t::value, int64_t> + Send(M& m, const std::string& channel, const int index = 0) { - return GetChannel(channel, index).Send(msg, sndTimeoutInMs); + return GetChannel(channel, index).Send(m); } - /// Shorthand method to receive `msg` on `chan` at index `i` - /// @param msg message reference + /// Receive `m` on `chan` at index `i` + /// @param m reference to MessagePtr/Parts/vector /// @param chan channel name /// @param i channel index - /// @param rcvTimeoutInMs receive timeout in ms, -1 will wait forever (or until interrupt (e.g. - /// via state change)), 0 will not wait (return immediately if cannot receive) - /// @return Number of bytes that have been received, TransferCode::timeout if timed out, - /// TransferCode::error if there was an error, TransferCode::interrupted if interrupted (e.g. by - /// requested state change) - int64_t Receive(MessagePtr& msg, - const std::string& channel, - const int index = 0, - int rcvTimeoutInMs = -1) + /// @return Number of received bytes, + /// TransferCode::timeout if timed out, + /// TransferCode::error if there was an error, + /// TransferCode::interrupted if interrupted (e.g. by requested state change) + template + std::enable_if_t::value, int64_t> + Receive(M& m, const std::string& channel, const int index = 0) { - return GetChannel(channel, index).Receive(msg, rcvTimeoutInMs); + return GetChannel(channel, index).Receive(m); } - /// Shorthand method to send Parts on `chan` at index `i` - /// @param parts parts reference + /// Send `m` on `chan` at index `i` + /// @param m reference to MessagePtr/Parts/vector /// @param chan channel name /// @param i channel index - /// @param sndTimeoutInMs send timeout in ms, -1 will wait forever (or until interrupt (e.g. via - /// state change)), 0 will not wait (return immediately if cannot send) - /// @return Number of bytes that have been queued, TransferCode::timeout if timed out, - /// TransferCode::error if there was an error, TransferCode::interrupted if interrupted (e.g. by - /// requested state change) - int64_t Send(Parts& parts, - const std::string& channel, - const int index = 0, - int sndTimeoutInMs = -1) - { - return GetChannel(channel, index).Send(parts.fParts, sndTimeoutInMs); - } - - /// Shorthand method to receive Parts on `chan` at index `i` - /// @param parts parts reference + /// @param sndTimeoutMs send timeout in ms, + /// -1 will wait forever (or until interrupt (e.g. via state change)), + /// 0 will not wait (return immediately if cannot send) + /// @return Number of queued bytes, + /// TransferCode::timeout if timed out, + /// TransferCode::error if there was an error, + /// TransferCode::interrupted if interrupted (e.g. by requested state change) + template + std::enable_if_t::value, int64_t> + Send(M& m, const std::string& channel, const int index, int sndTimeoutMs) + { + return GetChannel(channel, index).Send(m, sndTimeoutMs); + } + + /// Receive `m` on `chan` at index `i` + /// @param m reference to MessagePtr/Parts/vector /// @param chan channel name /// @param i channel index - /// @param rcvTimeoutInMs receive timeout in ms, -1 will wait forever (or until interrupt (e.g. - /// via state change)), 0 will not wait (return immediately if cannot receive) - /// @return Number of bytes that have been received, TransferCode::timeout if timed out, - /// TransferCode::error if there was an error, TransferCode::interrupted if interrupted (e.g. by - /// requested state change) - int64_t Receive(Parts& parts, - const std::string& channel, - const int index = 0, - int rcvTimeoutInMs = -1) - { - return GetChannel(channel, index).Receive(parts.fParts, rcvTimeoutInMs); + /// @param rcvTimeoutMs receive timeout in ms, + /// -1 will wait forever (or until interrupt (e.g. via state change), + /// 0 will not wait (return immediately if cannot receive) + /// @return Number of received bytes, + /// TransferCode::timeout if timed out, + /// TransferCode::error if there was an error, + /// TransferCode::interrupted if interrupted (e.g. by requested state change) + template + std::enable_if_t::value, int64_t> + Receive(M& m, const std::string& channel, const int index, int rcvTimeoutMs) + { + return GetChannel(channel, index).Receive(m, rcvTimeoutMs); } /// @brief Getter for default transport factory diff --git a/fairmq/JSONParser.cxx b/fairmq/JSONParser.cxx index b7f5a4f1e..2341ad374 100644 --- a/fairmq/JSONParser.cxx +++ b/fairmq/JSONParser.cxx @@ -96,6 +96,8 @@ void ChannelParser(const ptree& tree, fair::mq::Properties& properties) commonProperties.emplace("rcvBufSize", cn.second.get("rcvBufSize", FairMQChannel::DefaultRcvBufSize)); commonProperties.emplace("sndKernelSize", cn.second.get("sndKernelSize", FairMQChannel::DefaultSndKernelSize)); commonProperties.emplace("rcvKernelSize", cn.second.get("rcvKernelSize", FairMQChannel::DefaultRcvKernelSize)); + commonProperties.emplace("sndTimeoutMs", cn.second.get("sndTimeoutMs", FairMQChannel::DefaultSndTimeoutMs)); + commonProperties.emplace("rcvTimeoutMs", cn.second.get("rcvTimeoutMs", FairMQChannel::DefaultRcvTimeoutMs)); commonProperties.emplace("linger", cn.second.get("linger", FairMQChannel::DefaultLinger)); commonProperties.emplace("rateLogging", cn.second.get("rateLogging", FairMQChannel::DefaultRateLogging)); commonProperties.emplace("portRangeMin", cn.second.get("portRangeMin", FairMQChannel::DefaultPortRangeMin)); @@ -146,6 +148,8 @@ void SubChannelParser(const ptree& channelTree, fair::mq::Properties& properties newProperties["rcvBufSize"] = sn.second.get("rcvBufSize", boost::any_cast(commonProperties.at("rcvBufSize"))); newProperties["sndKernelSize"] = sn.second.get("sndKernelSize", boost::any_cast(commonProperties.at("sndKernelSize"))); newProperties["rcvKernelSize"] = sn.second.get("rcvKernelSize", boost::any_cast(commonProperties.at("rcvKernelSize"))); + newProperties["sndTimeoutMs"] = sn.second.get("sndTimeoutMs", boost::any_cast(commonProperties.at("sndTimeoutMs"))); + newProperties["rcvTimeoutMs"] = sn.second.get("rcvTimeoutMs", boost::any_cast(commonProperties.at("rcvTimeoutMs"))); newProperties["linger"] = sn.second.get("linger", boost::any_cast(commonProperties.at("linger"))); newProperties["rateLogging"] = sn.second.get("rateLogging", boost::any_cast(commonProperties.at("rateLogging"))); newProperties["portRangeMin"] = sn.second.get("portRangeMin", boost::any_cast(commonProperties.at("portRangeMin"))); diff --git a/fairmq/Socket.h b/fairmq/Socket.h index 5c529558f..459d7f550 100644 --- a/fairmq/Socket.h +++ b/fairmq/Socket.h @@ -10,9 +10,12 @@ #define FAIR_MQ_SOCKET_H #include +#include + #include #include #include +#include #include namespace fair::mq { @@ -27,6 +30,12 @@ enum class TransferCode : int interrupted = -3 }; +template +struct is_transferrable : std::disjunction, + std::is_same>, + std::is_same> +{}; + struct Socket { Socket() = default; @@ -45,6 +54,8 @@ struct Socket virtual int64_t Receive(MessagePtr& msg, int timeout = -1) = 0; virtual int64_t Send(std::vector>& msgVec, int timeout = -1) = 0; virtual int64_t Receive(std::vector>& msgVec, int timeout = -1) = 0; + virtual int64_t Send(Parts& parts, int timeout = -1) { return Send(parts.fParts, timeout); } + virtual int64_t Receive(Parts& parts, int timeout = -1) { return Receive(parts.fParts, timeout); } [[deprecated("Use Socket::~Socket() instead.")]] virtual void Close() = 0; diff --git a/fairmq/SuboptParser.cxx b/fairmq/SuboptParser.cxx index ff47ee7db..1d48563c0 100644 --- a/fairmq/SuboptParser.cxx +++ b/fairmq/SuboptParser.cxx @@ -38,6 +38,8 @@ enum channelOptionKeyIds RCVBUFSIZE, // size of the receive queue SNDKERNELSIZE, RCVKERNELSIZE, + SNDTIMEOUTMS, + RCVTIMEOUTMS, LINGER, RATELOGGING, // logging rate PORTRANGEMIN, @@ -57,6 +59,8 @@ constexpr static const char* channelOptionKeys[] = { /*[RCVBUFSIZE] = */ "rcvBufSize", /*[SNDKERNELSIZE] = */ "sndKernelSize", /*[RCVKERNELSIZE] = */ "rcvKernelSize", + /*[SNDTIMEOUTMS] = */ "sndTimeoutMs", + /*[RCVTIMEOUTMS] = */ "rcvTimeoutMs", /*[LINGER] = */ "linger", /*[RATELOGGING] = */ "rateLogging", /*[PORTRANGEMIN] = */ "portRangeMin",