Permalink
Browse files

Try flushing if zmq has nothing for us. This helps prevent slow emitt…

…ers (one every minute, for example) from holding messages until a larger internal buffer is full enough to flush
  • Loading branch information...
1 parent afda3f7 commit 141cdda3edc619cca5bf0aa0b352de1827d1807f @jordansissel jordansissel committed Aug 11, 2012
Showing with 17 additions and 2 deletions.
  1. +17 −2 emitter.c
View
19 emitter.c
@@ -53,9 +53,24 @@ void *emitter(void *arg) {
rc = zmq_msg_init(&message);
insist(rc == 0, "zmq_msg_init failed");
//printf("waiting for zmq\n");
- rc = zmq_recv(socket, &message, 0);
- insist(rc == 0, "zmq_recv(%s) failed (returned %d): %s",
+ rc = zmq_recv(socket, &message, ZMQ_NOBLOCK);
+ insist(rc == 0 || errno == EAGAIN, "zmq_recv(%s) failed (returned %d): %s",
config->zmq_endpoint, rc, zmq_strerror(errno));
+ if (rc != 0 && errno == EAGAIN) {
+ /* Nothing ready to read, flush and sleep. */
+ //printf("flush+sleep\n");
+
+ /* We flush here to keep slow feeders closer to real-time */
+ rc = lumberjack_flush(lumberjack);
+ if (rc != 0) {
+ /* write failure, reconnect (which will resend) and such */
+ lumberjack_disconnect(lumberjack);
+ lumberjack_ensure_connected(lumberjack);
+ }
+ backoff(&sleeper);
+ continue;
+ }
+ backoff_clear(&sleeper);
/* Write the data over lumberjack. This will handle any
* connection/reconnection/ack issues */

0 comments on commit 141cdda

Please sign in to comment.