Skip to content

Commit

Permalink
Implement basic status command.
Browse files Browse the repository at this point in the history
  • Loading branch information
postwait committed May 15, 2012
1 parent c5b320a commit 0ef5ebc
Show file tree
Hide file tree
Showing 7 changed files with 116 additions and 4 deletions.
9 changes: 9 additions & 0 deletions fq.h
Expand Up @@ -24,6 +24,8 @@
#define FQ_PROTO_BIND 0xb171 #define FQ_PROTO_BIND 0xb171
#define FQ_PROTO_UNBINDREQ 0x071b #define FQ_PROTO_UNBINDREQ 0x071b
#define FQ_PROTO_UNBIND 0x171b #define FQ_PROTO_UNBIND 0x171b
#define FQ_PROTO_STATUS 0x57a7
#define FQ_PROTO_STATUSREQ 0xc7a7


#define MAX_RK_LEN 127 #define MAX_RK_LEN 127
typedef struct fq_rk { typedef struct fq_rk {
Expand Down Expand Up @@ -120,6 +122,10 @@ extern int
const char *host, unsigned short port, const char *host, unsigned short port,
const char *source, const char *pass); const char *source, const char *pass);


extern void
fq_client_status(fq_client conn,
void (*f)(char *, uint32_t, void *), void *c);

extern void extern void
fq_client_heartbeat(fq_client conn, unsigned short ms); fq_client_heartbeat(fq_client conn, unsigned short ms);


Expand Down Expand Up @@ -147,6 +153,9 @@ extern int
extern int extern int
fq_rk_to_hex(char *buf, int len, fq_rk *k); fq_rk_to_hex(char *buf, int len, fq_rk *k);


extern int
fq_read_status(int fd, void (*f)(char *, uint32_t, void *), void *);

extern int extern int
fq_read_uint16(int fd, unsigned short *v); fq_read_uint16(int fd, unsigned short *v);


Expand Down
49 changes: 49 additions & 0 deletions fq_client.c
Expand Up @@ -14,6 +14,7 @@
#include <poll.h> #include <poll.h>
#include <ck_fifo.h> #include <ck_fifo.h>
#include <assert.h> #include <assert.h>
#include <uuid/uuid.h>


#include "fq.h" #include "fq.h"


