From d327e91bb5a9911a7ace206467b3250e0eaa65a9 Mon Sep 17 00:00:00 2001 From: xufeiming Date: Wed, 31 Oct 2018 13:56:57 +0800 Subject: [PATCH 1/3] MOD: Fix ZOOKEEPER-706 in c client when a lot watchers reconnect the server --- .../zookeeper-client-c/src/zookeeper.c | 98 ++++++++++++++----- 1 file changed, 71 insertions(+), 27 deletions(-) diff --git a/zookeeper-client/zookeeper-client-c/src/zookeeper.c b/zookeeper-client/zookeeper-client-c/src/zookeeper.c index ab40ae99586..635bbbf9109 100644 --- a/zookeeper-client/zookeeper-client-c/src/zookeeper.c +++ b/zookeeper-client/zookeeper-client-c/src/zookeeper.c @@ -169,6 +169,16 @@ struct ACL_vector ZOO_OPEN_ACL_UNSAFE = { 1, _OPEN_ACL_UNSAFE_ACL}; struct ACL_vector ZOO_READ_ACL_UNSAFE = { 1, _READ_ACL_UNSAFE_ACL}; struct ACL_vector ZOO_CREATOR_ALL_ACL = { 1, _CREATOR_ALL_ACL_ACL}; +/* ZOOKEEPER-706: If a session has a large number of watches set then + * attempting to re-establish those watches after a connection loss may + * fail due to the SetWatches request exceeding the server's configured + * jute.maxBuffer value. To avoid this we instead split the watch + * re-establishement across multiple SetWatches calls. This constant + * controls the size of each call. It is set to 128kB to be conservative + * with respect to the server's 1MB default for jute.maxBuffer. + */ +const int SET_WATCHES_MAX_LENGTH = 128 * 1024; + #define COMPLETION_WATCH -1 #define COMPLETION_VOID 0 #define COMPLETION_STAT 1 @@ -1909,39 +1919,73 @@ static void free_key_list(char **list, int count) static int send_set_watches(zhandle_t *zh) { - struct oarchive *oa; - struct RequestHeader h = {SET_WATCHES_XID, ZOO_SETWATCHES_OP}; - struct SetWatches req; int rc; - - req.relativeZxid = zh->last_zxid; - req.dataWatches.data = collect_keys(zh->active_node_watchers, (int*)&req.dataWatches.count); - req.existWatches.data = collect_keys(zh->active_exist_watchers, (int*)&req.existWatches.count); - req.childWatches.data = collect_keys(zh->active_child_watchers, (int*)&req.childWatches.count); - + int data_watch_count, exist_watch_count, child_watch_count; + char** data_watch_data = collect_keys(zh->active_node_watchers, &data_watch_count); + char** exist_watch_data = collect_keys(zh->active_exist_watchers, &exist_watch_count); + char** child_watch_data = collect_keys(zh->active_child_watchers, &child_watch_count); // return if there are no pending watches - if (!req.dataWatches.count && !req.existWatches.count && - !req.childWatches.count) { - free_key_list(req.dataWatches.data, req.dataWatches.count); - free_key_list(req.existWatches.data, req.existWatches.count); - free_key_list(req.childWatches.data, req.childWatches.count); + if (!data_watch_count && !exist_watch_count && + !child_watch_count) { + free_key_list(data_watch_data, data_watch_count); + free_key_list(exist_watch_data, exist_watch_count); + free_key_list(child_watch_data, child_watch_count); return ZOK; } - - oa = create_buffer_oarchive(); - rc = serialize_RequestHeader(oa, "header", &h); - rc = rc < 0 ? rc : serialize_SetWatches(oa, "req", &req); - /* add this buffer to the head of the send queue */ - rc = rc < 0 ? rc : queue_front_buffer_bytes(&zh->to_send, get_buffer(oa), + int data_watch_index = 0, exist_watch_index = 0, child_watch_index = 0; + while (data_watch_index < data_watch_count + || exist_watch_index < exist_watch_count + || child_watch_index < child_watch_count) { + struct RequestHeader h = { STRUCT_INITIALIZER(xid , SET_WATCHES_XID), STRUCT_INITIALIZER(type , ZOO_SETWATCHES_OP)}; + struct SetWatches req; + req.relativeZxid = zh->last_zxid; + req.dataWatches.count = 0; + req.existWatches.count = 0; + req.childWatches.count = 0; + req.dataWatches.data = data_watch_data + data_watch_index; + req.existWatches.data = exist_watch_data + exist_watch_index; + req.childWatches.data = child_watch_data + child_watch_index; + int batch_length = 0; + while (batch_length < SET_WATCHES_MAX_LENGTH) { + int length = 0; + if (data_watch_index < data_watch_count) { + req.dataWatches.count++; + length = strlen(data_watch_data[data_watch_index]); + data_watch_index++; + } else if (exist_watch_index < exist_watch_count) { + req.existWatches.count++; + length = strlen(exist_watch_data[exist_watch_index]); + exist_watch_index++; + } else if (child_watch_index < child_watch_count) { + req.childWatches.count++; + length = strlen(child_watch_data[child_watch_index]); + child_watch_index++; + } else { + break; + } + batch_length += length; + } + struct oarchive *oa; + oa = create_buffer_oarchive(); + rc = serialize_RequestHeader(oa, "header", &h); + rc = rc < 0 ? rc : serialize_SetWatches(oa, "req", &req); + /* add this buffer to the head of the send queue */ + rc = rc < 0 ? rc : queue_front_buffer_bytes(&zh->to_send, get_buffer(oa), get_buffer_len(oa)); - /* We queued the buffer, so don't free it */ - close_buffer_oarchive(&oa, 0); - free_key_list(req.dataWatches.data, req.dataWatches.count); - free_key_list(req.existWatches.data, req.existWatches.count); - free_key_list(req.childWatches.data, req.childWatches.count); - LOG_DEBUG(LOGCALLBACK(zh), "Sending set watches request to %s",zoo_get_current_server(zh)); - return (rc < 0)?ZMARSHALLINGERROR:ZOK; + close_buffer_oarchive(&oa, 0); + LOG_DEBUG(("Sending set watches request to %s",format_current_endpoint_info(zh))); + if (rc < 0) { + free_key_list(data_watch_data, data_watch_count); + free_key_list(exist_watch_data, exist_watch_count); + free_key_list(child_watch_data, child_watch_count); + return ZMARSHALLINGERROR; + } + } + free_key_list(data_watch_data, data_watch_count); + free_key_list(exist_watch_data, exist_watch_count); + free_key_list(child_watch_data, child_watch_count); + return ZOK; } static int serialize_prime_connect(struct connect_req *req, char* buffer){ From 95ff88c6ab7a97bb267433ead524b6cdbfc8ef16 Mon Sep 17 00:00:00 2001 From: xufeiming Date: Thu, 1 Nov 2018 11:38:30 +0800 Subject: [PATCH 2/3] MOD: Fix ZOOKEEPER-706 and some compile error --- .../zookeeper-client-c/src/zookeeper.c | 16 +++++++++++----- 1 file changed, 11 insertions(+), 5 deletions(-) diff --git a/zookeeper-client/zookeeper-client-c/src/zookeeper.c b/zookeeper-client/zookeeper-client-c/src/zookeeper.c index 635bbbf9109..7781404b9d5 100644 --- a/zookeeper-client/zookeeper-client-c/src/zookeeper.c +++ b/zookeeper-client/zookeeper-client-c/src/zookeeper.c @@ -1920,6 +1920,12 @@ static void free_key_list(char **list, int count) static int send_set_watches(zhandle_t *zh) { int rc; + int data_watch_index; + int exist_watch_index; + int child_watch_index; + int batch_length; + struct oarchive *oa; + struct RequestHeader h = {SET_WATCHES_XID, ZOO_SETWATCHES_OP}; int data_watch_count, exist_watch_count, child_watch_count; char** data_watch_data = collect_keys(zh->active_node_watchers, &data_watch_count); char** exist_watch_data = collect_keys(zh->active_exist_watchers, &exist_watch_count); @@ -1933,11 +1939,12 @@ static int send_set_watches(zhandle_t *zh) return ZOK; } - int data_watch_index = 0, exist_watch_index = 0, child_watch_index = 0; + data_watch_index = 0; + exist_watch_index = 0; + child_watch_index = 0; while (data_watch_index < data_watch_count || exist_watch_index < exist_watch_count || child_watch_index < child_watch_count) { - struct RequestHeader h = { STRUCT_INITIALIZER(xid , SET_WATCHES_XID), STRUCT_INITIALIZER(type , ZOO_SETWATCHES_OP)}; struct SetWatches req; req.relativeZxid = zh->last_zxid; req.dataWatches.count = 0; @@ -1946,7 +1953,7 @@ static int send_set_watches(zhandle_t *zh) req.dataWatches.data = data_watch_data + data_watch_index; req.existWatches.data = exist_watch_data + exist_watch_index; req.childWatches.data = child_watch_data + child_watch_index; - int batch_length = 0; + batch_length = 0; while (batch_length < SET_WATCHES_MAX_LENGTH) { int length = 0; if (data_watch_index < data_watch_count) { @@ -1966,7 +1973,6 @@ static int send_set_watches(zhandle_t *zh) } batch_length += length; } - struct oarchive *oa; oa = create_buffer_oarchive(); rc = serialize_RequestHeader(oa, "header", &h); rc = rc < 0 ? rc : serialize_SetWatches(oa, "req", &req); @@ -1974,7 +1980,7 @@ static int send_set_watches(zhandle_t *zh) rc = rc < 0 ? rc : queue_front_buffer_bytes(&zh->to_send, get_buffer(oa), get_buffer_len(oa)); close_buffer_oarchive(&oa, 0); - LOG_DEBUG(("Sending set watches request to %s",format_current_endpoint_info(zh))); + LOG_DEBUG(LOGCALLBACK(zh), "Sending set watches request to %s",format_current_endpoint_info(zh)); if (rc < 0) { free_key_list(data_watch_data, data_watch_count); free_key_list(exist_watch_data, exist_watch_count); From 49b23e461ba4dd5c17bec36d4ee2d285ceb78cb4 Mon Sep 17 00:00:00 2001 From: xufeiming Date: Thu, 1 Nov 2018 14:18:53 +0800 Subject: [PATCH 3/3] MOD: Fix ZOOKEEPER-706 and fix error --- zookeeper-client/zookeeper-client-c/src/zookeeper.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/zookeeper-client/zookeeper-client-c/src/zookeeper.c b/zookeeper-client/zookeeper-client-c/src/zookeeper.c index 7781404b9d5..13b350e33a8 100644 --- a/zookeeper-client/zookeeper-client-c/src/zookeeper.c +++ b/zookeeper-client/zookeeper-client-c/src/zookeeper.c @@ -1980,7 +1980,7 @@ static int send_set_watches(zhandle_t *zh) rc = rc < 0 ? rc : queue_front_buffer_bytes(&zh->to_send, get_buffer(oa), get_buffer_len(oa)); close_buffer_oarchive(&oa, 0); - LOG_DEBUG(LOGCALLBACK(zh), "Sending set watches request to %s",format_current_endpoint_info(zh)); + LOG_DEBUG(LOGCALLBACK(zh), "Sending set watches request to %s",zoo_get_current_server(zh)); if (rc < 0) { free_key_list(data_watch_data, data_watch_count); free_key_list(exist_watch_data, exist_watch_count);