From 2a809b96338bbaf3d69ff259a4c9b77bdd0a6473 Mon Sep 17 00:00:00 2001 From: "lukas@doubledutch.me" Date: Thu, 28 May 2015 17:42:33 -0700 Subject: [PATCH 1/2] check if connection is available before sending messages using the async producer --- kafka/producer/base.py | 49 ++++++++++++++++++++++++++++++++++-------- 1 file changed, 40 insertions(+), 9 deletions(-) diff --git a/kafka/producer/base.py b/kafka/producer/base.py index 4bd3de49a..6af8a8557 100644 --- a/kafka/producer/base.py +++ b/kafka/producer/base.py @@ -10,7 +10,7 @@ from Queue import Empty, Queue from collections import defaultdict -from threading import Thread, Event +from threading import Thread, Event, Lock import six @@ -29,7 +29,7 @@ def _send_upstream(queue, client, codec, batch_time, batch_size, - req_acks, ack_timeout, stop_event): + req_acks, ack_timeout, stop_event, connection_state_lock, connection_exc): """ Listen on the queue for a specified number of messages or till a specified timeout and send them upstream to the brokers in one @@ -71,12 +71,32 @@ def _send_upstream(queue, client, codec, batch_time, batch_size, messages) reqs.append(req) - try: - client.send_produce_request(reqs, - acks=req_acks, - timeout=ack_timeout) - except Exception: - log.exception("Unable to send message") + delivered = False + attempt = 0 + while not delivered: + try: + client.send_produce_request(reqs, + acks=req_acks, + timeout=ack_timeout) + delivered = True + + # Set connection state as available + connection_state_lock.acquire() + connection_exc = None + connection_state_lock.release() + except Exception as e: + log.exception("Unable to send message") + + # Set connection state as unavailable + connection_state_lock.acquire() + connection_exc = e + connection_state_lock.release() + + # Exponential back-off with min 0.1s, max 12.8s + attempt = min(attempt + 1, 7) + sleep_time = 0.1*(2**attempt) + log.warning("sleeping for {0}s".format(sleep_time)) + time.sleep(sleep_time) class Producer(object): @@ -140,6 +160,8 @@ def __init__(self, client, async=False, log.warning("Use at your own risk! (or help improve with a PR!)") self.queue = Queue() # Messages are sent through this queue self.thread_stop_event = Event() + self.connection_state_lock = Lock() + self.connection_exc = None self.thread = Thread(target=_send_upstream, args=(self.queue, self.client.copy(), @@ -148,7 +170,9 @@ def __init__(self, client, async=False, batch_send_every_n, self.req_acks, self.ack_timeout, - self.thread_stop_event)) + self.thread_stop_event, + self.connection_state_lock, + self.connection_exc)) # Thread will die if main thread exits self.thread.daemon = True @@ -199,6 +223,13 @@ def _send_messages(self, topic, partition, *msg, **kwargs): raise TypeError("the key must be type bytes") if self.async: + # Check if connection is available, otherwise fail to add message + self.connection_state_lock.acquire() + exc = self.connection_exc + self.connection_state_lock.release() + if exc != None: + raise exc + for m in msg: self.queue.put((TopicAndPartition(topic, partition), m, key)) resp = [] From 3c632b6f0c71a2c29b112ef98eccf36286e997e7 Mon Sep 17 00:00:00 2001 From: "lukas@doubledutch.me" Date: Fri, 29 May 2015 11:26:58 -0700 Subject: [PATCH 2/2] wrap value into object so we can modify it --- kafka/producer/base.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/kafka/producer/base.py b/kafka/producer/base.py index 6af8a8557..ded5708bf 100644 --- a/kafka/producer/base.py +++ b/kafka/producer/base.py @@ -82,20 +82,20 @@ def _send_upstream(queue, client, codec, batch_time, batch_size, # Set connection state as available connection_state_lock.acquire() - connection_exc = None + connection_exc[0] = None connection_state_lock.release() except Exception as e: log.exception("Unable to send message") # Set connection state as unavailable connection_state_lock.acquire() - connection_exc = e + connection_exc[0] = e connection_state_lock.release() # Exponential back-off with min 0.1s, max 12.8s attempt = min(attempt + 1, 7) sleep_time = 0.1*(2**attempt) - log.warning("sleeping for {0}s".format(sleep_time)) + log.warning("Sleeping for {0}s before retrying".format(sleep_time)) time.sleep(sleep_time) @@ -161,7 +161,7 @@ def __init__(self, client, async=False, self.queue = Queue() # Messages are sent through this queue self.thread_stop_event = Event() self.connection_state_lock = Lock() - self.connection_exc = None + self.connection_exc = [None] self.thread = Thread(target=_send_upstream, args=(self.queue, self.client.copy(), @@ -225,7 +225,7 @@ def _send_messages(self, topic, partition, *msg, **kwargs): if self.async: # Check if connection is available, otherwise fail to add message self.connection_state_lock.acquire() - exc = self.connection_exc + exc = self.connection_exc[0] self.connection_state_lock.release() if exc != None: raise exc