Open
Description
I'm experiencing an unusual issue. Under high load, I'm seeing the first frame of a message that isn't the routing id sometimes vanish. I'm doing a blocking synchronous client using zmq_immediate in conjunction with zmq_sndtimeo. So, if the message can't be sent it returns false after a few tries. Something like this
bool cppzmq_client(zmq::context_t* context, bool dowhile)
{
const int recvtimeout = 4000;
static std::atomic<int> id = 0;
zmq::socket_t m_socket = {*context, ZMQ_DEALER};
m_socket.set(zmq::sockopt::sndhwm, 0);
m_socket.set(zmq::sockopt::routing_id, std::to_string(id++));
m_socket.set(zmq::sockopt::immediate, 1);
m_socket.set(zmq::sockopt::rcvtimeo, recvtimeout);
zmq::multipart_t message;
message.pushstr({});
message.push_back(zmq::message_t{std::string{"stuff"}});
message.push_back(zmq::message_t{std::string{"more stuff"}});
message.push_back(zmq::message_t{std::string{"even more stuff"}});
bool succeeded = false;
const int sendtimeout = 1000;
m_socket.set(zmq::sockopt::sndtimeo, sendtimeout);
m_socket.connect("ipc://@/zmq-client");
int retry_count = 0;
do {
try {
succeeded = message.send(m_socket);
} catch (const std::exception& e) {
if (zmq_errno() != EINTR && zmq_errno() != EAGAIN) {
throw;
}
}
retry_count++;
std::cout << "Retry count: " << retry_count << std::endl;
} while (!succeeded && retry_count < 20);
if(!succeeded) {
std::cerr << "Failed to send message to broker: " << zmq_strerror(zmq_errno()) << std::endl;
return false;
}
zmq::multipart_t result;
bool read_succeeded = result.recv(m_socket);
return read_succeeded && result.peekstr(0) != "MISSING_FRAME";
}
In the send() function, its popping off the first part of the message, but it never restores the first message to the mulitpart_t if the read fails, which leads to retries failing
bool send(socket_ref socket, int flags = 0)
{
flags &= ~(ZMQ_SNDMORE);
bool more = size() > 0;
while (more) {
message_t message = pop();
more = size() > 0;
#ifdef ZMQ_CPP11
if (!socket.send(message, static_cast<send_flags>(
(more ? ZMQ_SNDMORE : 0) | flags)))
return false;
#else
if (!socket.send(message, (more ? ZMQ_SNDMORE : 0) | flags))
return false;
#endif
}
clear();
return true;
}
Would it be possible to restore the popped message so a retry can occur?
Metadata
Metadata
Assignees
Labels
No labels