Skip to content

Commit

Permalink
zstored_acquire/release_downstream_conn() pass make tests
Browse files Browse the repository at this point in the history
They now successfully use the thread->conn_hash to cache downstream
connections.  The conn_hash is key'ed by host_ident string, and the
value is a linked list of available (and fully connected) downstream
connections.

The logic for having downstreams waiting for downstream connections,
however, is still not integrated.

Change-Id: I3fe9f0528c150cefc75e5d4c6d39050efa77294b
Reviewed-on: http://review.northscale.com/2256
Reviewed-by: Steve Yen <steve.yen@gmail.com>
Tested-by: Steve Yen <steve.yen@gmail.com>
  • Loading branch information
steveyen committed Sep 8, 2010
1 parent 33c6f1d commit 64890cf
Show file tree
Hide file tree
Showing 2 changed files with 81 additions and 71 deletions.
149 changes: 79 additions & 70 deletions cproxy.c
Expand Up @@ -57,9 +57,6 @@ void format_host_ident(char *buf, int buf_len,
mcs_server_st *msst, mcs_server_st *msst,
enum protocol host_protocol); enum protocol host_protocol);


conn *cproxy_downstream_conn_for_host_ident(char *host_ident, LIBEVENT_THREAD *thread,
enum protocol host_protocol);

static bool set_hostinfo(char *host, bool is_tcp, struct addrinfo **ai_out); static bool set_hostinfo(char *host, bool is_tcp, struct addrinfo **ai_out);


// Function tables. // Function tables.
Expand Down Expand Up @@ -530,7 +527,7 @@ void cproxy_on_close_downstream_conn(conn *c) {
if (d == NULL) { if (d == NULL) {
// TODO: See if we need to remove the downstream conn from the // TODO: See if we need to remove the downstream conn from the
// thread-based free pool. This shouldn't happen, but we // thread-based free pool. This shouldn't happen, but we
// should then figure out an assert() here. // should then figure out how to put an assert() here.
// //
return; return;
} }
Expand Down Expand Up @@ -1137,9 +1134,12 @@ bool cproxy_check_downstream_config(downstream *d) {
} }


