Permalink
Browse files

- Finish implementing the lumberjack protocol prototype.

- Move clock_gettime mess to its own file
  • Loading branch information...
jordansissel committed Aug 5, 2012
1 parent 251cac3 commit 6aaa8721f9c4beda619dbd7af41830a64532dcc1
Showing with 136 additions and 51 deletions.
  1. +26 −0 clock_gettime.h
  2. +110 −51 emitter.c
View
@@ -0,0 +1,26 @@
+#ifndef _CLOCK_GETTIME_H_
+#define _CLOCK_GETTIME_H_
+#include <time.h> /* struct timespec, clock_gettime */
+
+// copied mostly from https://gist.github.com/1087739
+/* OS X doesn't have clock_gettime, sigh. */
+#ifdef __MACH__
+#include <mach/clock.h>
+#include <mach/mach.h>
+
+typedef int clockid_t;
+#define CLOCK_MONOTONIC 1
+static long clock_gettime(clockid_t __attribute__((unused)) which_clock, struct timespec *tp) {
+ clock_serv_t cclock;
+ mach_timespec_t mts;
+ host_get_clock_service(mach_host_self(), REALTIME_CLOCK, &cclock);
+ clock_get_time(cclock, &mts);
+ mach_port_deallocate(mach_task_self(), cclock);
+ tp->tv_sec = mts.tv_sec;
+ tp->tv_nsec = mts.tv_nsec;
+ return 0; /* success, according to clock_gettime(3) */
+}
+#endif
+// end gist copy
+
+#endif /* _CLOCK_GETTIME_H_ */
View
161 emitter.c
@@ -1,32 +1,17 @@
-#include "emitter.h"
-#include <zmq.h>
-#include "insist.h"
#include <errno.h>
#include <stdint.h> /* C99 for int64_t */
#include <string.h>
-#include <time.h> /* struct timespec, clock_gettime */
#include <unistd.h>
+#include <zmq.h>
+#include "ring.h"
+#include "emitter.h"
+#include "insist.h"
+#include "proto.h"
+#include "backoff.h"
+#include "clock_gettime.h"
-// copied mostly from https://gist.github.com/1087739
-/* OS X doesn't have clock_gettime, sigh. */
-#ifdef __MACH__
-#include <mach/clock.h>
-#include <mach/mach.h>
-
-typedef int clockid_t;
-#define CLOCK_MONOTONIC 1
-long clock_gettime(clockid_t __attribute__((unused)) which_clock, struct timespec *tp) {
- clock_serv_t cclock;
- mach_timespec_t mts;
- host_get_clock_service(mach_host_self(), REALTIME_CLOCK, &cclock);
- clock_get_time(cclock, &mts);
- mach_port_deallocate(mach_task_self(), cclock);
- tp->tv_sec = mts.tv_sec;
- tp->tv_nsec = mts.tv_nsec;
- return 0; /* success, according to clock_gettime(3) */
-}
-#endif
-// end gist copy
+static struct timespec MIN_SLEEP = { 0, 10000000 }; /* 10ms */
+static struct timespec MAX_SLEEP = { 15, 0 }; /* 15 */
void *emitter(void *arg) {
struct emitter_config *config = arg;
@@ -40,37 +25,112 @@ void *emitter(void *arg) {
insist(rc != -1, "zmq_bind(%s) failed: %s", config->zmq_endpoint,
zmq_strerror(errno));
- //srand(time(NULL));
+ /* Seed the RNG so we can pick a random starting sequence number */
+ srand(time(NULL));
+ uint32_t sequence = 0; //rand();
struct timespec start;
clock_gettime(CLOCK_MONOTONIC, &start);
- long count = 0;
+ //long count = 0;
+
+ struct ring *ring = ring_new_size(64); /* power of 2 is probably best */
+
+ struct backoff sleeper;
+ backoff_init(&sleeper, &MIN_SLEEP, &MAX_SLEEP);
+
+ struct lumberjack *lumberjack;
+ lumberjack = lumberjack_new("localhost", 1234);
+ insist(lumberjack != NULL, "lumberjack_new failed");
for (;;) {
- /* TODO(sissel): If buffer is full and this is not a fresh connection,
- * block for acks */
- /* TODO(sissel): If buffer is not empty, write one event. */
- /* TODO(sissel): write frame header (version + frame type 'D') */
- /* TODO(sissel): write sequence number */
- /* TODO(sissel): write event payload */
- /* TODO(sissel): On any write/connect error, block until reconnected.
- * When reconnected, restart this loop to flush buffer. */
-
- /* Receive an event from a harvester and put it in the queue */
- zmq_msg_t message;
- rc = zmq_msg_init(&message);
- insist(rc == 0, "zmq_msg_init failed");
- rc = zmq_recv(socket, &message, 0);
- insist(rc == 0, "zmq_recv(%s) failed (returned %d): %s",
- config->zmq_endpoint, rc, zmq_strerror(errno));
-
- //write(1, zmq_msg_data(&message), zmq_msg_size(&message));
- //write(1, "\n", 1);
-
- /* TODO(sissel): pick sequence number */
- /* TODO(sissel): put the event into the ring buffer */
- zmq_msg_close(&message);
+ if (!lumberjack_connected(lumberjack)) {
+ backoff(&sleeper);
+
+ rc = lumberjack_connect(lumberjack);
+ if (rc != 0) {
+ printf("Connection attempt to %s:%hd failed: %s\n",
+ lumberjack->host, lumberjack->port, strerror(errno));
+ continue;
+ }
+ backoff_clear(&sleeper);
+
+ /* New connection, anything in the ring buffer is assumed to be
+ * un-acknowledged. Send it. */
+ for (int i = 0, count = ring_count(ring); i < count; i++) {
+ struct str *frame;
+ rc = ring_peek(ring, i, (void **)&frame);
+ insist(rc == RING_OK, "ring_peek(%d) failed unexpectedly: %d\n", i, rc);
+ rc = lumberjack_write(lumberjack, frame);
+ if (rc != 0) {
+ /* write failed, break and reconnect */
+ break;
+ }
+ }
+ }
+ if (ring_is_full(ring)) {
+ /* Too many data frames waiting on acknowledgement, read acks until it
+ * would block ? */
+ uint32_t ack;
+
+ rc = lumberjack_read_ack(lumberjack, &ack);
+ if (rc < 0) {
+ /* error */
+ printf("lumberjack_read_ack failed: %s\n", strerror(errno));
+ lumberjack_disconnect(lumberjack);
+ backoff(&sleeper);
+ continue;
+ }
+
+ printf("Got ack for %d\n", ack);
+ /* Verify this is even a sane ack */
+ struct str *frame;
+ uint32_t cur_seq;
+ /* Clear anything in the ring with a sequence less than the one just acked */
+ for (int i = 0, count = ring_count(ring); i < count; i++) {
+ ring_peek(ring, 0, (void **)&frame);
+ /* this is a silly way, but since the ring only stores strings right now */
+ memcpy(&cur_seq, str_data(frame) + 2, sizeof(uint32_t));
+ cur_seq = ntohl(cur_seq);
+
+ if (cur_seq <= ack) {
+ printf("bulk ack: %d\n", cur_seq);
+ ring_pop(ring, NULL); /* don't care to retrieve it */
+ } else {
+
+ }
+ }
+ } else {
+ /* Receive an event from a harvester and put it in the queue */
+ zmq_msg_t message;
+ struct str *frame;
+
+ rc = zmq_msg_init(&message);
+ insist(rc == 0, "zmq_msg_init failed");
+ printf("waiting for zmq\n");
+ rc = zmq_recv(socket, &message, 0);
+ insist(rc == 0, "zmq_recv(%s) failed (returned %d): %s",
+ config->zmq_endpoint, rc, zmq_strerror(errno));
+
+ /* Build a lumberjack 'data' frame payload, put it in the ring buffer */
+ sequence++;
+ frame = lumberjack_encode_data(sequence, zmq_msg_data(&message),
+ zmq_msg_size(&message));
+ rc = ring_push(ring, frame);
+ insist(rc == RING_OK, "ring_push failed (returned %d, expected RING_OK(%d)",
+ rc, RING_OK);
+
+ printf("seq: %d\n", sequence);
+ /* Write a lumberjack frame, this will block until the full write
+ * completes or errors. On error, it will disconnect. */
+ rc = lumberjack_write(lumberjack, frame);
+
+ zmq_msg_close(&message);
+ }
+ } /* forever */
+} /* emitter */
+
+/*
count++;
if (count == 1000000) {
struct timespec now;
@@ -81,5 +141,4 @@ void *emitter(void *arg) {
clock_gettime(CLOCK_MONOTONIC, &start);
count = 0;
}
- }
-} /* emitter */
+*/

0 comments on commit 6aaa872

Please sign in to comment.