Skip to content

Commit

Permalink
Check response.error for async producer
Browse files Browse the repository at this point in the history
  • Loading branch information
vshlapakov committed Jun 3, 2015
1 parent 4474a50 commit 7d6f3f5
Show file tree
Hide file tree
Showing 3 changed files with 27 additions and 21 deletions.
43 changes: 24 additions & 19 deletions kafka/producer/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@

from kafka.common import (
ProduceRequest, TopicAndPartition, RetryOptions,
UnsupportedCodecError, FailedPayloadsError,
RequestTimedOutError, AsyncProducerQueueFull
kafka_errors, UnsupportedCodecError, FailedPayloadsError,
RequestTimedOutError, AsyncProducerQueueFull, UnknownError
)
from kafka.common import (
RETRY_ERROR_TYPES, RETRY_BACKOFF_ERROR_TYPES, RETRY_REFRESH_ERROR_TYPES)
Expand Down Expand Up @@ -89,41 +89,46 @@ def _send_upstream(queue, client, codec, batch_time, batch_size,
if not reqs:
continue

reqs_to_retry, error_type = [], None
reqs_to_retry, error_cls = [], None
do_backoff, do_refresh = False, False

def _handle_error(error_cls, reqs, all_retries):
if ((error_cls == RequestTimedOutError and
retry_options.retry_on_timeouts) or
error_cls in RETRY_ERROR_TYPES):
all_retries += reqs
if error_cls in RETRY_BACKOFF_ERROR_TYPES:
do_backoff = True
if error_cls in RETRY_REFRESH_ERROR_TYPES:
do_refresh = True

try:
reply = client.send_produce_request(reqs.keys(),
acks=req_acks,
timeout=ack_timeout,
fail_on_error=False)
reqs_to_retry = [req for broker_responses in reply
for response in broker_responses
for req in response.failed_payloads
if isinstance(response, FailedPayloadsError)]
if reqs_to_retry:
error_type = FailedPayloadsError

except RequestTimedOutError:
error_type = RequestTimedOutError
if retry_options.retry_on_timeouts:
reqs_to_retry = reqs.keys()
for i, response in enumerate(reply):
if isinstance(response, FailedPayloadsError):
_handle_error(FailedPayloadsError, response.failed_payloads, reqs_to_retry)
elif isinstance(response, ProduceResponse) and response.error:
error_cls = kafka_errors.get(response.error, UnknownError)
_handle_error(error_cls, [reqs.keys()[i]], reqs_to_retry)

except Exception as ex:
error_type = type(ex)
if type(ex) in RETRY_ERROR_TYPES:
reqs_to_retry = reqs.keys()
error_cls = kafka_errors.get(type(ex), UnknownError)
_handle_error(error_cls, reqs.keys(), reqs_to_retry)

if not reqs_to_retry:
reqs = {}
continue

# doing backoff before next retry
if error_type in RETRY_BACKOFF_ERROR_TYPES and retry_options.backoff_ms:
if do_backoff and retry_options.backoff_ms:
log.info("Doing backoff for %s(ms)." % retry_options.backoff_ms)
time.sleep(float(retry_options.backoff_ms) / 1000)

# refresh topic metadata before next retry
if error_type in RETRY_REFRESH_ERROR_TYPES:
if do_refresh:
client.load_metadata_for_topics()

reqs = dict((key, count + 1) for (key, count) in reqs.items()
Expand Down
4 changes: 2 additions & 2 deletions test/test_producer.py
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ def test_first_send_failed(self):
def send_side_effect(reqs, *args, **kwargs):
if self.client.is_first_time:
self.client.is_first_time = False
return [[FailedPayloadsError(reqs)]]
return [FailedPayloadsError(reqs)]
return []

self.client.send_produce_request.side_effect = send_side_effect
Expand All @@ -165,7 +165,7 @@ def test_with_limited_retries(self):
self.queue.put((TopicAndPartition("test", i), "msg %i" % i, "key %i" % i))

def send_side_effect(reqs, *args, **kwargs):
return [[FailedPayloadsError(reqs)]]
return [FailedPayloadsError(reqs)]

self.client.send_produce_request.side_effect = send_side_effect

Expand Down
1 change: 1 addition & 0 deletions tox.ini
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ commands =
nosetests {posargs:-v --with-id --id-file={envdir}/.noseids --with-timer --timer-top-n 10 --with-coverage --cover-erase --cover-package kafka}
setenv =
PROJECT_ROOT = {toxinidir}
passenv = KAFKA_VERSION

[testenv:py33]
deps =
Expand Down

0 comments on commit 7d6f3f5

Please sign in to comment.