From ef86abe9fbf23f4a78f36f918b587f3ad853b262 Mon Sep 17 00:00:00 2001 From: Ask Solem Date: Mon, 1 Oct 2012 17:18:08 +0100 Subject: [PATCH] Refactor Agent --- celery/app/control.py | 10 +- celery/app/defaults.py | 3 +- celery/worker/actors.py | 216 +++++++++++++++++--------------------- celery/worker/consumer.py | 30 +----- celery/worker/control.py | 12 +-- 5 files changed, 102 insertions(+), 169 deletions(-) diff --git a/celery/app/control.py b/celery/app/control.py index a9aab5b04b3..e29096e81ff 100644 --- a/celery/app/control.py +++ b/celery/app/control.py @@ -76,13 +76,14 @@ def registered(self, *taskinfoitems): def ping(self): return self._request('ping') - + def active_queues(self): return self._request('active_queues') def conf(self): return self._request('dump_conf') + class Control(object): Mailbox = Mailbox @@ -139,13 +140,6 @@ def ping(self, destination=None, timeout=1, **kwargs): return self.broadcast('ping', reply=True, destination=destination, timeout=timeout, **kwargs) - def start_actor(self, actor_name, actor_id = None, - destination = None, timeout = 1, **kwargs): - return self.broadcast('start_actor', name = actor_name, - actor_id = actor_id, - reply=True, destination=destination, - timeout=timeout, **kwargs) - def rate_limit(self, task_name, rate_limit, destination=None, **kwargs): """Tell all (or specific) workers to set a new rate limit for task by type. diff --git a/celery/app/defaults.py b/celery/app/defaults.py index bee0b2dc05f..8320dfdea63 100644 --- a/celery/app/defaults.py +++ b/celery/app/defaults.py @@ -150,8 +150,7 @@ def __repr__(self): 'WORKER_DIRECT': Option(False, type='bool'), }, 'CELERYD': { - 'ACTORS_MANAGER': Option( - 'celery.worker.actorsbootstrap:ActorsManager'), + 'AGENT': Option('celery.worker.actors:Agent'), 'AUTOSCALER': Option('celery.worker.autoscale:Autoscaler'), 'AUTORELOADER': Option('celery.worker.autoreload:Autoreloader'), 'BOOT_STEPS': Option((), type='tuple'), diff --git a/celery/worker/actors.py b/celery/worker/actors.py index 5df213ff058..c456a57f25e 100644 --- a/celery/worker/actors.py +++ b/celery/worker/actors.py @@ -1,158 +1,132 @@ +from __future__ import absolute_import -import bootsteps +from celery.bootsteps import StartStopStep +from celery.utils.imports import instantiate, qualname +from celery.utils.log import get_logger +from celery.worker.consumer import Connection from cell import Actor -from celery.utils.imports import instantiate +from kombu.common import ignore_errors from kombu.utils import uuid -from .bootsteps import StartStopComponent -from celery.utils.log import get_logger -from celery.worker.consumer import debug logger = get_logger(__name__) -info, warn, error, crit = (logger.info, logger.warn, - logger.error, logger.critical) - - -class WorkerComponent(StartStopComponent): - """This component starts an ActorManager instance if actors support is enabled.""" - name = 'worker.actors-manager' - consumer = None - - def ActorsManager(self, w): - return (w.actors_manager_cls or ActorsManager) - - def include_if(self, w): - #return w.actors_enabled - return True - - def init(self, w, **kwargs): - w.actors_manager = None - - def create(self, w): - debug('create ActorsManager') - actor = w.actors_manager = self.instantiate(self.ActorsManager(w), - app = w.app) - actor.app = w.app - w.on_consumer_ready_callbacks.append(actor.on_consumer_ready) - return actor +debug, warn, error = logger.debug, logger.warn, logger.error + + +class Bootstep(StartStopStep): + requires = (Connection, ) + + def __init__(self, c, **kwargs): + c.agent = None + + def create(self, c): + agent = c.app.conf.CELERYD_AGENT + agent = c.agent = self.instantiate(c.app.conf.CELERYD_AGENT, + connection=c.connection, app=c.app, + ) + return agent class ActorProxy(object): - """ - A class that represents an actor started remotely - """ + """A class that represents an actor started remotely.""" + def __init__(self, local_actor, actor_id, async_start_result): self.__subject = local_actor.__copy__() self.__subject.id = actor_id self.async_start_result = async_start_result - + def __getattr__(self, name): return getattr(self.__subject, name) - + def wait_to_start(self): self.async_start_result._result - -class ActorsManager(Actor): - connection = None + +class Agent(Actor): types = ('round-robin', 'scatter') - actor_registry = {} - actors_consumer = None - connection = None - app = None - - def __init__(self, app=None, *args, **kwargs): - self.app = app - super(ActorsManager, self).__init__(*args, **kwargs) - - def contribute_to_state(self, state): - state.actor_registry = self.actor_registry - state.connection = self.connection - conninfo = self.app.connection() - state.connection_errors = conninfo.connection_errors - state.channel_errors = conninfo.channel_errors - state.reset() - return Actor.contribute_to_state(self, state) - - class state(Actor.state): + + class state(object): + def _start_actor_consumer(self, actor): - consumer = actor.Consumer(self.connection.channel()) - consumer.consume() - self.actor_registry[actor.id] = (actor, consumer) - - def add_actor(self, name, id = None): - """Add actor to the actor registry and start the actor's main method""" + actor.consumer = actor.Consumer(self.connection.channel()) + actor.consumer.consume() + self.actor.registry[actor.id] = actor + + def add_actor(self, name, id=None): + """Add actor to the registry and start the actor's main method.""" try: - actor = instantiate(name, connection = self.connection, - id = id) + actor = instantiate(name, connection=self.connection, id=id) + if actor.id in self.actor.registry: + warn('Actor id %r already exists', actor.id) self._start_actor_consumer(actor) - if actor.id in self.actor_registry: - warn('Actor with the same id already exists') - debug('Register actor in the actor registry: %s' % name) + debug('Actor registered: %s', name) return actor.id except Exception as exc: - error('Start actor error: %r', exc, exc_info=True) - + error('Cannot start actor: %r', exc, exc_info=True) + def stop_all(self): - for _, (_, consumer) in self.actor_registry.items(): - self.maybe_conn_error(consumer.cancel) - self.actor_registry.clear() + self.actor.shutdown() def reset(self): debug('Resetting active actors') - print self.actor_registry.items() - for id, (actor, consumer) in self.actor_registry.items(): - self.maybe_conn_error(consumer.cancel) - # TODO:setting the connection here seems wrong ? + for actor in self.actor.registry.itervalues(): + if actor.consumer: + ignore_errors(self.connection, actor.consumer.cancel) actor.connection = self.connection self._start_actor_consumer(actor) - - def stop_actor(self, actor_id): - if actor_id in self.actor_registry: - (_, consumer) = self.actor_registry.pop(actor_id) - self.maybe_conn_error(consumer.cancel) - - def maybe_conn_error(self, fun): - """Applies function but ignores any connection or channel - errors raised.""" + + def stop_actor(self, id): try: - fun() - except (AttributeError, ) + \ - self.connection_errors + \ - self.channel_errors: + actor = self.actor.registry.pop(id) + except KeyError: pass - + else: + if actor.consumer and actor.consumer.channel: + ignore_errors(self.connection, consumer.cancel) + + def __init__(self, connection, app=None, *args, **kwargs): + self.connection = connection + self.app = app + self.registry = {} + super(ActorManager, self).__init__(*args, **kwargs) + + def contribute_to_state(self, state): + state.connection = self.connection + conninfo = self.app.connection() + state.connection_errors = conninfo.connection_errors + state.channel_errors = conninfo.channel_errors + state.reset() + return super(ActorsManager, self).contribute_to_state(state) + def add_actor(self, actor, nowait=False): - name = "%s.%s"%(actor.__class__.__module__, - actor.__class__.__name__) + name = qualname(actor) actor_id = uuid() - res = self.call('add_actor', {'name': name, 'id' : actor_id}, - type = 'round-robin', nowait = 'True') + res = self.call('add_actor', {'name': name, 'id': actor_id}, + type='round-robin', nowait=True) actor_proxy = ActorProxy(actor, actor_id, res) return actor_proxy - - def stop_actor_by_id(self, actor_id, nowait=False): - return self.scatter('stop_actor', {'actor_id' : actor_id}, - nowait=nowait) - + + def stop_actor_by_id(self, actor_id, nowait=False): + return self.scatter('stop_actor', {'actor_id': actor_id}, + nowait=nowait) + def start(self): - debug('Starting ActorsManager') - + debug('Starting Agent') + + def _shutdown(self, cancel=True, close=True, clear=True): + try: + for actor in self.registry.itervalues(): + if actor and actor.consumer: + if cancel: + ignore_errors(self.connection, actor.consumer.cancel) + if close and actor.consumer.channel: + ignore_errors(self.connection, + actor.consumer.channel.close) + finally: + if clear: + self.registry.clear() + def stop(self): - if self.actors_consumer: - self.actors_consumer.cancel() - - def on_start(self, connection): - self.connection = connection - actor_consumer = self.Consumer(self.connection.channel()) - debug('ActorsManager start consuming blabla') - self.actor_consumer = actor_consumer - self.actor_consumer.consume() - self.contribute_to_state(self.state) - - def on_consumer_ready(self, consumer): - debug('ActorsManager in On consumer ready') - if consumer.connection: - raise Exception('Consumer is ready.') - consumer.on_reset_connection.append(self.on_start) - consumer.on_close_connection.append(self.stop) - \ No newline at end of file + self._shutdown(clear=False) + + def shutdown(self): + self._shutdown(cancel=False) diff --git a/celery/worker/consumer.py b/celery/worker/consumer.py index 6641058d4dd..7d8fb7454b0 100644 --- a/celery/worker/consumer.py +++ b/celery/worker/consumer.py @@ -118,6 +118,7 @@ class Namespace(bootsteps.Namespace): 'celery.worker.consumer:Control', 'celery.worker.consumer:Tasks', 'celery.worker.consumer:Evloop', + 'celery.worker.actors:Bootstep', ] def shutdown(self, parent): @@ -180,35 +181,6 @@ def start(self): error(CONNECTION_RETRY, exc_info=True) ns.restart(self) - def add_actor(self, actor_name, actor_id): - """Add actor to the actor registry and start the actor main method""" - try: - actor = instantiate(actor_name, connection = self.connection, - id = actor_id) - consumer = actor.Consumer(self.connection.channel()) - consumer.consume() - self.actor_registry[actor.id] = consumer - info('Register actor in the actor registry: %s' % actor_name) - return actor.id - except Exception as exc: - error('Start actor error: %r', exc, exc_info=True) - - def stop_all_actors(self): - for _, consumer in self.actor_registry.items(): - self.maybe_conn_error(consumer.cancel) - self.actor_registry.clear() - - - def reset_actor_nodes(self): - for _, consumer in self.actor_registry.items(): - self.maybe_conn_error(consumer.cancel) - consumer.consume() - - def stop_actor(self, actor_id): - if actor_id in self.actor_registry: - consumer = self.actor_registry.pop(actor_id) - self.maybe_conn_error(consumer.cancel) - def shutdown(self): self.namespace.shutdown(self) diff --git a/celery/worker/control.py b/celery/worker/control.py index 1b35bec1737..9c6a8e38326 100644 --- a/celery/worker/control.py +++ b/celery/worker/control.py @@ -53,6 +53,7 @@ def revoke(panel, task_id, terminate=False, signal=None, **kwargs): logger.info('Revoking task %s', task_id) return {'ok': 'revoking task {0}'.format(task_id)} + @Panel.register def report(panel): return {'ok': panel.app.bugreport()} @@ -213,7 +214,7 @@ def _extract_info(task): @Panel.register def ping(panel, **kwargs): - return {'ok':'pong'} + return {'ok': 'pong'} @Panel.register @@ -275,14 +276,7 @@ def active_queues(panel): return [dict(queue.as_dict(recurse=True)) for queue in panel.consumer.task_consumer.queues] + @Panel.register def dump_conf(panel, **kwargs): return jsonify(dict(panel.app.conf)) - -@Panel.register -def start_actor(panel, name, actor_id): - return panel.consumer.add_actor(name, actor_id) - -@Panel.register -def stop_actor(panel, actor_id): - return panel.consumer.stop_actor(actor_id)