From 250778453fcece7316615bd62795e9293c62e525 Mon Sep 17 00:00:00 2001 From: Viktor Shlapakov Date: Tue, 24 Feb 2015 21:31:47 +0300 Subject: [PATCH 1/5] Trying to use threading for async batching --- kafka/producer/base.py | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/kafka/producer/base.py b/kafka/producer/base.py index 5b41bc9d2..a9288d926 100644 --- a/kafka/producer/base.py +++ b/kafka/producer/base.py @@ -3,12 +3,14 @@ import logging import time +from Queue import Queue try: from queue import Empty except ImportError: from Queue import Empty from collections import defaultdict -from multiprocessing import Queue, Process + +from threading import Thread import six @@ -140,7 +142,7 @@ def __init__(self, client, async=False, log.warning("Current implementation does not retry Failed messages") log.warning("Use at your own risk! (or help improve with a PR!)") self.queue = Queue() # Messages are sent through this queue - self.proc = Process(target=_send_upstream, + self.proc = Thread(target=_send_upstream, args=(self.queue, self.client.copy(), self.codec, @@ -211,4 +213,4 @@ def stop(self, timeout=1): self.proc.join(timeout) if self.proc.is_alive(): - self.proc.terminate() + raise SystemError("Can't join Kafka async thread") From bc0d5c1e275805d2b207900923ef5c87c45127d9 Mon Sep 17 00:00:00 2001 From: Alexey Borzenkov Date: Thu, 3 Apr 2014 21:39:19 +0400 Subject: [PATCH 2/5] Make KafkaConnection copies usable across threads --- kafka/conn.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/kafka/conn.py b/kafka/conn.py index 30debec06..8142c4595 100644 --- a/kafka/conn.py +++ b/kafka/conn.py @@ -62,6 +62,9 @@ def __init__(self, host, port, timeout=DEFAULT_SOCKET_TIMEOUT_SECONDS): self.reinit() + def __getnewargs__(self): + return (self.host, self.port, self.timeout) + def __repr__(self): return "" % (self.host, self.port) @@ -167,6 +170,7 @@ def copy(self): c.port = copy.copy(self.port) c.timeout = copy.copy(self.timeout) c._sock = None + c._dirty = True return c def close(self): From 1cce28715798cc5dbe84c793e306cd15769afd7c Mon Sep 17 00:00:00 2001 From: Viktor Shlapakov Date: Wed, 25 Feb 2015 10:45:47 +0300 Subject: [PATCH 3/5] Returned original tests, rm dirty flag, name fixes --- kafka/conn.py | 1 - kafka/producer/base.py | 41 ++++++++++++++++----------------------- test/test_conn.py | 44 ++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 61 insertions(+), 25 deletions(-) diff --git a/kafka/conn.py b/kafka/conn.py index 8142c4595..ea55481d9 100644 --- a/kafka/conn.py +++ b/kafka/conn.py @@ -170,7 +170,6 @@ def copy(self): c.port = copy.copy(self.port) c.timeout = copy.copy(self.timeout) c._sock = None - c._dirty = True return c def close(self): diff --git a/kafka/producer/base.py b/kafka/producer/base.py index a9288d926..bb7fd4388 100644 --- a/kafka/producer/base.py +++ b/kafka/producer/base.py @@ -3,11 +3,10 @@ import logging import time -from Queue import Queue try: - from queue import Empty + from queue import Empty, Queue except ImportError: - from Queue import Empty + from Queue import Empty, Queue from collections import defaultdict from threading import Thread @@ -33,13 +32,8 @@ def _send_upstream(queue, client, codec, batch_time, batch_size, Listen on the queue for a specified number of messages or till a specified timeout and send them upstream to the brokers in one request - - NOTE: Ideally, this should have been a method inside the Producer - class. However, multiprocessing module has issues in windows. The - functionality breaks unless this function is kept outside of a class """ stop = False - client.reinit() while not stop: timeout = batch_time @@ -142,18 +136,20 @@ def __init__(self, client, async=False, log.warning("Current implementation does not retry Failed messages") log.warning("Use at your own risk! (or help improve with a PR!)") self.queue = Queue() # Messages are sent through this queue - self.proc = Thread(target=_send_upstream, - args=(self.queue, - self.client.copy(), - self.codec, - batch_send_every_t, - batch_send_every_n, - self.req_acks, - self.ack_timeout)) - - # Process will die if main thread exits - self.proc.daemon = True - self.proc.start() + self.thread = Thread(target=_send_upstream, + args=(self.queue, + self.client.copy(), + self.codec, + batch_send_every_t, + batch_send_every_n, + self.req_acks, + self.ack_timeout)) + + # Thread will die if main thread exits + self.thread.daemon = True + self.thread.start() + + def send_messages(self, topic, partition, *msg): """ @@ -210,7 +206,4 @@ def stop(self, timeout=1): """ if self.async: self.queue.put((STOP_ASYNC_PRODUCER, None, None)) - self.proc.join(timeout) - - if self.proc.is_alive(): - raise SystemError("Can't join Kafka async thread") + self.thread.join(timeout) diff --git a/test/test_conn.py b/test/test_conn.py index 2c8f3b290..c4f219bab 100644 --- a/test/test_conn.py +++ b/test/test_conn.py @@ -1,5 +1,6 @@ import socket import struct +from threading import Thread import mock from . import unittest @@ -162,3 +163,46 @@ def test_close__object_is_reusable(self): self.conn.send(self.config['request_id'], self.config['payload']) self.assertEqual(self.MockCreateConn.call_count, 1) self.conn._sock.sendall.assert_called_with(self.config['payload']) + + +class TestKafkaConnection(unittest.TestCase): + + @mock.patch('socket.create_connection') + def test_copy(self, socket): + """KafkaConnection copies work as expected""" + + conn = KafkaConnection('kafka', 9092) + self.assertEqual(socket.call_count, 1) + + copy = conn.copy() + self.assertEqual(socket.call_count, 1) + self.assertEqual(copy.host, 'kafka') + self.assertEqual(copy.port, 9092) + self.assertEqual(copy._sock, None) + + copy.reinit() + self.assertEqual(socket.call_count, 2) + self.assertNotEqual(copy._sock, None) + + @mock.patch('socket.create_connection') + def test_copy_thread(self, socket): + """KafkaConnection copies work in other threads""" + + err = [] + copy = KafkaConnection('kafka', 9092).copy() + + def thread_func(err, copy): + try: + self.assertEqual(copy.host, 'kafka') + self.assertEqual(copy.port, 9092) + self.assertNotEqual(copy._sock, None) + except Exception as e: + err.append(e) + else: + err.append(None) + thread = Thread(target=thread_func, args=(err, copy)) + thread.start() + thread.join() + + self.assertEqual(err, [None]) + self.assertEqual(socket.call_count, 2) From aedbbb39be4d207ba9eaf99811980276d44f39a5 Mon Sep 17 00:00:00 2001 From: Viktor Shlapakov Date: Thu, 26 Feb 2015 13:02:48 +0300 Subject: [PATCH 4/5] Using threading.Event to stop async producer thread --- kafka/producer/base.py | 15 ++++++++++----- 1 file changed, 10 insertions(+), 5 deletions(-) diff --git a/kafka/producer/base.py b/kafka/producer/base.py index bb7fd4388..cf5abac35 100644 --- a/kafka/producer/base.py +++ b/kafka/producer/base.py @@ -9,7 +9,7 @@ from Queue import Empty, Queue from collections import defaultdict -from threading import Thread +from threading import Thread, Event import six @@ -27,7 +27,7 @@ def _send_upstream(queue, client, codec, batch_time, batch_size, - req_acks, ack_timeout): + req_acks, ack_timeout, stop_event): """ Listen on the queue for a specified number of messages or till a specified timeout and send them upstream to the brokers in one @@ -35,7 +35,7 @@ def _send_upstream(queue, client, codec, batch_time, batch_size, """ stop = False - while not stop: + while not stop_event.is_set(): timeout = batch_time count = batch_size send_at = time.time() + timeout @@ -52,7 +52,7 @@ def _send_upstream(queue, client, codec, batch_time, batch_size, # Check if the controller has requested us to stop if topic_partition == STOP_ASYNC_PRODUCER: - stop = True + stop_event.set() break # Adjust the timeout to match the remaining period @@ -136,6 +136,7 @@ def __init__(self, client, async=False, log.warning("Current implementation does not retry Failed messages") log.warning("Use at your own risk! (or help improve with a PR!)") self.queue = Queue() # Messages are sent through this queue + self.thread_stop_event = Event() self.thread = Thread(target=_send_upstream, args=(self.queue, self.client.copy(), @@ -143,7 +144,8 @@ def __init__(self, client, async=False, batch_send_every_t, batch_send_every_n, self.req_acks, - self.ack_timeout)) + self.ack_timeout, + self.thread_stop_event)) # Thread will die if main thread exits self.thread.daemon = True @@ -207,3 +209,6 @@ def stop(self, timeout=1): if self.async: self.queue.put((STOP_ASYNC_PRODUCER, None, None)) self.thread.join(timeout) + + if self.thread.is_alive(): + self.thread_stop_event.set() From 5137163fa44b4a6a8a315c30f959e816f657e921 Mon Sep 17 00:00:00 2001 From: Viktor Shlapakov Date: Thu, 26 Feb 2015 13:33:40 +0300 Subject: [PATCH 5/5] Async producer stop() fix --- kafka/producer/base.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/kafka/producer/base.py b/kafka/producer/base.py index cf5abac35..357bccd74 100644 --- a/kafka/producer/base.py +++ b/kafka/producer/base.py @@ -210,5 +210,5 @@ def stop(self, timeout=1): self.queue.put((STOP_ASYNC_PRODUCER, None, None)) self.thread.join(timeout) - if self.thread.is_alive(): - self.thread_stop_event.set() + if self.thread.is_alive(): + self.thread_stop_event.set()