Skip to content

Commit

Permalink
Using threading.Event to stop async producer thread
Browse files Browse the repository at this point in the history
  • Loading branch information
vshlapakov committed Feb 26, 2015
1 parent 1cce287 commit aedbbb3
Showing 1 changed file with 10 additions and 5 deletions.
15 changes: 10 additions & 5 deletions kafka/producer/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
from Queue import Empty, Queue
from collections import defaultdict

from threading import Thread
from threading import Thread, Event

import six

Expand All @@ -27,15 +27,15 @@


def _send_upstream(queue, client, codec, batch_time, batch_size,
req_acks, ack_timeout):
req_acks, ack_timeout, stop_event):
"""
Listen on the queue for a specified number of messages or till
a specified timeout and send them upstream to the brokers in one
request
"""
stop = False

while not stop:
while not stop_event.is_set():
timeout = batch_time
count = batch_size
send_at = time.time() + timeout
Expand All @@ -52,7 +52,7 @@ def _send_upstream(queue, client, codec, batch_time, batch_size,

# Check if the controller has requested us to stop
if topic_partition == STOP_ASYNC_PRODUCER:
stop = True
stop_event.set()
break

# Adjust the timeout to match the remaining period
Expand Down Expand Up @@ -136,14 +136,16 @@ def __init__(self, client, async=False,
log.warning("Current implementation does not retry Failed messages")
log.warning("Use at your own risk! (or help improve with a PR!)")
self.queue = Queue() # Messages are sent through this queue
self.thread_stop_event = Event()
self.thread = Thread(target=_send_upstream,
args=(self.queue,
self.client.copy(),
self.codec,
batch_send_every_t,
batch_send_every_n,
self.req_acks,
self.ack_timeout))
self.ack_timeout,
self.thread_stop_event))

# Thread will die if main thread exits
self.thread.daemon = True
Expand Down Expand Up @@ -207,3 +209,6 @@ def stop(self, timeout=1):
if self.async:
self.queue.put((STOP_ASYNC_PRODUCER, None, None))
self.thread.join(timeout)

if self.thread.is_alive():
self.thread_stop_event.set()

0 comments on commit aedbbb3

Please sign in to comment.