Permalink
Browse files

let workers exit gracefully

Conflicts:
	worker/worker.py
  • Loading branch information...
1 parent f342567 commit caa2732225f918781c55b761870e188505a2bc71 @bkw bkw committed Feb 6, 2014
Showing with 14 additions and 2 deletions.
  1. +1 −1 worker/start_workers.py
  2. +13 −1 worker/worker.py
View
@@ -25,13 +25,13 @@ def _get_parent_logger():
def kill_time(signal, frame):
- log_listener.end()
print "dying ..."
for process in processes:
process.terminate()
print "rose"
for process in processes:
process.join()
+ log_listener.end()
print "bud"
sys.exit(0)
View
@@ -20,6 +20,7 @@
import sys
import time
import anyjson
+import signal
import kombu
import kombu.mixins
@@ -42,6 +43,7 @@
from kombu.serialization import BytesIO, register
stacklog.set_default_logger_name('worker')
+shutdown_soon = False
def _get_child_logger():
@@ -62,6 +64,7 @@ def __init__(self, name, connection, deployment, durable, queue_arguments,
self.total_processed = 0
self.topics = topics
self.exchange = exchange
+ signal.signal(signal.SIGTERM, self._shutdown)
register('bufferjson', self.loads, anyjson.dumps,
content_type='application/json',
@@ -144,9 +147,14 @@ def on_nova(self, body, message):
(e, json.loads(str(message.body))))
raise
+ def _shutdown(self, signal, stackframe = False):
+ global shutdown_soon
+ self.should_stop = True
+ shutdown_soon = True
+
def continue_running():
- return True
+ return not shutdown_soon
def exit_or_sleep(exit=False):
@@ -206,6 +214,10 @@ def run(deployment_config, deployment_id, exchange):
"exception=%s. Retrying in 5s"
logger.exception(msg % (name, exchange, e))
exit_or_sleep(exit_on_exception)
+ logger.info("Worker exiting.")
+
+signal.signal(signal.SIGINT, signal.SIG_IGN)
+signal.signal(signal.SIGTERM, signal.SIG_IGN)
POST_PROCESS_METHODS = {
'RawData': views.post_process_rawdata,

0 comments on commit caa2732

Please sign in to comment.