Skip to content

Commit

Permalink
Refactor Agent
Browse files Browse the repository at this point in the history
  • Loading branch information
ask committed Oct 1, 2012
1 parent e84ab03 commit ef86abe
Show file tree
Hide file tree
Showing 5 changed files with 102 additions and 169 deletions.
10 changes: 2 additions & 8 deletions celery/app/control.py
Expand Up @@ -76,13 +76,14 @@ def registered(self, *taskinfoitems):

def ping(self):
return self._request('ping')

def active_queues(self):
return self._request('active_queues')

def conf(self):
return self._request('dump_conf')


class Control(object):
Mailbox = Mailbox

Expand Down Expand Up @@ -139,13 +140,6 @@ def ping(self, destination=None, timeout=1, **kwargs):
return self.broadcast('ping', reply=True, destination=destination,
timeout=timeout, **kwargs)

def start_actor(self, actor_name, actor_id = None,
destination = None, timeout = 1, **kwargs):
return self.broadcast('start_actor', name = actor_name,
actor_id = actor_id,
reply=True, destination=destination,
timeout=timeout, **kwargs)

def rate_limit(self, task_name, rate_limit, destination=None, **kwargs):
"""Tell all (or specific) workers to set a new rate limit
for task by type.
Expand Down
3 changes: 1 addition & 2 deletions celery/app/defaults.py
Expand Up @@ -150,8 +150,7 @@ def __repr__(self):
'WORKER_DIRECT': Option(False, type='bool'),
},
'CELERYD': {
'ACTORS_MANAGER': Option(
'celery.worker.actorsbootstrap:ActorsManager'),
'AGENT': Option('celery.worker.actors:Agent'),
'AUTOSCALER': Option('celery.worker.autoscale:Autoscaler'),
'AUTORELOADER': Option('celery.worker.autoreload:Autoreloader'),
'BOOT_STEPS': Option((), type='tuple'),
Expand Down
216 changes: 95 additions & 121 deletions celery/worker/actors.py
@@ -1,158 +1,132 @@
from __future__ import absolute_import

import bootsteps
from celery.bootsteps import StartStopStep
from celery.utils.imports import instantiate, qualname
from celery.utils.log import get_logger
from celery.worker.consumer import Connection
from cell import Actor
from celery.utils.imports import instantiate
from kombu.common import ignore_errors
from kombu.utils import uuid
from .bootsteps import StartStopComponent
from celery.utils.log import get_logger
from celery.worker.consumer import debug

logger = get_logger(__name__)
info, warn, error, crit = (logger.info, logger.warn,
logger.error, logger.critical)


class WorkerComponent(StartStopComponent):
"""This component starts an ActorManager instance if actors support is enabled."""
name = 'worker.actors-manager'
consumer = None

def ActorsManager(self, w):
return (w.actors_manager_cls or ActorsManager)

def include_if(self, w):
#return w.actors_enabled
return True

def init(self, w, **kwargs):
w.actors_manager = None

def create(self, w):
debug('create ActorsManager')
actor = w.actors_manager = self.instantiate(self.ActorsManager(w),
app = w.app)
actor.app = w.app
w.on_consumer_ready_callbacks.append(actor.on_consumer_ready)
return actor
debug, warn, error = logger.debug, logger.warn, logger.error


class Bootstep(StartStopStep):
requires = (Connection, )

def __init__(self, c, **kwargs):
c.agent = None

def create(self, c):
agent = c.app.conf.CELERYD_AGENT
agent = c.agent = self.instantiate(c.app.conf.CELERYD_AGENT,
connection=c.connection, app=c.app,
)
return agent


class ActorProxy(object):
"""
A class that represents an actor started remotely
"""
"""A class that represents an actor started remotely."""

def __init__(self, local_actor, actor_id, async_start_result):
self.__subject = local_actor.__copy__()
self.__subject.id = actor_id
self.async_start_result = async_start_result

def __getattr__(self, name):
return getattr(self.__subject, name)

def wait_to_start(self):
self.async_start_result._result


class ActorsManager(Actor):
connection = None

class Agent(Actor):
types = ('round-robin', 'scatter')
actor_registry = {}
actors_consumer = None
connection = None
app = None

def __init__(self, app=None, *args, **kwargs):
self.app = app
super(ActorsManager, self).__init__(*args, **kwargs)

def contribute_to_state(self, state):
state.actor_registry = self.actor_registry
state.connection = self.connection
conninfo = self.app.connection()
state.connection_errors = conninfo.connection_errors
state.channel_errors = conninfo.channel_errors
state.reset()
return Actor.contribute_to_state(self, state)

class state(Actor.state):

class state(object):

def _start_actor_consumer(self, actor):
consumer = actor.Consumer(self.connection.channel())
consumer.consume()
self.actor_registry[actor.id] = (actor, consumer)
def add_actor(self, name, id = None):
"""Add actor to the actor registry and start the actor's main method"""
actor.consumer = actor.Consumer(self.connection.channel())
actor.consumer.consume()
self.actor.registry[actor.id] = actor