// Returns -1 if the connections aren't fully assigned and ready. // Returns -1 if the connections aren't fully assigned and ready.
// In that case, the downstream was enqueued by this function to wait // In that case, the downstream has to wait for a downstream connection
// for relevant downstream connections to be assigned (and to not be in // to get out of the conn_connecting state.
// the conn_connecting state). //
// TODO: The downstream connection might leave the conn_connecting state
// with an error (unable to connect). Need to handle that case.
// Perhaps consider a sentinel conn object that's sfd of -1.
// //
// Also, in the -1 result case, the d->upstream_conn should remain in // Also, in the -1 result case, the d->upstream_conn should remain in
// conn_pause state. // conn_pause state.
Expand Down Expand Up @@ -1171,10 +1171,6 @@ int cproxy_connect_downstream(downstream *d, LIBEVENT_THREAD *thread) {


// Connect to downstream servers, if not already. // Connect to downstream servers, if not already.
// //
// TODO: Should call zstored_acquire_connection() here,
// and return -1 if a required downstream conn isn't ready,
// and the downstream struct d is enqueued.
//
// TODO: Need to hash by key on single-key requests to // TODO: Need to hash by key on single-key requests to
// figure out what server we want, and handle broadcast // figure out what server we want, and handle broadcast
// downstream conn assignment correctly. // downstream conn assignment correctly.
Expand Down Expand Up @@ -1217,8 +1213,8 @@ conn *cproxy_connect_downstream_conn(downstream *d,


if (settings.verbose > 2) { if (settings.verbose > 2) {
moxi_log_write("cproxy_connect_downstream_conn %s:%d\n", moxi_log_write("cproxy_connect_downstream_conn %s:%d\n",
mcs_server_st_hostname(msst), mcs_server_st_hostname(msst),
mcs_server_st_port(msst)); mcs_server_st_port(msst));
} }


uint64_t start = 0; uint64_t start = 0;
Expand Down Expand Up @@ -2547,6 +2543,8 @@ void cproxy_upstream_state_change(conn *c, enum conn_states next_state) {
// ------------------------------------------------- // -------------------------------------------------


void cproxy_on_connect_downstream_conn(conn *c) { void cproxy_on_connect_downstream_conn(conn *c) {
// TODO: Need to revisit this callback.
//
assert(c != NULL); assert(c != NULL);
assert(c->host_ident); assert(c->host_ident);


Expand Down Expand Up @@ -2701,7 +2699,33 @@ conn *zstored_acquire_downstream_conn(downstream *d,
assert(mcs_server_st_port(msst) > 0); assert(mcs_server_st_port(msst) > 0);
assert(mcs_server_st_fd(msst) == -1); assert(mcs_server_st_fd(msst) == -1);


return cproxy_connect_downstream_conn(d, thread, msst, behavior); genhash_t *conn_hash = thread->conn_hash;
assert(conn_hash != NULL);

char host_ident_buf[300];
format_host_ident(host_ident_buf, sizeof(host_ident_buf), msst,
behavior->downstream_protocol);

conn *dc = (conn *) genhash_find(conn_hash, host_ident_buf);
if (dc != NULL) {
assert(strcmp(host_ident_buf, dc->host_ident) == 0);

if (dc->next != NULL) {
genhash_update(conn_hash, host_ident_buf, dc->next);
dc->next = NULL;
} else {
genhash_delete(conn_hash, host_ident_buf);
}

return dc;
}

dc = cproxy_connect_downstream_conn(d, thread, msst, behavior);
if (dc != NULL) {
dc->host_ident = strdup(host_ident_buf);
}

return dc;


#ifdef TODO_FIGURE_OUT_WHERE_THESE_FIELDS_WILL_LIVE #ifdef TODO_FIGURE_OUT_WHERE_THESE_FIELDS_WILL_LIVE
int status = -1; int status = -1;
Expand Down Expand Up @@ -2745,22 +2769,58 @@ conn *zstored_acquire_downstream_conn(downstream *d,
} }
#endif #endif


#ifdef TODO_NEED_TO_ADD_DC_TO_POOL_MAYBE
if (dc == NULL) {
dc = cproxy_create_downstream_conn(host_ident, thread, host_protocol);
if (dc != NULL) {
genhash_store(conn_hash, dc->host_ident, dc);

// Add this downstream conn to the thread local
// downstream list.
// take STATS_LOCK here since we traverse this from cproxy_stats
STATS_LOCK();
proxy_td *ptd = &thread->ptd;
d->next = NULL;
if (ptd->downstream_tail != NULL) {
ptd->downstream_tail->next = dc;
}
ptd->downstream_tail = dc;
if (ptd->downstream_head == NULL) {
ptd->downstream_head = dc;
}
STATS_UNLOCK();
}
}

return NULL; return NULL;
#endif
} }


// new fn by jsh // new fn by jsh
void zstored_release_downstream_conn(conn *dc, bool closing) { void zstored_release_downstream_conn(conn *dc, bool closing) {
cproxy_close_conn(dc);

return;

assert(dc != NULL); assert(dc != NULL);
assert(dc->next == NULL);
assert(dc->thread != NULL);
assert(dc->host_ident != NULL); assert(dc->host_ident != NULL);


if (settings.verbose > 2) { if (settings.verbose > 2) {
moxi_log_write("%d: release_downstream_conn (%d)", dc->sfd, closing); moxi_log_write("%d: release_downstream_conn (%d)", dc->sfd, closing);
} }


genhash_t *conn_hash = dc->thread->conn_hash;
assert(conn_hash != NULL);

conn *dc_head = (conn *) genhash_find(conn_hash, dc->host_ident);
if (dc_head != NULL) {
assert(strcmp(dc->host_ident, dc_head->host_ident) == 0);

dc->next = dc_head;
}

genhash_update(conn_hash, dc->host_ident, dc);

return;

#ifdef TOOD_FIGURE_OUT_IF_WE_STILL_HAVE_DC_PEER #ifdef TOOD_FIGURE_OUT_IF_WE_STILL_HAVE_DC_PEER
// Delink upstream conn. // Delink upstream conn.
// //
Expand Down Expand Up @@ -2788,14 +2848,14 @@ void zstored_release_downstream_conn(conn *dc, bool closing) {


dc->peer = NULL; dc->peer = NULL;
dc->upstream_suffix = NULL; // No free(), expecting a static string. dc->upstream_suffix = NULL; // No free(), expecting a static string.
#endif


downstream *d = dc->extra; downstream *d = dc->extra;
if (d == NULL) { if (d == NULL) {
return; return;
} }


dc->extra = NULL; dc->extra = NULL;
#endif


#ifdef TODO_FIGURE_OUT_HOW_TO_WAKE_UP_WAITING_DOWNSTREAM #ifdef TODO_FIGURE_OUT_HOW_TO_WAKE_UP_WAITING_DOWNSTREAM
if (!closing) { if (!closing) {
Expand Down Expand Up @@ -2852,57 +2912,6 @@ void zstored_release_downstream_conn(conn *dc, bool closing) {
#endif #endif
} }


conn *cproxy_downstream_conn_for_msst(proxy_td *ptd,
LIBEVENT_THREAD *thread,
mcs_server_st *msst,
enum protocol host_protocol) {
genhash_t *conn_hash = thread->conn_hash;
assert(conn_hash != NULL);

char host_ident_buf[300];
format_host_ident(host_ident_buf, sizeof(host_ident_buf), msst, host_protocol);

conn *dc = (conn *) genhash_find(conn_hash, host_ident_buf);
if (dc != NULL) {
assert(strcmp(host_ident_buf, dc->host_ident) == 0);

if (dc->next != NULL) {
genhash_update(conn_hash, host_ident_buf, dc->next);
dc->next = NULL;
} else {
genhash_delete(conn_hash, host_ident_buf);
}

return dc;
}

if (dc == NULL) {
#ifdef TODO_NEED_TO_ADD_DC_TO_POOL_MAYBE
dc = cproxy_create_downstream_conn(host_ident, thread, host_protocol);
if (dc != NULL) {
genhash_store(conn_hash, dc->host_ident, dc);

// Add this downstream conn to the thread local
// downstream list.
// take STATS_LOCK here since we traverse this from cproxy_stats
STATS_LOCK();
proxy_td *ptd = &thread->ptd;
d->next = NULL;
if (ptd->downstream_tail != NULL) {
ptd->downstream_tail->next = dc;
}
ptd->downstream_tail = dc;
if (ptd->downstream_head == NULL) {
ptd->downstream_head = dc;
}
STATS_UNLOCK();
}
#endif
}

return dc;
}

void format_host_ident(char *buf, int buf_len, void format_host_ident(char *buf, int buf_len,
mcs_server_st *msst, mcs_server_st *msst,
enum protocol host_protocol) { enum protocol host_protocol) {
Expand Down
3 changes: 2 additions & 1 deletion thread.c
Expand Up @@ -14,6 +14,7 @@


#define ITEMS_PER_ALLOC 64 #define ITEMS_PER_ALLOC 64


extern struct hash_ops strhash_ops;
extern struct hash_ops skeyhash_ops; extern struct hash_ops skeyhash_ops;


/* An item in the connection queue. */ /* An item in the connection queue. */
Expand Down Expand Up @@ -240,7 +241,7 @@ static void setup_thread(LIBEVENT_THREAD *me) {
exit(EXIT_FAILURE); exit(EXIT_FAILURE);
} }


me->conn_hash = genhash_init(512, skeyhash_ops); me->conn_hash = genhash_init(512, strhash_ops);
if (me->conn_hash == NULL) { if (me->conn_hash == NULL) {
moxi_log_write("Failed to create connection hash\n"); moxi_log_write("Failed to create connection hash\n");
exit(EXIT_FAILURE); exit(EXIT_FAILURE);
Expand Down

0 comments on commit 64890cf

Please sign in to comment.