Permalink
Browse files

- add exponential backoff on EOF

- each file is handled by a separate thread
  • Loading branch information...
1 parent ac335e3 commit 47233b9c4c4ee03750a12e3509ed4fc265b873a6 @jordansissel jordansissel committed Aug 1, 2012
Showing with 174 additions and 0 deletions.
  1. +18 −0 Makefile
  2. +26 −0 backoff.c
  3. +22 −0 backoff.h
  4. +79 −0 lumberjack.c
  5. +29 −0 unixsock.c
View
@@ -0,0 +1,18 @@
+CFLAGS+=-Ibuild/include
+#LDFLAGS+=-pthread
+
+default: lumberjack
+unixsock.c: build/include/insist.h
+lumberjack.c: build/include/insist.h backoff.h
+
+lumberjack: lumberjack.o backoff.o
+
+build/include/insist.h: | build/include
+ curl -s -o $@ https://raw.github.com/jordansissel/experiments/master/c/better-assert/insist.h
+
+build:
+ mkdir $@
+
+build/include: | build
+ mkdir $@
+
View
@@ -0,0 +1,26 @@
+#include "backoff.h"
+#include <stdio.h>
+#include <unistd.h>
+
+inline void backoff_init(struct backoff *b, useconds_t min, useconds_t max) {
+ b->max = max;
+ b->min = min;
+ backoff_clear(b);
+} /* backoff_init */
+
+inline void backoff(struct backoff *b) {
+ printf("Sleeping %f seconds\n", b->time / 1000000.0);
+ usleep(b->time);
+
+ /* Exponential backoff */
+ b->time <<= 1;
+
+ /* Cap at 'max' time sleep */
+ if (b->time > b->max) {
+ b->time = b->max;
+ }
+} /* backoff_sleep */
+
+inline void backoff_clear(struct backoff *b) {
+ b->time = b->min; /* 1000 microseconds == 1ms */
+} /* backoff_clear */
View
@@ -0,0 +1,22 @@
+#ifndef _BACKOFF_H_
+#define _BACKOFF_H_
+
+#include <sys/types.h>
+
+struct backoff {
+ useconds_t max;
+ useconds_t min;
+ useconds_t time;
+};
+
+/* Initialize a backoff struct with a max value */
+void backoff_init(struct backoff *b, useconds_t min, useconds_t max);
+
+/* Execute a backoff. This will sleep for a time.
+ * The next backoff() call will sleep twice as long (or the max value,
+ * whichever is smaller) */
+void backoff(struct backoff *b);
+
+/* Reset the next backoff() call to sleep the minimum (1ms) */
+void backoff_clear(struct backoff *b);
+#endif /* _BACKOFF_H_ */
View
@@ -0,0 +1,79 @@
+#include <stdio.h>
+#include <getopt.h>
+#include <insist.h>
+
+#include <pthread.h>
+
+#include <string.h> /* for strerror(3) */
+#include <errno.h> /* for errno */
+#include <fcntl.h> /* for open(2) */
+
+#include "backoff.h"
+
+typedef enum {
+ opt_help = 'h',
+ opt_version = 'v',
+} optlist_t;
+
+static struct option options[] = {
+ { "help", no_argument, NULL, opt_help },
+ { "version", no_argument, NULL, opt_version },
+ { NULL, 0, NULL, 0 }
+};
+
+void *harvest(void *arg) {
+ const char *path = (const char *)arg;
+ int fd;
+ fd = open(path, O_RDONLY);
+ insist(fd >= 0, "open(%s) failed: %s", path, strerror(errno));
+
+ char *buf;
+ ssize_t bytes;
+ buf = calloc(65536, sizeof(char));
+
+ struct backoff sleeper;
+ backoff_init(&sleeper, 10000 /* 10ms */, 15000000 /* 15 seconds */);
+
+ for (;;) {
+ bytes = read(fd, buf, 65536);
+ if (bytes < 0) {
+ /* error */
+ break;
+ } else if (bytes == 0) {
+ backoff(&sleeper);
+ } else {
+ backoff_clear(&sleeper);
+ printf("got: %.*s\n", (int)bytes, buf);
+ }
+ }
+ close(fd);
+
+ return NULL;
+} /* harvest */
+
+
+int main(int argc, char **argv) {
+ int c, i;
+ while (c = getopt_long_only(argc, argv, "+hv", options, &i), c != -1) {
+ /* handle args */
+ }
+
+ argc -= optind;
+ argv += optind;
+
+ insist(argc > 0, "No arguments given. What log files do you want shipped?");
+
+ pthread_t *harvesters = calloc(argc, sizeof(pthread_t));
+
+ /* Start harvesters for each path given */
+ for (int i = 0; i < argc; i++) {
+ pthread_create(&harvesters[i], NULL, harvest, argv[i]);
+ }
+
+ /* Wait for the harvesters to die */
+ for (int i = 0; i < argc; i++) {
+ pthread_join(harvesters[i], NULL);
+ }
+
+ return 0;
+} /* main */
View
@@ -0,0 +1,29 @@
+#include <sys/socket.h> /* for socket(2) */
+#include <sys/un.h> /* for struct sockaddr_un */
+#include <errno.h> /* for errno */
+#include <string.h> /* for strerror(3) */
+#include <insist.h> /* github/jordansissel/insist */
+#include <unistd.h> /* for unlink(2) */
+
+int main() {
+ int r;
+ int sock;
+ sock = socket(PF_LOCAL, SOCK_DGRAM, 0);
+ insist(sock != -1, "socket() failed: %s", strerror(errno));
+
+ struct sockaddr_un addr;
+ addr.sun_family = PF_LOCAL;
+ strcpy(addr.sun_path, "/tmp/fancylog");
+ //unlink(addr.sun_path);
+ r = bind(sock, (struct sockaddr *)&addr,
+ sizeof(addr.sun_family) + strlen(addr.sun_path) + 1);
+ insist(r == 0, "bind(%s) failed: %s", addr.sun_path, strerror(errno));
+
+ char buffer[65536];
+ for (;;) {
+ ssize_t bytes;
+ bytes = recvfrom(sock, buffer, 65536, 0, NULL, NULL);
+ insist(bytes > 0, "recvfrom() returned %d: %s", (int)bytes, strerror(errno));
+ printf("Received: %.*s\n", (int)bytes, buffer);
+ }
+} /* main */

0 comments on commit 47233b9

Please sign in to comment.