Skip to content
This repository has been archived by the owner on Mar 24, 2021. It is now read-only.

Commit

Permalink
Merge pull request #799 from Parsely/enhancement/emptyqueue_noflush
Browse files Browse the repository at this point in the history
stop/start the linger/flush cycle based on queue emptiness
  • Loading branch information
Emmett J. Butler committed May 8, 2018
2 parents 7cf21a6 + 54558bd commit 11a3ad3
Showing 1 changed file with 60 additions and 1 deletion.
61 changes: 60 additions & 1 deletion pykafka/producer.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,9 @@ def __init__(self,
max_queued_messages=100000,
min_queued_messages=70000,
linger_ms=5 * 1000,
# XXX 0 default here mirrors previous behavior - should default have a
# nonzero wait to save CPU cycles?
queue_empty_timeout_ms=0,
block_on_queue_full=True,
max_request_size=1000012,
sync=False,
Expand Down Expand Up @@ -126,8 +129,23 @@ def __init__(self,
this setting. However, if we have fewer than this many messages
accumulated for this partition we will 'linger' for the specified
time waiting for more records to show up. linger_ms=0 indicates no
lingering.
lingering - messages are sent as fast as possible after they are
`produce()`d.
:type linger_ms: int
:param queue_empty_timeout_ms: The amount of time in milliseconds for which
the producer's worker threads should block when no messages are available
to flush to brokers. After each `linger_ms` interval, the worker thread
checks for the presence of at least one message in its queue. If there is
not at least one, it enters an "empty wait" period for
`queue_empty_timeout_ms` before starting a new `linger_ms` wait loop. If
`queue_empty_timeout_ms` is 0, this "empty wait" period is a noop, and
flushes will continue to be attempted at intervals of `linger_ms`, even
when the queue is empty. If `queue_empty_timeout_ms` is a positive integer,
this "empty wait" period will last for at most that long, but it ends earlier
if a message is `produce()`d before that time. If `queue_empty_timeout_ms` is
-1, the "empty wait" period can only be stopped (and the worker thread killed)
by a call to either `produce()` or `stop()` - it will never time out.
:type queue_empty_timeout_ms: int
:param block_on_queue_full: When the producer's message queue for a
broker contains max_queued_messages, we must either stop accepting
new messages (block) or throw an error. If True, this setting
Expand Down Expand Up @@ -193,6 +211,8 @@ def __init__(self,
self._min_queued_messages = max(1, valid_int(min_queued_messages)
if not sync else 1)
self._linger_ms = valid_int(linger_ms, allow_zero=True)
self._queue_empty_timeout_ms = valid_int(queue_empty_timeout_ms,
allow_zero=True, allow_negative=True)
self._block_on_queue_full = block_on_queue_full
self._max_request_size = valid_int(max_request_size)
self._synchronous = sync
Expand Down Expand Up @@ -570,6 +590,7 @@ def __init__(self, producer, broker, auto_start=True):
self.broker = broker
self.lock = self.producer._cluster.handler.RLock()
self.flush_ready = self.producer._cluster.handler.Event()
self.has_message = self.producer._cluster.handler.Event()
self.slot_available = self.producer._cluster.handler.Event()
self.queue = deque()
self.messages_pending = 0
Expand Down Expand Up @@ -603,6 +624,11 @@ def queue_reader():
queue_reader, name=name)

def stop(self):
# explicitly set has_message to kill any infinite waits triggered by
# queue_empty_timeout_ms=-1
with self.lock:
if not self.has_message.is_set():
self.has_message.set()
self.running = False

def increment_messages_pending(self, amnt):
Expand Down Expand Up @@ -630,6 +656,8 @@ def enqueue(self, message):
if len(self.queue) >= self.producer._min_queued_messages:
if not self.flush_ready.is_set():
self.flush_ready.set()
if not self.has_message.is_set():
self.has_message.set()

def flush(self, linger_ms, max_request_size, release_pending=False, wait=True):
"""Pop messages from the end of the queue
Expand All @@ -649,6 +677,17 @@ def flush(self, linger_ms, max_request_size, release_pending=False, wait=True):
attempt a flush immediately without waiting
:type wait: bool
"""
# Q: why not simply wait for flush_ready here? do we need a separate Event for
# has_message?
# A: If we're blocking on flush_ready with an empty queue, a single event arriving
# does not mean we're ready to flush. We could flush whenever the
# current linger_ms interval ends, but the better way is to pause the linger
# loop when the queue is empty, restarting it when a message is added. Doing
# this without two Events would require _wait_for_flush_ready to be modal,
# returning a value indicating whether a flush should happen or whether it
# returned for the sole purpose of unblocking the linger loop. This design is
# cleaner.
self._wait_for_has_message(self.producer._queue_empty_timeout_ms)
if wait:
self._wait_for_flush_ready(linger_ms)
with self.lock:
Expand Down Expand Up @@ -715,6 +754,26 @@ def _wait_for_flush_ready(self, linger_ms):
if linger_ms > 0:
self.flush_ready.wait((linger_ms / 1000))

def _wait_for_has_message(self, timeout_ms):
"""Block until the queue has at least one slot containing a message
:param timeout_ms: The amount of time in milliseconds to wait for a message
to be enqueued. -1 indicates infinite waiting; in this case a thread waiting
on this call can only be killed by a call to `stop()`.
:type timeout_ms: int
"""
if len(self.queue) == 0 and self.running:
with self.lock:
if len(self.queue) == 0 and self.running:
self.has_message.clear()
if timeout_ms != -1:
self.has_message.wait(timeout_ms / 1000)
else:
# infinite wait that doesn't break under gevent
while not self.has_message.is_set():
self.producer._cluster.handler.sleep()
self.has_message.wait(5)

def _wait_for_slot_available(self):
"""Block until the queue has at least one slot not containing a message"""
if len(self.queue) >= self.producer._max_queued_messages:
Expand Down

0 comments on commit 11a3ad3

Please sign in to comment.