diff --git a/kafka/admin/client.py b/kafka/admin/client.py index 63a0f3bb7..c8f9bfa99 100644 --- a/kafka/admin/client.py +++ b/kafka/admin/client.py @@ -272,7 +272,7 @@ def _refresh_controller_id(self): version = self._matching_api_version(MetadataRequest) if 1 <= version <= 6: request = MetadataRequest[version]() - future = self._send_request_to_node(self._client.least_loaded_node(), request) + future = self._send_request_to_least_loaded_node(request) self._wait_for_futures([future]) @@ -310,7 +310,7 @@ def _find_coordinator_id_send_request(self, group_id): raise NotImplementedError( "Support for GroupCoordinatorRequest_v{} has not yet been added to KafkaAdminClient." .format(version)) - return self._send_request_to_node(self._client.least_loaded_node(), request) + return self._send_request_to_least_loaded_node(request) def _find_coordinator_id_process_response(self, response): """Process a FindCoordinatorResponse. @@ -355,9 +355,36 @@ def _find_coordinator_ids(self, group_ids): } return groups_coordinators + def _send_request_to_least_loaded_node(self, request): + """Send a Kafka protocol message to the least loaded broker. + + Returns a future that may be polled for status and results. + + :param request: The message to send. + :return: A future object that may be polled for status and results. + :exception: The exception if the message could not be sent. + """ + node_id = self._client.least_loaded_node() + while not self._client.ready(node_id): + # poll until the connection to broker is ready, otherwise send() + # will fail with NodeNotReadyError + self._client.poll() + + # node_id is not part of the cluster anymore, choose a new broker + # to connect to + if self._client.cluster.broker_metadata(node_id) is None: + node_id = self._client.least_loaded_node() + + return self._client.send(node_id, request) + def _send_request_to_node(self, node_id, request): """Send a Kafka protocol message to a specific broker. + .. note:: + + This function will enter in an infinite loop if `node_id` is + removed from the cluster. + Returns a future that may be polled for status and results. :param node_id: The broker id to which to send the message. @@ -382,10 +409,23 @@ def _send_request_to_controller(self, request): tries = 2 # in case our cached self._controller_id is outdated while tries: tries -= 1 - future = self._send_request_to_node(self._controller_id, request) + future = self._client.send(self._controller_id, request) self._wait_for_futures([future]) + if future.exception is not None: + log.error( + "Sending request to controller_id %s failed with %s", + self._controller_id, + future.exception, + ) + is_outdated_controler = ( + self._client.cluster.broker_metadata(self._controller_id) is None + ) + if is_outdated_controler: + self._refresh_controller_id() + continue + response = future.value # In Java, the error field name is inconsistent: # - CreateTopicsResponse / CreatePartitionsResponse uses topic_errors @@ -506,10 +546,7 @@ def _get_cluster_metadata(self, topics=None, auto_topic_creation=False): allow_auto_topic_creation=auto_topic_creation ) - future = self._send_request_to_node( - self._client.least_loaded_node(), - request - ) + future = self._send_request_to_least_loaded_node(request) self._wait_for_futures([future]) return future.value @@ -601,7 +638,7 @@ def describe_acls(self, acl_filter): .format(version) ) - future = self._send_request_to_node(self._client.least_loaded_node(), request) + future = self._send_request_to_least_loaded_node(request) self._wait_for_futures([future]) response = future.value @@ -692,7 +729,7 @@ def create_acls(self, acls): .format(version) ) - future = self._send_request_to_node(self._client.least_loaded_node(), request) + future = self._send_request_to_least_loaded_node(request) self._wait_for_futures([future]) response = future.value @@ -786,7 +823,7 @@ def delete_acls(self, acl_filters): .format(version) ) - future = self._send_request_to_node(self._client.least_loaded_node(), request) + future = self._send_request_to_least_loaded_node(request) self._wait_for_futures([future]) response = future.value @@ -846,8 +883,7 @@ def describe_configs(self, config_resources, include_synonyms=False): )) if len(topic_resources) > 0: - futures.append(self._send_request_to_node( - self._client.least_loaded_node(), + futures.append(self._send_request_to_least_loaded_node( DescribeConfigsRequest[version](resources=topic_resources) )) @@ -867,8 +903,7 @@ def describe_configs(self, config_resources, include_synonyms=False): )) if len(topic_resources) > 0: - futures.append(self._send_request_to_node( - self._client.least_loaded_node(), + futures.append(self._send_request_to_least_loaded_node( DescribeConfigsRequest[version](resources=topic_resources, include_synonyms=include_synonyms) )) else: @@ -915,7 +950,7 @@ def alter_configs(self, config_resources): # // a single request that may be sent to any broker. # # So this is currently broken as it always sends to the least_loaded_node() - future = self._send_request_to_node(self._client.least_loaded_node(), request) + future = self._send_request_to_least_loaded_node(request) self._wait_for_futures([future]) response = future.value