Skip to content

Commit

Permalink
Merge pull request #331 from vshlapakov/feature-producer-retries
Browse files Browse the repository at this point in the history
Async producer retries for failed messages
  • Loading branch information
dpkp committed Jun 4, 2015
2 parents 67424a2 + 7d6f3f5 commit 474aeaa
Show file tree
Hide file tree
Showing 6 changed files with 296 additions and 29 deletions.
26 changes: 26 additions & 0 deletions kafka/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,11 @@
KafkaMessage = namedtuple("KafkaMessage",
["topic", "partition", "offset", "key", "value"])

# Define retry policy for async producer
# Limit value: int >= 0, 0 means no retries
RetryOptions = namedtuple("RetryOptions",
["limit", "backoff_ms", "retry_on_timeouts"])


#################
# Exceptions #
Expand Down Expand Up @@ -205,6 +210,12 @@ class KafkaConfigurationError(KafkaError):
pass


class AsyncProducerQueueFull(KafkaError):
def __init__(self, failed_msgs, *args):
super(AsyncProducerQueueFull, self).__init__(*args)
self.failed_msgs = failed_msgs


def _iter_broker_errors():
for name, obj in inspect.getmembers(sys.modules[__name__]):
if inspect.isclass(obj) and issubclass(obj, BrokerResponseError) and obj != BrokerResponseError:
Expand All @@ -218,3 +229,18 @@ def check_error(response):
if response.error:
error_class = kafka_errors.get(response.error, UnknownError)
raise error_class(response)


RETRY_BACKOFF_ERROR_TYPES = (
KafkaUnavailableError, LeaderNotAvailableError,
ConnectionError, FailedPayloadsError
)


RETRY_REFRESH_ERROR_TYPES = (
NotLeaderForPartitionError, UnknownTopicOrPartitionError,
LeaderNotAvailableError, ConnectionError
)


RETRY_ERROR_TYPES = RETRY_BACKOFF_ERROR_TYPES + RETRY_REFRESH_ERROR_TYPES
120 changes: 98 additions & 22 deletions kafka/producer/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,18 +5,23 @@
import time

try:
from queue import Empty, Queue
from queue import Empty, Full, Queue
except ImportError:
from Queue import Empty, Queue
from Queue import Empty, Full, Queue
from collections import defaultdict

from threading import Thread, Event

import six

from kafka.common import (
ProduceRequest, TopicAndPartition, UnsupportedCodecError
ProduceRequest, TopicAndPartition, RetryOptions,
kafka_errors, UnsupportedCodecError, FailedPayloadsError,
RequestTimedOutError, AsyncProducerQueueFull, UnknownError
)
from kafka.common import (
RETRY_ERROR_TYPES, RETRY_BACKOFF_ERROR_TYPES, RETRY_REFRESH_ERROR_TYPES)

from kafka.protocol import CODEC_NONE, ALL_CODECS, create_message_set
from kafka.util import kafka_bytestring

Expand All @@ -25,21 +30,33 @@
BATCH_SEND_DEFAULT_INTERVAL = 20
BATCH_SEND_MSG_COUNT = 20

# unlimited
ASYNC_QUEUE_MAXSIZE = 0
ASYNC_QUEUE_PUT_TIMEOUT = 0
# no retries by default
ASYNC_RETRY_LIMIT = 0
ASYNC_RETRY_BACKOFF_MS = 0
ASYNC_RETRY_ON_TIMEOUTS = False

STOP_ASYNC_PRODUCER = -1


def _send_upstream(queue, client, codec, batch_time, batch_size,
req_acks, ack_timeout, stop_event):
req_acks, ack_timeout, retry_options, stop_event):
"""
Listen on the queue for a specified number of messages or till
a specified timeout and send them upstream to the brokers in one
request
"""
stop = False
reqs = {}
client.reinit()

while not stop_event.is_set():
timeout = batch_time
count = batch_size

# it's a simplification: we're comparing message sets and
# messages: each set can contain [1..batch_size] messages
count = batch_size - len(reqs)
send_at = time.time() + timeout
msgset = defaultdict(list)

