Skip to content

Commit

Permalink
revert drain code to attempt more general expand/shrink code
Browse files Browse the repository at this point in the history
  • Loading branch information
steveyen committed Jun 18, 2009
1 parent 36cc25b commit 568c436
Show file tree
Hide file tree
Showing 6 changed files with 16 additions and 73 deletions.
27 changes: 6 additions & 21 deletions agent_config.c
Expand Up @@ -286,7 +286,6 @@ void cproxy_on_new_config(void *data0, void *data1) {
// bucket=buck1
// usr=test1
// pwd=password
// drain=svrnameX
// svr-svrnameX
// host=mc2.foo.net
// port=11211
Expand All @@ -296,6 +295,9 @@ void cproxy_on_new_config(void *data0, void *data1) {
// behavior-customer1-b
// wait_queue_timeout=1000
// downstream_max=10
// pool_prev-customer1-b
// svrname1
// svrname3
// pools
// customer1-a
// customer1-b
Expand Down Expand Up @@ -385,11 +387,8 @@ void cproxy_on_new_config(void *data0, void *data1) {
// Parse server-level behaviors, so we'll have an
// array of behaviors, one entry for each server.
//
// The 2nd half of the array is for drain server
// behaviors, if any.
//
proxy_behavior *behaviors =
calloc(s * 2, sizeof(proxy_behavior));
calloc(s, sizeof(proxy_behavior));

if (config_str != NULL &&
behaviors != NULL) {
Expand All @@ -400,22 +399,8 @@ void cproxy_on_new_config(void *data0, void *data1) {
//
behaviors[j] = proxyb;

char **props =
parse_kvs_behavior(kvs, "svr", servers[j],
&behaviors[j]);

// Parse drain server-level behavior that's
// associated with the current main server.
//
for (int d = 0; props && props[d]; d++) {
#ifdef NO_DRAIN_FILL_FOR_NOW
if (strcmp(props[d], "drain=") == 0) {
parse_kvs_behavior(kvs, "svr", props[d],
&behaviors[j + s]);
break;
}
#endif
}
parse_kvs_behavior(kvs, "svr", servers[j],
&behaviors[j]);

// Grow config string for libmemcached.
//
Expand Down
34 changes: 0 additions & 34 deletions cproxy.c
Expand Up @@ -934,40 +934,6 @@ int cproxy_connect_downstream(downstream *d, LIBEVENT_THREAD *thread) {

if (d->downstream_conns[i] != NULL) {
s++;

// Connect to a drain downstream server, if needed.
//
if (d->downstream_conns[i]->next == NULL) {
proxy_behavior *drain_behavior = &d->behaviors[n + i];
if (drain_behavior != NULL &&
drain_behavior->host[0] &&
drain_behavior->port > 0) {
char drain_config[400];

snprintf(drain_config, sizeof(drain_config),
"%s:%u",
drain_behavior->host,
drain_behavior->port);

memcached_st drain_mst = {0};

if (init_memcached_st(&drain_mst, drain_config) == 1) {
conn *drain_conn =
cproxy_connect_downstream_conn(d, thread,
&drain_mst.hosts[0],
drain_behavior);
if (drain_conn != NULL) {
d->downstream_conns[i]->next = drain_conn;

// The drain conn holds onto the fd.
//
drain_mst.hosts[0].fd = -1;
}

memcached_free(&drain_mst);
}
}
}
} else {
memcached_quit_server(&d->mst.hosts[i], 1);
}
Expand Down
4 changes: 1 addition & 3 deletions cproxy.h
Expand Up @@ -156,9 +156,7 @@ struct proxy {
//
proxy_behavior behavior_head; // Proxy-level behavior.
int behaviors_num; // # main server-level (SL) behaviors.
proxy_behavior *behaviors; // Array, size is 2*behaviors_num,
// where the second half is for
// drain server-level behaviors.
proxy_behavior *behaviors; // Array, size is behaviors_num.

// Any thread that accesses the mutable fields should
// first acquire the proxy_lock.
Expand Down
20 changes: 5 additions & 15 deletions cproxy_config.c
Expand Up @@ -286,7 +286,7 @@ int cproxy_init_string(char *cfg_str,
behaviors_num++;

proxy_behavior *behaviors =
calloc(behaviors_num * 2, sizeof(proxy_behavior));
calloc(behaviors_num, sizeof(proxy_behavior));

if (behaviors != NULL) {
for (int i = 0; i < behaviors_num; i++) {
Expand Down Expand Up @@ -476,17 +476,17 @@ void cproxy_parse_behavior_key_val(char *key,
}

/**
* Size of array should be 2*arr_size.
* Size of array should be arr_size.
*/
proxy_behavior *cproxy_copy_behaviors(int arr_size, proxy_behavior *arr) {
proxy_behavior *rv = calloc(arr_size * 2, sizeof(proxy_behavior));
proxy_behavior *rv = calloc(arr_size, sizeof(proxy_behavior));
if (rv != NULL)
memcpy(rv, arr, arr_size * 2 * sizeof(proxy_behavior));
memcpy(rv, arr, arr_size * sizeof(proxy_behavior));
return rv;
}

/**
* Size of array should be 2*size.
* Size of x/y array should be x/y_size.
*/
bool cproxy_equal_behaviors(int x_size, proxy_behavior *x,
int y_size, proxy_behavior *y) {
Expand All @@ -509,16 +509,6 @@ bool cproxy_equal_behaviors(int x_size, proxy_behavior *x,

return false;
}

if (cproxy_equal_behavior(&x[i + x_size], &y[i + x_size]) == false) {
if (settings.verbose > 1) {
fprintf(stderr, "drain behaviors not equal (%d)\n", i);
cproxy_dump_behavior(&x[i + x_size], "dx", 0);
cproxy_dump_behavior(&y[i + x_size], "dy", 0);
}

return false;
}
}

return true;
Expand Down
1 change: 1 addition & 0 deletions cproxy_protocol_a2a.c
Expand Up @@ -27,6 +27,7 @@ void cproxy_init_a2a() {

void cproxy_process_a2a_downstream(conn *c, char *line) {
assert(c != NULL);
assert(c->next == NULL);
assert(c->extra != NULL);
assert(c->cmd == -1);
assert(c->item == NULL);
Expand Down
3 changes: 3 additions & 0 deletions cproxy_protocol_a2b.c
Expand Up @@ -340,6 +340,7 @@ bool a2b_fill_request_token(struct A2BSpec *spec,
void cproxy_process_a2b_downstream(conn *c) {
assert(c != NULL);
assert(c->cmd >= 0);
assert(c->next == NULL);
assert(c->item == NULL);
assert(IS_BINARY(c->protocol));
assert(IS_PROXY(c->protocol));
Expand Down Expand Up @@ -442,6 +443,7 @@ void cproxy_process_a2b_downstream(conn *c) {
void cproxy_process_a2b_downstream_nread(conn *c) {
assert(c != NULL);
assert(c->cmd >= 0);
assert(c->next == NULL);
assert(c->cmd_start != NULL);
assert(IS_BINARY(c->protocol));
assert(IS_PROXY(c->protocol));
Expand Down Expand Up @@ -526,6 +528,7 @@ void cproxy_process_a2b_downstream_nread(conn *c) {
void a2b_process_downstream_response(conn *c) {
assert(c != NULL);
assert(c->cmd >= 0);
assert(c->next == NULL);
assert(c->cmd_start != NULL);
assert(IS_BINARY(c->protocol));
assert(IS_PROXY(c->protocol));
Expand Down

0 comments on commit 568c436

Please sign in to comment.