Expand Down Expand Up @@ -80,6 +81,10 @@ typedef struct {
struct { struct {
uint16_t ms; uint16_t ms;
} heartbeat; } heartbeat;
struct {
void (*callback)(char *, uint32_t, void *);
void *closure;
} status;
fq_bind_req *bind; fq_bind_req *bind;
fq_unbind_req *unbind; fq_unbind_req *unbind;
} data; } data;
Expand Down Expand Up @@ -371,7 +376,21 @@ fq_conn_worker(void *u) {
fq_debug(FQ_DEBUG_CONN, "client acting on user req 0x%04x\n", entry->cmd); fq_debug(FQ_DEBUG_CONN, "client acting on user req 0x%04x\n", entry->cmd);
#endif #endif
switch(entry->cmd) { switch(entry->cmd) {
case FQ_PROTO_STATUSREQ:
if(expect != 0) {
if(conn_s->errorlog) conn_s->errorlog("protocol violation");
goto restart;
}
fq_debug(FQ_DEBUG_CONN, "sending status request\n");
if(fq_write_uint16(conn_s->cmd_fd, entry->cmd)) {
free(entry);
goto restart;
}
expect = FQ_PROTO_STATUS;
last_entry = entry;
break;
case FQ_PROTO_HBREQ: case FQ_PROTO_HBREQ:
fq_debug(FQ_DEBUG_CONN, "sending heartbeat request\n");
if(fq_write_uint16(conn_s->cmd_fd, entry->cmd) || if(fq_write_uint16(conn_s->cmd_fd, entry->cmd) ||
fq_write_uint16(conn_s->cmd_fd, entry->data.heartbeat.ms)) { fq_write_uint16(conn_s->cmd_fd, entry->data.heartbeat.ms)) {
free(entry); free(entry);
Expand Down Expand Up @@ -470,6 +489,17 @@ fq_conn_worker(void *u) {
conn_s->cmd_hb_last = fq_gethrtime(); conn_s->cmd_hb_last = fq_gethrtime();
conn_s->cmd_hb_needed = 1; conn_s->cmd_hb_needed = 1;
break; break;
case FQ_PROTO_STATUS:
if(expect != FQ_PROTO_STATUS) {
if(conn_s->errorlog) conn_s->errorlog("protocol violation");
goto restart;
}
if(fq_read_status(conn_s->cmd_fd,
last_entry->data.status.callback,
last_entry->data.status.closure))
goto restart;
expect = 0;
break;
case FQ_PROTO_BIND: case FQ_PROTO_BIND:
if(expect != FQ_PROTO_BIND) { if(expect != FQ_PROTO_BIND) {
if(conn_s->errorlog) conn_s->errorlog("protocol violation"); if(conn_s->errorlog) conn_s->errorlog("protocol violation");
Expand Down Expand Up @@ -534,6 +564,15 @@ fq_client_creds(fq_client conn, const char *host, unsigned short port,
conn_s->user = strdup(sender); conn_s->user = strdup(sender);
conn_s->queue = strchr(conn_s->user, '/'); conn_s->queue = strchr(conn_s->user, '/');
if(conn_s->queue) *conn_s->queue++ = '\0'; if(conn_s->queue) *conn_s->queue++ = '\0';
if(!conn_s->queue) {
uuid_t out;
char qname[39];
uuid_generate(out);
qname[0] = 'q'; qname[1] = '-';
uuid_unparse_lower(out, qname+2);
conn_s->queue = qname;
}
conn_s->queue = strdup(conn_s->queue);
conn_s->pass = strdup(pass); conn_s->pass = strdup(pass);


/* determine our endpoint */ /* determine our endpoint */
Expand Down Expand Up @@ -596,6 +635,16 @@ fq_client_creds(fq_client conn, const char *host, unsigned short port,
return 0; return 0;
} }


void
fq_client_status(fq_client conn,
void (*f)(char *, uint32_t, void *), void *c) {
cmd_instr *e;
e = malloc(sizeof(*e));
e->cmd = FQ_PROTO_STATUSREQ;
e->data.status.callback = f;
e->data.status.closure = c;
fq_client_signal(conn, e);
}
void void
fq_client_heartbeat(fq_client conn, unsigned short heartbeat_ms) { fq_client_heartbeat(fq_client conn, unsigned short heartbeat_ms) {
cmd_instr *e; cmd_instr *e;
Expand Down
6 changes: 6 additions & 0 deletions fq_sndr.c
Expand Up @@ -13,8 +13,14 @@ void logger(const char *s) {
fprintf(stderr, "fq_logger: %s\n", s); fprintf(stderr, "fq_logger: %s\n", s);
} }
static void static void
debug_status(char *key, uint32_t value, void *unused) {
(void)unused;
fq_debug(FQ_DEBUG_CONN, " ---> %s : %u\n", key, value);
}
static void
print_rate(fq_client c, hrtime_t s, hrtime_t f, uint64_t cnt, uint64_t icnt) { print_rate(fq_client c, hrtime_t s, hrtime_t f, uint64_t cnt, uint64_t icnt) {
double d; double d;
fq_client_status(c, debug_status, NULL);
if(cnt) { if(cnt) {
d = (double)cnt * 1000000000; d = (double)cnt * 1000000000;
d /= (double)(f-s); d /= (double)(f-s);
Expand Down
16 changes: 16 additions & 0 deletions fq_utils.c
Expand Up @@ -279,6 +279,22 @@ fq_read_short_cmd(int fd, unsigned short buflen, void *buf) {
return rv; return rv;
} }
int int
fq_read_status(int fd, void (*f)(char *, uint32_t, void *), void *closure) {
while(1) {
char key[0x10000];
int len;
uint32_t value;

len = fq_read_short_cmd(fd, 0xffff, key);
if(len < 0) return -1;
if(len == 0) break;
key[len] = '\0';
if(fq_read_uint32(fd, &value) < 0) return -1;
f(key, value, closure);
}
return 0;
}
int
fq_write_short_cmd(int fd, unsigned short buflen, const void *buf) { fq_write_short_cmd(int fd, unsigned short buflen, const void *buf) {
unsigned short nlen; unsigned short nlen;
int rv; int rv;
Expand Down
3 changes: 3 additions & 0 deletions fqd.h
Expand Up @@ -54,6 +54,9 @@ typedef struct {
typedef struct { typedef struct {
CLIENT_SHARED CLIENT_SHARED
uint32_t mode; uint32_t mode;
uint32_t no_exchange;
uint32_t no_route;
uint32_t routed;
uint32_t msgs_in; uint32_t msgs_in;
uint32_t msgs_out; uint32_t msgs_out;
} remote_data_client; } remote_data_client;
Expand Down
26 changes: 25 additions & 1 deletion fqd_ccs.c
Expand Up @@ -81,6 +81,27 @@ fqd_css_heartbeat(remote_client *client) {
return fq_write_uint16(client->fd, FQ_PROTO_HB); return fq_write_uint16(client->fd, FQ_PROTO_HB);
} }


static int
fqd_css_status(remote_client *client) {
remote_data_client *data = client->data;
if(!data) return 0;
#ifdef DEBUG
fq_debug(FQ_DEBUG_CONN, "status -> %s\n", client->pretty);
#endif
if(fq_write_uint16(client->fd, FQ_PROTO_STATUS) < 0) return -1;
#define write_uintkey(name, v) do { \
if(fq_write_short_cmd(client->fd, strlen(name), name) < 0) return -1; \
if(fq_write_uint32(client->fd, v) < 0) return -1; \
} while(0)
write_uintkey("no_exchange", data->no_exchange);
write_uintkey("no_route", data->no_route);
write_uintkey("routed", data->routed);
write_uintkey("msgs_in", data->msgs_in);
write_uintkey("msgs_out", data->msgs_out);
if(fq_write_uint16(client->fd, 0) < 0) return -1;
return 0;
}

static int static int
fqd_ccs_loop(remote_client *client) { fqd_ccs_loop(remote_client *client) {
while(1) { while(1) {
Expand Down Expand Up @@ -119,14 +140,17 @@ fqd_ccs_loop(remote_client *client) {
case FQ_PROTO_HBREQ: case FQ_PROTO_HBREQ:
{ {
uint16_t ms; uint16_t ms;
fq_read_uint16(client->fd, &ms); if(fq_read_uint16(client->fd, &ms) < 0) return -1;
#ifdef DEBUG #ifdef DEBUG
fq_debug(FQ_DEBUG_CONN, "setting client(%p) heartbeat to %d\n", fq_debug(FQ_DEBUG_CONN, "setting client(%p) heartbeat to %d\n",
(void *)client, ms); (void *)client, ms);
#endif #endif
client->heartbeat_ms = ms; client->heartbeat_ms = ms;
break; break;
} }
case FQ_PROTO_STATUSREQ:
if(fqd_css_status(client)) return -1;
break;
case FQ_PROTO_BINDREQ: case FQ_PROTO_BINDREQ:
{ {
int len; int len;
Expand Down
11 changes: 8 additions & 3 deletions fqd_routemgr.c
Expand Up @@ -33,14 +33,15 @@ struct fqd_route_rules {
struct prefix_jumptable master; struct prefix_jumptable master;
}; };
static void static void
walk_jump_table(struct prefix_jumptable *jt, fq_msg *m, int offset) { walk_jump_table(struct prefix_jumptable *jt, fq_msg *m, int offset, int *mcnt) {
if(jt->tabletype == RULETABLE) { if(jt->tabletype == RULETABLE) {
struct fqd_route_rule *r; struct fqd_route_rule *r;
for(r=jt->rules;r;r=r->next) { for(r=jt->rules;r;r=r->next) {
if(m->route.len >= r->prefix.len && m->route.len <= r->match_maxlen) { if(m->route.len >= r->prefix.len && m->route.len <= r->match_maxlen) {
fq_rk *rk = (fq_rk *)r->queue; fq_rk *rk = (fq_rk *)r->queue;
fq_debug(FQ_DEBUG_ROUTE, "M[%p] -> Q[%.*s]\n", (void *)m, rk->len, rk->name); fq_debug(FQ_DEBUG_ROUTE, "M[%p] -> Q[%.*s]\n", (void *)m, rk->len, rk->name);
fqd_queue_enqueue(r->queue, m); fqd_queue_enqueue(r->queue, m);
if(mcnt) (*mcnt)++;
} }
} }
} }
Expand All @@ -51,7 +52,7 @@ walk_jump_table(struct prefix_jumptable *jt, fq_msg *m, int offset) {
memcpy(&inbits, in, sizeof(inbits)); memcpy(&inbits, in, sizeof(inbits));
for(i=0;i<jt->pat_len;i++) { for(i=0;i<jt->pat_len;i++) {
if(jt->pats[i].pattern == (jt->pats[i].checkbits & inbits)) { if(jt->pats[i].pattern == (jt->pats[i].checkbits & inbits)) {
walk_jump_table(jt->pats[i].jt, m, offset + sizeof(inbits)); walk_jump_table(jt->pats[i].jt, m, offset + sizeof(inbits), mcnt);
} }
} }
} }
Expand All @@ -64,10 +65,14 @@ fqd_inject_message(remote_client *c, fq_msg *m) {
config = fqd_config_get(); config = fqd_config_get();
e = fqd_config_get_exchange(config, &m->exchange); e = fqd_config_get_exchange(config, &m->exchange);
if(e) { if(e) {
walk_jump_table(&e->set->master, m, 0); int cnt = 0;
walk_jump_table(&e->set->master, m, 0, &cnt);
if(cnt == 0) c->data->no_route++;
else c->data->routed += cnt;
} }
else { else {
fq_debug(FQ_DEBUG_ROUTE, "No exchange \"%.*s\"\n", m->exchange.len, m->exchange.name); fq_debug(FQ_DEBUG_ROUTE, "No exchange \"%.*s\"\n", m->exchange.len, m->exchange.name);
c->data->no_exchange++;
} }
fqd_config_release(config); fqd_config_release(config);
fq_msg_deref(m); fq_msg_deref(m);
Expand Down

0 comments on commit 0ef5ebc

Please sign in to comment.