def add_actor(self, name, id=None):
"""Add actor to the registry and start the actor's main method."""
try:
actor = instantiate(name, connection = self.connection,
id = id)
actor = instantiate(name, connection=self.connection, id=id)
if actor.id in self.actor.registry:
warn('Actor id %r already exists', actor.id)
self._start_actor_consumer(actor)
if actor.id in self.actor_registry:
warn('Actor with the same id already exists')
debug('Register actor in the actor registry: %s' % name)
debug('Actor registered: %s', name)
return actor.id
except Exception as exc:
error('Start actor error: %r', exc, exc_info=True)
error('Cannot start actor: %r', exc, exc_info=True)

def stop_all(self):
for _, (_, consumer) in self.actor_registry.items():
self.maybe_conn_error(consumer.cancel)
self.actor_registry.clear()
self.actor.shutdown()

def reset(self):
debug('Resetting active actors')
print self.actor_registry.items()
for id, (actor, consumer) in self.actor_registry.items():
self.maybe_conn_error(consumer.cancel)
# TODO:setting the connection here seems wrong ?
for actor in self.actor.registry.itervalues():
if actor.consumer:
ignore_errors(self.connection, actor.consumer.cancel)
actor.connection = self.connection
self._start_actor_consumer(actor)

def stop_actor(self, actor_id):
if actor_id in self.actor_registry:
(_, consumer) = self.actor_registry.pop(actor_id)
self.maybe_conn_error(consumer.cancel)

def maybe_conn_error(self, fun):
"""Applies function but ignores any connection or channel
errors raised."""

def stop_actor(self, id):
try:
fun()
except (AttributeError, ) + \
self.connection_errors + \
self.channel_errors:
actor = self.actor.registry.pop(id)
except KeyError:
pass

else:
if actor.consumer and actor.consumer.channel:
ignore_errors(self.connection, consumer.cancel)

def __init__(self, connection, app=None, *args, **kwargs):
self.connection = connection
self.app = app
self.registry = {}
super(ActorManager, self).__init__(*args, **kwargs)

def contribute_to_state(self, state):
state.connection = self.connection
conninfo = self.app.connection()
state.connection_errors = conninfo.connection_errors
state.channel_errors = conninfo.channel_errors
state.reset()
return super(ActorsManager, self).contribute_to_state(state)

def add_actor(self, actor, nowait=False):
name = "%s.%s"%(actor.__class__.__module__,
actor.__class__.__name__)
name = qualname(actor)
actor_id = uuid()
res = self.call('add_actor', {'name': name, 'id' : actor_id},
type = 'round-robin', nowait = 'True')
res = self.call('add_actor', {'name': name, 'id': actor_id},
type='round-robin', nowait=True)
actor_proxy = ActorProxy(actor, actor_id, res)
return actor_proxy
def stop_actor_by_id(self, actor_id, nowait=False):
return self.scatter('stop_actor', {'actor_id' : actor_id},
nowait=nowait)