Expand All @@ -48,7 +65,6 @@ def _send_upstream(queue, client, codec, batch_time, batch_size,
while count > 0 and timeout >= 0:
try:
topic_partition, msg, key = queue.get(timeout=timeout)

except Empty:
break

Expand All @@ -63,20 +79,60 @@ def _send_upstream(queue, client, codec, batch_time, batch_size,
msgset[topic_partition].append((msg, key))

# Send collected requests upstream
reqs = []
for topic_partition, msg in msgset.items():
messages = create_message_set(msg, codec, key)
req = ProduceRequest(topic_partition.topic,
topic_partition.partition,
messages)
reqs.append(req)
tuple(messages))
reqs[req] = 0

if not reqs:
continue

reqs_to_retry, error_cls = [], None
do_backoff, do_refresh = False, False

def _handle_error(error_cls, reqs, all_retries):
if ((error_cls == RequestTimedOutError and
retry_options.retry_on_timeouts) or
error_cls in RETRY_ERROR_TYPES):
all_retries += reqs
if error_cls in RETRY_BACKOFF_ERROR_TYPES:
do_backoff = True
if error_cls in RETRY_REFRESH_ERROR_TYPES:
do_refresh = True

try:
client.send_produce_request(reqs,
acks=req_acks,
timeout=ack_timeout)
except Exception:
log.exception("Unable to send message")
reply = client.send_produce_request(reqs.keys(),
acks=req_acks,
timeout=ack_timeout,
fail_on_error=False)
for i, response in enumerate(reply):
if isinstance(response, FailedPayloadsError):
_handle_error(FailedPayloadsError, response.failed_payloads, reqs_to_retry)
elif isinstance(response, ProduceResponse) and response.error:
error_cls = kafka_errors.get(response.error, UnknownError)
_handle_error(error_cls, [reqs.keys()[i]], reqs_to_retry)

except Exception as ex:
error_cls = kafka_errors.get(type(ex), UnknownError)
_handle_error(error_cls, reqs.keys(), reqs_to_retry)

if not reqs_to_retry:
reqs = {}
continue

# doing backoff before next retry
if do_backoff and retry_options.backoff_ms:
log.info("Doing backoff for %s(ms)." % retry_options.backoff_ms)
time.sleep(float(retry_options.backoff_ms) / 1000)

# refresh topic metadata before next retry
if do_refresh:
client.load_metadata_for_topics()

reqs = dict((key, count + 1) for (key, count) in reqs.items()
if key in reqs_to_retry and count < retry_options.limit)


class Producer(object):
Expand Down Expand Up @@ -111,12 +167,18 @@ def __init__(self, client, async=False,
codec=None,
batch_send=False,
batch_send_every_n=BATCH_SEND_MSG_COUNT,
batch_send_every_t=BATCH_SEND_DEFAULT_INTERVAL):
batch_send_every_t=BATCH_SEND_DEFAULT_INTERVAL,
async_retry_limit=ASYNC_RETRY_LIMIT,
async_retry_backoff_ms=ASYNC_RETRY_BACKOFF_MS,
async_retry_on_timeouts=ASYNC_RETRY_ON_TIMEOUTS,
async_queue_maxsize=ASYNC_QUEUE_MAXSIZE,
async_queue_put_timeout=ASYNC_QUEUE_PUT_TIMEOUT):

