Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

New 'celery graph workers' command creates graph of the current cluster

  • Loading branch information...
commit c111e5f3dcc4bee5d378f77b598c2c8c5f0962e7 1 parent d7cf4cd
Ask Solem Hoel ask authored
Showing with 79 additions and 2 deletions.
  1. +79 −2 celery/bin/celery.py
81 celery/bin/celery.py
View
@@ -18,6 +18,7 @@
from itertools import imap
from pprint import pformat
+from celery.datastructures import DependencyGraph, GraphFormatter
from celery.platforms import EX_OK, EX_FAILURE, EX_UNAVAILABLE, EX_USAGE
from celery.utils import term
from celery.utils import text
@@ -467,10 +468,13 @@ def run(self, task_id, *args, **kwargs):
@command
class graph(Command):
- args = '<TYPE> [arguments]\n..... bootsteps [worker] [consumer]'
+ args = """<TYPE> [arguments]
+ ..... bootsteps [worker] [consumer]
+ ..... workers [enumerate]
+ """
def run(self, what=None, *args, **kwargs):
- map = {'bootsteps': self.bootsteps}
+ map = {'bootsteps': self.bootsteps, 'workers': self.workers}
not what and self.exit_help('graph')
if what not in map:
raise Error('no graph {0} in {1}'.format(what, '|'.join(map)))
@@ -487,6 +491,79 @@ def bootsteps(self, *args, **kwargs):
graph = worker.consumer.namespace.graph
graph.to_dot(self.stdout)
+ def workers(self, *args, **kwargs):
+ args = set(arg.lower() for arg in args)
+ generic = 'generic' in args
+
+ def generic_label(node):
+ return '{0} ({1}://)'.format(type(node).__name__,
+ node._label.split('://')[0])
+
+ class Formatter(GraphFormatter):
+
+ def label(self, obj):
+ return obj and obj.label()
+
+ def node(self, obj):
+ return self.draw_node(
+ obj, dict(self.node_scheme, **obj.scheme),
+ )
+
+ def terminal_node(self, obj):
+ return self.draw_node(
+ obj, dict(self.term_scheme, **obj.scheme),
+ )
+
+ class Node(object):
+ force_label = None
+ scheme = {}
+
+ def __init__(self, label):
+ self._label = label
+
+ def label(self):
+ return self._label
+
+ class Worker(Node):
+ pass
+
+ class Backend(Node):
+ scheme = {'shape': 'folder', 'width': 2,
+ 'height': 2, 'color': 'black'}
+
+ def label(self):
+ return generic_label(self) if generic else self._label
+
+ class Broker(Node):
+ scheme = {'shape': 'circle', 'fillcolor': 'cadetblue3',
+ 'color': 'cadetblue4', 'height': 2}
+
+ def label(self):
+ return generic_label(self) if generic else self._label
+
+ backend = self.app.conf.CELERY_RESULT_BACKEND
+ pongs = self.app.control.ping()
+ workers = [pong.keys()[0] for pong in pongs]
+ wfmt = 'Worker{0}' if len(workers) < 6 else 'W{0}'
+ if 'enumerate' in args:
+ workers = [wfmt.format(i + 1) for i, _ in enumerate(workers)]
+
+ workers = [Worker(worker) for worker in workers]
+ broker = Broker(self.app.connection().as_uri())
+ backend = Backend(backend) if backend else None
+
+ graph = DependencyGraph(formatter=Formatter())
+ graph.add_arc(broker)
+ if backend:
+ graph.add_arc(backend)
+ for worker in workers:
+ graph.add_arc(worker)
+ graph.add_edge(worker, broker)
+ if backend:
+ graph.add_edge(worker, backend)
+
+ graph.to_dot(self.stdout)
+
class _RemoteControl(Command):
name = None
Please sign in to comment.
Something went wrong with that request. Please try again.