Skip to content

Commit

Permalink
worker acks and quits on sigquit
Browse files Browse the repository at this point in the history
  • Loading branch information
cenkalti committed Jul 25, 2013
1 parent 864f6e2 commit ea3f532
Show file tree
Hide file tree
Showing 2 changed files with 43 additions and 3 deletions.
12 changes: 12 additions & 0 deletions kuyruk/test/test_kuyruk.py
Original file line number Diff line number Diff line change
Expand Up @@ -262,3 +262,15 @@ def test_max_run_time(self):
tasks.sleeping_task(2)
with run_kuyruk() as master:
master.expect('raise Timeout')

def test_worker_sigquit(self):
"""Ack current message and exit"""
tasks.loop_forever()
with run_kuyruk(terminate=False) as worker:
worker.expect('looping forever')
pid = get_pid('kuyruk: worker')
os.kill(pid, signal.SIGQUIT)
worker.expect('Acking current task')
worker.expect('Exiting')
worker.expect_exit(1)
assert is_empty('kuyruk'), worker.get_output()
34 changes: 31 additions & 3 deletions kuyruk/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from time import time, sleep
from datetime import datetime
from functools import wraps
from contextlib import contextmanager

from setproctitle import setproctitle

Expand Down Expand Up @@ -65,6 +66,7 @@ def __init__(self, kuyruk, queue_name):
queue_name = queue_name.lstrip('@')
self.queue = Queue(queue_name, self.channel, local=is_local)
self.consumer = Consumer(self.queue)
self.current_message = None
self.current_task = None
self.current_args = None
self.current_kwargs = None
Expand Down Expand Up @@ -120,9 +122,20 @@ def consume_messages(self):
"""
with self.consumer.consume() as messages:
for message in messages:
self.process_message(message)
self.queue.tx_commit()
logger.debug("Committed transaction")
with self._set_current_message(message):
self.process_message(message)
self.queue.tx_commit()
logger.debug("Committed transaction")

@contextmanager
def _set_current_message(self, message):
"""Save current message processing so we can send ack before exiting
when SIGQUIT is received."""
self.current_message = message
try:
yield message
finally:
self.current_message = None

def start_daemon_threads(self):
"""Start the function as threads listed in self.daemon_thread."""
Expand Down Expand Up @@ -312,12 +325,27 @@ def shutdown_timer(self):
def register_signals(self):
super(Worker, self).register_signals()
signal.signal(signal.SIGTERM, self.handle_sigterm)
signal.signal(signal.SIGQUIT, self.handle_sigquit)

def handle_sigterm(self, signum, frame):
"""Initiates a warm shutdown."""
logger.warning("Catched SIGTERM")
self.warm_shutdown()

def handle_sigquit(self, signum, frame):
"""Send ACK for the current task and exit."""
logger.warning("Catched SIGQUIT")
if self.current_message:
try:
logger.warning("Acking current task...")
self.current_message.ack()
self.queue.tx_commit()
except Exception:
logger.critical("Cannot send ACK for the current task.")
traceback.print_exc()
logger.warning("Exiting...")
os._exit(1)

def warm_shutdown(self, sigint=False):
"""Shutdown gracefully."""
super(Worker, self).warm_shutdown(sigint)
Expand Down

0 comments on commit ea3f532

Please sign in to comment.