Skip to content
This repository

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Fetching contributors…

Cannot retrieve contributors at this time

file 90 lines (68 sloc) 2.751 kb
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90
# -*- coding: utf-8 -*-
"""
celery.events.dumper
~~~~~~~~~~~~~~~~~~~~

THis is a simple program that dumps events to the console
as they happen. Think of it like a `tcpdump` for Celery events.

:copyright: (c) 2009 - 2012 by Ask Solem.
:license: BSD, see LICENSE for more details.

"""
from __future__ import absolute_import

import sys

from datetime import datetime

from celery.app import app_or_default
from celery.datastructures import LRUCache


TASK_NAMES = LRUCache(limit=0xFFF)

HUMAN_TYPES = {"worker-offline": "shutdown",
               "worker-online": "started",
               "worker-heartbeat": "heartbeat"}


def humanize_type(type):
    try:
        return HUMAN_TYPES[type.lower()]
    except KeyError:
        return type.lower().replace("-", " ")


def say(msg, out=sys.stdout):
    out.write(msg + "\n")


class Dumper(object):

    def __init__(self, out=sys.stdout):
        self.out = out

    def say(self, msg):
        say(msg, out=self.out)

    def on_event(self, event):
        timestamp = datetime.utcfromtimestamp(event.pop("timestamp"))
        type = event.pop("type").lower()
        hostname = event.pop("hostname")
        if type.startswith("task-"):
            uuid = event.pop("uuid")
            if type in ("task-received", "task-sent"):
                task = TASK_NAMES[uuid] = "%s(%s) args=%s kwargs=%s" % (
                        event.pop("name"), uuid,
                        event.pop("args"),
                        event.pop("kwargs"))
            else:
                task = TASK_NAMES.get(uuid, "")
            return self.format_task_event(hostname, timestamp,
                                          type, task, event)
        fields = ", ".join("%s=%s" % (key, event[key])
                        for key in sorted(event.keys()))
        sep = fields and ":" or ""
        self.say("%s [%s] %s%s %s" % (hostname, timestamp,
                                      humanize_type(type), sep, fields))

    def format_task_event(self, hostname, timestamp, type, task, event):
        fields = ", ".join("%s=%s" % (key, event[key])
                        for key in sorted(event.keys()))
        sep = fields and ":" or ""
        self.say("%s [%s] %s%s %s %s" % (hostname, timestamp,
                    humanize_type(type), sep, task, fields))


def evdump(app=None, out=sys.stdout):
    app = app_or_default(app)
    dumper = Dumper(out=out)
    dumper.say("-> evdump: starting capture...")
    conn = app.broker_connection()
    recv = app.events.Receiver(conn, handlers={"*": dumper.on_event})
    try:
        recv.capture()
    except (KeyboardInterrupt, SystemExit):
        conn and conn.close()

if __name__ == "__main__": # pragma: no cover
    evdump()
Something went wrong with that request. Please try again.