Skip to content

Commit

Permalink
Cosmetics and stuff
Browse files Browse the repository at this point in the history
  • Loading branch information
ask committed Apr 29, 2012
1 parent cbd1e9d commit fa2d784
Show file tree
Hide file tree
Showing 26 changed files with 168 additions and 182 deletions.
1 change: 1 addition & 0 deletions celery/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
"celery.app": ["Celery", "bugreport"],
"celery.app.state": ["current_app", "current_task"],
"celery.canvas": ["chain", "chord", "group", "subtask"],
"celery.utils": ["uuid"],
},
direct={"task": "celery.task"},
__package__="celery",
Expand Down
10 changes: 9 additions & 1 deletion celery/app/builtins.py
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@ def apply(self, args=(), kwargs={}, **options):
@builtin_task
def add_chain_task(app):
from celery.canvas import maybe_subtask
from celery.result import EagerResult

class Chain(app.Task):
name = "celery.chain"
Expand All @@ -133,7 +134,6 @@ class Chain(app.Task):
def apply_async(self, args=(), kwargs={}, **options):
if self.app.conf.CELERY_ALWAYS_EAGER:
return self.apply(args, kwargs, **options)
tasks = kwargs["tasks"]
tasks = [maybe_subtask(task).clone(task_id=uuid(), **kwargs)
for task in kwargs["tasks"]]
reduce(lambda a, b: a.link(b), tasks)
Expand All @@ -143,6 +143,14 @@ def apply_async(self, args=(), kwargs={}, **options):
reduce(lambda a, b: a.set_parent(b), reversed(results))
return results[-1]

def apply(self, args=(), kwargs={}, **options):
tasks = [maybe_subtask(task).clone() for task in kwargs["tasks"]]
res = prev = None
for task in tasks:
res = task.apply((prev.get(), ) if prev else ())
res.parent, prev = prev, res
return res

return Chain


Expand Down
3 changes: 2 additions & 1 deletion celery/app/control.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ def inspect(self, destination=None, timeout=1, callback=None):
return Inspect(self, destination=destination, timeout=timeout,
callback=callback)

def discard_all(self, connection=None):
def purge(self, connection=None):
"""Discard all waiting tasks.
This will ignore all tasks waiting for execution, and they will
Expand All @@ -116,6 +116,7 @@ def discard_all(self, connection=None):
with self.app.default_connection(connection) as conn:
return self.app.amqp.get_task_consumer(connection=conn)\
.discard_all()
discard_all = purge

def revoke(self, task_id, destination=None, terminate=False,
signal="SIGTERM", **kwargs):
Expand Down
3 changes: 3 additions & 0 deletions celery/app/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,11 @@
import os
import platform as _platform

from operator import add

from celery import datastructures
from celery import platforms
from celery.utils.functional import maybe_list
from celery.utils.text import pretty
from celery.utils.imports import qualname

Expand Down
3 changes: 1 addition & 2 deletions celery/apps/beat.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,7 @@ def setup_logging(self):
def start_scheduler(self):
c = self.colored
if self.pidfile:
pidlock = platforms.create_pidlock(self.pidfile).acquire()
atexit.register(pidlock.release)
platforms.create_pidlock(self.pidfile)
beat = self.Service(app=self.app,
max_interval=self.max_interval,
scheduler_cls=self.scheduler_cls,
Expand Down
103 changes: 35 additions & 68 deletions celery/apps/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@
%(tasks)s
"""

UNKNOWN_QUEUE_ERROR = """\
UNKNOWN_QUEUE = """\
Trying to select queue subset of %r, but queue %s is not
defined in the CELERY_QUEUES setting.
Expand All @@ -69,7 +69,7 @@ class Worker(configurated):
redirect_stdouts = from_config()
redirect_stdouts_level = from_config()

