Skip to content

Commit

Permalink
- expose a few more methods publicly
Browse files Browse the repository at this point in the history
- flush before waiting for acks for the common case that readers will
  wait until the window size has filled completely before doing a bulk
  ack.
  • Loading branch information
jordansissel committed Aug 11, 2012
1 parent 052c033 commit afda3f7
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 3 deletions.
16 changes: 13 additions & 3 deletions proto.c
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,8 @@ static int lumberjack_tcp_connect(struct lumberjack *lumberjack);
static int lumberjack_ssl_handshake(struct lumberjack *lumberjack);
static int lumberjack_connected(struct lumberjack *lumberjack);
static int lumberjack_wait_for_ack(struct lumberjack *lumberjack);
static int lumberjack_ensure_connected(struct lumberjack *lumberjack);
static int lumberjack_retransmit_all(struct lumberjack *lumberjack);
static int lumberjack_write_window_size(struct lumberjack *lumberjack);
static int lumberjack_flush(struct lumberjack *lumberjack);

static int lumberjack_init_done = 0;

Expand Down Expand Up @@ -161,7 +159,7 @@ int lumberjack_connect(struct lumberjack *lumberjack) {
return 0;
} /* lumberjack_connect */

static int lumberjack_ensure_connected(struct lumberjack *lumberjack) {
int lumberjack_ensure_connected(struct lumberjack *lumberjack) {
int rc;
struct backoff sleeper;
backoff_init(&sleeper, &MIN_SLEEP, &MAX_SLEEP);
Expand Down Expand Up @@ -301,6 +299,15 @@ int lumberjack_flush(struct lumberjack *lumberjack) {
size_t length = str_length(lumberjack->io_buffer);
/* Zlib */
int rc;

if (length == 0) {
return 0; /* nothing to do */
}

if (!lumberjack_connected(lumberjack)) {
return -1;
}

size_t compressed_length = lumberjack->compression_buffer->data_size;
rc = compress2((Bytef *)str_data(lumberjack->compression_buffer), &compressed_length,
(Bytef *)str_data(lumberjack->io_buffer), length, 1);
Expand Down Expand Up @@ -436,6 +443,9 @@ int lumberjack_send_data(struct lumberjack *lumberjack, const char *payload,
lumberjack_ensure_connected(lumberjack);
/* if the ring is currently full, we need to wait for acks. */
while (ring_is_full(lumberjack->ring)) {
/* flush any writes waiting for buffering/compression */
lumberjack_flush(lumberjack);

/* read at least one ACK */
lumberjack_wait_for_ack(lumberjack);
}
Expand Down
2 changes: 2 additions & 0 deletions proto.h
Original file line number Diff line number Diff line change
Expand Up @@ -61,12 +61,14 @@ int lumberjack_send_data(struct lumberjack *lumberjack, const char *payload,
size_t payload_len);
//void (*free_func)(void *payload, void *hint()));

int lumberjack_flush(struct lumberjack *lumberjack);
/* TODO(sissel): permit inspection of currently-unacknowledged events? */

//int lumberjack_send_kv(struct *kv map);

/* blocks until all messages in the ring have been acknowledged */
void lumberjack_disconnect(struct lumberjack *lumberjack);
int lumberjack_ensure_connected(struct lumberjack *lumberjack);

/* Pack a key-value list according to the lumberjack protocol */
struct str *lumberjack_kv_pack(struct kv *kv_list, size_t kv_count);
Expand Down

0 comments on commit afda3f7

Please sign in to comment.