Skip to content
This repository was archived by the owner on Mar 24, 2021. It is now read-only.
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
61 changes: 34 additions & 27 deletions pykafka/handlers.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import functools
import logging
import threading
import weakref

from .utils.compat import Queue, Empty

Expand Down Expand Up @@ -91,16 +90,20 @@ class RequestHandler(object):
"""Uses a Handler instance to dispatch requests."""

Task = namedtuple('Task', ['request', 'future'])
Shared = namedtuple('Shared', ['connection', 'requests', 'ending'])

def __init__(self, handler, connection):
"""
:type handler: :class:`pykafka.handlers.Handler`
:type connection: :class:`pykafka.connection.BrokerConnection`
"""
self.handler = handler
self.connection = connection
self._requests = handler.Queue()
self.ending = handler.Event()

# NB self.shared is referenced directly by _start_thread(), so be careful not to
# rebind it
self.shared = self.Shared(connection=connection,
requests=handler.Queue(),
ending=handler.Event())

def __del__(self):
self.stop()
Expand All @@ -117,7 +120,7 @@ def request(self, request, has_response=True):
future = ResponseFuture(self.handler)

task = self.Task(request, future)
self._requests.put(task)
self.shared.requests.put(task)
return future

def start(self):
Expand All @@ -126,33 +129,37 @@ def start(self):

def stop(self):
"""Stop the request processor."""
shared = self.shared
self.shared = None
log.info("RequestHandler.stop: about to flush requests queue")
self._requests.join()
self.ending.set()
shared.requests.join()
shared.ending.set()

def _start_thread(self):
"""Run the request processor"""
self = weakref.proxy(self)
# We pass a direct reference to `shared` into the worker, to avoid
# that thread holding a ref to `self`, which would prevent GC. A
# previous version of this used a weakref to `self`, but would
# potentially abort the thread before the requests queue was empty
shared = self.shared

def worker():
try:
while not self.ending.is_set():
try:
# set a timeout so we check self.ending every so often
task = self._requests.get(timeout=1)
except Empty:
continue
try:
self.connection.request(task.request)
if task.future:
res = self.connection.response()
task.future.set_response(res)
except Exception as e:
if task.future:
task.future.set_error(e)
finally:
self._requests.task_done()
except ReferenceError: # dead weakref
log.info("ReferenceError in handler - dead weakref")
while not shared.ending.is_set():
try:
# set a timeout so we check `ending` every so often
task = shared.requests.get(timeout=1)
except Empty:
continue
try:
shared.connection.request(task.request)
if task.future:
res = shared.connection.response()
task.future.set_response(res)
except Exception as e:
if task.future:
task.future.set_error(e)
finally:
shared.requests.task_done()
log.info("RequestHandler worker: exiting cleanly")

return self.handler.spawn(worker)