Permalink
Browse files

fix some race issues. still doesn't work completely

  • Loading branch information...
postwait committed Apr 23, 2012
1 parent 499a4e9 commit 14a93680a6024637ea85ffec2628f2383a338d10
Showing with 200 additions and 17 deletions.
  1. +2 −0 .gitignore
  2. +10 −2 Makefile
  3. +1 −1 fq_client.c
  4. +4 −2 fq_msg.c
  5. +82 −0 fq_rcvr.c
  6. +88 −0 fq_sndr.c
  7. +3 −2 fqd.c
  8. +4 −2 fqd_dss.c
  9. +4 −5 fqd_listener.c
  10. +0 −2 fqd_queue.c
  11. +2 −1 fqd_routemgr.c
View
@@ -2,6 +2,8 @@
*.o
fqd
fqc
+fq_rcvr
+fq_sndr
Makefile.build
Makefile.depend
*.dSYM/
View
@@ -11,7 +11,7 @@ FQD_OBJ=fqd.o fqd_listener.o fqd_ccs.o fqd_dss.o fqd_config.o \
FQC_OBJ=fqc.o $(CLIENT_OBJ)
CPPFLAGS=-I./$(CKDIR)/include
-all: libfq.a fqd fqc
+all: libfq.a fqd fqc fq_sndr fq_rcvr
Makefile.build:
(cd $(CKDIR) && ./configure)
@@ -34,13 +34,21 @@ fqc: $(FQC_OBJ)
@echo " - linking $@"
@$(CC) $(CFLAGS) $(LDFLAGS) -o $@ $(FQC_OBJ)
+fq_sndr: fq_sndr.o $(CLIENT_OBJ)
+ @echo " - linking $@"
+ @$(CC) $(CFLAGS) $(LDFLAGS) -o $@ $^
+
+fq_rcvr: fq_rcvr.o $(CLIENT_OBJ)
+ @echo " - linking $@"
+ @$(CC) $(CFLAGS) $(LDFLAGS) -o $@ $^
+
libfq.a: $(CLIENT_OBJ)
@echo " - creating $@"
@ar cr $@ $(CLIENT_OBJ)
.c.o: $<
@echo " - compiling $<"
- $(CC) $(CPPFLAGS) $(CFLAGS) -o $@ -c $<
+ @$(CC) $(CPPFLAGS) $(CFLAGS) -o $@ -c $<
Makefile.depend:
@echo " - make depend"
View
@@ -144,7 +144,7 @@ fq_client_do_auth(fq_conn_s *conn_s) {
if(fq_write_uint16(conn_s->cmd_fd, FQ_PROTO_AUTH_PLAIN)) return -2;
if(fq_write_short_cmd(conn_s->cmd_fd, strlen(conn_s->user), conn_s->user) < 0)
return -3;
- if(fq_write_short_cmd(conn_s->cmd_fd, strlen(conn_s->user), conn_s->queue) < 0)
+ if(fq_write_short_cmd(conn_s->cmd_fd, strlen(conn_s->queue), conn_s->queue) < 0)
return -4;
if(fq_write_short_cmd(conn_s->cmd_fd, strlen(conn_s->pass), conn_s->pass) < 0)
return -5;
View
@@ -5,6 +5,8 @@
#include "fq.h"
#include "ck_pr.h"
+#define MSG_ALIGN sizeof(void *)
+
static fq_msgid local_msgid = {
.id = {
.u32 = {
@@ -49,7 +51,7 @@ pull_next_local_msgid(fq_msgid *msgid) {
fq_msg *
fq_msg_alloc(const void *data, size_t s) {
fq_msg *m;
- m = malloc(offsetof(fq_msg, payload) + (s));
+ m = malloc(offsetof(fq_msg, payload) + ((s | (MSG_ALIGN-1))+1));
if(!m) return NULL;
m->payload_len = s;
memset(m, 0, offsetof(fq_msg, payload));
@@ -63,7 +65,7 @@ fq_msg_alloc(const void *data, size_t s) {
fq_msg *
fq_msg_alloc_BLANK(size_t s) {
fq_msg *m;
- m = calloc(offsetof(fq_msg, payload) + (s), 1);
+ m = calloc(offsetof(fq_msg, payload) + ((s | (MSG_ALIGN-1))+1), 1);
if(!m) return NULL;
m->payload_len = s;
#ifdef DEBUG
View
@@ -0,0 +1,82 @@
+#include <stdio.h>
+#include <stdlib.h>
+#include <unistd.h>
+#include <signal.h>
+#include <string.h>
+#include "fq.h"
+
+void logger(const char *);
+
+void logger(const char *s) {
+ fprintf(stderr, "fq_logger: %s\n", s);
+}
+static void
+print_rate(fq_client c, hrtime_t s, hrtime_t f, uint64_t cnt, uint64_t icnt) {
+ double d;
+ if(cnt) {
+ d = (double)cnt * 1000000000;
+ d /= (double)(f-s);
+ printf("[%d backlog] output %0.2f msg/sec\n",
+ fq_client_data_backlog(c), d);
+ }
+ if(icnt) {
+ d = (double)icnt * 1000000000;
+ d /= (double)(f-s);
+ printf("[%d backlog] input %0.2f msg/sec\n",
+ fq_client_data_backlog(c), d);
+ }
+}
+int main(int argc, char **argv) {
+ hrtime_t s0, s, f;
+ uint64_t cnt = 0, icnt = 0, icnt_total = 0;
+ int rcvd = 0;
+ fq_client c;
+ fq_bind_req breq;
+ fq_msg *m;
+ signal(SIGPIPE, SIG_IGN);
+ fq_client_init(&c, 0, logger);
+ if(argc < 5) {
+ fprintf(stderr, "%s <host> <port> <user> <pass> [size [count]]\n",
+ argv[0]);
+ exit(-1);
+ }
+ fq_client_creds(c, argv[1], atoi(argv[2]), argv[3], argv[4]);
+ fq_client_heartbeat(c, 1000);
+ fq_client_set_backlog(c, 10000, 100);
+ fq_client_connect(c);
+
+ memset(&breq, 0, sizeof(breq));
+ memcpy(breq.exchange.name, "maryland", 8);
+ breq.exchange.len = 8;
+ breq.peermode = 0;
+ breq.program = (char *)"prefix:\"test.prefix.\"";
+
+ fq_client_bind(c, &breq);
+ while(breq.out__route_id == 0) usleep(100);
+ printf("route set -> %u\n", breq.out__route_id);
+ if(breq.out__route_id == FQ_BIND_ILLEGAL) {
+ fprintf(stderr, "Failure to bind...\n");
+ exit(-1);
+ }
+
+ s0 = s = fq_gethrtime();
+ while(1) {
+ f = fq_gethrtime();
+ while(m = fq_client_receive(c)) {
+ icnt++;
+ icnt_total++;
+ rcvd++;
+ fq_msg_deref(m);
+ }
+ usleep(1000);
+ if(f-s > 1000000000) {
+ print_rate(c, s, f, cnt, icnt);
+ printf("total: %llu\n", (unsigned long long)icnt_total);
+ icnt = 0;
+ cnt = 0;
+ s = f;
+ }
+ }
+ (void) argc;
+ return 0;
+}
View
@@ -0,0 +1,88 @@
+#include <stdio.h>
+#include <stdlib.h>
+#include <unistd.h>
+#include <signal.h>
+#include <string.h>
+#include "fq.h"
+
+#define SEND_COUNT 1000
+int send_count = SEND_COUNT;
+void logger(const char *);
+
+void logger(const char *s) {
+ fprintf(stderr, "fq_logger: %s\n", s);
+}
+static void
+print_rate(fq_client c, hrtime_t s, hrtime_t f, uint64_t cnt, uint64_t icnt) {
+ double d;
+ if(cnt) {
+ d = (double)cnt * 1000000000;
+ d /= (double)(f-s);
+ printf("[%d backlog] output %0.2f msg/sec\n",
+ fq_client_data_backlog(c), d);
+ }
+ if(icnt) {
+ d = (double)icnt * 1000000000;
+ d /= (double)(f-s);
+ printf("[%d backlog] input %0.2f msg/sec\n",
+ fq_client_data_backlog(c), d);
+ }
+}
+int main(int argc, char **argv) {
+ hrtime_t s0, s, f, f0;
+ uint64_t cnt = 0, icnt = 0;
+ int psize = 0, i = 0;
+ fq_client c;
+ fq_msg *m;
+ char *fq_debug = getenv("FQ_DEBUG");
+ if(fq_debug) fq_debug_set_bits(atoi(fq_debug));
+ signal(SIGPIPE, SIG_IGN);
+ fq_client_init(&c, 0, logger);
+ if(argc < 5) {
+ fprintf(stderr, "%s <host> <port> <user> <pass> [size [count]]\n",
+ argv[0]);
+ exit(-1);
+ }
+ fq_client_creds(c, argv[1], atoi(argv[2]), argv[3], argv[4]);
+ fq_client_heartbeat(c, 1000);
+ fq_client_set_backlog(c, 10000, 100);
+ fq_client_connect(c);
+
+ if(argc > 5) {
+ psize = atoi(argv[5]);
+ }
+ printf("payload size -> %d\n", psize);
+ if(argc > 6) {
+ send_count = atoi(argv[6]);
+ }
+ printf("message count -> %d\n", send_count);
+
+ s0 = s = fq_gethrtime();
+ while(i < send_count || fq_client_data_backlog(c) > 0) {
+ if(i < send_count) {
+ m = fq_msg_alloc_BLANK(psize);
+ memset(m->payload, 0, psize);
+ fq_msg_exchange(m, "maryland", 8);
+ fq_msg_route(m, "test.prefix.foo", 15);
+ fq_msg_id(m, NULL);
+ fq_client_publish(c, m);
+ cnt++;
+ i++;
+ fq_msg_free(m);
+ }
+ else usleep(100);
+
+
+ f = fq_gethrtime();
+ if(f-s > 1000000000) {
+ print_rate(c, s, f, cnt, icnt);
+ icnt = 0;
+ cnt = 0;
+ s = f;
+ }
+ }
+ f0 = fq_gethrtime();
+ print_rate(c, s0, f0, i, 0);
+ (void) argc;
+ return 0;
+}
View
5 fqd.c
@@ -21,7 +21,7 @@ static void usage(const char *prog) {
}
static void parse_cli(int argc, char **argv) {
int c;
- const char *debug = getenv("DEBUG");
+ const char *debug = getenv("FQ_DEBUG");
if(debug) fq_debug_set_bits(atoi(debug));
while((c = getopt(argc, argv, "hn:")) != EOF) {
switch(c) {
@@ -69,6 +69,7 @@ int main(int argc, char **argv) {
fqd_config_init(nodeid);
signal(SIGPIPE, SIG_IGN);
pthread_create(&tid, NULL, listener_thread, NULL);
- fgets(buff, sizeof(buff), stdin);
+ pause();
+ //fgets(buff, sizeof(buff), stdin);
return 0;
}
View
@@ -75,7 +75,9 @@ fqd_data_driver(remote_client *parent) {
if(!needs_write || (rv > 0 && (pfd.revents & POLLOUT))) {
fq_msg *m;
needs_write = 0;
- m = inflight ? inflight : fqd_queue_dequeue(parent->queue);
+ m = inflight ? inflight
+ : parent->queue ? fqd_queue_dequeue(parent->queue)
+ : NULL;
inflight = NULL;
while(m) {
int written;
@@ -95,7 +97,7 @@ fqd_data_driver(remote_client *parent) {
fq_msg_deref(m);
me->msgs_out++;
inflight_sofar = 0;
- m = fqd_queue_dequeue(parent->queue);
+ m = parent->queue ? fqd_queue_dequeue(parent->queue) : NULL;
}
}
View
@@ -23,10 +23,9 @@ void
fqd_remote_client_deref(remote_client *r) {
bool zero;
ck_pr_dec_uint_zero(&r->refcnt, &zero);
+ fq_debug(FQ_DEBUG_CONN, "deref client -> %u%s\n",
+ r->refcnt, zero ? " dropping" : "");
if(zero) {
-#ifdef DEBUG
- fq_debug(FQ_DEBUG_CONN, "dropping client\n");
-#endif
close(r->fd);
free(r);
}
@@ -52,7 +51,7 @@ conn_handler(void *vc) {
switch(ntohl(cmd)) {
case FQ_PROTO_CMD_MODE:
{
- remote_client *newc = calloc(sizeof(*newc), 1);
+ remote_client *newc = calloc(1, sizeof(*newc));
memcpy(newc, client, sizeof(*client));
newc->refcnt = 1;
fqd_command_and_control_server(newc);
@@ -63,7 +62,7 @@ conn_handler(void *vc) {
case FQ_PROTO_DATA_MODE:
case FQ_PROTO_PEER_MODE:
{
- remote_data_client *newc = calloc(sizeof(*newc), 1);
+ remote_data_client *newc = calloc(1, sizeof(*newc));
memcpy(newc, client, sizeof(*client));
newc->mode = ntohl(cmd);
newc->refcnt=1;
View
@@ -82,10 +82,8 @@ fqd_queue_deref(fqd_queue *q) {
bool zero;
ck_pr_dec_uint_zero(&q->refcnt, &zero);
if(zero) {
-#ifdef DEBUG
fq_debug(FQ_DEBUG_CONFIG, "dropping queue(%p) %.*s\n",
(void *)q, q->name.len, q->name.name);
-#endif
free(q);
}
}
View
@@ -37,7 +37,8 @@ walk_jump_table(struct prefix_jumptable *jt, fq_msg *m, int offset) {
struct fqd_route_rule *r;
for(r=jt->rules;r;r=r->next) {
if(m->route.len >= r->prefix.len && m->route.len <= r->match_maxlen) {
- fq_debug(FQ_DEBUG_ROUTE, "M[%p] -> Q[%p]\n", (void *)m, (void *)r->queue);
+ fq_rk *rk = (fq_rk *)r->queue;
+ fq_debug(FQ_DEBUG_ROUTE, "M[%p] -> Q[%.*s]\n", (void *)m, rk->len, rk->name);
fqd_queue_enqueue(r->queue, m);
}
}

0 comments on commit 14a9368

Please sign in to comment.