From 9116d16c82203f6b585b711fc47a8c7b8d693b64 Mon Sep 17 00:00:00 2001 From: Yung-Chin Oei Date: Mon, 7 Sep 2015 15:46:32 +0100 Subject: [PATCH 1/3] RequestHandler: replace atexit call by finaliser This fixes a problem where, because RequestHandler instances registered themselves with atexit, they would always live until program exit. Having replaced the atexit call with the more usual __del__ finaliser, I found that this __del__ would never run: the queue-handling thread instantiated by RequestHandler holds references to self. Having also replaced that ref by a weakref, we did get the finaliser to run. Finally, to make sure we don't block on Queue.get() forever (in which case the thread still doesn't die), this also adds a timeout to that. Signed-off-by: Yung-Chin Oei --- pykafka/handlers.py | 42 +++++++++++++++++++++++++++--------------- 1 file changed, 27 insertions(+), 15 deletions(-) diff --git a/pykafka/handlers.py b/pykafka/handlers.py index 90db4ef35..3a60aa1d5 100644 --- a/pykafka/handlers.py +++ b/pykafka/handlers.py @@ -17,11 +17,13 @@ limitations under the License. """ __all__ = ["ResponseFuture", "Handler", "ThreadingHandler", "RequestHandler"] -import atexit + +from collections import namedtuple import functools import threading +import weakref + from .utils.compat import Queue, Empty -from collections import namedtuple class ResponseFuture(object): @@ -96,7 +98,9 @@ def __init__(self, handler, connection): self.connection = connection self._requests = handler.Queue() self.ending = handler.Event() - atexit.register(self.stop) + + def __del__(self): + self.stop() def request(self, request, has_response=True): """Construct a new request @@ -124,17 +128,25 @@ def stop(self): def _start_thread(self): """Run the request processor""" + self = weakref.proxy(self) def worker(): - while not self.ending.is_set(): - task = self._requests.get() - try: - self.connection.request(task.request) - if task.future: - res = self.connection.response() - task.future.set_response(res) - except Exception as e: - if task.future: - task.future.set_error(e) - finally: - self._requests.task_done() + try: + while not self.ending.is_set(): + try: + # set a timeout so we check self.ending every so often + task = self._requests.get(timeout=1) + except Empty: + continue + try: + self.connection.request(task.request) + if task.future: + res = self.connection.response() + task.future.set_response(res) + except Exception as e: + if task.future: + task.future.set_error(e) + finally: + self._requests.task_done() + except ReferenceError: # dead weakref + pass return self.handler.spawn(worker) From a5427770de96082bb0df3236e6c97777625dc5a7 Mon Sep 17 00:00:00 2001 From: Yung-Chin Oei Date: Thu, 10 Sep 2015 05:59:02 +0100 Subject: [PATCH 2/3] handlers: add some logging Small addition to the parent commit. I figured we might also benefit from a log line in RequestHandler.stop, since it took a while to realise that might hang (see #218). Signed-off-by: Yung-Chin Oei --- pykafka/handlers.py | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/pykafka/handlers.py b/pykafka/handlers.py index 3a60aa1d5..ceeec8170 100644 --- a/pykafka/handlers.py +++ b/pykafka/handlers.py @@ -20,11 +20,14 @@ from collections import namedtuple import functools +import logging import threading import weakref from .utils.compat import Queue, Empty +log = logging.getLogger(__name__) + class ResponseFuture(object): """A response which may have a value at some point.""" @@ -123,6 +126,7 @@ def start(self): def stop(self): """Stop the request processor.""" + log.info("RequestHandler.stop: about to flush requests queue") self._requests.join() self.ending.set() @@ -149,4 +153,5 @@ def worker(): self._requests.task_done() except ReferenceError: # dead weakref pass + log.info("RequestHandler worker: exiting cleanly") return self.handler.spawn(worker) From 5ffb74e93fd0d937e7cde31b5e8744efb0a32c85 Mon Sep 17 00:00:00 2001 From: Emmett Butler Date: Sat, 19 Sep 2015 10:59:02 -0400 Subject: [PATCH 3/3] add logging to dead weakref case --- pykafka/handlers.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/pykafka/handlers.py b/pykafka/handlers.py index ceeec8170..eb62b4f1d 100644 --- a/pykafka/handlers.py +++ b/pykafka/handlers.py @@ -133,6 +133,7 @@ def stop(self): def _start_thread(self): """Run the request processor""" self = weakref.proxy(self) + def worker(): try: while not self.ending.is_set(): @@ -152,6 +153,6 @@ def worker(): finally: self._requests.task_done() except ReferenceError: # dead weakref - pass + log.warning("ReferenceError in handler - dead weakref") log.info("RequestHandler worker: exiting cleanly") return self.handler.spawn(worker)