Skip to content

Commit

Permalink
generalize the buffered message reading (for use in the client)
Browse files Browse the repository at this point in the history
  • Loading branch information
postwait committed Apr 3, 2012
1 parent 7e91d79 commit 1a39073
Show file tree
Hide file tree
Showing 10 changed files with 402 additions and 210 deletions.
2 changes: 1 addition & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -4,5 +4,5 @@ fqd
fqc
Makefile.build
Makefile.depend
fqd.dSYM/
*.dSYM/
.*.swp
24 changes: 23 additions & 1 deletion fq.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@

#define FQ_PROTO_CMD_MODE 0xcc50cafe
#define FQ_PROTO_DATA_MODE 0xcc50face
#define FQ_PROTO_PEER_MODE 0xcc50fade

#define FQ_PROTO_ERROR 0xeeee
#define FQ_PROTO_AUTH_CMD 0xaaaa
Expand Down Expand Up @@ -65,6 +66,15 @@ extern void fq_msg_deref(fq_msg *);
extern void fq_msg_route(fq_msg *, const void *key, int klen);
extern void fq_msg_id(fq_msg *, fq_msgid *id);

typedef struct buffered_msg_reader buffered_msg_reader;

extern buffered_msg_reader *fq_buffered_msg_reader_alloc(int fd, int peermode);
extern void fq_buffered_msg_reader_free(buffered_msg_reader *f);
extern int
fq_buffered_msg_read(buffered_msg_reader *f,
void (*f_msg_handler)(void *, fq_rk *, fq_msg *),
void *);

/* frame */
/*
* 1 x uint8_t<net> hops
Expand All @@ -80,7 +90,7 @@ extern void fq_msg_id(fq_msg *, fq_msgid *id);
typedef struct fq_conn_s *fq_client;

extern int
fq_client_init(fq_client *, void (*)(const char *));
fq_client_init(fq_client *, int peermode, void (*)(const char *));

extern int
fq_client_creds(fq_client,
Expand Down Expand Up @@ -120,6 +130,18 @@ extern int
extern int
fq_read_long_cmd(int fd, int *len, void **buf);

/* This function returns 0 on success, -1 on failure or a positive
* integer indicating that a partial write as happened.
* The initial call should be made with off = 0, if a positive
* value is returned, a subsequent call should be made with
* off = (off + return value).
* The caller must be able to keep track of an accumulated offset
* in the event that several invocations are required to send the
* message.
*/
extern int
fq_client_write_msg(int fd, int peermode, fq_msg *m, size_t off);

extern int
fq_debug_fl(const char *file, int line, const char *fmt, ...)
__printflike(3, 4);
Expand Down
73 changes: 8 additions & 65 deletions fq_client.c
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ struct fq_conn_s {
int cmd_hb_needed;
unsigned short cmd_hb_ms;
hrtime_t cmd_hb_last;
int peermode;
int data_fd;
pthread_t worker;
pthread_t data_worker;
Expand Down Expand Up @@ -181,7 +182,8 @@ fq_client_do_auth(fq_conn_s *conn_s) {
static int
fq_client_data_connect_internal(fq_conn_s *conn_s) {
int flags;
uint32_t cmd = htonl(FQ_PROTO_DATA_MODE);
uint32_t cmd = htonl(conn_s->peermode ? FQ_PROTO_PEER_MODE
: FQ_PROTO_DATA_MODE);
/* We don't support data connections when the cmd connection is down */
if(conn_s->cmd_fd < 0) return -1;

Expand Down Expand Up @@ -243,67 +245,6 @@ fq_client_connect_internal(fq_conn_s *conn_s) {
return -1;
}

/* frame */
/*
* 1 x <nstring> exchange
* 1 x fq_rk<nstring> route
* 1 x uint32_t<net> payload_len
* 1 x data
*/
static int
fq_client_write_msg(int fd, fq_msg *m, size_t off) {
struct iovec pv[7];
int rv, i, writev_start = 0;
size_t expect;
unsigned char exchange_len = m->exchange.len;
unsigned char route_len = m->route.len;
uint32_t data_len = htonl(m->payload_len);

expect = 1 + m->exchange.len + 1 + m->route.len +
sizeof(m->sender_msgid) +
sizeof(data_len) + m->payload_len;
assert(off < expect);
expect -= off;
pv[0].iov_len = 1;
pv[0].iov_base = &exchange_len;
pv[1].iov_len = m->exchange.len;
pv[1].iov_base = m->exchange.name;
pv[2].iov_len = 1;
pv[2].iov_base = &route_len;
pv[3].iov_len = m->route.len;
pv[3].iov_base = m->route.name;
pv[4].iov_len = sizeof(m->sender_msgid);
pv[4].iov_base = &m->sender_msgid;
pv[5].iov_len = sizeof(data_len);
pv[5].iov_base = &data_len;
pv[6].iov_len = m->payload_len;
pv[6].iov_base = m->payload;
if(off > 0) {
for(i=0;i<7;i++) {
if(off >= pv[i].iov_len) {
off -= pv[i].iov_len;
writev_start++;
}
else {
pv[i].iov_len -= off;
pv[i].iov_base = ((unsigned char *)pv[i].iov_base) + off;
off = 0;
break;
}
}
}
rv = writev(fd, pv+writev_start, 7-writev_start);
#ifdef DEBUG
fq_debug("writev(%d bytes [%d data]) -> %d\n",
(int)expect, (int)m->payload_len, rv);
#endif
if(rv != (int)expect) {
return rv;
}
if(rv == 0) return -1;
return 0;
}

static void
fq_data_worker_loop(fq_conn_s *conn_s) {
while(conn_s->cmd_fd >= 0 && conn_s->stop == 0) {
Expand All @@ -319,8 +260,8 @@ fq_data_worker_loop(fq_conn_s *conn_s) {
#ifdef DEBUG
fq_debug("dequeue message to submit to server\n");
#endif
write_rv = fq_client_write_msg(conn_s->data_fd, conn_s->tosend,
conn_s->tosend_offset);
write_rv = fq_client_write_msg(conn_s->data_fd, conn_s->peermode,
conn_s->tosend, conn_s->tosend_offset);
if(write_rv > 0) {
conn_s->tosend_offset += write_rv;
break;
Expand Down Expand Up @@ -486,12 +427,14 @@ fq_conn_worker(void *u) {
}

int
fq_client_init(fq_client *conn_ptr, void (*logger)(const char *)) {
fq_client_init(fq_client *conn_ptr, int peermode,
void (*logger)(const char *)) {
fq_conn_s *conn_s;
conn_s = *conn_ptr = calloc(1, sizeof(*conn_s));
if(!conn_s) return -1;
/* make the sockets as disconnected */
conn_s->cmd_fd = conn_s->data_fd = -1;
conn_s->peermode = peermode;
conn_s->errorlog = logger;
return 0;
}
Expand Down
Loading

0 comments on commit 1a39073

Please sign in to comment.