Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

treat a forgotten exchange

  • Loading branch information...
commit c0c355d0f69979d42d3ce799eb346ddb729af54b 1 parent 1a39073
@postwait postwait authored
View
2  Makefile
@@ -6,7 +6,7 @@ EXTRA_CFLAGS=-g -D_REENTRANT
CLIENT_OBJ=fq_client.o fq_msg.o fq_utils.o
FQD_OBJ=fqd.o fqd_listener.o fqd_ccs.o fqd_dss.o fqd_config.o \
- fqd_queue.o fq_routemgr.o \
+ fqd_queue.o fqd_routemgr.o fqd_queue_mem.o \
$(CLIENT_OBJ)
FQC_OBJ=fqc.o $(CLIENT_OBJ)
CPPFLAGS=-I./$(CKDIR)/include
View
20 README
@@ -1,4 +1,24 @@
++------------+
+|- exchange -|
++------------+
+|- routemap -|
++------------+
+ | | +---------+
+ | +--------------|- queue -|
+ | +---------+
++---------+ |
+|- queue -| | +-----------+
++---------+ +---|- client1 -|
+ | +-----------+
+ | +-----------+
+ +--|- client2 -|
+ | +-----------+
+ |
+ +-----------+
+ |- client3 -|
+ +-----------+
+
client -> server
mode/cmd(user/pw) -> sender+key/error
heartbeat(ms) -> ok/no
View
2  fq.h
@@ -72,7 +72,7 @@ extern buffered_msg_reader *fq_buffered_msg_reader_alloc(int fd, int peermode);
extern void fq_buffered_msg_reader_free(buffered_msg_reader *f);
extern int
fq_buffered_msg_read(buffered_msg_reader *f,
- void (*f_msg_handler)(void *, fq_rk *, fq_msg *),
+ void (*f_msg_handler)(void *, fq_msg *),
void *);
/* frame */
View
6 fq_routemgr.c
@@ -1,6 +0,0 @@
-#include "fqd.h"
-
-void
-fqd_inject_message(remote_client *c, fq_rk *exchange, fq_msg *m) {
- fq_msg_deref(m);
-}
View
15 fq_utils.c
@@ -22,7 +22,6 @@ struct buffered_msg_reader {
int peermode;
ssize_t nread;
ssize_t into_body;
- fq_rk exchange;
fq_msg msg;
fq_msg *copy;
};
@@ -40,7 +39,7 @@ void fq_buffered_msg_reader_free(buffered_msg_reader *f) {
}
static int
parse_message_headers(int peermode, unsigned char *d, int dlen,
- fq_rk *exchange, fq_msg *msg) {
+ fq_msg *msg) {
int ioff = 0;
unsigned char exchange_len, route_len, sender_len, nhops;
#define BAIL_UNLESS_LEFT(d) do { \
@@ -50,11 +49,11 @@ parse_message_headers(int peermode, unsigned char *d, int dlen,
BAIL_UNLESS_LEFT(sizeof(exchange_len));
memcpy(&exchange_len, d+ioff, sizeof(exchange_len));
ioff += sizeof(exchange_len);
- if(exchange_len > sizeof(exchange->name)) return -1;
- exchange->len = exchange_len;
+ if(exchange_len > sizeof(msg->exchange.name)) return -1;
+ msg->exchange.len = exchange_len;
BAIL_UNLESS_LEFT(exchange_len);
- memcpy(exchange->name, d+ioff, exchange_len);
+ memcpy(msg->exchange.name, d+ioff, exchange_len);
ioff += exchange_len;
BAIL_UNLESS_LEFT(sizeof(route_len));
@@ -108,7 +107,7 @@ parse_message_headers(int peermode, unsigned char *d, int dlen,
*/
int
fq_buffered_msg_read(buffered_msg_reader *f,
- void (*f_msg_handler)(void *, fq_rk *, fq_msg *),
+ void (*f_msg_handler)(void *, fq_msg *),
void *closure) {
int rv;
if(f->into_body < f->msg.payload_len) {
@@ -142,7 +141,7 @@ fq_buffered_msg_read(buffered_msg_reader *f,
int body_start;
body_start = parse_message_headers(f->peermode,
f->scratch+f->off, f->nread-f->off,
- &f->exchange, &f->msg);
+ &f->msg);
f->into_body = 0;
#ifdef DEBUG
fq_debug("%d = parse(+%d, %d) -> %d\n",
@@ -175,7 +174,7 @@ fq_buffered_msg_read(buffered_msg_reader *f,
#ifdef DEBUG
fq_debug("message read... injecting\n");
#endif
- f_msg_handler(closure, &f->exchange, f->copy);
+ f_msg_handler(closure, f->copy);
f->copy = NULL;
memset(&f->msg, 0, sizeof(f->msg));
}
View
10 fqd.h
@@ -14,11 +14,15 @@
typedef void * fqd_queue_impl_data;
typedef struct fqd_queue_impl {
- int (*enqueue)(fqd_queue_impl_data, fq_msg *); /* cannot block */
- fq_msg *(*dequeue)(fqd_queue_impl_data); /* can block */
+ fqd_queue_impl_data (*setup)(fq_rk *);
+ void (*enqueue)(fqd_queue_impl_data, fq_msg *);
+ fq_msg *(*dequeue)(fqd_queue_impl_data);
void (*dispose)(fqd_queue_impl_data);
} fqd_queue_impl;
+/* implememted in fqd_queue_mem.c */
+extern fqd_queue_impl fqd_queue_mem_impl;
+
typedef struct fqd_queue fqd_queue;
extern void fqd_queue_ref(fqd_queue *);
@@ -80,7 +84,7 @@ extern fqd_queue *fqd_queue_get(fq_rk *);
extern int fqd_queue_register_client(fqd_queue *q, remote_client *c);
extern int fqd_queue_deregister_client(fqd_queue *q, remote_client *c);
-extern void fqd_inject_message(remote_client *c, fq_rk *exchange, fq_msg *m);
+extern void fqd_inject_message(remote_client *c, fq_msg *m);
#define ERRTOFD(fd, error) do { \
(void)fq_write_uint16(fd, htons(FQ_PROTO_ERROR)); \
View
5 fqd_config.c
@@ -18,12 +18,17 @@
* [cycleout] [currentread] [currentwrite]
*
*/
+struct fqd_exchange {
+ fq_rk exchange;
+};
struct fqd_config {
u_int64_t gen;
int n_clients;
remote_client **clients;
int n_queues;
fqd_queue **queues;
+ int n_exchanges;
+ struct fqd_exchange **exchanges;
};
static u_int64_t global_gen = 0;
View
9 fqd_dss.c
@@ -15,7 +15,7 @@
#define IN_READ_BUFFER_SIZE 1024*256
static void
-fqd_dss_read_complete(void *closure, fq_rk *exchange, fq_msg *msg) {
+fqd_dss_read_complete(void *closure, fq_msg *msg) {
int i;
remote_client *parent = closure;
remote_data_client *me = parent->data;
@@ -29,14 +29,14 @@ fqd_dss_read_complete(void *closure, fq_rk *exchange, fq_msg *msg) {
break;
}
}
- fqd_inject_message(parent, exchange, msg);
+ fqd_inject_message(parent, msg);
}
static void
fqd_data_driver(remote_client *parent) {
remote_data_client *me = parent->data;
buffered_msg_reader *ctx = NULL;
- int flags;
+ int flags, needs_write = 1;
if(((flags = fcntl(me->fd, F_GETFL, 0)) == -1) ||
(fcntl(me->fd, F_SETFL, flags | O_NONBLOCK) == -1))
@@ -48,6 +48,7 @@ fqd_data_driver(remote_client *parent) {
struct pollfd pfd;
pfd.fd = me->fd;
pfd.events = POLLIN;
+ if(needs_write) pfd.events |= POLLOUT;
pfd.revents = 0;
rv = poll(&pfd, 1, parent->heartbeat_ms ? parent->heartbeat_ms : 1000);
if(rv < 0) break;
@@ -59,6 +60,8 @@ fqd_data_driver(remote_client *parent) {
}
if(rv > 0 && (pfd.revents & POLLOUT)) {
+ needs_write = 0;
+
}
}
View
3  fqd_queue.c
@@ -91,9 +91,12 @@ fqd_queue_get(fq_rk *qname) {
nq = calloc(1, sizeof(*nq));
nq->refcnt = 1;
memcpy(&nq->name, qname, sizeof(*qname));
+ nq->impl = &fqd_queue_mem_impl;
+ nq->impl_data = nq->impl->setup(qname);
q = fqd_config_register_queue(nq, NULL);
if(nq != q) {
/* race */
+ nq->impl->dispose(nq->impl_data);
free(nq);
}
}
View
53 fqd_queue_mem.c
@@ -0,0 +1,53 @@
+#include "fqd.h"
+#include <stdlib.h>
+#include <ck_fifo.h>
+
+struct queue_mem {
+ uint32_t qlen;
+ ck_fifo_mpmc_t q;
+ ck_fifo_mpmc_entry_t *qhead;
+};
+
+static void queue_mem_enqueue(fqd_queue_impl_data f, fq_msg *m) {
+ struct queue_mem *d = (struct queue_mem *)f;
+ ck_fifo_mpmc_entry_t *fifo_entry;
+ fifo_entry = malloc(sizeof(ck_fifo_mpmc_entry_t));
+ fq_msg_ref(m);
+ ck_fifo_mpmc_enqueue(&d->q, fifo_entry, m);
+ ck_pr_inc_uint(&d->qlen);
+}
+static fq_msg *queue_mem_dequeue(fqd_queue_impl_data f) {
+ struct queue_mem *d = (struct queue_mem *)f;
+ ck_fifo_mpmc_entry_t *garbage;
+ fq_msg *m;
+ if(ck_fifo_mpmc_dequeue(&d->q, &m, &garbage) == true) {
+ ck_pr_dec_uint(&d->qlen);
+ free(garbage);
+ return m;
+ }
+ return NULL;
+}
+static fqd_queue_impl_data queue_mem_setup(fq_rk *qname) {
+ struct queue_mem *d;
+ d = calloc(1, sizeof(*d));
+ d->qhead = malloc(sizeof(ck_fifo_mpmc_entry_t));
+ ck_fifo_mpmc_init(&d->q, d->qhead);
+ (void)qname;
+ return d;
+}
+static void queue_mem_dispose(fqd_queue_impl_data f) {
+ struct queue_mem *d = (struct queue_mem *)f;
+ fq_msg *m;
+ while(NULL != (m = queue_mem_dequeue(d))) {
+ fq_msg_deref(m);
+ }
+ free(d->qhead);
+ free(d);
+}
+
+fqd_queue_impl fqd_queue_mem_impl = {
+ .setup = queue_mem_setup,
+ .enqueue = queue_mem_enqueue,
+ .dequeue = queue_mem_dequeue,
+ .dispose = queue_mem_dispose
+};
View
8 fqd_routemgr.c
@@ -0,0 +1,8 @@
+#include "fqd.h"
+
+void
+fqd_inject_message(remote_client *c, fq_msg *m) {
+ fq_msg_deref(m);
+}
+
+
Please sign in to comment.
Something went wrong with that request. Please try again.