Skip to content
This repository has been archived by the owner on Mar 24, 2021. It is now read-only.

Commit

Permalink
Merge pull request #769 from Parsely/bugfix/sync_producer_hang
Browse files Browse the repository at this point in the history
don't let sync producer hang when delivery report is not received
  • Loading branch information
Emmett J. Butler committed Feb 26, 2018
2 parents 5b860fd + afffc30 commit a4b6793
Show file tree
Hide file tree
Showing 2 changed files with 56 additions and 16 deletions.
57 changes: 42 additions & 15 deletions pykafka/producer.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import struct
import sys
import threading
import time
import weakref

from six import reraise
Expand All @@ -35,6 +36,7 @@
InvalidMessageSize,
MessageSizeTooLarge,
NotLeaderForPartition,
ProduceFailureError,
ProducerQueueFullError,
ProducerStoppedException,
SocketDisconnectedError,
Expand Down Expand Up @@ -73,6 +75,7 @@ def __init__(self,
max_request_size=1000012,
sync=False,
delivery_reports=False,
pending_timeout_ms=5 * 1000,
auto_start=True,
serializer=None):
"""Instantiate a new AsyncProducer
Expand Down Expand Up @@ -102,7 +105,7 @@ def __init__(self,
before a request is considered complete
:type required_acks: int
:param ack_timeout_ms: The amount of time (in milliseconds) to wait for
acknowledgment of a produce request.
acknowledgment of a produce request on the server.
:type ack_timeout_ms: int
:param max_queued_messages: The maximum number of messages the producer
can have waiting to be sent to the broker. If messages are sent
Expand Down Expand Up @@ -151,6 +154,14 @@ def __init__(self,
`get_delivery_report()` is not called regularly with this setting enabled,
memory usage will grow unbounded. This setting is ignored when `sync=True`.
:type delivery_reports: bool
:param pending_timeout_ms: The amount of time (in milliseconds) to wait for
delivery reports to be returned from the broker during a `produce()` call.
Also, the time in ms to wait during a `stop()` call for all messages to be
marked as delivered. -1 indicates that these calls should block indefinitely.
Differs from `ack_timeout_ms` in that `ack_timeout_ms` is a value sent to the
broker to control the broker-side timeout, while `pending_timeout_ms` is used
internally by pykafka and not sent to the broker.
:type pending_timeout_ms:
:param auto_start: Whether the producer should begin communicating
with kafka after __init__ is complete. If false, communication
can be started with `start()`.
Expand Down Expand Up @@ -190,6 +201,7 @@ def __init__(self,
self._delivery_reports = (_DeliveryReportQueue(self._cluster.handler)
if delivery_reports or self._synchronous
else _DeliveryReportNone())
self._pending_timeout_ms = pending_timeout_ms
self._auto_start = auto_start
self._serializer = serializer
self._running = False
Expand Down Expand Up @@ -311,6 +323,14 @@ def stop_owned_brokers():
for queue_reader in queue_readers:
queue_reader.join()

def _produce_has_timed_out(self, start_time):
"""Indicates whether enough time has passed since start_time for a `produce()`
call to timeout
"""
if self._pending_timeout_ms == -1:
return False
return time.time() * 1000 - start_time > self._pending_timeout_ms

def produce(self, message, partition_key=None, timestamp=None):
"""Produce a message.
Expand Down Expand Up @@ -355,15 +375,20 @@ def produce(self, message, partition_key=None, timestamp=None):
self._produce(msg)

if self._synchronous:
while True:
req_time = time.time() * 1000
reported_msg = None
while not self._produce_has_timed_out(req_time):
self._raise_worker_exceptions()
self._cluster.handler.sleep()
try:
reported_msg, exc = self.get_delivery_report(timeout=1)
break
except Empty:
continue
assert reported_msg is msg
except ValueError:
raise ProduceFailureError("Error retrieving delivery report")
if reported_msg is not msg:
raise ProduceFailureError("Delivery report not received after timeout")
if exc is not None:
raise exc
self._raise_worker_exceptions()
Expand Down Expand Up @@ -402,6 +427,12 @@ def _produce(self, message):
else:
success = False

