Skip to content

Dropped first frame on failed send #659

Open
@diehard2

Description

@diehard2

I'm experiencing an unusual issue. Under high load, I'm seeing the first frame of a message that isn't the routing id sometimes vanish. I'm doing a blocking synchronous client using zmq_immediate in conjunction with zmq_sndtimeo. So, if the message can't be sent it returns false after a few tries. Something like this

bool cppzmq_client(zmq::context_t* context, bool dowhile)
{

  const int recvtimeout = 4000;
  static std::atomic<int> id = 0;

  zmq::socket_t m_socket = {*context, ZMQ_DEALER};
  m_socket.set(zmq::sockopt::sndhwm, 0);
  m_socket.set(zmq::sockopt::routing_id, std::to_string(id++));
  m_socket.set(zmq::sockopt::immediate, 1);
  m_socket.set(zmq::sockopt::rcvtimeo, recvtimeout);

  zmq::multipart_t message;
  message.pushstr({});
  message.push_back(zmq::message_t{std::string{"stuff"}});
  message.push_back(zmq::message_t{std::string{"more stuff"}});
  message.push_back(zmq::message_t{std::string{"even more stuff"}});

  bool succeeded = false;
  const int sendtimeout = 1000;
  m_socket.set(zmq::sockopt::sndtimeo, sendtimeout);
  m_socket.connect("ipc://@/zmq-client");
  int retry_count = 0;
  do {
    try {
       succeeded = message.send(m_socket);
      } catch (const std::exception& e) {
        if (zmq_errno() != EINTR && zmq_errno() != EAGAIN) {
          throw;
        }
      }
      retry_count++;
      std::cout << "Retry count: " << retry_count << std::endl;
    } while (!succeeded && retry_count < 20);
    
  if(!succeeded) {
    std::cerr << "Failed to send message to broker: " << zmq_strerror(zmq_errno()) << std::endl;
    return false;
  }

  zmq::multipart_t result;
  bool read_succeeded = result.recv(m_socket);
  return read_succeeded && result.peekstr(0) != "MISSING_FRAME";
}

In the send() function, its popping off the first part of the message, but it never restores the first message to the mulitpart_t if the read fails, which leads to retries failing

 bool send(socket_ref socket, int flags = 0)
    {
        flags &= ~(ZMQ_SNDMORE);
        bool more = size() > 0;
        while (more) {
            message_t message = pop();
            more = size() > 0;
#ifdef ZMQ_CPP11
            if (!socket.send(message, static_cast<send_flags>(
                                        (more ? ZMQ_SNDMORE : 0) | flags)))
                return false;
#else
            if (!socket.send(message, (more ? ZMQ_SNDMORE : 0) | flags))
                return false;
#endif
        }
        clear();
        return true;
    }

Would it be possible to restore the popped message so a retry can occur?

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions