Skip to content

Commit

Permalink
Cosmetics
Browse files Browse the repository at this point in the history
  • Loading branch information
ask committed Apr 16, 2012
1 parent 6e362f9 commit 9571e0e
Show file tree
Hide file tree
Showing 8 changed files with 119 additions and 120 deletions.
4 changes: 2 additions & 2 deletions celery/app/amqp.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@

from kombu import BrokerConnection, Exchange
from kombu import compat as messaging
from kombu.pools import ProducerPool
from kombu import pools

from celery import signals
from celery.utils import cached_property, lpmerge, uuid
Expand Down Expand Up @@ -259,7 +259,7 @@ def __exit__(self, *exc_info):
self.close()


class PublisherPool(ProducerPool):
class PublisherPool(pools.ProducerPool):

def __init__(self, app):
self.app = app
Expand Down
4 changes: 4 additions & 0 deletions celery/app/annotations.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,10 @@
_first_match_any = firstmethod("annotate_any")


def resolve_all(anno, task):
return filter(None, (_first_match(anno, task), _first_match_any(anno)))


class MapAnnotation(dict):

def annotate_any(self):
Expand Down
188 changes: 90 additions & 98 deletions celery/app/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,24 +29,24 @@
from celery.utils.functional import first
from celery.utils.imports import instantiate, symbol_by_name

from .annotations import (
_first_match, _first_match_any,
prepare as prepare_annotations,
)
from .annotations import prepare as prepare_annotations
from .builtins import load_builtin_tasks
from .defaults import DEFAULTS, find_deprecated_settings
from .state import _tls, get_current_app
from .utils import AppPickler, Settings, bugreport, _unpickle_app


def _unpickle_appattr(reverse, args):
return getattr(get_current_app(), reverse)(*args)
def _unpickle_appattr(reverse_name, args):
"""Given an attribute name and a list of args, gets
the attribute from the current app and calls it."""
return getattr(get_current_app(), reverse_name)(*args)


class Celery(object):
"""Celery Application.
:param main: Name of the main module if running as `__main__`.
:keyword broker: URL of the default broker used.
:keyword loader: The loader class, or the name of the loader class to use.
Default is :class:`celery.loaders.app.AppLoader`.
:keyword backend: The result store backend class, or the name of the
Expand All @@ -57,6 +57,7 @@ class Celery(object):
:keyword log: Log object or class name.
:keyword control: Control object or class name.
:keyword set_as_current: Make this the global current app.
:keyword tasks: A task registry or the name of a registry class.
"""
Pickler = AppPickler
Expand Down Expand Up @@ -108,6 +109,7 @@ def set_current(self):
_tls.current_app = self

def on_init(self):
"""Optional callback called at init."""
pass

def create_task_cls(self):
Expand All @@ -116,50 +118,6 @@ def create_task_cls(self):
return self.subclass_with_self("celery.app.task:BaseTask", name="Task",
attribute="_app", abstract=True)

def subclass_with_self(self, Class, name=None, attribute="app",
reverse=None, **kw):
"""Subclass an app-compatible class by setting its app attribute
to be this app instance.
App-compatible means that the class has a class attribute that
provides the default app it should use, e.g.
``class Foo: app = None``.
:param Class: The app-compatible class to subclass.
:keyword name: Custom name for the target class.
:keyword attribute: Name of the attribute holding the app,
default is "app".
"""
Class = symbol_by_name(Class)
reverse = reverse if reverse else Class.__name__

def __reduce__(self):
return _unpickle_appattr, (reverse, self.__reduce_args__())

attrs = dict({attribute: self}, __module__=Class.__module__,
__doc__=Class.__doc__, __reduce__=__reduce__, **kw)

return type(name or Class.__name__, (Class, ), attrs)

@cached_property
def Worker(self):
"""Create new :class:`~celery.apps.worker.Worker` instance."""
return self.subclass_with_self("celery.apps.worker:Worker")

@cached_property
def WorkController(self, **kwargs):
return self.subclass_with_self("celery.worker:WorkController")

@cached_property
def Beat(self, **kwargs):
"""Create new :class:`~celery.apps.beat.Beat` instance."""
return self.subclass_with_self("celery.apps.beat:Beat")

@cached_property
def TaskSet(self):
return self.subclass_with_self("celery.task.sets:group")

def start(self, argv=None):
"""Run :program:`celery` using `argv`. Uses :data:`sys.argv`
if `argv` is not specified."""
Expand Down Expand Up @@ -233,46 +191,9 @@ def _task_from_fun(self, fun, **options):
task.bind(self)
return task

def annotate_task(self, task):
if self.annotations:
match = _first_match(self.annotations, task)
for attr, value in (match or {}).iteritems():
setattr(task, attr, value)
match_any = _first_match_any(self.annotations)
for attr, value in (match_any or {}).iteritems():
setattr(task, attr, value)

@cached_property
def Task(self):
"""Default Task base class for this application."""
return self.create_task_cls()

@cached_property
def annotations(self):
return prepare_annotations(self.conf.CELERY_ANNOTATIONS)

def __repr__(self):
return "<Celery: %s:0x%x>" % (self.main or "__main__", id(self), )

def __reduce__(self):
# Reduce only pickles the configuration changes,
# so the default configuration doesn't have to be passed
# between processes.
return (_unpickle_app, (self.__class__, self.Pickler)
+ self.__reduce_args__())

def __reduce_args__(self):
return (self.main,
self.conf.changes,
self.loader_cls,
self.backend_cls,
self.amqp_cls,
self.events_cls,
self.log_cls,
self.control_cls,
self.accept_magic_kwargs)

def finalize(self):
"""Finalizes the app by loading built-in tasks,
and evaluating pending task decorators."""
if not self.finalized:
load_builtin_tasks(self)