def _mark_as_delivered(self, owned_broker, message_batch, req):
owned_broker.increment_messages_pending(-1 * len(message_batch))
req.delivered += len(message_batch)
for msg in message_batch:
self._delivery_reports.put(msg)

def _send_request(self, message_batch, owned_broker):
"""Send the produce request to the broker and handle the response.
Expand Down Expand Up @@ -431,16 +462,10 @@ def _get_partition_msgs(partition_id, req):
if p_id == partition_id
)

def mark_as_delivered(message_batch):
owned_broker.increment_messages_pending(-1 * len(message_batch))
req.delivered += len(message_batch)
for msg in message_batch:
self._delivery_reports.put(msg)

try:
response = owned_broker.broker.produce_messages(req)
if self._required_acks == 0: # and thus, `response` is None
mark_as_delivered(message_batch)
self._mark_as_delivered(owned_broker, message_batch, req)
return

# Kafka either atomically appends or rejects whole MessageSets, so
Expand All @@ -453,7 +478,7 @@ def mark_as_delivered(message_batch):
messages = req.msets[topic][partition].messages
for i, message in enumerate(messages):
message.offset = presponse.offset + i
mark_as_delivered(messages)
self._mark_as_delivered(owned_broker, messages, req)
continue # All's well
if presponse.err == NotLeaderForPartition.ERROR_CODE:
# Update cluster metadata to get new leader
Expand Down Expand Up @@ -501,13 +526,15 @@ def mark_as_delivered(message_batch):
self._produce(msg)

def _wait_all(self):
"""Block until all pending messages are sent
"""Block until all pending messages are sent or until pending_timeout_ms
"Pending" messages are those that have been used in calls to `produce`
and have not yet been dequeued and sent to the broker
and have not yet been acknowledged in a response from the broker
"""
log.info("Blocking until all messages are sent")
while any(q.message_is_pending() for q in itervalues(self._owned_brokers)):
log.info("Blocking until all messages are sent or until pending_timeout_ms")
start_time = time.time() * 1000
while any(q.message_is_pending() for q in itervalues(self._owned_brokers)) and \
not self._produce_has_timed_out(start_time):
self._cluster.handler.sleep(.3)
self._raise_worker_exceptions()

Expand Down
15 changes: 14 additions & 1 deletion tests/pykafka/test_producer.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from __future__ import division

import mock
import os
import platform
import pytest
Expand All @@ -22,7 +23,8 @@

from pykafka import KafkaClient
from pykafka.common import OffsetType
from pykafka.exceptions import MessageSizeTooLarge, ProducerQueueFullError
from pykafka.exceptions import (MessageSizeTooLarge, ProducerQueueFullError,
ProduceFailureError)
from pykafka.partitioners import hashing_partitioner
from pykafka.protocol import Message
from pykafka.test.utils import get_cluster, stop_cluster, retry
Expand Down Expand Up @@ -102,6 +104,17 @@ def stub_send_request(self, message_batch, owned_broker):
with self.assertRaises(ZeroDivisionError):
p.produce(b"test")

def test_sync_produce_doesnt_hang(self):
producer = self._get_producer(sync=True)

def stub_mark(w, x, y, z):
return None
# simulate delivery report being lost
producer._mark_as_delivered = types.MethodType(stub_mark, producer)
producer._delivery_reports = mock.MagicMock()
with self.assertRaises(ProduceFailureError):
producer.produce(b"test")

def test_produce_hashing_partitioner(self):
# unique bytes, just to be absolutely sure we're not fetching data
# produced in a previous test
Expand Down

0 comments on commit a4b6793

Please sign in to comment.