def __init__(self, hostname=None, discard=False, embed_clockservice=False,
def __init__(self, hostname=None, purge=False, beat=False,
queues=None, include=None, app=None, pidfile=None,
autoscale=None, autoreload=False, no_execv=False, **kwargs):
self.app = app = app_or_default(app or self.app)
Expand All @@ -86,14 +86,11 @@ def __init__(self, hostname=None, discard=False, embed_clockservice=False,
self.concurrency = cpu_count()
except NotImplementedError:
self.concurrency = 2
self.discard = discard
self.embed_clockservice = embed_clockservice
if self.app.IS_WINDOWS and self.embed_clockservice:
self.die("-B option does not work on Windows. "
"Please run celerybeat as a separate service.")
self.purge = purge
self.beat = beat
self.use_queues = [] if queues is None else queues
self.queues = None
self.include = [] if include is None else include
self.include = include
self.pidfile = pidfile
self.autoscale = None
self.autoreload = autoreload
Expand All @@ -107,34 +104,29 @@ def __init__(self, hostname=None, discard=False, embed_clockservice=False,

if isinstance(self.use_queues, basestring):
self.use_queues = self.use_queues.split(",")
if isinstance(self.include, basestring):
self.include = self.include.split(",")

try:
self.loglevel = mlevel(self.loglevel)
except KeyError:
self.die("Unknown level %r. Please use one of %s." % (
self.loglevel,
"|".join(l for l in LOG_LEVELS.keys()
if isinstance(l, basestring))))
if self.include:
if isinstance(self.include, basestring):
self.include = self.include.split(",")
app.conf.CELERY_IMPORTS = tuple(
self.include) + tuple(app.conf.CELERY_IMPORTS)
self.loglevel = mlevel(self.loglevel)

def run(self):
self.init_loader()
self.init_queues()
self.worker_init()
self.app.loader.init_worker()
self.redirect_stdouts_to_logger()

if getattr(os, "getuid", None) and os.getuid() == 0:
warnings.warn(RuntimeWarning(
"Running celeryd with superuser privileges is discouraged!"))

if self.discard:
if self.purge:
self.purge_messages()

# Dump configuration to screen so we have some basic information
# for when users sends bug reports.
print(str(self.colored.cyan(" \n", self.startup_info())) +
str(self.colored.reset(self.extra_info())))
str(self.colored.reset(self.extra_info() or "")))
self.set_process_status("-active-")

try:
Expand All @@ -150,75 +142,56 @@ def init_queues(self):
try:
self.app.select_queues(self.use_queues)
except KeyError, exc:
raise ImproperlyConfigured(
UNKNOWN_QUEUE_ERROR % (self.use_queues, exc))

def init_loader(self):
self.loader = self.app.loader
self.settings = self.app.conf
for module in self.include:
self.loader.import_task_module(module)
raise ImproperlyConfigured(UNKNOWN_QUEUE % (self.use_queues, exc))

def redirect_stdouts_to_logger(self):
self.app.log.setup(self.loglevel, self.logfile,
self.redirect_stdouts,
self.redirect_stdouts_level)
self.redirect_stdouts, self.redirect_stdouts_level)

def purge_messages(self):
count = self.app.control.discard_all()
print("discard: Erased %d %s from the queue.\n" % (
count = self.app.control.purge()
print("purge: Erased %d %s from the queue.\n" % (
count, pluralize(count, "message")))

def worker_init(self):
# Run the worker init handler.
# (Usually imports task modules and such.)
self.loader.init_worker()

def tasklist(self, include_builtins=True):
tasklist = self.app.tasks.keys()
tasks = self.app.tasks.keys()
if not include_builtins:
tasklist = filter(lambda s: not s.startswith("celery."),
tasklist)
return "\n".join(" . %s" % task for task in sorted(tasklist))
tasks = filter(lambda s: not s.startswith("celery."), tasks)
return "\n".join(" . %s" % task for task in sorted(tasks))

def extra_info(self):
if self.loglevel <= logging.INFO:
include_builtins = self.loglevel <= logging.DEBUG
tasklist = self.tasklist(include_builtins=include_builtins)
return EXTRA_INFO_FMT % {"tasks": tasklist}
return ""

def startup_info(self):
app = self.app
concurrency = self.concurrency
if self.autoscale:
cmax, cmin = self.autoscale
concurrency = "{min=%s, max=%s}" % (cmin, cmax)
concurrency = "{min=%s, max=%s}" % self.autoscale
return BANNER % {
"hostname": self.hostname,
"version": __version__,
"conninfo": self.app.broker_connection().as_uri(),
"concurrency": concurrency,
"loglevel": LOG_LEVELS[self.loglevel],
"logfile": self.logfile or "[stderr]",
"celerybeat": "ON" if self.embed_clockservice else "OFF",
"celerybeat": "ON" if self.beat else "OFF",
"events": "ON" if self.send_events else "OFF",
"loader": qualname(self.loader),
"loader": qualname(self.app.loader),
"queues": app.amqp.queues.format(indent=18, indent_first=False),
}

def run_worker(self):
if self.pidfile:
pidlock = platforms.create_pidlock(self.pidfile).acquire()
atexit.register(pidlock.release)
platforms.create_pidlock(self.pidfile)
worker = self.WorkController(app=self.app,
hostname=self.hostname,
ready_callback=self.on_consumer_ready,
embed_clockservice=self.embed_clockservice,
autoscale=self.autoscale,
autoreload=self.autoreload,
no_execv=self.no_execv,
**self.confopts_as_dict())
hostname=self.hostname,
ready_callback=self.on_consumer_ready, beat=self.beat,
autoscale=self.autoscale, autoreload=self.autoreload,
no_execv=self.no_execv,
**self.confopts_as_dict())
self.install_platform_tweaks(worker)
signals.worker_init.send(sender=worker)
worker.start()
Expand Down Expand Up @@ -250,26 +223,20 @@ def osx_proxy_detection_workaround(self):
os.environ.setdefault("celery_dummy_proxy", "set_by_celeryd")

def set_process_status(self, info):
info = "%s (%s)" % (info, platforms.strargv(sys.argv))
return platforms.set_mp_process_title("celeryd",
info=info,
hostname=self.hostname)
info="%s (%s)" % (info, platforms.strargv(sys.argv)),
hostname=self.hostname)

def die(self, msg, exitcode=1):
sys.stderr.write("Error: %s\n" % (msg, ))
sys.exit(exitcode)


def _shutdown_handler(worker, sig="TERM", how="stop", exc=SystemExit,
callback=None):
types = {"terminate": "Cold", "stop": "Warm"}
callback=None, types={"terminate": "Cold", "stop": "Warm"}):

def _handle_request(signum, frame):
process_name = current_process()._name
if not process_name or process_name == "MainProcess":
if current_process()._name == "MainProcess":
if callback:
callback(worker)
print("celeryd: %s shutdown (%s)" % (types[how], process_name, ))
print("celeryd: %s shutdown (MainProcess)" % types[how])
getattr(worker, how)(in_sighandler=True)
raise exc()
_handle_request.__name__ = "worker_" + how
Expand Down
2 changes: 1 addition & 1 deletion celery/bin/celery.py
Original file line number Diff line number Diff line change
Expand Up @@ -242,7 +242,7 @@ class purge(Command):

def run(self, *args, **kwargs):
queues = len(self.app.amqp.queues.keys())
messages_removed = self.app.control.discard_all()
messages_removed = self.app.control.purge()
if messages_removed:
self.out("Purged %s %s from %s known task %s." % (
messages_removed, pluralize(messages_removed, "message"),
Expand Down
20 changes: 15 additions & 5 deletions celery/bin/celeryd.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@
.. cmdoption:: --purge
Discard all waiting tasks before the daemon is started.
Purges all waiting tasks before the daemon is started.
**WARNING**: This is unrecoverable, and the tasks will be
deleted from the messaging server.
Expand Down Expand Up @@ -118,6 +118,7 @@
from billiard import freeze_support

from celery.bin.base import Command, Option
from celery.utils.log import LOG_LEVELS, mlevel


class WorkerCommand(Command):
Expand All @@ -133,6 +134,17 @@ def run(self, *args, **kwargs):
from celery import concurrency
kwargs["pool_cls"] = concurrency.get_implementation(
kwargs.get("pool_cls") or self.app.conf.CELERYD_POOL)
if self.app.IS_WINDOWS and kwargs.get("beat"):
self.die("-B option does not work on Windows. "
"Please run celerybeat as a separate service.")
loglevel = kwargs.get("loglevel")
if loglevel:
try:
kwargs["loglevel"] = mlevel(loglevel)
except KeyError:
self.die("Unknown level %r. Please use one of %s." % (
loglevel, "|".join(l for l in LOG_LEVELS.keys()
if isinstance(l, basestring))))
return self.app.Worker(**kwargs).run()

def get_options(self):
Expand All @@ -141,13 +153,11 @@ def get_options(self):
Option('-c', '--concurrency',
default=conf.CELERYD_CONCURRENCY, type="int"),
Option('-P', '--pool', default=conf.CELERYD_POOL, dest="pool_cls"),
Option('--purge', '--discard', default=False,
action="store_true", dest="discard"),
Option('--purge', '--discard', default=False, action="store_true"),
Option('-f', '--logfile', default=conf.CELERYD_LOG_FILE),
Option('-l', '--loglevel', default=conf.CELERYD_LOG_LEVEL),
Option('-n', '--hostname'),
Option('-B', '--beat',
action="store_true", dest="embed_clockservice"),
Option('-B', '--beat', action="store_true"),
Option('-s', '--schedule', dest="schedule_filename",
default=conf.CELERYBEAT_SCHEDULE_FILENAME),
Option('--scheduler', dest="scheduler_cls"),
Expand Down
10 changes: 3 additions & 7 deletions celery/canvas.py
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,6 @@ def set(self, **options):
return self

def apply_async(self, args=(), kwargs={}, **options):
"""Apply this task asynchronously."""
# For callbacks: extra args are prepended to the stored args.
args, kwargs, options = self._merge(args, kwargs, options)
return self.type.apply_async(args, kwargs, **options)
Expand All @@ -144,18 +143,12 @@ def append_to_list_option(self, key, value):
return value

def link(self, callback):
"""Add a callback task to be applied if this task
executes successfully."""
return self.append_to_list_option("link", callback)

def link_error(self, errback):
"""Add a callback task to be applied if an error occurs
while executing this task."""
return self.append_to_list_option("link_error", errback)

def flatten_links(self):
"""Gives a recursive list of dependencies (unchain if you will,
but with links intact)."""
return list(chain_from_iterable(_chain([[self]],
(link.flatten_links()
for link in maybe_list(self.options.get("link")) or []))))
Expand Down Expand Up @@ -201,6 +194,9 @@ def __init__(self, *tasks, **options):
self.tasks = tasks
self.subtask_type = "chain"

def __call__(self, *args, **kwargs):
return self.apply_async(*args, **kwargs)

@classmethod
def from_dict(self, d):
return chain(*d["kwargs"]["tasks"], **kwdict(d["options"]))
Expand Down
8 changes: 4 additions & 4 deletions celery/datastructures.py
Original file line number Diff line number Diff line change
Expand Up @@ -211,13 +211,13 @@ class AttributeDictMixin(object):
"""

def __getattr__(self, key):
def __getattr__(self, k):
"""`d.key -> d[key]`"""
try:
return self[key]
return self[k]
except KeyError:
raise AttributeError("'%s' object has no attribute '%s'" % (
self.__class__.__name__, key))
raise AttributeError(
"'%s' object has no attribute '%s'" % (type(self).__name__, k))

def __setattr__(self, key, value):
"""`d[key] = value -> d.key = value`"""
Expand Down
Loading

0 comments on commit fa2d784

Please sign in to comment.