def stop_actor_by_id(self, actor_id, nowait=False):
return self.scatter('stop_actor', {'actor_id': actor_id},
nowait=nowait)

def start(self):
debug('Starting ActorsManager')

debug('Starting Agent')

def _shutdown(self, cancel=True, close=True, clear=True):
try:
for actor in self.registry.itervalues():
if actor and actor.consumer:
if cancel:
ignore_errors(self.connection, actor.consumer.cancel)
if close and actor.consumer.channel:
ignore_errors(self.connection,
actor.consumer.channel.close)
finally:
if clear:
self.registry.clear()

def stop(self):
if self.actors_consumer:
self.actors_consumer.cancel()

def on_start(self, connection):
self.connection = connection
actor_consumer = self.Consumer(self.connection.channel())
debug('ActorsManager start consuming blabla')
self.actor_consumer = actor_consumer
self.actor_consumer.consume()
self.contribute_to_state(self.state)

def on_consumer_ready(self, consumer):
debug('ActorsManager in On consumer ready')
if consumer.connection:
raise Exception('Consumer is ready.')
consumer.on_reset_connection.append(self.on_start)
consumer.on_close_connection.append(self.stop)

self._shutdown(clear=False)

def shutdown(self):
self._shutdown(cancel=False)
30 changes: 1 addition & 29 deletions celery/worker/consumer.py
Expand Up @@ -118,6 +118,7 @@ class Namespace(bootsteps.Namespace):
'celery.worker.consumer:Control',
'celery.worker.consumer:Tasks',
'celery.worker.consumer:Evloop',
'celery.worker.actors:Bootstep',
]

def shutdown(self, parent):
Expand Down Expand Up @@ -180,35 +181,6 @@ def start(self):
error(CONNECTION_RETRY, exc_info=True)
ns.restart(self)

def add_actor(self, actor_name, actor_id):
"""Add actor to the actor registry and start the actor main method"""
try:
actor = instantiate(actor_name, connection = self.connection,
id = actor_id)
consumer = actor.Consumer(self.connection.channel())
consumer.consume()
self.actor_registry[actor.id] = consumer
info('Register actor in the actor registry: %s' % actor_name)
return actor.id
except Exception as exc:
error('Start actor error: %r', exc, exc_info=True)

def stop_all_actors(self):
for _, consumer in self.actor_registry.items():
self.maybe_conn_error(consumer.cancel)
self.actor_registry.clear()


def reset_actor_nodes(self):
for _, consumer in self.actor_registry.items():
self.maybe_conn_error(consumer.cancel)
consumer.consume()

def stop_actor(self, actor_id):
if actor_id in self.actor_registry:
consumer = self.actor_registry.pop(actor_id)
self.maybe_conn_error(consumer.cancel)

def shutdown(self):
self.namespace.shutdown(self)

Expand Down
12 changes: 3 additions & 9 deletions celery/worker/control.py
Expand Up @@ -53,6 +53,7 @@ def revoke(panel, task_id, terminate=False, signal=None, **kwargs):
logger.info('Revoking task %s', task_id)
return {'ok': 'revoking task {0}'.format(task_id)}


@Panel.register
def report(panel):
return {'ok': panel.app.bugreport()}
Expand Down Expand Up @@ -213,7 +214,7 @@ def _extract_info(task):

@Panel.register
def ping(panel, **kwargs):
return {'ok':'pong'}
return {'ok': 'pong'}


@Panel.register
Expand Down Expand Up @@ -275,14 +276,7 @@ def active_queues(panel):
return [dict(queue.as_dict(recurse=True))
for queue in panel.consumer.task_consumer.queues]


@Panel.register
def dump_conf(panel, **kwargs):
return jsonify(dict(panel.app.conf))

@Panel.register
def start_actor(panel, name, actor_id):
return panel.consumer.add_actor(name, actor_id)

@Panel.register
def stop_actor(panel, actor_id):
return panel.consumer.stop_actor(actor_id)

0 comments on commit ef86abe

Please sign in to comment.