From d818189f26593f29f2392bad21d296c2d3b6ac38 Mon Sep 17 00:00:00 2001 From: Yung-Chin Oei Date: Sat, 19 Sep 2015 15:34:55 +0100 Subject: [PATCH] producer: avoid messages stuck in queues on stop Because stop() called _wait_all() after stopping the OwnedBroker threads (and because the OwnedBroker threads stop as soon as they see self.running=False, without trying to check for remaining messages), a situation could occur where the OwnedBroker threads had already exited but a message was still sitting in the queue - resulting in _wait_all() and thus stop() looping forever. We ended up with occasional hangs in test_producer.test_async_produce_context. This is a minimal fix, that just swaps those steps. Signed-off-by: Yung-Chin Oei --- pykafka/producer.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pykafka/producer.py b/pykafka/producer.py index 5cbe2bd13..ed7a74bf7 100644 --- a/pykafka/producer.py +++ b/pykafka/producer.py @@ -216,10 +216,10 @@ def _setup_owned_brokers(self): def stop(self): """Mark the producer as stopped""" self._running = False + self._wait_all() if self._owned_brokers is not None: for owned_broker in self._owned_brokers.values(): owned_broker.stop() - self._wait_all() def produce(self, message, partition_key=None): """Produce a message.