Skip to content
Browse files

Implements Task suspend/resume using SIGTSTP/SIGCONT

  • Loading branch information...
1 parent 99d8674 commit 03b7a417e86df4a2334382705f17a706163d9704 @ask committed Mar 26, 2011
Showing with 70 additions and 6 deletions.
  1. +9 −1 celery/concurrency/base.py
  2. +11 −0 celery/concurrency/processes/__init__.py
  3. +35 −5 celery/worker/control/builtins.py
  4. +15 −0 celery/worker/job.py
View
10 celery/concurrency/base.py
@@ -49,7 +49,15 @@ def on_terminate(self):
def terminate_job(self, pid):
raise NotImplementedError(
- "%s does not implement kill_job" % (self.__class__, ))
+ "%s does not implement terminate_job" % (self.__class__, ))
+
+ def suspend_job(self, pid):
+ raise NotImplementedError(
+ "%s does not implement suspend_job" % (self.__class__, ))
+
+ def resume_job(self, pid):
+ raise NotImplementedError(
+ "%s does not implement resume_job" % (self.__class__, ))
def stop(self):
self._state = self.CLOSE
View
11 celery/concurrency/processes/__init__.py
@@ -53,6 +53,17 @@ def on_terminate(self):
def terminate_job(self, pid, signal=None):
os.kill(pid, signal or _signal.SIGTERM)
+ def suspend_job(self, pid):
+ os.kill(pid, _signal.SIGTSTP)
+ self.grow(1)
+
+ def resume_job(self, pid):
+ os.kill(pid, _signal.SIGCONT)
+ try:
+ self.shrink(1)
+ except ValueError:
+ pass
+
def grow(self, n=1):
return self._pool.grow(n)
View
40 celery/worker/control/builtins.py
@@ -13,24 +13,54 @@
TASK_INFO_FIELDS = ("exchange", "routing_key", "rate_limit")
+def _with_request(task_id, fun):
+ for request in state.active_requests:
+ if request.task_id == task_id:
+ return fun(request)
+ raise KeyError(task_id)
+
+
@Panel.register
def revoke(panel, task_id, terminate=False, signal=None, **kwargs):
"""Revoke task by task id."""
revoked.add(task_id)
action = "revoked"
if terminate:
signum = get_signal(signal)
- for request in state.active_requests:
- if request.task_id == task_id:
- action = "terminated (%s)" % (signum, )
- request.terminate(panel.consumer.pool, signal=signum)
- break
+
+ def do_terminate(request):
+ action = "terminated (%s)" % (signum, )
+ request.terminate(panel.consumer.pool, signal=signum)
+
+ _with_request(task_id, do_terminate)
panel.logger.info("Task %s %s." % (task_id, action))
return {"ok": "task %s %s" % (task_id, action)}
@Panel.register
+def suspend(panel, task_id):
+
+ def do_suspend(request):
+ request.suspend(panel.consumer.pool)
+
+ _with_request(task_id, do_suspend)
+ panel.logger.info("Task %s suspended." % (task_id, ))
+ return True
+
+
+@Panel.register
+def resume(panel, task_id):
+
+ def do_resume(request):
+ request.resume(panel.consumer.pool)
+
+ _with_request(task_id, do_resume)
+ panel.logger.info("Task %s resumed." % (task_id, ))
+ return True
+
+
+@Panel.register
def enable_events(panel):
dispatcher = panel.consumer.event_dispatcher
if not dispatcher.enabled:
View
15 celery/worker/job.py
@@ -241,6 +241,7 @@ class TaskRequest(object):
_already_revoked = False
_terminate_on_ack = None
+ _suspend_on_ack = None
def __init__(self, task_name, task_id, args, kwargs,
on_ack=noop, retries=0, delivery_info=None, hostname=None,
@@ -401,6 +402,17 @@ def terminate(self, pool, signal=None):
else:
self._terminate_on_ack = (True, pool, signal)
+ def suspend(self, pool):
+ if self._suspend_on_ack is not None:
+ return
+ elif self.time_start:
+ return pool.suspend_job(self.worker_pid)
+ else:
+ self._suspend_on_ack = (True, pool)
+
+ def resume(self, pool):
+ return pool.resume_job(self.worker_pid)
+
def revoked(self):
"""If revoked, skip task and mark state."""
if self._already_revoked:
@@ -433,6 +445,9 @@ def on_accepted(self, pid, time_accepted):
if self._terminate_on_ack is not None:
_, pool, signal = self._terminate_on_ack
self.terminate(pool, signal)
+ if self._suspend_on_ack is not None:
+ _, pool = self._terminate_on_ack
+ self.suspend(pool)
def on_timeout(self, soft):
"""Handler called if the task times out."""

0 comments on commit 03b7a41

Please sign in to comment.
Something went wrong with that request. Please try again.