From 718e5fb66da5dca449aa31d305b8867fba4f783c Mon Sep 17 00:00:00 2001 From: Oliver Jowett Date: Sun, 19 Jul 2015 10:57:37 +0000 Subject: [PATCH 1/3] Treat KafkaUnavailableError like other errors. --- kafka/client.py | 15 ++++++++++----- 1 file changed, 10 insertions(+), 5 deletions(-) diff --git a/kafka/client.py b/kafka/client.py index 817c62152..dbd986359 100644 --- a/kafka/client.py +++ b/kafka/client.py @@ -162,11 +162,16 @@ def _send_broker_aware_request(self, payloads, encoder_fn, decoder_fn): payloads_by_broker = collections.defaultdict(list) for payload in payloads: - leader = self._get_leader_for_partition(payload.topic, - payload.partition) - - payloads_by_broker[leader].append(payload) - brokers_for_payloads.append(leader) + try: + leader = self._get_leader_for_partition(payload.topic, + payload.partition) + payloads_by_broker[leader].append(payload) + brokers_for_payloads.append(leader) + except KafkaUnavailableError as e: + log.warning('KafkaUnavailableError attempting to send request ' + 'on topic %s partition %d', payload.topic, payload.partition) + topic_partition = (payload.topic, payload.partition) + responses[topic_partition] = FailedPayloadsErrors(payload) # For each broker, send the list of request payloads # and collect the responses and errors From 3376ed1cef3e29877f773017117d90192ccf9a5e Mon Sep 17 00:00:00 2001 From: Oliver Jowett Date: Mon, 20 Jul 2015 20:25:02 +0000 Subject: [PATCH 2/3] Errors -> Error typo --- kafka/client.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/kafka/client.py b/kafka/client.py index dbd986359..87b51c3df 100644 --- a/kafka/client.py +++ b/kafka/client.py @@ -171,7 +171,7 @@ def _send_broker_aware_request(self, payloads, encoder_fn, decoder_fn): log.warning('KafkaUnavailableError attempting to send request ' 'on topic %s partition %d', payload.topic, payload.partition) topic_partition = (payload.topic, payload.partition) - responses[topic_partition] = FailedPayloadsErrors(payload) + responses[topic_partition] = FailedPayloadsError(payload) # For each broker, send the list of request payloads # and collect the responses and errors From 77e5180a377197c8157a19d5603ad2653c238aa3 Mon Sep 17 00:00:00 2001 From: Oliver Jowett Date: Fri, 24 Jul 2015 10:53:01 +0000 Subject: [PATCH 3/3] Init responses before we use it. --- kafka/client.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/kafka/client.py b/kafka/client.py index 87b51c3df..9846acf3c 100644 --- a/kafka/client.py +++ b/kafka/client.py @@ -161,6 +161,7 @@ def _send_broker_aware_request(self, payloads, encoder_fn, decoder_fn): brokers_for_payloads = [] payloads_by_broker = collections.defaultdict(list) + responses = {} for payload in payloads: try: leader = self._get_leader_for_partition(payload.topic, @@ -175,7 +176,6 @@ def _send_broker_aware_request(self, payloads, encoder_fn, decoder_fn): # For each broker, send the list of request payloads # and collect the responses and errors - responses = {} broker_failures = [] for broker, payloads in payloads_by_broker.items(): requestId = self._next_id()