diff --git a/cproxy.c b/cproxy.c index 03597c79..7fec09cd 100644 --- a/cproxy.c +++ b/cproxy.c @@ -57,9 +57,6 @@ void format_host_ident(char *buf, int buf_len, mcs_server_st *msst, enum protocol host_protocol); -conn *cproxy_downstream_conn_for_host_ident(char *host_ident, LIBEVENT_THREAD *thread, - enum protocol host_protocol); - static bool set_hostinfo(char *host, bool is_tcp, struct addrinfo **ai_out); // Function tables. @@ -530,7 +527,7 @@ void cproxy_on_close_downstream_conn(conn *c) { if (d == NULL) { // TODO: See if we need to remove the downstream conn from the // thread-based free pool. This shouldn't happen, but we - // should then figure out an assert() here. + // should then figure out how to put an assert() here. // return; } @@ -1137,9 +1134,12 @@ bool cproxy_check_downstream_config(downstream *d) { } // Returns -1 if the connections aren't fully assigned and ready. -// In that case, the downstream was enqueued by this function to wait -// for relevant downstream connections to be assigned (and to not be in -// the conn_connecting state). +// In that case, the downstream has to wait for a downstream connection +// to get out of the conn_connecting state. +// +// TODO: The downstream connection might leave the conn_connecting state +// with an error (unable to connect). Need to handle that case. +// Perhaps consider a sentinel conn object that's sfd of -1. // // Also, in the -1 result case, the d->upstream_conn should remain in // conn_pause state. @@ -1171,10 +1171,6 @@ int cproxy_connect_downstream(downstream *d, LIBEVENT_THREAD *thread) { // Connect to downstream servers, if not already. // - // TODO: Should call zstored_acquire_connection() here, - // and return -1 if a required downstream conn isn't ready, - // and the downstream struct d is enqueued. - // // TODO: Need to hash by key on single-key requests to // figure out what server we want, and handle broadcast // downstream conn assignment correctly. @@ -1217,8 +1213,8 @@ conn *cproxy_connect_downstream_conn(downstream *d, if (settings.verbose > 2) { moxi_log_write("cproxy_connect_downstream_conn %s:%d\n", - mcs_server_st_hostname(msst), - mcs_server_st_port(msst)); + mcs_server_st_hostname(msst), + mcs_server_st_port(msst)); } uint64_t start = 0; @@ -2547,6 +2543,8 @@ void cproxy_upstream_state_change(conn *c, enum conn_states next_state) { // ------------------------------------------------- void cproxy_on_connect_downstream_conn(conn *c) { + // TODO: Need to revisit this callback. + // assert(c != NULL); assert(c->host_ident); @@ -2701,7 +2699,33 @@ conn *zstored_acquire_downstream_conn(downstream *d, assert(mcs_server_st_port(msst) > 0); assert(mcs_server_st_fd(msst) == -1); - return cproxy_connect_downstream_conn(d, thread, msst, behavior); + genhash_t *conn_hash = thread->conn_hash; + assert(conn_hash != NULL); + + char host_ident_buf[300]; + format_host_ident(host_ident_buf, sizeof(host_ident_buf), msst, + behavior->downstream_protocol); + + conn *dc = (conn *) genhash_find(conn_hash, host_ident_buf); + if (dc != NULL) { + assert(strcmp(host_ident_buf, dc->host_ident) == 0); + + if (dc->next != NULL) { + genhash_update(conn_hash, host_ident_buf, dc->next); + dc->next = NULL; + } else { + genhash_delete(conn_hash, host_ident_buf); + } + + return dc; + } + + dc = cproxy_connect_downstream_conn(d, thread, msst, behavior); + if (dc != NULL) { + dc->host_ident = strdup(host_ident_buf); + } + + return dc; #ifdef TODO_FIGURE_OUT_WHERE_THESE_FIELDS_WILL_LIVE int status = -1; @@ -2745,22 +2769,58 @@ conn *zstored_acquire_downstream_conn(downstream *d, } #endif +#ifdef TODO_NEED_TO_ADD_DC_TO_POOL_MAYBE + if (dc == NULL) { + dc = cproxy_create_downstream_conn(host_ident, thread, host_protocol); + if (dc != NULL) { + genhash_store(conn_hash, dc->host_ident, dc); + + // Add this downstream conn to the thread local + // downstream list. + // take STATS_LOCK here since we traverse this from cproxy_stats + STATS_LOCK(); + proxy_td *ptd = &thread->ptd; + d->next = NULL; + if (ptd->downstream_tail != NULL) { + ptd->downstream_tail->next = dc; + } + ptd->downstream_tail = dc; + if (ptd->downstream_head == NULL) { + ptd->downstream_head = dc; + } + STATS_UNLOCK(); + } + } + return NULL; +#endif } // new fn by jsh void zstored_release_downstream_conn(conn *dc, bool closing) { - cproxy_close_conn(dc); - - return; - assert(dc != NULL); + assert(dc->next == NULL); + assert(dc->thread != NULL); assert(dc->host_ident != NULL); if (settings.verbose > 2) { moxi_log_write("%d: release_downstream_conn (%d)", dc->sfd, closing); } + genhash_t *conn_hash = dc->thread->conn_hash; + assert(conn_hash != NULL); + + conn *dc_head = (conn *) genhash_find(conn_hash, dc->host_ident); + if (dc_head != NULL) { + assert(strcmp(dc->host_ident, dc_head->host_ident) == 0); + + dc->next = dc_head; + } + + genhash_update(conn_hash, dc->host_ident, dc); + + return; + #ifdef TOOD_FIGURE_OUT_IF_WE_STILL_HAVE_DC_PEER // Delink upstream conn. // @@ -2788,7 +2848,6 @@ void zstored_release_downstream_conn(conn *dc, bool closing) { dc->peer = NULL; dc->upstream_suffix = NULL; // No free(), expecting a static string. -#endif downstream *d = dc->extra; if (d == NULL) { @@ -2796,6 +2855,7 @@ void zstored_release_downstream_conn(conn *dc, bool closing) { } dc->extra = NULL; +#endif #ifdef TODO_FIGURE_OUT_HOW_TO_WAKE_UP_WAITING_DOWNSTREAM if (!closing) { @@ -2852,57 +2912,6 @@ void zstored_release_downstream_conn(conn *dc, bool closing) { #endif } -conn *cproxy_downstream_conn_for_msst(proxy_td *ptd, - LIBEVENT_THREAD *thread, - mcs_server_st *msst, - enum protocol host_protocol) { - genhash_t *conn_hash = thread->conn_hash; - assert(conn_hash != NULL); - - char host_ident_buf[300]; - format_host_ident(host_ident_buf, sizeof(host_ident_buf), msst, host_protocol); - - conn *dc = (conn *) genhash_find(conn_hash, host_ident_buf); - if (dc != NULL) { - assert(strcmp(host_ident_buf, dc->host_ident) == 0); - - if (dc->next != NULL) { - genhash_update(conn_hash, host_ident_buf, dc->next); - dc->next = NULL; - } else { - genhash_delete(conn_hash, host_ident_buf); - } - - return dc; - } - - if (dc == NULL) { -#ifdef TODO_NEED_TO_ADD_DC_TO_POOL_MAYBE - dc = cproxy_create_downstream_conn(host_ident, thread, host_protocol); - if (dc != NULL) { - genhash_store(conn_hash, dc->host_ident, dc); - - // Add this downstream conn to the thread local - // downstream list. - // take STATS_LOCK here since we traverse this from cproxy_stats - STATS_LOCK(); - proxy_td *ptd = &thread->ptd; - d->next = NULL; - if (ptd->downstream_tail != NULL) { - ptd->downstream_tail->next = dc; - } - ptd->downstream_tail = dc; - if (ptd->downstream_head == NULL) { - ptd->downstream_head = dc; - } - STATS_UNLOCK(); - } -#endif - } - - return dc; -} - void format_host_ident(char *buf, int buf_len, mcs_server_st *msst, enum protocol host_protocol) { diff --git a/thread.c b/thread.c index c65a9a92..d3a1bbdf 100644 --- a/thread.c +++ b/thread.c @@ -14,6 +14,7 @@ #define ITEMS_PER_ALLOC 64 +extern struct hash_ops strhash_ops; extern struct hash_ops skeyhash_ops; /* An item in the connection queue. */ @@ -240,7 +241,7 @@ static void setup_thread(LIBEVENT_THREAD *me) { exit(EXIT_FAILURE); } - me->conn_hash = genhash_init(512, skeyhash_ops); + me->conn_hash = genhash_init(512, strhash_ops); if (me->conn_hash == NULL) { moxi_log_write("Failed to create connection hash\n"); exit(EXIT_FAILURE);