Skip to content

Commit

Permalink
Use new celery.events.state module instead of celerymonitor.state
Browse files Browse the repository at this point in the history
  • Loading branch information
Ask Solem committed Jul 15, 2010
1 parent d146979 commit f9ab35a
Show file tree
Hide file tree
Showing 3 changed files with 16 additions and 121 deletions.
19 changes: 12 additions & 7 deletions celerymonitor/handlers/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,10 @@
import anyjson
from tornado.web import RequestHandler, Application

from celery import states
from celery.task.control import revoke
from celerymonitor.state import monitor_state

from celery.events.state import state


def JSON(fun):
Expand Down Expand Up @@ -35,32 +37,35 @@ def get(self, *args, **kwargs):

@api_handler
def task_state(request, task_id):
return monitor_state.get_task_info(task_id)
task = state.tasks[task_id]
if task.state in states.EXCEPTION_STATES:
return task.info(extra=["traceback"])
return task.info()


@api_handler
def list_tasks(request):
return monitor_state.tasks_by_time()
return state.tasks_by_timestamp()


@api_handler
def list_tasks_by_name(request, name):
return monitor_state.tasks_by_type()[name]
return state.tasks_by_type(name)


@api_handler
def list_task_types(request):
return monitor_state.tasks_by_type()
return state.task_types()


@api_handler
def list_workers(request):
return monitor_state.list_workers()
return state.alive_workers()


@api_handler
def list_worker_tasks(request, hostname):
return monitor_state.list_worker_tasks(hostname)
return state.list_worker_tasks(hostname)


class RevokeTaskHandler(APIHandler):
Expand Down
16 changes: 4 additions & 12 deletions celerymonitor/listener.py
Original file line number Diff line number Diff line change
@@ -1,25 +1,17 @@
from celery.events import EventReceiver
from celery.messaging import establish_connection

from celerymonitor.state import monitor_state
from celery.events.state import state


class EventListener(object):
"""Capture events sent by messages and store them in memory."""

def __init__(self, state=monitor_state):
def __init__(self, state=state):
self.state = state
self.connection = establish_connection()
self.receiver = EventReceiver(self.connection, handlers={
"task-received": state.receive_task_received,
"task-accepted": state.receive_task_event,
"task-succeeded": state.receive_task_event,
"task-retried": state.receive_task_event,
"task-failed": state.receive_task_event,
"worker-online": state.receive_worker_event,
"worker-offline": state.receive_worker_event,
"worker-heartbeat": state.receive_heartbeat,
})
self.receiver = EventReceiver(self.connection,
handlers={"*": self.state.event})

def start(self):
self.receiver.capture()
102 changes: 0 additions & 102 deletions celerymonitor/state.py

This file was deleted.

0 comments on commit f9ab35a

Please sign in to comment.