Expand Down Expand Up @@ -351,14 +272,6 @@ def send_task(self, name, args=None, kwargs=None, countdown=None,
publisher or publish.close()
return result_cls(new_id)

@cached_property
def AsyncResult(self):
return self.subclass_with_self("celery.result:AsyncResult")

@cached_property
def TaskSetResult(self):
return self.subclass_with_self("celery.result:TaskSetResult")

def broker_connection(self, hostname=None, userid=None,
password=None, virtual_host=None, port=None, ssl=None,
insist=None, connect_timeout=None, transport=None,
Expand Down Expand Up @@ -483,6 +396,85 @@ def _after_fork(self, obj_):
self._pool.force_close_all()
self._pool = None

def subclass_with_self(self, Class, name=None, attribute="app",
reverse=None, **kw):
"""Subclass an app-compatible class by setting its app attribute
to be this app instance.
App-compatible means that the class has a class attribute that
provides the default app it should use, e.g.
``class Foo: app = None``.
:param Class: The app-compatible class to subclass.
:keyword name: Custom name for the target class.
:keyword attribute: Name of the attribute holding the app,
default is "app".
"""
Class = symbol_by_name(Class)
reverse = reverse if reverse else Class.__name__

def __reduce__(self):
return _unpickle_appattr, (reverse, self.__reduce_args__())

attrs = dict({attribute: self}, __module__=Class.__module__,
__doc__=Class.__doc__, __reduce__=__reduce__, **kw)

return type(name or Class.__name__, (Class, ), attrs)

def __repr__(self):
return "<%s %s:0x%x>" % (self.__class__.__name__,
self.main or "__main__", id(self), )

def __reduce__(self):
# Reduce only pickles the configuration changes,
# so the default configuration doesn't have to be passed
# between processes.
return (_unpickle_app, (self.__class__, self.Pickler)
+ self.__reduce_args__())

def __reduce_args__(self):
return (self.main, self.conf.changes, self.loader_cls,
self.backend_cls, self.amqp_cls, self.events_cls,
self.log_cls, self.control_cls, self.accept_magic_kwargs)


@cached_property
def Worker(self):
"""Create new :class:`~celery.apps.worker.Worker` instance."""
return self.subclass_with_self("celery.apps.worker:Worker")

@cached_property
def WorkController(self, **kwargs):
return self.subclass_with_self("celery.worker:WorkController")

@cached_property
def Beat(self, **kwargs):
"""Create new :class:`~celery.apps.beat.Beat` instance."""
return self.subclass_with_self("celery.apps.beat:Beat")

@cached_property
def TaskSet(self):
return self.subclass_with_self("celery.task.sets:group")

@cached_property
def Task(self):
"""Default Task base class for this application."""
return self.create_task_cls()

@cached_property
def annotations(self):
return prepare_annotations(self.conf.CELERY_ANNOTATIONS)


@cached_property
def AsyncResult(self):
return self.subclass_with_self("celery.result:AsyncResult")

@cached_property
def TaskSetResult(self):
return self.subclass_with_self("celery.result:TaskSetResult")

@property
def pool(self):
if self._pool is None:
Expand Down Expand Up @@ -537,4 +529,4 @@ def tasks(self):
"""
self.finalize()
return self._tasks
App = Celery
App = Celery # compat
2 changes: 1 addition & 1 deletion celery/app/routes.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ def route(self, options, task, args=(), kwargs={}):
return lpmerge(self.expand_destination(route), options)
if "queue" not in options:
options = lpmerge(self.expand_destination(
self.app.conf.CELERY_DEFAULT_QUEUE), options)
self.app.conf.CELERY_DEFAULT_QUEUE), options)
return options

def expand_destination(self, route):
Expand Down
21 changes: 11 additions & 10 deletions celery/result.py
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,14 @@ def failed(self):
"""Returns :const:`True` if the task failed."""
return self.state == states.FAILURE

def build_graph(self, intermediate=False):
graph = DependencyGraph()
for parent, node in self.iterdeps(intermediate=intermediate):
if parent:
graph.add_arc(parent)
graph.add_edge(parent, node)
return graph

def __str__(self):
"""`str(self) -> self.id`"""
return self.id
Expand All @@ -203,21 +211,14 @@ def __eq__(self, other):
return NotImplemented

def __copy__(self):
return self.__class__(self.id, backend=self.backend)
r = self.__reduce__()
return r[0](*r[1])

def __reduce__(self):
return self.__class__, self.__reduce_args__()

def __reduce_args__(self):
return self.id, self.task_name, self.backend

def build_graph(self, intermediate=False):
graph = DependencyGraph()
for parent, node in self.iterdeps(intermediate=intermediate):
if parent:
graph.add_arc(parent)
graph.add_edge(parent, node)
return graph
return self.id, self.backend, self.task_name, self.parent

def set_parent(self, parent):
self.parent = parent
Expand Down
4 changes: 2 additions & 2 deletions celery/task/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,11 @@
from celery.app.task import Context, TaskType, BaseTask # noqa
from celery.schedules import maybe_schedule

#: list of methods that are classmethods in the old API.
#: list of methods that must be classmethods in the old API.
_COMPAT_CLASSMETHODS = (
"get_logger", "establish_connection", "get_publisher", "get_consumer",
"delay", "apply_async", "retry", "apply", "AsyncResult", "subtask",
"bind", "on_bound", "_get_app")
"bind", "on_bound", "_get_app", "annotate")


class Task(BaseTask):
Expand Down
14 changes: 8 additions & 6 deletions celery/utils/functional.py
Original file line number Diff line number Diff line change
Expand Up @@ -179,21 +179,23 @@ def first(predicate, iterable):


def firstmethod(method):
"""Returns a functions that with a list of instances,
"""Returns a function that with a list of instances,
finds the first instance that returns a value for the given method.
The list can also contain promises (:class:`promise`.)
"""

def _matcher(seq, *args, **kwargs):
for cls in seq:
def _matcher(it, *args, **kwargs):
for obj in it:
try:
answer = getattr(maybe_promise(cls), method)(*args, **kwargs)
if answer is not None:
return answer
answer = getattr(maybe_promise(obj), method)(*args, **kwargs)
except AttributeError:
pass
else:
if answer is not None:
return answer

return _matcher


Expand Down
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ def run(self, *args, **kwargs):
# -*- %%% -*-

setup(
name="celery",
name=NAME,
version=meta["VERSION"],
description=meta["doc"],
author=meta["author"],
Expand Down

0 comments on commit 9571e0e

Please sign in to comment.