Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

Try flushing if zmq has nothing for us. This helps prevent slow emitt…

…ers (one every minute, for example) from holding messages until a larger internal buffer is full enough to flush
  • Loading branch information...
commit 141cdda3edc619cca5bf0aa0b352de1827d1807f 1 parent afda3f7
Jordan Sissel jordansissel authored
Showing with 17 additions and 2 deletions.
  1. +17 −2 emitter.c
19 emitter.c
View
@@ -53,9 +53,24 @@ void *emitter(void *arg) {
rc = zmq_msg_init(&message);
insist(rc == 0, "zmq_msg_init failed");
//printf("waiting for zmq\n");
- rc = zmq_recv(socket, &message, 0);
- insist(rc == 0, "zmq_recv(%s) failed (returned %d): %s",
+ rc = zmq_recv(socket, &message, ZMQ_NOBLOCK);
+ insist(rc == 0 || errno == EAGAIN, "zmq_recv(%s) failed (returned %d): %s",
config->zmq_endpoint, rc, zmq_strerror(errno));
+ if (rc != 0 && errno == EAGAIN) {
+ /* Nothing ready to read, flush and sleep. */
+ //printf("flush+sleep\n");
+
+ /* We flush here to keep slow feeders closer to real-time */
+ rc = lumberjack_flush(lumberjack);
+ if (rc != 0) {
+ /* write failure, reconnect (which will resend) and such */
+ lumberjack_disconnect(lumberjack);
+ lumberjack_ensure_connected(lumberjack);
+ }
+ backoff(&sleeper);
+ continue;
+ }
+ backoff_clear(&sleeper);
/* Write the data over lumberjack. This will handle any
* connection/reconnection/ack issues */
Please sign in to comment.
Something went wrong with that request. Please try again.