From 726066465e5effa6d3b182844b3037d4ded30644 Mon Sep 17 00:00:00 2001 From: "Augusto F. Hack" Date: Thu, 14 Jan 2021 15:18:05 +0100 Subject: [PATCH 1/2] bugfix: fix infinite loop on KafkaAdminClient (#2194) An infinite loop may happen with the following pattern: self._send_request_to_node(self._client.least_loaded_node(), request) The problem happens when `self._client`'s cluster metadata is out-of-date, and the result of `least_loaded_node()` is a node that has been removed from the cluster but the client is unware of it. When this happens `_send_request_to_node` will enter an infinite loop waiting for the chosen node to become available, which won't happen, resulting in an infinite loop. This commit introduces a new method named `_send_request_to_least_loaded_node` which handles the case above. This is done by regularly checking if the target node is available in the cluster metadata, and if not, a new node is chosen. Notes: - This does not yet cover every call site to `_send_request_to_node`, there are some other places were similar race conditions may happen. - The code above does not guarantee that the request itself will be sucessful, since it is still possible for the target node to exit, however, it does remove the infinite loop which can render client code unusable. --- kafka/admin/client.py | 50 +++++++++++++++++++++++++++++++------------ 1 file changed, 36 insertions(+), 14 deletions(-) diff --git a/kafka/admin/client.py b/kafka/admin/client.py index 63a0f3bb7..b51b524ee 100644 --- a/kafka/admin/client.py +++ b/kafka/admin/client.py @@ -272,7 +272,7 @@ def _refresh_controller_id(self): version = self._matching_api_version(MetadataRequest) if 1 <= version <= 6: request = MetadataRequest[version]() - future = self._send_request_to_node(self._client.least_loaded_node(), request) + future = self._send_request_to_least_loaded_node(request) self._wait_for_futures([future]) @@ -310,7 +310,7 @@ def _find_coordinator_id_send_request(self, group_id): raise NotImplementedError( "Support for GroupCoordinatorRequest_v{} has not yet been added to KafkaAdminClient." .format(version)) - return self._send_request_to_node(self._client.least_loaded_node(), request) + return self._send_request_to_least_loaded_node(request) def _find_coordinator_id_process_response(self, response): """Process a FindCoordinatorResponse. @@ -355,9 +355,36 @@ def _find_coordinator_ids(self, group_ids): } return groups_coordinators + def _send_request_to_least_loaded_node(self, request): + """Send a Kafka protocol message to the least loaded broker. + + Returns a future that may be polled for status and results. + + :param request: The message to send. + :return: A future object that may be polled for status and results. + :exception: The exception if the message could not be sent. + """ + node_id = self._client.least_loaded_node() + while not self._client.ready(node_id): + # poll until the connection to broker is ready, otherwise send() + # will fail with NodeNotReadyError + self._client.poll() + + # node_id is not part of the cluster anymore, choose a new broker + # to connect to + if self._client.cluster.broker_metadata(node_id) is None: + node_id = self._client.least_loaded_node() + + return self._client.send(node_id, request) + def _send_request_to_node(self, node_id, request): """Send a Kafka protocol message to a specific broker. + .. note:: + + This function will enter in an infinite loop if `node_id` is + removed from the cluster. + Returns a future that may be polled for status and results. :param node_id: The broker id to which to send the message. @@ -506,10 +533,7 @@ def _get_cluster_metadata(self, topics=None, auto_topic_creation=False): allow_auto_topic_creation=auto_topic_creation ) - future = self._send_request_to_node( - self._client.least_loaded_node(), - request - ) + future = self._send_request_to_least_loaded_node(request) self._wait_for_futures([future]) return future.value @@ -601,7 +625,7 @@ def describe_acls(self, acl_filter): .format(version) ) - future = self._send_request_to_node(self._client.least_loaded_node(), request) + future = self._send_request_to_least_loaded_node(request) self._wait_for_futures([future]) response = future.value @@ -692,7 +716,7 @@ def create_acls(self, acls): .format(version) ) - future = self._send_request_to_node(self._client.least_loaded_node(), request) + future = self._send_request_to_least_loaded_node(request) self._wait_for_futures([future]) response = future.value @@ -786,7 +810,7 @@ def delete_acls(self, acl_filters): .format(version) ) - future = self._send_request_to_node(self._client.least_loaded_node(), request) + future = self._send_request_to_least_loaded_node(request) self._wait_for_futures([future]) response = future.value @@ -846,8 +870,7 @@ def describe_configs(self, config_resources, include_synonyms=False): )) if len(topic_resources) > 0: - futures.append(self._send_request_to_node( - self._client.least_loaded_node(), + futures.append(self._send_request_to_least_loaded_node( DescribeConfigsRequest[version](resources=topic_resources) )) @@ -867,8 +890,7 @@ def describe_configs(self, config_resources, include_synonyms=False): )) if len(topic_resources) > 0: - futures.append(self._send_request_to_node( - self._client.least_loaded_node(), + futures.append(self._send_request_to_least_loaded_node( DescribeConfigsRequest[version](resources=topic_resources, include_synonyms=include_synonyms) )) else: @@ -915,7 +937,7 @@ def alter_configs(self, config_resources): # // a single request that may be sent to any broker. # # So this is currently broken as it always sends to the least_loaded_node() - future = self._send_request_to_node(self._client.least_loaded_node(), request) + future = self._send_request_to_least_loaded_node(request) self._wait_for_futures([future]) response = future.value From 4609d1df92893cd01a3eedf7e74a2089e174795f Mon Sep 17 00:00:00 2001 From: "Augusto F. Hack" Date: Thu, 14 Jan 2021 17:50:28 +0100 Subject: [PATCH 2/2] bugfix: infinite loop when send msgs to controller (#2194) If the value `_controller_id` is out-of-date and the node was removed from the cluster, `_send_request_to_node` would enter an infinite loop. --- kafka/admin/client.py | 15 ++++++++++++++- 1 file changed, 14 insertions(+), 1 deletion(-) diff --git a/kafka/admin/client.py b/kafka/admin/client.py index b51b524ee..c8f9bfa99 100644 --- a/kafka/admin/client.py +++ b/kafka/admin/client.py @@ -409,10 +409,23 @@ def _send_request_to_controller(self, request): tries = 2 # in case our cached self._controller_id is outdated while tries: tries -= 1 - future = self._send_request_to_node(self._controller_id, request) + future = self._client.send(self._controller_id, request) self._wait_for_futures([future]) + if future.exception is not None: + log.error( + "Sending request to controller_id %s failed with %s", + self._controller_id, + future.exception, + ) + is_outdated_controler = ( + self._client.cluster.broker_metadata(self._controller_id) is None + ) + if is_outdated_controler: + self._refresh_controller_id() + continue + response = future.value # In Java, the error field name is inconsistent: # - CreateTopicsResponse / CreatePartitionsResponse uses topic_errors