if batch_send:
async = True
assert batch_send_every_n > 0
assert batch_send_every_t > 0
assert async_queue_maxsize >= 0
else:
batch_send_every_n = 1
batch_send_every_t = 3600
Expand All @@ -135,10 +197,13 @@ def __init__(self, client, async=False,
self.codec = codec

if self.async:
log.warning("async producer does not guarantee message delivery!")
log.warning("Current implementation does not retry Failed messages")
log.warning("Use at your own risk! (or help improve with a PR!)")
self.queue = Queue() # Messages are sent through this queue
# Messages are sent through this queue
self.queue = Queue(async_queue_maxsize)
self.async_queue_put_timeout = async_queue_put_timeout
async_retry_options = RetryOptions(
limit=async_retry_limit,
backoff_ms=async_retry_backoff_ms,
retry_on_timeouts=async_retry_on_timeouts)
self.thread_stop_event = Event()
self.thread = Thread(target=_send_upstream,
args=(self.queue,
Expand All @@ -148,6 +213,7 @@ def __init__(self, client, async=False,
batch_send_every_n,
self.req_acks,
self.ack_timeout,
async_retry_options,
self.thread_stop_event))

# Thread will die if main thread exits
Expand Down Expand Up @@ -199,8 +265,18 @@ def _send_messages(self, topic, partition, *msg, **kwargs):
raise TypeError("the key must be type bytes")

if self.async:
for m in msg:
self.queue.put((TopicAndPartition(topic, partition), m, key))
for idx, m in enumerate(msg):
try:
item = (TopicAndPartition(topic, partition), m, key)
if self.async_queue_put_timeout == 0:
self.queue.put_nowait(item)
else:
self.queue.put(item, True, self.async_queue_put_timeout)
except Full:
raise AsyncProducerQueueFull(
msg[idx:],
'Producer async queue overfilled. '
'Current queue size %d.' % self.queue.qsize())
resp = []
else:
messages = create_message_set([(m, key) for m in msg], self.codec, key)
Expand Down
17 changes: 14 additions & 3 deletions kafka/producer/keyed.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@

from .base import (
Producer, BATCH_SEND_DEFAULT_INTERVAL,
BATCH_SEND_MSG_COUNT
BATCH_SEND_MSG_COUNT, ASYNC_QUEUE_MAXSIZE, ASYNC_QUEUE_PUT_TIMEOUT,
ASYNC_RETRY_LIMIT, ASYNC_RETRY_BACKOFF_MS, ASYNC_RETRY_ON_TIMEOUTS
)

log = logging.getLogger("kafka")
Expand Down Expand Up @@ -37,7 +38,12 @@ def __init__(self, client, partitioner=None, async=False,
codec=None,
batch_send=False,
batch_send_every_n=BATCH_SEND_MSG_COUNT,
batch_send_every_t=BATCH_SEND_DEFAULT_INTERVAL):
batch_send_every_t=BATCH_SEND_DEFAULT_INTERVAL,
async_retry_limit=ASYNC_RETRY_LIMIT,
async_retry_backoff_ms=ASYNC_RETRY_BACKOFF_MS,
async_retry_on_timeouts=ASYNC_RETRY_ON_TIMEOUTS,
async_queue_maxsize=ASYNC_QUEUE_MAXSIZE,
async_queue_put_timeout=ASYNC_QUEUE_PUT_TIMEOUT):
if not partitioner:
partitioner = HashedPartitioner
self.partitioner_class = partitioner
Expand All @@ -46,7 +52,12 @@ def __init__(self, client, partitioner=None, async=False,
super(KeyedProducer, self).__init__(client, async, req_acks,
ack_timeout, codec, batch_send,
batch_send_every_n,
batch_send_every_t)
batch_send_every_t,
async_retry_limit,
async_retry_backoff_ms,
async_retry_on_timeouts,
async_queue_maxsize,
async_queue_put_timeout)

def _next_partition(self, topic, key):
if topic not in self.partitioners:
Expand Down
17 changes: 14 additions & 3 deletions kafka/producer/simple.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,8 @@

from .base import (
Producer, BATCH_SEND_DEFAULT_INTERVAL,
BATCH_SEND_MSG_COUNT
BATCH_SEND_MSG_COUNT, ASYNC_QUEUE_MAXSIZE, ASYNC_QUEUE_PUT_TIMEOUT,
ASYNC_RETRY_LIMIT, ASYNC_RETRY_BACKOFF_MS, ASYNC_RETRY_ON_TIMEOUTS
)

log = logging.getLogger("kafka")
Expand Down Expand Up @@ -45,13 +46,23 @@ def __init__(self, client, async=False,
batch_send=False,
batch_send_every_n=BATCH_SEND_MSG_COUNT,
batch_send_every_t=BATCH_SEND_DEFAULT_INTERVAL,
random_start=True):
random_start=True,
async_retry_limit=ASYNC_RETRY_LIMIT,
async_retry_backoff_ms=ASYNC_RETRY_BACKOFF_MS,
async_retry_on_timeouts=ASYNC_RETRY_ON_TIMEOUTS,
async_queue_maxsize=ASYNC_QUEUE_MAXSIZE,
async_queue_put_timeout=ASYNC_QUEUE_PUT_TIMEOUT):
self.partition_cycles = {}
self.random_start = random_start
super(SimpleProducer, self).__init__(client, async, req_acks,
ack_timeout, codec, batch_send,
batch_send_every_n,
batch_send_every_t)
batch_send_every_t,
async_retry_limit,
async_retry_backoff_ms,
async_retry_on_timeouts,
async_queue_maxsize,
async_queue_put_timeout)

def _next_partition(self, topic):
if topic not in self.partition_cycles:
Expand Down

0 comments on commit 474aeaa

Please sign in to comment.