Permalink
Browse files

Flake8

  • Loading branch information...
1 parent b987f40 commit d65e1b0c91859f023b00d7a32985483cb8f9b5fe @ask committed May 6, 2012
View
@@ -80,6 +80,7 @@ def _compat_periodic_task_decorator(*args, **kwargs):
"inspect": "control.inspect",
},
"schedules": "celery.schedules",
+ "chords": "celery.canvas",
}
}
View
@@ -15,12 +15,12 @@
from weakref import WeakValueDictionary
from kombu import BrokerConnection, Consumer, Exchange, Producer, Queue
-from kombu.common import entry_to_queue, maybe_declare
+from kombu.common import entry_to_queue
from kombu.pools import ProducerPool
from celery import signals
-from celery.utils import cached_property, lpmerge, uuid
-from celery.utils import text
+from celery.utils import cached_property, uuid
+from celery.utils.text import indent as textindent
from . import routes as _routes
@@ -109,8 +109,8 @@ def format(self, indent=0, indent_first=True):
"routing_key": q.routing_key}
for name, q in sorted(active.iteritems())]
if indent_first:
- return text.indent("\n".join(info), indent)
- return info[0] + "\n" + text.indent("\n".join(info[1:]), indent)
+ return textindent("\n".join(info), indent)
+ return info[0] + "\n" + textindent("\n".join(info[1:]), indent)
def select_subset(self, wanted):
"""Sets :attr:`consume_from` by selecting a subset of the
@@ -156,19 +156,17 @@ def delay_task(self, task_name, task_args=None, task_kwargs=None,
routing_key=None, serializer=None, delivery_mode=None,
compression=None, **kwargs):
"""Send task message."""
-
- connection = self.connection
- _retry_policy = self.retry_policy
- if retry_policy: # merge default and custom policy
- _retry_policy = dict(_retry_policy, **retry_policy)
+ # merge default and custom policy
+ _rp = (dict(self.retry_policy, **retry_policy) if retry_policy
+ else self.retry_policy)
task_id = task_id or uuid()
task_args = task_args or []
task_kwargs = task_kwargs or {}
if not isinstance(task_args, (list, tuple)):
raise ValueError("task args must be a list or tuple")
if not isinstance(task_kwargs, dict):
raise ValueError("task kwargs must be a dictionary")
- if countdown: # Convert countdown to ETA.
+ if countdown: # Convert countdown to ETA.
now = now or self.app.now()
eta = now + timedelta(seconds=countdown)
if isinstance(expires, (int, float)):
@@ -179,8 +177,8 @@ def delay_task(self, task_name, task_args=None, task_kwargs=None,
body = {"task": task_name,
"id": task_id,
- "args": task_args or [],
- "kwargs": task_kwargs or {},
+ "args": task_args,
+ "kwargs": task_kwargs,
"retries": retries or 0,
"eta": eta,
"expires": expires,
@@ -195,9 +193,8 @@ def delay_task(self, task_name, task_args=None, task_kwargs=None,
self.publish(body, exchange=exchange, mandatory=mandatory,
immediate=immediate, routing_key=routing_key,
serializer=serializer or self.serializer,
- delivery_mode=delivery_mode,
compression=compression or self.compression,
- retry=retry, retry_policy=retry_policy,
+ retry=retry, retry_policy=_rp, delivery_mode=delivery_mode,
declare=[self.app.amqp.queues[queue]] if queue else [])
signals.task_sent.send(sender=task_name, **body)
@@ -298,8 +295,8 @@ def queues(self):
"""Queue name⇒ declaration mapping."""
return self.Queues(self.app.conf.CELERY_QUEUES)
- @queues.setter
- def queues(self, queues): # noqa
+ @queues.setter # noqa
+ def queues(self, queues):
return self.Queues(queues)
@property
View
@@ -209,7 +209,6 @@ def default_producer(self, producer=None):
with self.amqp.publisher_pool.acquire(block=True) as producer:
yield producer
-
def with_default_connection(self, fun):
"""With any function accepting a `connection`
keyword argument, establishes a default connection if one is
View
@@ -81,7 +81,6 @@ def xmap(task, it):
return list(map(task, it))
-
@builtin_task
def add_starmap_task(app):
from celery.canvas import subtask
@@ -144,7 +143,8 @@ def apply_async(self, args=(), kwargs={}, **options):
if self.app.conf.CELERY_ALWAYS_EAGER:
return self.apply(args, kwargs, **options)
tasks, result, gid = self.prepare(options, **kwargs)
- super(Group, self).apply_async((list(tasks), result, gid), **options)
+ super(Group, self).apply_async(
+ (list(tasks), result, gid), **options)
return result
def apply(self, args=(), kwargs={}, **options):
View
@@ -1,2 +0,0 @@
-# compat module
-from celery.canvas import Chord, chord # noqa
View
@@ -1,9 +0,0 @@
-# -*- coding: utf-8 -*-
-from __future__ import absolute_import
-
-import warnings
-from celery.schedules import schedule, crontab_parser, crontab # noqa
-from celery.exceptions import CDeprecationWarning
-
-warnings.warn(CDeprecationWarning(
- "celery.task.schedules is deprecated and renamed to celery.schedules"))
View
@@ -2,6 +2,7 @@
from __future__ import absolute_import
from __future__ import with_statement
+from celery.app import app_or_default
from celery.app.state import get_current_task
from celery.canvas import subtask, maybe_subtask # noqa
from celery.utils import uuid
@@ -22,10 +23,10 @@ class TaskSet(UserList):
>>> list_of_return_values = taskset_result.join() # *expensive*
"""
- _app = None
+ app = None
def __init__(self, tasks=None, app=None, Publisher=None):
- self.app = app_or_default(app)
+ self.app = app_or_default(app or self.app)
self.data = [maybe_subtask(t) for t in tasks or []]
self.total = len(self.tasks)
self.Publisher = Publisher or self.app.amqp.TaskProducer
@@ -263,8 +263,6 @@ def send(self, type, **fields):
chan.close()
assert conn.transport_cls == "memory"
- entities = conn.declared_entities
-
pub = self.app.amqp.TaskPublisher(conn,
exchange=Exchange("foo_exchange"))
@@ -3,7 +3,6 @@
from functools import wraps
-from kombu import Exchange
from kombu.utils.functional import maybe_promise
from celery import current_app
@@ -3,8 +3,6 @@
import anyjson
-from mock import Mock
-
from celery import current_app
from celery.task import Task
from celery.task.sets import subtask, TaskSet
@@ -175,7 +173,7 @@ def apply(self, *args, **kwargs):
def test_set_app(self):
ts = TaskSet([])
ts.app = 42
- self.assertEqual(ts._app, 42)
+ self.assertEqual(ts.app, 42)
def test_set_tasks(self):
ts = TaskSet([])
@@ -185,4 +183,4 @@ def test_set_tasks(self):
def test_set_Publisher(self):
ts = TaskSet([])
ts.Publisher = 42
- self.assertEqual(ts._Publisher, 42)
+ self.assertEqual(ts.Publisher, 42)
@@ -10,7 +10,6 @@ SKIP_FILES="celery.__compat__.rst
celery.concurrency.processes._win.rst
celery.contrib.rst
celery.contrib.bundles.rst
- celery.task.schedules.rst
celery.local.rst
celery.app.base.rst
celery.apps.rst

0 comments on commit d65e1b0

Please sign in to comment.