Permalink
Browse files

Faster more scalable prefix matching.

  • Loading branch information...
1 parent 20b0d1c commit 40933191cb519f8d0aa5e67af5f0c5c8a4d215b9 @postwait postwait committed Apr 20, 2012
Showing with 301 additions and 86 deletions.
  1. +3 −0 .gitignore
  2. +11 −6 Makefile
  3. +4 −0 fq.h
  4. +13 −12 fq_client.c
  5. +3 −3 fq_msg.c
  6. +4 −0 fq_utils.c
  7. +11 −2 fqc.c
  8. +2 −0 fqd.c
  9. +5 −5 fqd_ccs.c
  10. +2 −1 fqd_config.c
  11. +3 −3 fqd_listener.c
  12. +3 −3 fqd_queue.c
  13. +237 −51 fqd_routemgr.c
View
@@ -6,3 +6,6 @@ Makefile.build
Makefile.depend
*.dSYM/
.*.swp
+ck-*/Makefile
+ck-*/build/ck.spec
+ck-*/doc/Makefile
View
@@ -1,8 +1,8 @@
CC=gcc
CKDIR=ck-0.2
-EXTRA_CFLAGS=-g -D_REENTRANT
-#EXTRA_CFLAGS+=-DDEBUG
+EXTRA_CFLAGS=-g -D_REENTRANT -D_DARWIN_C_SOURCE
+EXTRA_CFLAGS+=-DDEBUG
CLIENT_OBJ=fq_client.o fq_msg.o fq_utils.o
FQD_OBJ=fqd.o fqd_listener.o fqd_ccs.o fqd_dss.o fqd_config.o \
@@ -27,19 +27,24 @@ include Makefile.depend
CFLAGS+=$(EXTRA_CFLAGS)
fqd: $(FQD_OBJ)
- $(CC) $(CFLAGS) $(LDFLAGS) -o $@ $(FQD_OBJ)
+ @echo " - linking $@"
+ @$(CC) $(CFLAGS) $(LDFLAGS) -o $@ $(FQD_OBJ)
fqc: $(FQC_OBJ)
- $(CC) $(CFLAGS) $(LDFLAGS) -o $@ $(FQC_OBJ)
+ @echo " - linking $@"
+ @$(CC) $(CFLAGS) $(LDFLAGS) -o $@ $(FQC_OBJ)
libfq.a: $(CLIENT_OBJ)
- ar cr $@ $(CLIENT_OBJ)
+ @echo " - creating $@"
+ @ar cr $@ $(CLIENT_OBJ)
.c.o: $<
+ @echo " - compiling $<"
$(CC) $(CPPFLAGS) $(CFLAGS) -o $@ -c $<
Makefile.depend:
- $(CC) $(CPPFLAGS) $(CFLAGS) -MM *.c > Makefile.depend
+ @echo " - make depend"
+ @$(CC) $(CPPFLAGS) $(CFLAGS) -MM *.c > Makefile.depend
clean:
rm -f *.o *.a fqc fqd
View
@@ -37,6 +37,8 @@ fq_rk_cmp(const fq_rk * const a, const fq_rk * const b) {
return memcmp(a->name, b->name, a->len);
}
+#define FQ_BIND_ILLEGAL (uint32_t)0xffffffff
+
typedef struct {
fq_rk exchange;
int peermode;
@@ -188,6 +190,8 @@ typedef enum {
extern uint32_t fq_debug_bits;
+void fq_debug_set_bits(uint32_t bits);
+
extern int
fq_debug_fl(const char *file, int line, fq_debug_bits_t, const char *fmt, ...)
__printflike(4, 5);
View
@@ -115,13 +115,13 @@ static void
fq_client_disconnect_internal(fq_conn_s *conn_s) {
if(conn_s->cmd_fd >= 0) {
#ifdef DEBUG
- fq_debug("close(cmd_fd)\n");
+ fq_debug(FQ_DEBUG_CONN, "close(cmd_fd)\n");
#endif
close(conn_s->data_fd);
}
if(conn_s->data_fd >= 0) {
#ifdef DEBUG
- fq_debug("close(data_fd)\n");
+ fq_debug(FQ_DEBUG_CONN, "close(data_fd)\n");
#endif
close(conn_s->data_fd);
}
@@ -167,7 +167,7 @@ fq_client_do_auth(fq_conn_s *conn_s) {
{
char hex[260];
if(fq_rk_to_hex(hex, sizeof(hex), &conn_s->key) >= 0)
- fq_debug("client keyed:\n%s\n", hex);
+ fq_debug(FQ_DEBUG_CONN, "client keyed:\n%s\n", hex);
}
#endif
conn_s->data_ready = 1;
@@ -203,7 +203,7 @@ fq_client_data_connect_internal(fq_conn_s *conn_s) {
{
char hex[260];
if(fq_rk_to_hex(hex, sizeof(hex), &conn_s->key) >= 0)
- fq_debug("client keying:\n%s\n", hex);
+ fq_debug(FQ_DEBUG_CONN, "client keying:\n%s\n", hex);
}
#endif
if(fq_write_short_cmd(conn_s->data_fd,
@@ -236,7 +236,7 @@ fq_client_connect_internal(fq_conn_s *conn_s) {
goto shutdown;
if((rv = fq_client_do_auth(conn_s)) < 0) {
#ifdef DEBUG
- fq_debug("fq_client_do_auth -> %d\n", rv);
+ fq_debug(FQ_DEBUG_CONN, "fq_client_do_auth -> %d\n", rv);
#endif
goto shutdown;
}
@@ -273,7 +273,7 @@ fq_data_worker_loop(fq_conn_s *conn_s) {
free(garbage);
the_thick_of_it:
#ifdef DEBUG
- fq_debug("dequeue message to submit to server\n");
+ fq_debug(FQ_DEBUG_MSG, "dequeue message to submit to server\n");
#endif
write_rv = fq_client_write_msg(conn_s->data_fd, conn_s->peermode,
conn_s->tosend, conn_s->tosend_offset);
@@ -317,7 +317,7 @@ fq_data_worker_loop(fq_conn_s *conn_s) {
finish:
if(ctx) fq_buffered_msg_reader_free(ctx);
#ifdef DEBUG
- fq_debug("cmd_fd -> %d, stop -> %d\n", conn_s->cmd_fd, conn_s->stop);
+ fq_debug(FQ_DEBUG_CONN, "cmd_fd -> %d, stop -> %d\n", conn_s->cmd_fd, conn_s->stop);
#endif
}
static void *
@@ -332,7 +332,7 @@ fq_data_worker(void *u) {
fq_data_worker_loop(conn_s);
#ifdef DEBUG
- fq_debug("[data] connection failed: %s\n", conn_s->error);
+ fq_debug(FQ_DEBUG_IO, "[data] connection failed: %s\n", conn_s->error);
#endif
}
if(backoff < 1000000) backoff += 10000;
@@ -368,7 +368,7 @@ fq_conn_worker(void *u) {
while(ck_fifo_mpmc_dequeue(&conn_s->cmdq, &entry, &garbage) == true) {
free(garbage);
#ifdef DEBUG
- fq_debug("client acting on user req 0x%04x\n", entry->cmd);
+ fq_debug(FQ_DEBUG_CONN, "client acting on user req 0x%04x\n", entry->cmd);
#endif
switch(entry->cmd) {
case FQ_PROTO_HBREQ:
@@ -437,7 +437,7 @@ fq_conn_worker(void *u) {
if(conn_s->cmd_hb_needed) {
#ifdef DEBUG
- fq_debug("-> heartbeat\n");
+ fq_debug(FQ_DEBUG_CONN, "-> heartbeat\n");
#endif
if(fq_write_uint16(conn_s->cmd_fd, FQ_PROTO_HB)) break;
conn_s->cmd_hb_needed = 0;
@@ -465,7 +465,7 @@ fq_conn_worker(void *u) {
switch(hb) {
case FQ_PROTO_HB:
#ifdef DEBUG
- fq_debug("<- heartbeat\n");
+ fq_debug(FQ_DEBUG_CONN, "<- heartbeat\n");
#endif
conn_s->cmd_hb_last = fq_gethrtime();
conn_s->cmd_hb_needed = 1;
@@ -499,9 +499,10 @@ fq_conn_worker(void *u) {
}
#ifdef DEBUG
- fq_debug("[cmd] connection failed: %s\n", conn_s->error);
+ fq_debug(FQ_DEBUG_CONN, "[cmd] connection failed: %s\n", conn_s->error);
#endif
usleep(backoff);
+ backoff += 10000;
}
fq_client_disconnect_internal(conn_s);
return (void *)NULL;
View
@@ -55,7 +55,7 @@ fq_msg_alloc(const void *data, size_t s) {
memset(m, 0, offsetof(fq_msg, payload));
if(s) memcpy(m->payload, data, s);
#ifdef DEBUG
- fq_debug("msg(%p) -> alloc\n", (void *)m);
+ fq_debug(FQ_DEBUG_MSG, "msg(%p) -> alloc\n", (void *)m);
#endif
m->refcnt = 1;
return m;
@@ -67,7 +67,7 @@ fq_msg_alloc_BLANK(size_t s) {
if(!m) return NULL;
m->payload_len = s;
#ifdef DEBUG
- fq_debug("msg(%p) -> alloc\n", (void *)m);
+ fq_debug(FQ_DEBUG_MSG, "msg(%p) -> alloc\n", (void *)m);
#endif
m->refcnt = 1;
return m;
@@ -83,7 +83,7 @@ fq_msg_deref(fq_msg *msg) {
ck_pr_dec_uint_zero(&msg->refcnt, &zero);
if(zero) {
#ifdef DEBUG
- fq_debug("msg(%p) -> free\n", (void *)msg);
+ fq_debug(FQ_DEBUG_MSG, "msg(%p) -> free\n", (void *)msg);
#endif
free(msg);
}
View
@@ -16,6 +16,10 @@
uint32_t fq_debug_bits = 0;
+void fq_debug_set_bits(uint32_t bits) {
+ fq_debug_bits = bits;
+}
+
#define IN_READ_BUFFER_SIZE 1024*128
struct buffered_msg_reader {
unsigned char scratch[IN_READ_BUFFER_SIZE];
View
@@ -37,6 +37,11 @@ int main(int argc, char **argv) {
fq_msg *m;
signal(SIGPIPE, SIG_IGN);
fq_client_init(&c, 0, logger);
+ if(argc < 5) {
+ fprintf(stderr, "%s <host> <port> <user> <pass> [size [count]]\n",
+ argv[0]);
+ exit(-1);
+ }
fq_client_creds(c, argv[1], atoi(argv[2]), argv[3], argv[4]);
fq_client_heartbeat(c, 1000);
fq_client_set_backlog(c, 10000, 100);
@@ -46,11 +51,15 @@ int main(int argc, char **argv) {
memcpy(breq.exchange.name, "maryland", 8);
breq.exchange.len = 8;
breq.peermode = 0;
- breq.program = (char *)"";
+ breq.program = (char *)"prefix:\"test.prefix.\"";
fq_client_bind(c, &breq);
while(breq.out__route_id == 0) usleep(100);
printf("route set -> %u\n", breq.out__route_id);
+ if(breq.out__route_id == FQ_BIND_ILLEGAL) {
+ fprintf(stderr, "Failure to bind...\n");
+ exit(-1);
+ }
if(argc > 5) {
psize = atoi(argv[5]);
@@ -67,7 +76,7 @@ int main(int argc, char **argv) {
m = fq_msg_alloc_BLANK(psize);
memset(m->payload, 0, psize);
fq_msg_exchange(m, "maryland", 8);
- fq_msg_route(m, "check.9", 7);
+ fq_msg_route(m, "test.prefix.foo", 15);
fq_msg_id(m, NULL);
fq_client_publish(c, m);
cnt++;
View
@@ -21,6 +21,8 @@ static void usage(const char *prog) {
}
static void parse_cli(int argc, char **argv) {
int c;
+ const char *debug = getenv("DEBUG");
+ if(debug) fq_debug_set_bits(atoi(debug));
while((c = getopt(argc, argv, "hn:")) != EOF) {
switch(c) {
case 'h':
View
@@ -66,7 +66,7 @@ fqd_ccs_key_client(remote_client *client) {
{
char hex[260];
if(fq_rk_to_hex(hex, sizeof(hex), &client->key) >= 0)
- fq_debug("client keyed:\n%s\n", hex);
+ fq_debug(FQ_DEBUG_CONN, "client keyed:\n%s\n", hex);
}
#endif
@@ -76,7 +76,7 @@ fqd_ccs_key_client(remote_client *client) {
static int
fqd_css_heartbeat(remote_client *client) {
#ifdef DEBUG
- fq_debug("heartbeat -> %s\n", client->pretty);
+ fq_debug(FQ_DEBUG_CONN, "heartbeat -> %s\n", client->pretty);
#endif
return fq_write_uint16(client->fd, FQ_PROTO_HB);
}
@@ -103,7 +103,7 @@ fqd_ccs_loop(remote_client *client) {
if(hb_us && client->last_activity < (t - hb_us * 3)) {
ERRTOFD(client->fd, "heartbeat failed");
#ifdef DEBUG
- fq_debug("heartbeat failed from %s\n", client->pretty);
+ fq_debug(FQ_DEBUG_CONN, "heartbeat failed from %s\n", client->pretty);
#endif
break;
}
@@ -113,15 +113,15 @@ fqd_ccs_loop(remote_client *client) {
switch(cmd) {
case FQ_PROTO_HB:
#ifdef DEBUG
- fq_debug("heartbeat <- %s\n", client->pretty);
+ fq_debug(FQ_DEBUG_CONN, "heartbeat <- %s\n", client->pretty);
#endif
break;
case FQ_PROTO_HBREQ:
{
uint16_t ms;
fq_read_uint16(client->fd, &ms);
#ifdef DEBUG
- fq_debug("setting client(%p) heartbeat to %d\n",
+ fq_debug(FQ_DEBUG_CONN, "setting client(%p) heartbeat to %d\n",
(void *)client, ms);
#endif
client->heartbeat_ms = ms;
View
@@ -165,7 +165,7 @@ fqd_config_bind(fq_rk *exchange, int peermode, const char *program,
fqd_exchange *x;
fqd_route_rule *rule;
rule = fqd_routemgr_compile(program, peermode, q);
- if(!rule) return 0;
+ if(!rule) return FQ_BIND_ILLEGAL;
BEGIN_CONFIG_MODIFY(config);
x = fqd_config_get_exchange(config, exchange);
if(!x) x = fqd_config_add_exchange(config, exchange);
@@ -350,6 +350,7 @@ fqd_internal_copy_config(fqd_config_ref *src, fqd_config_ref *tgt) {
for(i=0;i<tgt->config.n_exchanges;i++) {
if(tgt->config.exchanges[i] && tgt->config.exchanges[i]->set) {
fqd_routemgr_ruleset_free(tgt->config.exchanges[i]->set);
+ free(tgt->config.exchanges[i]);
}
}
free(tgt->config.exchanges);
View
@@ -25,7 +25,7 @@ fqd_remote_client_deref(remote_client *r) {
ck_pr_dec_uint_zero(&r->refcnt, &zero);
if(zero) {
#ifdef DEBUG
- fq_debug("dropping client\n");
+ fq_debug(FQ_DEBUG_CONN, "dropping client\n");
#endif
close(r->fd);
free(r);
@@ -44,7 +44,7 @@ conn_handler(void *vc) {
"(pre-auth)@%s:%d", buf, ntohs(client->remote.sin_port));
gettimeofday(&client->connect_time, NULL);
#ifdef DEBUG
- fq_debug("client connected\n");
+ fq_debug(FQ_DEBUG_CONN, "client connected\n");
#endif
while((rv = read(client->fd, &cmd, sizeof(cmd))) == -1 && errno == EINTR);
@@ -72,7 +72,7 @@ conn_handler(void *vc) {
default:
#ifdef DEBUG
- fq_debug("client protocol violation in initial cmd\n");
+ fq_debug(FQ_DEBUG_CONN, "client protocol violation in initial cmd\n");
#endif
break;
}
View
@@ -41,7 +41,7 @@ fqd_queue_register_client(fqd_queue *q, remote_client *c) {
if(ck_pr_cas_ptr(&q->downstream[i], NULL, c) == true) {
fqd_queue_ref(q);
#ifdef DEBUG
- fq_debug("%.*s adding %s\n",
+ fq_debug(FQ_DEBUG_CONFIG, "%.*s adding %s\n",
q->name.len, q->name.name, c->pretty);
#endif
return 0;
@@ -58,7 +58,7 @@ fqd_queue_deregister_client(fqd_queue *q, remote_client *c) {
if(q->downstream[i] == c) {
q->downstream[i] = NULL;
#ifdef DEBUG
- fq_debug("%.*s dropping %s\n",
+ fq_debug(FQ_DEBUG_CONFIG, "%.*s dropping %s\n",
q->name.len, q->name.name, c->pretty);
#endif
fqd_remote_client_deref(c);
@@ -83,7 +83,7 @@ fqd_queue_deref(fqd_queue *q) {
ck_pr_dec_uint_zero(&q->refcnt, &zero);
if(zero) {
#ifdef DEBUG
- fq_debug("dropping queue(%p) %.*s\n",
+ fq_debug(FQ_DEBUG_CONFIG, "dropping queue(%p) %.*s\n",
(void *)q, q->name.len, q->name.name);
#endif
free(q);
Oops, something went wrong.

0 comments on commit 4093319

Please sign in to comment.