Browse files

CELERY_DEBUG_LEAK: Keep process task and RSS memory info over time

  • Loading branch information...
1 parent 8dc7051 commit ac960d4f3edb76f35b1b42d161a04e9de7f084d7 @ask committed Mar 13, 2011
Showing with 20 additions and 0 deletions.
  1. +2 −0 celery/app/defaults.py
  2. +5 −0 celery/worker/control/builtins.py
  3. +10 −0 celery/worker/job.py
  4. +3 −0 celery/worker/state.py
View
2 celery/app/defaults.py
@@ -64,6 +64,8 @@ def to_python(self, value):
"CACHE_BACKEND": Option(),
"CACHE_BACKEND_OPTIONS": Option({}, type="dict"),
"CREATE_MISSING_QUEUES": Option(True, type="bool"),
+ "DEBUG_LEAK": Option(False, type="bool"),
+ "DEBUG_LEAK_MAX_HISTORY": Option(1000, type="int"),
"DEFAULT_RATE_LIMIT": Option(type="string"),
"DISABLE_RATE_LIMITS": Option(False, type="bool"),
"DEFAULT_ROUTING_KEY": Option("celery"),
View
5 celery/worker/control/builtins.py
@@ -14,6 +14,11 @@
@Panel.register
+def process_history(panel):
+ return dict((k, list(v)) for k, v in state.process_history.iteritems())
+
+
+@Panel.register
def revoke(panel, task_id, terminate=False, signal=None, **kwargs):
"""Revoke task by task id."""
revoked.add(task_id)
View
10 celery/worker/job.py
@@ -19,6 +19,7 @@
from celery.utils.compat import log_with_extra
from celery.utils.encoding import safe_repr, safe_str
from celery.utils.timeutils import maybe_iso8601
+from celery.utils.compat import defaultdict
from celery.worker import state
# pep8.py borks on a inline signature separator and
@@ -465,6 +466,15 @@ def on_success(self, ret_value):
"name": self.task_name,
"return_value": self.repr_result(ret_value),
"runtime": runtime})
+ if self.app.conf.CELERY_DEBUG_LEAK:
+ from psutil import Process
+ from collections import deque
+ p = Process(self.worker_pid)
+ history = state.process_history.get(str(self.worker_pid))
+ if history is None:
+ history = state.process_history[str(self.worker_pid)] = deque(
+ maxlen=self.app.conf.CELERY_DEBUG_LEAK_MAX_HISTORY)
+ history.append((self.task_name, p.get_memory_info().rss))
def on_retry(self, exc_info):
"""Handler called if the task should be retried."""
View
3 celery/worker/state.py
@@ -32,6 +32,9 @@
#: the list of currently revoked tasks. Persistent if statedb set.
revoked = LimitedSet(maxlen=REVOKES_MAX, expires=REVOKE_EXPIRES)
+#: history of process tasks and resident memory size.
+process_history = defaultdict(lambda: [])
+
def task_reserved(request):
"""Updates global state when a task has been reserved."""

0 comments on commit ac960d4

Please sign in to comment.