View
@@ -56,7 +56,6 @@
IS_OSX = SYSTEM == 'Darwin'
IS_WINDOWS = SYSTEM == 'Windows'
DAEMON_UMASK = 0
DAEMON_WORKDIR = '/'
PIDFILE_FLAGS = os.O_CREAT | os.O_EXCL | os.O_WRONLY
@@ -295,8 +294,11 @@ class DaemonContext(object):
def __init__(self, pidfile=None, workdir=None, umask=None,
fake=False, after_chdir=None, **kwargs):
if isinstance(umask, string_t):
# octal or decimal, depending on initial zero.
umask = int(umask, 8 if umask.startswith('0') else 10)
self.workdir = workdir or DAEMON_WORKDIR
self.umask = DAEMON_UMASK if umask is None else umask
self.umask = umask
self.fake = fake
self.after_chdir = after_chdir
self.stdfds = (sys.stdin, sys.stdout, sys.stderr)
@@ -312,14 +314,16 @@ def open(self):
self._detach()
os.chdir(self.workdir)
os.umask(self.umask)
if self.umask is not None:
os.umask(self.umask)
if self.after_chdir:
self.after_chdir()
close_open_fds(self.stdfds)
for fd in self.stdfds:
self.redirect_to_null(maybe_fileno(fd))
if not self.fake:
close_open_fds(self.stdfds)
for fd in self.stdfds:
self.redirect_to_null(maybe_fileno(fd))
self._is_open = True
__enter__ = open
View
@@ -96,6 +96,7 @@ def as_tuple(self):
def forget(self):
"""Forget about (and possibly remove the result of) this task."""
self._cache = None
self.backend.forget(self.id)
def revoke(self, connection=None, terminate=False, signal=None,
@@ -119,8 +120,10 @@ def revoke(self, connection=None, terminate=False, signal=None,
terminate=terminate, signal=signal,
reply=wait, timeout=timeout)
def get(self, timeout=None, propagate=True, interval=0.5, no_ack=True,
follow_parents=True):
def get(self, timeout=None, propagate=True, interval=0.5,
no_ack=True, follow_parents=True,
EXCEPTION_STATES=states.EXCEPTION_STATES,
PROPAGATE_STATES=states.PROPAGATE_STATES):
"""Wait until task is ready, and return its result.
.. warning::
@@ -159,16 +162,18 @@ def get(self, timeout=None, propagate=True, interval=0.5, no_ack=True,
self.maybe_reraise()
return self.result
try:
return self.backend.wait_for(
self.id, timeout=timeout,
propagate=propagate,
interval=interval,
on_interval=on_interval,
no_ack=no_ack,
)
finally:
self._get_task_meta() # update self._cache
meta = self.backend.wait_for(
self.id, timeout=timeout,
interval=interval,
on_interval=on_interval,
no_ack=no_ack,
)
if meta:
self._maybe_set_cache(meta)
status = meta['status']
if status in PROPAGATE_STATES and propagate:
raise meta['result']
return meta['result']
wait = get # deprecated alias to :meth:`get`.
def _maybe_reraise_parent_error(self):
@@ -322,21 +327,20 @@ def supports_native_join(self):
def children(self):
return self._get_task_meta().get('children')
def _maybe_set_cache(self, meta):
if meta:
state = meta['status']
if state == states.SUCCESS or state in states.PROPAGATE_STATES:
return self._set_cache(meta)
return meta
def _get_task_meta(self):
if self._cache is None:
meta = self.backend.get_task_meta(self.id)
if meta:
state = meta['status']
if state == states.SUCCESS or state in states.PROPAGATE_STATES:
self._set_cache(meta)
return self._set_cache(meta)
return meta
return self._maybe_set_cache(self.backend.get_task_meta(self.id))
return self._cache
def _set_cache(self, d):
state, children = d['status'], d.get('children')
if state in states.EXCEPTION_STATES:
d['result'] = self.backend.exception_to_python(d['result'])
children = d.get('children')
if children:
d['children'] = [
result_from_tuple(child, self.app) for child in children
@@ -658,7 +662,7 @@ def iter_native(self, timeout=None, interval=0.5, no_ack=True):
results = self.results
if not results:
return iter([])
return results[0].backend.get_many(
return self.backend.get_many(
set(r.id for r in results),
timeout=timeout, interval=interval, no_ack=no_ack,
)
@@ -718,7 +722,14 @@ def subtasks(self):
@property
def supports_native_join(self):
return self.results[0].supports_native_join
try:
return self.results[0].supports_native_join
except IndexError:
pass
@property
def backend(self):
return self.app.backend if self.app else self.results[0].backend
class GroupResult(ResultSet):
View
@@ -12,7 +12,7 @@
from __future__ import absolute_import
from celery._state import current_app, current_task as current
from celery.five import MagicModule, recreate_module
from celery.five import LazyModule, recreate_module
from celery.local import Proxy
__all__ = [
@@ -32,7 +32,7 @@
from .sets import TaskSet
class module(MagicModule):
class module(LazyModule):
def __call__(self, *args, **kwargs):
return self.task(*args, **kwargs)
View
@@ -38,7 +38,7 @@ class Task(BaseTask):
__bound__ = False
__v2_compat__ = True
#- Deprecated compat. attributes -:
# - Deprecated compat. attributes -:
queue = None
routing_key = None
View
@@ -252,7 +252,7 @@ def foo(shared=False):
_state._task_stack.pop()
def test_task_not_shared(self):
with patch('celery.app.base.shared_task') as sh:
with patch('celery.app.base.connect_on_app_finalize') as sh:
@self.app.task(shared=False)
def foo():
pass
@@ -430,7 +430,7 @@ def test_config_from_cmdline(self):
{'foo': 'bar'})
def test_compat_setting_CELERY_BACKEND(self):
self.app._preconf = {} # removes result backend set by AppCase
self.app.config_from_object(Object(CELERY_BACKEND='set_by_us'))
self.assertEqual(self.app.conf.CELERY_RESULT_BACKEND, 'set_by_us')
View
@@ -206,9 +206,11 @@ def test_import_from_cwd(self):
except ValueError:
pass
celery = sys.modules.pop('celery', None)
sys.modules.pop('celery.five', None)
try:
self.assertTrue(l.import_from_cwd('celery'))
sys.modules.pop('celery', None)
sys.modules.pop('celery.five', None)
sys.path.insert(0, os.getcwd())
self.assertTrue(l.import_from_cwd('celery'))
finally:
@@ -261,7 +263,10 @@ def test_find_related_module(self):
imp.return_value.__path__ = 'foo'
base.find_related_module(base, 'tasks')
imp.side_effect = AttributeError()
def se1(val):
imp.side_effect = AttributeError()
imp.side_effect = se1
base.find_related_module(base, 'tasks')
imp.side_effect = None
View
@@ -97,7 +97,7 @@ class test_ColorFormatter(AppCase):
@patch('celery.utils.log.safe_str')
@patch('logging.Formatter.formatException')
def test_formatException_not_string(self, fe, safe_str):
x = ColorFormatter('HELLO')
x = ColorFormatter()
value = KeyError()
fe.return_value = value
self.assertIs(x.formatException(value), value)
@@ -106,16 +106,19 @@ def test_formatException_not_string(self, fe, safe_str):
@patch('logging.Formatter.formatException')
@patch('celery.utils.log.safe_str')
def test_formatException_string(self, safe_str, fe, value='HELLO'):
x = ColorFormatter(value)
fe.return_value = value
self.assertTrue(x.formatException(value))
def test_formatException_string(self, safe_str, fe):
x = ColorFormatter()
fe.return_value = 'HELLO'
try:
raise Exception()
except Exception:
self.assertTrue(x.formatException(sys.exc_info()))
if sys.version_info[0] == 2:
self.assertTrue(safe_str.called)
@patch('logging.Formatter.format')
def test_format_object(self, _format):
x = ColorFormatter(object())
x = ColorFormatter()
x.use_color = True
record = Mock()
record.levelname = 'ERROR'
@@ -124,7 +127,7 @@ def test_format_object(self, _format):
@patch('celery.utils.log.safe_str')
def test_format_raises(self, safe_str):
x = ColorFormatter('HELLO')
x = ColorFormatter()
def on_safe_str(s):
try:
@@ -136,6 +139,7 @@ def on_safe_str(s):
class Record(object):
levelname = 'ERROR'
msg = 'HELLO'
exc_info = 1
exc_text = 'error text'
stack_info = None
@@ -148,15 +152,15 @@ def getMessage(self):
record = Record()
safe_str.return_value = record
x.format(record)
self.assertIn('<Unrepresentable', record.msg)
msg = x.format(record)
self.assertIn('<Unrepresentable', msg)
self.assertEqual(safe_str.call_count, 1)
@patch('celery.utils.log.safe_str')
def test_format_raises_no_color(self, safe_str):
if sys.version_info[0] == 3:
raise SkipTest('py3k')
x = ColorFormatter('HELLO', False)
x = ColorFormatter(use_color=False)
record = Mock()
record.levelname = 'ERROR'
record.msg = 'HELLO'
View
@@ -2,7 +2,7 @@
from collections import Mapping, MutableMapping
from celery.app.utils import Settings, bugreport
from celery.app.utils import Settings, filter_hidden_settings, bugreport
from celery.tests.case import AppCase, Mock
@@ -20,6 +20,22 @@ def test_is_mutable_mapping(self):
self.assertTrue(issubclass(Settings, MutableMapping))
class test_filter_hidden_settings(AppCase):
def test_handles_non_string_keys(self):
"""filter_hidden_settings shouldn't raise an exception when handling
mappings with non-string keys"""
conf = {
'STRING_KEY': 'VALUE1',
('NON', 'STRING', 'KEY'): 'VALUE2',
'STRING_KEY2': {
'STRING_KEY3': 1,
('NON', 'STRING', 'KEY', '2'): 2
},
}
filter_hidden_settings(conf)
class test_bugreport(AppCase):
def test_no_conn_driver_info(self):
View
@@ -234,15 +234,14 @@ def test_wait_for(self):
with self.assertRaises(TimeoutError):
b.wait_for(tid, timeout=0.1)
b.store_result(tid, 42, states.SUCCESS)
self.assertEqual(b.wait_for(tid, timeout=1), 42)
self.assertEqual(b.wait_for(tid, timeout=1)['result'], 42)
b.store_result(tid, 56, states.SUCCESS)
self.assertEqual(b.wait_for(tid, timeout=1), 42,
self.assertEqual(b.wait_for(tid, timeout=1)['result'], 42,
'result is cached')
self.assertEqual(b.wait_for(tid, timeout=1, cache=False), 56)
self.assertEqual(b.wait_for(tid, timeout=1, cache=False)['result'], 56)
b.store_result(tid, KeyError('foo'), states.FAILURE)
with self.assertRaises(KeyError):
b.wait_for(tid, timeout=1, cache=False)
self.assertTrue(b.wait_for(tid, timeout=1, propagate=False))
res = b.wait_for(tid, timeout=1, cache=False)
self.assertEqual(res['status'], states.FAILURE)
b.store_result(tid, KeyError('foo'), states.PENDING)
with self.assertRaises(TimeoutError):
b.wait_for(tid, timeout=0.01, cache=False)
@@ -271,12 +270,16 @@ def test_get_many(self):
tids.append(tid)
res = list(b.get_many(tids, timeout=1))
expected_results = [(tid, {'status': states.SUCCESS,
'result': i,
'traceback': None,
'task_id': tid,
'children': None})
for i, tid in enumerate(tids)]
expected_results = [
(task_id, {
'status': states.SUCCESS,
'result': i,
'traceback': None,
'task_id': task_id,
'children': None,
})
for i, task_id in enumerate(tids)
]
self.assertEqual(sorted(res), sorted(expected_results))
self.assertDictEqual(b._cache[res[0][0]], res[0][1])
cached_res = list(b.get_many(tids, timeout=1))
View
@@ -19,15 +19,6 @@ def test_get_backend_aliases(self):
expect_cls,
)
def test_get_backend_cache(self):
backends.get_backend_cls.clear()
hits = backends.get_backend_cls.hits
misses = backends.get_backend_cls.misses
self.assertTrue(backends.get_backend_cls('amqp', self.app.loader))
self.assertEqual(backends.get_backend_cls.misses, misses + 1)
self.assertTrue(backends.get_backend_cls('amqp', self.app.loader))
self.assertEqual(backends.get_backend_cls.hits, hits + 1)
def test_unknown_backend(self):
with self.assertRaises(ImportError):
backends.get_backend_cls('fasodaopjeqijwqe', self.app.loader)
View
@@ -42,16 +42,16 @@ def setup(self):
self.uri = 'sqlite:///test.db'
def test_retry_helper(self):
from celery.backends.database import OperationalError
from celery.backends.database import DatabaseError
calls = [0]
@retry
def raises():
calls[0] += 1
raise OperationalError(1, 2, 3)
raise DatabaseError(1, 2, 3)
with self.assertRaises(OperationalError):
with self.assertRaises(DatabaseError):
raises(max_retries=5)
self.assertEqual(calls[0], 5)
View
@@ -98,7 +98,7 @@ def test_get_connection_no_connection_host(self):
connection = self.backend._get_connection()
mock_Connection.assert_called_once_with(
host='mongodb://localhost:27017', ssl=False, max_pool_size=10,
host='mongodb://localhost:27017', max_pool_size=10,
auto_start_request=False)
self.assertEqual(sentinel.connection, connection)
@@ -113,7 +113,7 @@ def test_get_connection_no_connection_mongodb_uri(self):
connection = self.backend._get_connection()
mock_Connection.assert_called_once_with(
host=mongodb_uri, ssl=False, max_pool_size=10,
host=mongodb_uri, max_pool_size=10,
auto_start_request=False)
self.assertEqual(sentinel.connection, connection)
@@ -196,9 +196,10 @@ def test_get_task_meta_for(self, mock_get_database):
mock_get_database.assert_called_once_with()
mock_database.__getitem__.assert_called_once_with(MONGODB_COLLECTION)
self.assertEqual(
['status', 'task_id', 'date_done', 'traceback', 'result',
'children'],
list(ret_val.keys()))
list(sorted(['status', 'task_id', 'date_done', 'traceback',
'result', 'children'])),
list(sorted(ret_val.keys())),
)
@patch('celery.backends.mongodb.MongoBackend._get_database')
def test_get_task_meta_for_no_result(self, mock_get_database):
@@ -298,7 +299,7 @@ def test_cleanup(self, mock_get_database):
self.backend.taskmeta_collection = MONGODB_COLLECTION
mock_database = MagicMock(spec=['__getitem__', '__setitem__'])
mock_collection = Mock()
self.backend.collections = mock_collection = Mock()
mock_get_database.return_value = mock_database
mock_database.__getitem__.return_value = mock_collection
@@ -309,7 +310,7 @@ def test_cleanup(self, mock_get_database):
mock_get_database.assert_called_once_with()
mock_database.__getitem__.assert_called_once_with(
MONGODB_COLLECTION)
mock_collection.assert_called_once_with()
self.assertTrue(mock_collection.remove.called)
def test_get_database_authfailure(self):
x = MongoBackend(app=self.app)
View
@@ -314,3 +314,19 @@ def test_parse_preload_options_shortopt(self):
cmd.preload_options = (Option('-s', action='store', dest='silent'), )
acc = cmd.parse_preload_options(['-s', 'yes'])
self.assertEqual(acc.get('silent'), 'yes')
def test_parse_preload_options_with_equals_and_append(self):
cmd = Command()
opt = Option('--zoom', action='append', default=[])
cmd.preload_options = (opt,)
acc = cmd.parse_preload_options(['--zoom=1', '--zoom=2'])
self.assertEqual(acc, {'zoom': ['1', '2']})
def test_parse_preload_options_without_equals_and_append(self):
cmd = Command()
opt = Option('--zoom', action='append', default=[])
cmd.preload_options = (opt,)
acc = cmd.parse_preload_options(['--zoom', '1', '--zoom', '2'])
self.assertEqual(acc, {'zoom': ['1', '2']})
View
@@ -24,8 +24,8 @@ def test_execs(self, setup_logs, logger, execv, detached):
detach('/bin/boo', ['a', 'b', 'c'], logfile='/var/log',
pidfile='/var/pid')
detached.assert_called_with('/var/log', '/var/pid', None, None, 0,
None, False)
detached.assert_called_with('/var/log', '/var/pid', None, None,
None, None, False)
execv.assert_called_with('/bin/boo', ['/bin/boo', 'a', 'b', 'c'])
execv.side_effect = Exception('foo')
@@ -84,7 +84,8 @@ def test_execute_from_commandline(self, detach, exit):
self.assertTrue(exit.called)
detach.assert_called_with(
path=x.execv_path, uid=None, gid=None,
umask=0, fake=False, logfile='/var/log', pidfile='celeryd.pid',
umask=None, fake=False, logfile='/var/log', pidfile='celeryd.pid',
working_directory=None, executable=None,
argv=x.execv_argv + [
'-c', '1', '-lDEBUG',
'--logfile=/var/log', '--pidfile=celeryd.pid',
View
@@ -185,15 +185,15 @@ def test_info_not_verbose(self):
self.assertFalse(self.fh.getvalue())
def test_error(self):
self.t.say = Mock()
self.t.carp = Mock()
self.t.usage = Mock()
self.assertEqual(self.t.error('foo'), 1)
self.t.say.assert_called_with('foo')
self.t.carp.assert_called_with('foo')
self.t.usage.assert_called_with()
self.t.say = Mock()
self.t.carp = Mock()
self.assertEqual(self.t.error(), 1)
self.assertFalse(self.t.say.called)
self.assertFalse(self.t.carp.called)
self.assertEqual(self.t.retcode, 1)
@@ -249,7 +249,7 @@ def test_restart(self):
waitexec.return_value = 0
callback('jerry', ['arg'], 13)
waitexec.assert_called_with(['arg'])
waitexec.assert_called_with(['arg'], path=sys.executable)
self.assertIn('OK', self.fh.getvalue())
self.fh.seek(0)
self.fh.truncate()
View
@@ -206,7 +206,10 @@ def test_startup_info(self):
# test when there are too few output lines
# to draft the ascii art onto
prev, cd.ARTLINES = cd.ARTLINES, ['the quick brown fox']
self.assertTrue(worker.startup_info())
try:
self.assertTrue(worker.startup_info())
finally:
cd.ARTLINES = prev
@disable_stdouts
def test_run(self):
@@ -322,8 +325,11 @@ def test_setup_logging_no_color(self):
app=self.app, redirect_stdouts=False, no_color=True,
)
prev, self.app.log.setup = self.app.log.setup, Mock()
worker.setup_logging()
self.assertFalse(self.app.log.setup.call_args[1]['colorize'])
try:
worker.setup_logging()
self.assertFalse(self.app.log.setup.call_args[1]['colorize'])
finally:
self.app.log.setup = prev
@disable_stdouts
def test_startup_info_pool_is_str(self):
@@ -443,8 +449,10 @@ def test_set_process_status(self):
def test_parse_options(self):
cmd = worker()
cmd.app = self.app
opts, args = cmd.parse_options('worker', ['--concurrency=512'])
opts, args = cmd.parse_options('worker', ['--concurrency=512',
'--heartbeat-interval=10'])
self.assertEqual(opts.concurrency, 512)
self.assertEqual(opts.heartbeat_interval, 10)
@disable_stdouts
def test_main(self):
View
@@ -235,7 +235,7 @@ def _is_magic_module(m):
# pyflakes refuses to accept 'noqa' for this isinstance.
cls, modtype = m.__class__, types.ModuleType
return (not cls is modtype and (
return (cls is not modtype and (
'__getattr__' in vars(m.__class__) or
'__getattribute__' in vars(m.__class__)))
@@ -412,8 +412,12 @@ def setUp(self):
self._current_app = current_app()
self._default_app = _state.default_app
trap = Trap()
self._prev_tls = _state._tls
_state.set_default_app(trap)
_state._tls.current_app = trap
class NonTLS(object):
current_app = trap
_state._tls = NonTLS()
self.app = self.Celery(set_as_current=False)
if not self.contained:
@@ -447,13 +451,12 @@ def _teardown_app(self):
if isinstance(backend.client, DummyClient):
backend.client.cache.clear()
backend._cache.clear()
from celery._state import (
_tls, set_default_app, _set_task_join_will_block,
)
_set_task_join_will_block(False)
from celery import _state
_state._set_task_join_will_block(False)
set_default_app(self._default_app)
_tls.current_app = self._current_app
_state.set_default_app(self._default_app)
_state._tls = self._prev_tls
_state._tls.current_app = self._current_app
if self.app is not self._current_app:
self.app.close()
self.app = None
View
@@ -134,8 +134,8 @@ def test_is_JSON_serializable(self):
s = self.MockTask.subtask(
(2, ), {'cache': True}, {'routing_key': 'CPU-bound'},
)
s.args = list(s.args) # tuples are not preserved
# but this doesn't matter.
# tuples are not preserved, but this doesn't matter.
s.args = list(s.args)
self.assertEqual(s, self.subtask(anyjson.loads(anyjson.dumps(s))))
def test_repr(self):
View
@@ -52,8 +52,6 @@ def test_rdb(self, get_avail_port):
rdb.set_trace()
rdb.set_trace(Mock())
pset.side_effect = SockErr
pset.side_effect.errno = errno.ECONNRESET
rdb.set_trace()
pset.side_effect.errno = errno.ENOENT
with self.assertRaises(SockErr):
rdb.set_trace()
View
@@ -93,9 +93,7 @@ def test_on_worker_init(self):
f.on_worker_init()
DWF.assert_called_with(f.app)
DWF.return_value.install.assert_called_with()
self.assertIs(
f._worker_fixup, DWF.return_value.install.return_value,
)
self.assertIs(f._worker_fixup, DWF.return_value)
class test_DjangoWorkerFixup(FixupCase):
@@ -207,10 +205,13 @@ def test_close_database(self):
def test__close_database(self):
with self.fixup_context(self.app) as (f, _, _):
conns = f._db.connections = [Mock(), Mock(), Mock()]
conns = [Mock(), Mock(), Mock()]
conns[1].close.side_effect = KeyError('already closed')
f.database_errors = (KeyError, )
f._db.connections = Mock() # ConnectionHandler
f._db.connections.all.side_effect = lambda: conns
f._close_database()
conns[0].close.assert_called_with()
conns[1].close.assert_called_with()
View
@@ -6,7 +6,7 @@
from . import CERT1, CERT2, KEY1
from .case import SecurityCase
from celery.tests.case import Mock, mock_open, patch
from celery.tests.case import Mock, SkipTest, mock_open, patch
class test_Certificate(SecurityCase):
@@ -23,6 +23,7 @@ def test_invalid_certificate(self):
self.assertRaises(SecurityError, Certificate, KEY1)
def test_has_expired(self):
raise SkipTest('cert actually expired')
self.assertFalse(Certificate(CERT1).has_expired())
View
@@ -59,6 +59,14 @@ def test_getitem_property(self):
self.assertEqual(SIG.options, {'task_id': 'TASK_ID'})
self.assertEqual(SIG.subtask_type, '')
def test_link_on_scalar(self):
x = Signature('TASK', link=Signature('B'))
self.assertTrue(x.options['link'])
x.link(Signature('C'))
self.assertIsInstance(x.options['link'], list)
self.assertIn(Signature('B'), x.options['link'])
self.assertIn(Signature('C'), x.options['link'])
def test_replace(self):
x = Signature('TASK', ('A'), {})
self.assertTupleEqual(x.replace(args=('B', )).args, ('B', ))
@@ -273,6 +281,9 @@ def test_from_dict(self):
def test_call_empty_group(self):
x = group(app=self.app)
self.assertFalse(len(x()))
x.delay()
x.apply_async()
x()
def test_skew(self):
g = group([self.add.s(i, i) for i in range(10)])
View
@@ -73,15 +73,18 @@ def test_children(self):
def test_propagates_for_parent(self):
x = self.app.AsyncResult(uuid())
x.backend = Mock()
x.backend = Mock(name='backend')
x.backend.get_task_meta.return_value = {}
x.backend.wait_for.return_value = {
'status': states.SUCCESS, 'result': 84,
}
x.parent = EagerResult(uuid(), KeyError('foo'), states.FAILURE)
with self.assertRaises(KeyError):
x.get(propagate=True)
self.assertFalse(x.backend.wait_for.called)
x.parent = EagerResult(uuid(), 42, states.SUCCESS)
x.get(propagate=True)
self.assertEqual(x.get(propagate=True), 84)
self.assertTrue(x.backend.wait_for.called)
def test_get_children(self):
@@ -276,6 +279,13 @@ def test_get(self):
x.get()
self.assertTrue(x.join_native.called)
def test_get_empty(self):
x = self.app.ResultSet([])
self.assertIsNone(x.supports_native_join)
x.join = Mock(name='join')
x.get()
self.assertTrue(x.join.called)
def test_add(self):
x = self.app.ResultSet([1])
x.add(2)
@@ -489,6 +499,7 @@ def test_join_native(self):
subtasks = [self.app.AsyncResult(uuid(), backend=backend)
for i in range(10)]
ts = self.app.GroupResult(uuid(), subtasks)
ts.app.backend = backend
backend.ids = [subtask.id for subtask in subtasks]
res = ts.join_native()
self.assertEqual(res, list(range(10)))
@@ -526,6 +537,7 @@ def test_iter_native(self):
subtasks = [self.app.AsyncResult(uuid(), backend=backend)
for i in range(10)]
ts = self.app.GroupResult(uuid(), subtasks)
ts.app.backend = backend
backend.ids = [subtask.id for subtask in subtasks]
self.assertEqual(len(list(ts.iter_native())), 10)
View
@@ -128,6 +128,11 @@ def test_retry_kwargs_can_be_empty(self):
self.retry_task_mockapply.push_request()
try:
with self.assertRaises(Retry):
import sys
try:
sys.exc_clear()
except AttributeError:
pass
self.retry_task_mockapply.retry(args=[4, 4], kwargs=None)
finally:
self.retry_task_mockapply.pop_request()
View
@@ -93,6 +93,36 @@ def test_with_success_receivers(self):
finally:
signals.task_success.receivers[:] = []
def test_multiple_callbacks(self):
"""
Regression test on trace with multiple callbacks
Uses the signature of the following canvas:
chain(
empty.subtask(link=empty.subtask()),
group(empty.subtask(), empty.subtask())
)
"""
@self.app.task(shared=False)
def empty(*args, **kwargs):
pass
empty.backend = Mock()
sig = {
'chord_size': None, 'task': 'empty', 'args': (), 'options': {},
'subtask_type': None, 'kwargs': {}, 'immutable': False
}
group_sig = {
'chord_size': None, 'task': 'celery.group', 'args': (),
'options': {}, 'subtask_type': 'group',
'kwargs': {'tasks': (empty(), empty())}, 'immutable': False
}
callbacks = [sig, group_sig]
# should not raise an exception
self.trace(empty, [], {}, request={'callbacks': callbacks})
def test_when_chord_part(self):
@self.app.task(shared=False)
View
@@ -79,7 +79,7 @@ def __init__(self, cache):
def run(self):
while not self.__is_shutdown.isSet():
try:
self.cache.data.popitem(last=False)
self.cache.popitem(last=False)
except KeyError:
break
self.__is_stopped.set()
View
@@ -341,6 +341,7 @@ def test_callbacks(self):
self.assertTrue(object.__getattribute__(p, '__pending__'))
self.assertTrue(repr(p))
self.assertTrue(p.__evaluated__())
with self.assertRaises(AttributeError):
object.__getattribute__(p, '__pending__')
cbA.assert_called_with(p)
View
@@ -372,7 +372,7 @@ class test_DaemonContext(Case):
@patch('os.dup2')
def test_open(self, dup2, open, close, closer, umask, chdir,
_exit, setsid, fork):
x = DaemonContext(workdir='/opt/workdir')
x = DaemonContext(workdir='/opt/workdir', umask=0o22)
x.stdfds = [0, 1, 2]
fork.return_value = 0
@@ -385,7 +385,7 @@ def test_open(self, dup2, open, close, closer, umask, chdir,
self.assertFalse(_exit.called)
chdir.assert_called_with(x.workdir)
umask.assert_called_with(x.umask)
umask.assert_called_with(0o22)
self.assertTrue(dup2.called)
fork.reset_mock()
View
@@ -164,11 +164,30 @@ def test_start(self):
with patch('celery.worker.heartbeat.Heart') as hcls:
h = Heart(c)
self.assertTrue(h.enabled)
self.assertEqual(h.heartbeat_interval, None)
self.assertIsNone(c.heart)
h.start(c)
self.assertTrue(c.heart)
hcls.assert_called_with(c.timer, c.event_dispatcher)
hcls.assert_called_with(c.timer, c.event_dispatcher,
h.heartbeat_interval)
c.heart.start.assert_called_with()
def test_start_heartbeat_interval(self):
c = Mock()
c.timer = Mock()
c.event_dispatcher = Mock()
with patch('celery.worker.heartbeat.Heart') as hcls:
h = Heart(c, False, 20)
self.assertTrue(h.enabled)
self.assertEqual(h.heartbeat_interval, 20)
self.assertIsNone(c.heart)
h.start(c)
self.assertTrue(c.heart)
hcls.assert_called_with(c.timer, c.event_dispatcher,
h.heartbeat_interval)
c.heart.start.assert_called_with()
View
@@ -119,7 +119,7 @@ class test_ControlPanel(AppCase):
def setup(self):
self.panel = self.create_panel(consumer=Consumer(self.app))
@self.app.task(rate_limit=200, shared=False)
@self.app.task(name='c.unittest.mytask', rate_limit=200, shared=False)
def mytask():
pass
self.mytask = mytask
@@ -140,6 +140,9 @@ def test_enable_events(self):
evd = consumer.event_dispatcher
evd.groups = set()
panel.handle('enable_events')
self.assertFalse(evd.groups)
evd.groups = set(['worker'])
panel.handle('enable_events')
self.assertIn('task', evd.groups)
evd.groups = set(['task'])
self.assertIn('already enabled', panel.handle('enable_events')['ok'])
View
@@ -24,9 +24,20 @@
'first', 'firstmethod', 'chunks', 'padlist', 'mattrgetter', 'uniq',
'regen', 'dictfilter', 'lazy', 'maybe_evaluate']
IS_PYPY = hasattr(sys, 'pypy_version_info')
KEYWORD_MARK = object()
class DummyContext(object):
def __enter__(self):
return self
def __exit__(self, *exc_info):
pass
class LRUCache(UserDict):
"""LRU Cache implementation using a doubly linked list to track access.
@@ -57,6 +68,12 @@ def update(self, *args, **kwargs):
for item in islice(iter(data), len(data) - limit):
data.pop(item)
def popitem(self, last=True, _needs_lock=IS_PYPY):
if not _needs_lock:
return self.data.popitem(last)
with self.mutex:
return self.data.popitem(last)
def __setitem__(self, key, value):
# remove least recently used key.
with self.mutex:
@@ -67,20 +84,23 @@ def __setitem__(self, key, value):
def __iter__(self):
return iter(self.data)
def _iterate_items(self):
for k in self:
try:
yield (k, self.data[k])
except KeyError: # pragma: no cover
pass
def _iterate_items(self, _need_lock=IS_PYPY):
with self.mutex if _need_lock else DummyContext():
for k in self:
try:
yield (k, self.data[k])
except KeyError: # pragma: no cover
pass
iteritems = _iterate_items
def _iterate_values(self):
for k in self:
try:
yield self.data[k]
except KeyError: # pragma: no cover
pass
def _iterate_values(self, _need_lock=IS_PYPY):
with self.mutex if _need_lock else DummyContext():
for k in self:
try:
yield self.data[k]
except KeyError: # pragma: no cover
pass
itervalues = _iterate_values
def _iterate_keys(self):
View
@@ -78,9 +78,9 @@ def in_sighandler():
set_in_sighandler(False)
def logger_isa(l, p):
def logger_isa(l, p, max=1000):
this, seen = l, set()
while this:
for _ in range(max):
if this == p:
return True
else:
@@ -90,6 +90,10 @@ def logger_isa(l, p):
)
seen.add(this)
this = this.parent
if not this:
break
else:
raise RuntimeError('Logger hierarchy exceeds {0}'.format(max))
return False
@@ -135,30 +139,36 @@ def formatException(self, ei):
return r
def format(self, record):
sformat = logging.Formatter.format
msg = logging.Formatter.format(self, record)
color = self.colors.get(record.levelname)
# reset exception info later for other handlers...
einfo = sys.exc_info() if record.exc_info == 1 else record.exc_info
if color and self.use_color:
msg = record.msg
try:
# safe_str will repr the color object
# and color will break on non-string objects
# so need to reorder calls based on type.
# Issue #427
try:
if isinstance(msg, string_t):
record.msg = text_t(color(safe_str(msg)))
else:
record.msg = safe_str(color(msg))
return text_t(color(safe_str(msg)))
return safe_str(color(msg))
except UnicodeDecodeError:
record.msg = safe_str(msg) # skip colors
return safe_str(msg) # skip colors
except Exception as exc:
record.msg = '<Unrepresentable {0!r}: {1!r}>'.format(
type(msg), exc)
record.exc_info = True
return sformat(self, record)
prev_msg, record.exc_info, record.msg = (
record.msg, 1, '<Unrepresentable {0!r}: {1!r}>'.format(
type(msg), exc
),
)
try:
return logging.Formatter.format(self, record)
finally:
record.msg, record.exc_info = prev_msg, einfo
else:
return safe_str(sformat(self, record))
return safe_str(msg)
class LoggingProxy(object):
View
@@ -35,3 +35,50 @@ def mro_lookup(cls, attr, stop=(), monkey_patched=[]):
return
if attr in node.__dict__:
return node
class FallbackContext(object):
"""The built-in ``@contextmanager`` utility does not work well
when wrapping other contexts, as the traceback is wrong when
the wrapped context raises.
This solves this problem and can be used instead of ``@contextmanager``
in this example::
@contextmanager
def connection_or_default_connection(connection=None):
if connection:
# user already has a connection, should not close
# after use
yield connection
else:
# must have new connection, and also close the connection
# after the block returns
with create_new_connection() as connection:
yield connection
This wrapper can be used instead for the above like this::
def connection_or_default_connection(connection=None):
return FallbackContext(connection, create_new_connection)
"""
def __init__(self, provided, fallback, *fb_args, **fb_kwargs):
self.provided = provided
self.fallback = fallback
self.fb_args = fb_args
self.fb_kwargs = fb_kwargs
self._context = None
def __enter__(self):
if self.provided is not None:
return self.provided
context = self._context = self.fallback(
*self.fb_args, **self.fb_kwargs
).__enter__()
return context
def __exit__(self, *exc_info):
if self._context is not None:
return self._context.__exit__(*exc_info)
View
@@ -86,8 +86,8 @@ def run(self):
os._exit(1)
def stop(self):
self._is_shutdown.set()
if self.running:
self._is_shutdown.set()
self._is_stopped.wait()
self.join(THREAD_TIMEOUT_MAX)
self.running = False
View
@@ -10,6 +10,7 @@
import numbers
import os
import sys
import time as _time
from calendar import monthrange
@@ -18,7 +19,7 @@
from kombu.utils import cached_property, reprcall
from kombu.utils.compat import timedelta_seconds
from pytz import timezone as _timezone, AmbiguousTimeError
from pytz import timezone as _timezone, AmbiguousTimeError, FixedOffset
from celery.five import string_t
@@ -32,6 +33,9 @@
'localize', 'to_utc', 'maybe_make_aware', 'ffwd', 'utcoffset',
'adjust_timestamp', 'maybe_s_to_ms']
PY3 = sys.version_info[0] == 3
PY33 = sys.version_info >= (3, 3)
C_REMDEBUG = os.environ.get('C_REMDEBUG', False)
DAYNAMES = 'sun', 'mon', 'tue', 'wed', 'thu', 'fri', 'sat'
@@ -50,15 +54,13 @@
_local_timezone = None
__timezone__ = -_time.timezone
__altzone__ = -_time.altzone
class LocalTimezone(tzinfo):
"""Local time implementation taken from Python's docs.
Used only when UTC is not enabled.
"""
_offset_cache = {}
def __init__(self):
# This code is moved in __init__ to execute it as late as possible
@@ -72,23 +74,34 @@ def __init__(self):
tzinfo.__init__(self)
def __repr__(self):
return '<LocalTimezone>'
return '<LocalTimezone: UTC{0:+03d}>'.format(
int(timedelta_seconds(self.DSTOFFSET) / 3600),
)
def utcoffset(self, dt):
if self._isdst(dt):
return self.DSTOFFSET
else:
return self.STDOFFSET
return self.DSTOFFSET if self._isdst(dt) else self.STDOFFSET
def dst(self, dt):
if self._isdst(dt):
return self.DSTDIFF
else:
return ZERO
return self.DSTDIFF if self._isdst(dt) else ZERO
def tzname(self, dt):
return _time.tzname[self._isdst(dt)]
if PY3:
def fromutc(self, dt):
# The base tzinfo class no longer implements a DST
# offset aware .fromutc() in Python3 (Issue #2306).
# I'd rather rely on pytz to do this, than port
# the C code from cpython's fromutc [asksol]
offset = int(self.utcoffset(dt).seconds / 60.0)
try:
tz = self._offset_cache[offset]
except KeyError:
tz = self._offset_cache[offset] = FixedOffset(offset)
return tz.fromutc(dt.replace(tzinfo=tz))
def _isdst(self, dt):
tt = (dt.year, dt.month, dt.day,
dt.hour, dt.minute, dt.second,
@@ -110,8 +123,17 @@ def to_local(self, dt, local=None, orig=None):
dt = make_aware(dt, orig or self.utc)
return localize(dt, self.tz_or_local(local))
def to_system(self, dt):
return localize(dt, self.local)
if PY33:
def to_system(self, dt):
# tz=None is a special case since Python 3.3, and will
# convert to the current local timezone (Issue #2306).
return dt.astimezone(tz=None)
else:
def to_system(self, dt): # noqa
return localize(dt, self.local)
def to_local_fallback(self, dt):
if is_naive(dt):
@@ -334,10 +356,10 @@ def _fields(self, **extra):
}, **extra)
def utcoffset():
if _time.daylight:
return __altzone__ // 3600
return __timezone__ // 3600
def utcoffset(time=_time):
if time.daylight:
return time.altzone // 3600
return time.timezone // 3600
def adjust_timestamp(ts, offset, here=utcoffset):
View
@@ -281,7 +281,10 @@ def reload(self, modules=None, reload=False, reloader=None):
if self.consumer:
self.consumer.update_strategies()
self.consumer.reset_rate_limits()
self.pool.restart()
try:
self.pool.restart()
except NotImplementedError:
pass
def info(self):
return {'total': self.state.total_count,
View
@@ -81,7 +81,7 @@ def body(self):
self.maybe_scale()
sleep(1.0)
def _maybe_scale(self):
def _maybe_scale(self, req=None):
procs = self.processes
cur = min(self.qty, self.max_concurrency)
if cur > procs:
@@ -91,8 +91,8 @@ def _maybe_scale(self):
self.scale_down((procs - cur) - self.min_concurrency)
return True
def maybe_scale(self):
if self._maybe_scale():
def maybe_scale(self, req=None):
if self._maybe_scale(req):
self.pool.maintain_pool()
def update(self, max=None, min=None):
View
@@ -270,7 +270,7 @@ def _limit_task(self, request, bucket, tokens):
self.on_task_request(request)
def start(self):
blueprint, loop = self.blueprint, self.loop
blueprint = self.blueprint
while blueprint.state != CLOSE:
self.restart_count += 1
maybe_shutdown()
@@ -534,12 +534,16 @@ def shutdown(self, c):
class Heart(bootsteps.StartStopStep):
requires = (Events, )
def __init__(self, c, without_heartbeat=False, **kwargs):
def __init__(self, c, without_heartbeat=False, heartbeat_interval=None,
**kwargs):
self.enabled = not without_heartbeat
self.heartbeat_interval = heartbeat_interval
c.heart = None
def start(self, c):
c.heart = heartbeat.Heart(c.timer, c.event_dispatcher)
c.heart = heartbeat.Heart(
c.timer, c.event_dispatcher, self.heartbeat_interval,
)
c.heart.start()
def stop(self, c):
@@ -589,11 +593,27 @@ def __init__(self, c, **kwargs):
def start(self, c):
c.update_strategies()
# - RabbitMQ 3.3 completely redefines how basic_qos works..
# This will detect if the new qos smenatics is in effect,
# and if so make sure the 'apply_global' flag is set on qos updates.
qos_global = not c.connection.qos_semantics_matches_spec
# set initial prefetch count
c.connection.default_channel.basic_qos(
0, c.initial_prefetch_count, qos_global,
)
c.task_consumer = c.app.amqp.TaskConsumer(
c.connection, on_decode_error=c.on_decode_error,
)
c.qos = QoS(c.task_consumer.qos, c.initial_prefetch_count)
c.qos.update() # set initial prefetch count
def set_prefetch_count(prefetch_count):
return c.task_consumer.qos(
prefetch_count=prefetch_count,
apply_global=qos_global,
)
c.qos = QoS(set_prefetch_count, c.initial_prefetch_count)
def stop(self, c):
if c.task_consumer:
@@ -787,7 +807,7 @@ def on_message(self, prepare, message):
message.payload['hostname'])
if hostname != self.hostname:
type, event = prepare(message.payload)
obj, subject = self.update_state(event)
self.update_state(event)
else:
self.clock.forward()
View
@@ -14,7 +14,7 @@
from kombu.utils.encoding import safe_repr
from celery.exceptions import WorkerShutdown
from celery.five import UserDict, items
from celery.five import UserDict, items, string_t
from celery.platforms import signals as _signals
from celery.utils import timeutils
from celery.utils.functional import maybe_list
@@ -111,7 +111,7 @@ def report(state):
@Panel.register
def enable_events(state):
dispatcher = state.consumer.event_dispatcher
if 'task' not in dispatcher.groups:
if dispatcher.groups and 'task' not in dispatcher.groups:
dispatcher.groups.add('task')
logger.info('Events of group {task} enabled by remote.')
return {'ok': 'task events enabled'}
@@ -275,10 +275,13 @@ def hello(state, from_node, revoked=None, **kwargs):
@Panel.register
def dump_tasks(state, taskinfoitems=None, **kwargs):
tasks = state.app.tasks
def dump_tasks(state, taskinfoitems=None, builtins=False, **kwargs):
reg = state.app.tasks
taskinfoitems = taskinfoitems or DEFAULT_TASK_INFO_ITEMS
tasks = reg if builtins else (
task for task in reg if not task.startswith('celery.'))
def _extract_info(task):
fields = dict((field, str(getattr(task, field, None)))
for field in taskinfoitems
@@ -288,7 +291,7 @@ def _extract_info(task):
return '{0} [{1}]'.format(task.name, ' '.join(info))
return task.name
return [_extract_info(tasks[task]) for task in sorted(tasks)]
return [_extract_info(reg[task]) for task in sorted(tasks)]
@Panel.register
@@ -364,7 +367,9 @@ def active_queues(state):
def _wanted_config_key(key):
return key.isupper() and not key.startswith('__')
return (isinstance(key, string_t) and
key.isupper() and
not key.startswith('__'))
@Panel.register
View
@@ -296,7 +296,7 @@ def execute(self, loglevel=None, logfile=None):
def maybe_expire(self):
"""If expired, mark the task as revoked."""
if self.expires:
now = datetime.now(tz_or_local(self.tzlocal) if self.utc else None)
now = datetime.now(self.expires.tzinfo)
if now > self.expires:
revoked_tasks.add(self.id)
return True
@@ -454,37 +454,41 @@ def _log_error(self, einfo, send_failed_event=True):
)
task = self.task
if task.throws and isinstance(eobj, task.throws):
severity, exc_info = logging.INFO, None
description = 'raised expected'
do_send_mail, severity, exc_info, description = (
False, logging.INFO, None, 'raised expected',
)
else:
severity = logging.ERROR
description = 'raised unexpected'
format = self.error_msg
if send_failed_event:
self.send_event(
'task-failed', exception=exception, traceback=traceback,
do_send_mail, severity, description = (
True, logging.ERROR, 'raised unexpected',
)
format = self.error_msg
if internal:
if isinstance(einfo.exception, MemoryError):
raise MemoryError('Process got: %s' % (einfo.exception, ))
elif isinstance(einfo.exception, Reject):
format = self.rejected_msg
description = 'rejected'
severity = logging.WARN
exc_info = einfo
send_failed_event = False
self.reject(requeue=einfo.exception.requeue)
elif isinstance(einfo.exception, Ignore):
format = self.ignored_msg
description = 'ignored'
severity = logging.INFO
exc_info = None
send_failed_event = False
self.acknowledge()
else:
format = self.internal_error_msg
description = 'INTERNAL ERROR'
severity = logging.CRITICAL
if send_failed_event:
self.send_event(
'task-failed', exception=exception, traceback=traceback,
)
context = {
'hostname': self.hostname,
'id': self.id,
@@ -505,7 +509,8 @@ def _log_error(self, einfo, send_failed_event=True):
'hostname': self.hostname,
'internal': internal}})
task.send_error_email(context, einfo.exception)
if do_send_mail:
task.send_error_email(context, einfo.exception)
def acknowledge(self):
"""Acknowledge task."""
@@ -537,9 +542,11 @@ def info(self, safe=False):
'worker_pid': self.worker_pid}
def __str__(self):
return '{0.name}[{0.id}]{1}{2}'.format(self,
' eta:[{0}]'.format(self.eta) if self.eta else '',
' expires:[{0}]'.format(self.expires) if self.expires else '')
return '{0.name}[{0.id}]{1}{2}'.format(
self,
' eta:[{0}]'.format(self.eta) if self.eta else '',
' expires:[{0}]'.format(self.expires) if self.expires else '',
)
shortinfo = __str__
def __repr__(self):
View
@@ -26,11 +26,9 @@ def asynloop(obj, connection, consumer, blueprint, hub, qos,
"""Non-blocking event loop consuming messages until connection is lost,
or shutdown is requested."""
update_qos = qos.update
readers, writers = hub.readers, hub.writers
hbtick = connection.heartbeat_check
errors = connection.connection_errors
heartbeat = connection.get_heartbeat_interval() # negotiated
hub_add, hub_remove = hub.add, hub.remove
on_task_received = obj.create_task_handler()
View
@@ -31,8 +31,9 @@ def __init__(self, c):
self._forward_clock = self.c.app.clock.forward
def on_message(self, body, message):
self._forward_clock() # just increase clock as clients usually don't
# have a valid clock to adjust with.
# just increase clock as clients usually don't
# have a valid clock to adjust with.
self._forward_clock()
try:
self.node.handle_message(body, message)
except KeyError as exc:
View
@@ -37,7 +37,7 @@ def default(task, app, consumer,
call_at = consumer.timer.call_at
apply_eta_task = consumer.apply_eta_task
rate_limits_enabled = not consumer.disable_rate_limits
bucket = consumer.task_buckets[task.name]
get_bucket = consumer.task_buckets.__getitem__
handle = consumer.on_task_request
limit_task = consumer._limit_task
@@ -79,6 +79,7 @@ def task_message_handler(message, body, ack, reject, callbacks,
call_at(eta, apply_eta_task, (req, ), priority=6)
else:
if rate_limits_enabled:
bucket = get_bucket(task.name)
if bucket:
return limit_task(req, bucket, 1)
task_reserved(req)
View
@@ -21,6 +21,7 @@ Ben Firshman <ben@firshman.co.uk>
Brad Jasper <bjasper@gmail.com>
Branko ÄŒibej <brane@apache.org>
Brendon Crawford <brendon@aphexcreations.net>
Brian Bouterse <bmbouter@redhat.com>
Brian Rosner <brosner@gmail.com>
Bryan Berg <bryan@mixedmedialabs.com>
Chase Seibert <chase.seibert+github@gmail.com>
View
@@ -5,6 +5,7 @@
APPATTRS = {
'amqp': 'celery.app.amqp.AMQP',
'backend': 'celery.backends.base.BaseBackend',
'conf': 'celery.app.utils.Settings',
'control': 'celery.app.control.Control',
'events': 'celery.events.Events',
'loader': 'celery.app.loaders.base.BaseLoader',
@@ -13,15 +14,31 @@
'tasks': 'celery.app.registry.Registry',
'AsyncResult': 'celery.result.AsyncResult',
'ResultSet': 'celery.result.ResultSet',
'GroupResult': 'celery.result.GroupResult',
'Worker': 'celery.apps.worker.Worker',
'WorkController': 'celery.worker.WorkController',
'Beat': 'celery.apps.beat.Beat',
'Task': 'celery.app.task.Task',
'send_task': 'celery.Celery.send_task',
'connection': 'celery.Celery.connection',
'signature': 'celery.canvas.Signature',
}
APPDIRECT = set([
'on_configure', 'on_after_configure', 'on_after_finalize',
'set_current', 'set_default', 'close', 'on_init', 'start',
'worker_main', 'task', 'gen_task_name', 'finalize',
'add_defaults', 'config_from_object', 'config_from_envvar',
'config_from_cmdline', 'setup_security', 'autodiscover_tasks',
'send_task', 'connection', 'connection_or_acquire',
'producer_or_acquire', 'prepare_config', 'now', 'mail_admins',
'select_queues', 'either', 'bugreport', 'create_task_cls',
'subclass_with_self', 'annotations', 'current_task', 'oid',
'timezone', '__reduce_keys__', 'fixups', 'finalized', 'configured',
'autofinalize', 'steps', 'user_options', 'main', 'clock',
])
APPATTRS.update(dict((x, 'celery.Celery.{0}'.format(x)) for x in APPDIRECT))
ABBRS = {
'Celery': 'celery.Celery',
}
@@ -43,7 +60,7 @@ def shorten(S, newtarget, src_dict):
return S[2:]
elif S.startswith('@'):
if src_dict is APPATTRS:
return '.'.join([pkg_of(newtarget), S[1:]])
return '.'.join(['app', S[1:]])
return S[1:]
return S
View
@@ -54,7 +54,7 @@ def linkcode_resolve(domain, info):
# General information about the project.
project = 'Celery'
copyright = '2009-2013, Ask Solem & Contributors'
copyright = '2009-2014, Ask Solem & Contributors'
# The version info for the project you're documenting, acts as replacement for
# |version| and |release|, also used in various other places throughout the
@@ -125,7 +125,7 @@ def linkcode_resolve(domain, info):
epub_title = 'Celery Manual, Version {0}'.format(version)
epub_author = 'Ask Solem'
epub_publisher = 'Celery Project'
epub_copyright = '2009-2013'
epub_copyright = '2009-2014'
# The language of the text. It defaults to the language option
# or en if the language is not set.
View
@@ -299,8 +299,11 @@ the :setting:`CELERY_RESULT_ENGINE_OPTIONS` setting::
# echo enables verbose logging from SQLAlchemy.
CELERY_RESULT_ENGINE_OPTIONS = {'echo': True}
.. setting:: CELERY_RESULT_DB_SHORT_LIVED_SESSIONS
Short lived sessions
~~~~~~~~~~~~~~~~~~~~
CELERY_RESULT_DB_SHORT_LIVED_SESSIONS = True
Short lived sessions are disabled by default. If enabled they can drastically reduce
@@ -714,14 +717,21 @@ Message Routing
CELERY_QUEUES
~~~~~~~~~~~~~
The mapping of queues the worker consumes from. This is a dictionary
of queue name/options. See :ref:`guide-routing` for more information.
Most users will not want to specify this setting and should rather use
the :ref:`automatic routing facilities <routing-automatic>`.
If you really want to configure advanced routing, this setting should
be a list of :class:`kombu.Queue` objects the worker will consume from.
Note that workers can be overriden this setting via the `-Q` option,
or individual queues from this list (by name) can be excluded using
the `-X` option.
Also see :ref:`routing-basics` for more information.
The default is a queue/exchange/binding key of ``celery``, with
exchange type ``direct``.
You don't have to care about this unless you want custom routing facilities.
.. setting:: CELERY_ROUTES
CELERY_ROUTES
@@ -896,26 +906,6 @@ Example::
.. setting:: BROKER_TRANSPORT
BROKER_FAILOVER_STRATEGY
~~~~~~~~~~~~~~~~~~~~~~~~
Default failover strategy for the broker Connection object. If supplied,
may map to a key in 'kombu.connection.failover_strategies', or be a reference
to any method that yields a single item from a supplied list.
Example::
# Random failover strategy
def random_failover_strategy(servers):
it = list(it) # don't modify callers list
shuffle = random.shuffle
for _ in repeat(None):
shuffle(it)
yield it[0]
BROKER_FAILOVER_STRATEGY=random_failover_strategy
BROKER_TRANSPORT
~~~~~~~~~~~~~~~~
:Aliases: ``BROKER_BACKEND``
@@ -954,7 +944,7 @@ manner using TCP/IP alone, so AMQP defines something called heartbeats
that's is used both by the client and the broker to detect if
a connection was closed.
Hartbeats are disabled by default.
Heartbeats are disabled by default.
If the heartbeat value is 10 seconds, then
the heartbeat will be monitored at the interval specified
@@ -1273,24 +1263,6 @@ to have different import categories.
The modules in this setting are imported after the modules in
:setting:`CELERY_IMPORTS`.
.. setting:: CELERYD_FORCE_EXECV
CELERYD_FORCE_EXECV
~~~~~~~~~~~~~~~~~~~
On Unix the prefork pool will fork, so that child processes
start with the same memory as the parent process.
This can cause problems as there is a known deadlock condition
with pthread locking primitives when `fork()` is combined with threads.
You should enable this setting if you are experiencing hangs (deadlocks),
especially in combination with time limits or having a max tasks per child limit.
This option will be enabled by default in a later version.
This is not a problem on Windows, as it does not have `fork()`.
.. setting:: CELERYD_WORKER_LOST_WAIT
CELERYD_WORKER_LOST_WAIT
View
@@ -7,7 +7,7 @@ by Ask Solem
.. |copy| unicode:: U+000A9 .. COPYRIGHT SIGN
Copyright |copy| 2009-2013, Ask Solem.
Copyright |copy| 2009-2014, Ask Solem.
All rights reserved. This material may be copied or distributed only
subject to the terms and conditions set forth in the `Creative Commons
View
@@ -62,15 +62,15 @@ for the :program:`celery` command-line program:
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'proj.settings')
You don't need this line, but it saves you from always passing in the
settings module to the celery program. It must always come before
creating the app instances, which is what we do next:
Specifying the settings here means the ``celery`` command line program
will know where your Django project is. This statement must always appear before
the app instance is created, which is what we do next:
.. code-block:: python
app = Celery('proj')
This is our instance of the library, you can have many instances
This is your instance of the library, you can have many instances
but there's probably no reason for that when using Django.
We also add the Django settings module as a configuration source
@@ -136,10 +136,14 @@ concrete app instance:
Using the Django ORM/Cache as a result backend.
-----------------------------------------------
The ``django-celery`` library defines result backends that
uses the Django ORM and Django Cache frameworks.
If you want to store task results in the Django database then
you still need to install the ``django-celery`` library for that
(alternatively you can use the SQLAlchemy result backend).
To use this with your project you need to follow these four steps:
The ``django-celery`` library implements result backends using
the Django ORM and the Django Cache frameworks.
To use this extension in your project you need to follow these four steps:
1. Install the ``django-celery`` library:
View
@@ -99,7 +99,7 @@ many performance and stability improvements. It is an eventual goal
that these improvements will be merged back into Python one day.
It is also used for compatibility with older Python versions
that doesn't come with the multiprocessing module.
that don't come with the multiprocessing module.
.. _`billiard`: http://pypi.python.org/pypi/billiard
@@ -288,9 +288,16 @@ most systems), it usually contains a message describing the reason.
Does it work on FreeBSD?
------------------------
**Answer:** The prefork pool requires a working POSIX semaphore
implementation which isn't enabled in FreeBSD by default. You have to enable
POSIX semaphores in the kernel and manually recompile multiprocessing.
**Answer:** Depends
When using the RabbitMQ (AMQP) and Redis transports it should work
out of the box.
For other transports the compatibility prefork pool is
used which requires a working POSIX semaphore implementation,
this is enabled in FreeBSD by default since FreeBSD 8.x.
For older version of FreeBSD, you have to enable
POSIX semaphores in the kernel and manually recompile billiard.
Luckily, Viktor Petersson has written a tutorial to get you started with
Celery on FreeBSD here:
@@ -374,22 +381,22 @@ all configured task queues:
.. code-block:: bash
$ celery purge
$ celery -A proj purge
or programatically:
.. code-block:: python
>>> from celery import current_app as celery
>>> celery.control.purge()
>>> from proj.celery import app
>>> app.control.purge()
1753
If you only want to purge messages from a specific queue
you have to use the AMQP API or the :program:`celery amqp` utility:
.. code-block:: bash
$ celery amqp queue.purge <queue name>
$ celery -A proj amqp queue.purge <queue name>
The number 1753 is the number of messages deleted.
@@ -432,7 +439,7 @@ using the tasks current result backend.
If you need to specify a custom result backend, or you want to use
the current application's default backend you can use
:class:`@Celery.AsyncResult`:
:class:`@AsyncResult`:
>>> result = app.AsyncResult(task_id)
>>> result.get()
@@ -594,7 +601,7 @@ Why do workers delete tasks from the queue if they are unable to process them?
**Answer**:
The worker rejects unknown tasks, messages with encoding errors and messages
that doesn't contain the proper fields (as per the task message protocol).
that don't contain the proper fields (as per the task message protocol).
If it did not reject them they could be redelivered again and again,
causing a loop.
@@ -607,12 +614,11 @@ queue for exchange, so that rejected messages is moved there.
Can I call a task by name?
-----------------------------
**Answer**: Yes. Use :func:`celery.execute.send_task`.
**Answer**: Yes. Use :meth:`@send_task`.
You can also call a task by name from any language
that has an AMQP client.
>>> from celery.execute import send_task
>>> send_task("tasks.add", args=[2, 2], kwargs={})
>>> app.send_task('tasks.add', args=[2, 2], kwargs={})
<AsyncResult: 373550e8-b9a0-4666-bc61-ace01fa4f91d>
.. _faq-get-current-task-id:
@@ -693,8 +699,8 @@ Can I cancel the execution of a task?
or if you only have the task id::
>>> from celery import current_app as celery
>>> celery.control.revoke(task_id)
>>> from proj.celery import app
>>> app.control.revoke(task_id)
.. _faq-node-not-receiving-broadcast-commands:
@@ -711,8 +717,8 @@ using the :option:`-n` argument to :mod:`~celery.bin.worker`:
.. code-block:: bash
$ celery worker -n worker1@%h
$ celery worker -n worker2@%h
$ celery -A proj worker -n worker1@%h
$ celery -A proj worker -n worker2@%h
where ``%h`` is automatically expanded into the current hostname.
@@ -768,7 +774,7 @@ to use both.
`Task.retry` is used to retry tasks, notably for expected errors that
is catchable with the `try:` block. The AMQP transaction is not used
for these errors: **if the task raises an exception it is still acknowledged!**.
for these errors: **if the task raises an exception it is still acknowledged!**
The `acks_late` setting would be used when you need the task to be
executed again if the worker (for some reason) crashes mid-execution.
@@ -794,7 +800,7 @@ scenario of course, but you can probably imagine something far more
sinister. So for ease of programming we have less reliability;
It's a good default, users who require it and know what they
are doing can still enable acks_late (and in the future hopefully
use manual acknowledgement)
use manual acknowledgement).
In addition `Task.retry` has features not available in AMQP
transactions: delay between retries, max retries, etc.
@@ -835,9 +841,23 @@ executing jobs and shut down as soon as possible. No tasks should be lost.
You should never stop :mod:`~celery.bin.worker` with the :sig:`KILL` signal
(:option:`-9`), unless you've tried :sig:`TERM` a few times and waited a few
minutes to let it get a chance to shut down. As if you do tasks may be
terminated mid-execution, and they will not be re-run unless you have the
`acks_late` option set (`Task.acks_late` / :setting:`CELERY_ACKS_LATE`).
minutes to let it get a chance to shut down.
Also make sure you kill the main worker process, not its child processes.
You can direct a kill signal to a specific child process if you know the
process is currently executing a task the worker shutdown is depending on,
but this also means that a ``WorkerLostError`` state will be set for the
task so the task will not run again.
Identifying the type of process is easier if you have installed the
``setproctitle`` module:
.. code-block:: bash
pip install setproctitle
With this library installed you will be able to see the type of process in ps
listings, but the worker must be restarted for this to take effect.
.. seealso::
View
@@ -90,6 +90,12 @@ Finally, we can install rabbitmq using :program:`brew`:
.. _rabbitmq-osx-system-hostname:
After you have installed rabbitmq with brew you need to add the following to your path to be able to start and stop the broker. Add it to your .bash_profile or .profile
.. code-block:: bash
`PATH=$PATH:/usr/local/sbin`
Configuring the system host name
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
View
@@ -34,6 +34,10 @@ Where the URL is in the format of::
all fields after the scheme are optional, and will default to localhost on port 6379,
using database 0.
If a unix socket connection should be used, the URL needs to be in the format::
redis+socket:///path/to/redis.sock
.. _redis-visibility_timeout:
Visibility Timeout
View
@@ -32,7 +32,7 @@ an SQLAlchemy database URI.
Please see `SQLAlchemy: Supported Databases`_ for a table of supported databases.
Here's a list of examples using a selection of other `SQLAlchemy Connection String`_'s:
Here's a list of examples using a selection of other `SQLAlchemy Connection Strings`_:
.. code-block:: python
@@ -51,7 +51,7 @@ Here's a list of examples using a selection of other `SQLAlchemy Connection Stri
.. _`SQLAlchemy: Supported Databases`:
http://www.sqlalchemy.org/docs/core/engines.html#supported-databases
.. _`SQLAlchemy Connection String`:
.. _`SQLAlchemy Connection Strings`:
http://www.sqlalchemy.org/docs/core/engines.html#database-urls
.. _sqlalchemy-results-configuration:
View
@@ -16,7 +16,7 @@ In this tutorial you will learn the absolute basics of using Celery.
You will learn about;
- Choosing and installing a message transport (broker).
- Installing Celery and creating your first task
- Installing Celery and creating your first task.
- Starting the worker and calling tasks.
- Keeping track of tasks as they transition through different states,
and inspecting return values.
@@ -37,7 +37,7 @@ showcase Celery's capabilities.
Choosing a Broker
=================
Celery requires a solution to send and receive messages, usually this
Celery requires a solution to send and receive messages; usually this
comes in the form of a separate service called a *message broker*.
There are several choices available, including:
@@ -118,8 +118,8 @@ with standard Python tools like ``pip`` or ``easy_install``:
Application
===========
The first thing you need is a Celery instance, this is called the celery
application or just app in short. Since this instance is used as
The first thing you need is a Celery instance, which is called the celery
application or just "app" for short. Since this instance is used as
the entry-point for everything you want to do in Celery, like creating tasks and
managing workers, it must be possible for other modules to import it.
@@ -230,7 +230,7 @@ you choose to use a configuration module)::
app = Celery('tasks', backend='amqp', broker='amqp://')
or if you want to use Redis as the result backend, but still use RabbitMQ as
Or if you want to use Redis as the result backend, but still use RabbitMQ as
the message broker (a popular combination)::
app = Celery('tasks', backend='redis://localhost', broker='amqp://')
@@ -316,7 +316,7 @@ you can also imagine your SysAdmin making simple changes to the configuration
in the event of system trouble.
You can tell your Celery instance to use a configuration module,
by calling the :meth:`~@Celery.config_from_object` method:
by calling the :meth:`@config_from_object` method:
.. code-block:: python
@@ -350,7 +350,7 @@ contain any syntax errors, you can try to import it:
For a complete reference of configuration options, see :ref:`configuration`.
To demonstrate the power of configuration files, this how you would
To demonstrate the power of configuration files, this is how you would
route a misbehaving task to a dedicated queue:
:file:`celeryconfig.py`:
@@ -379,7 +379,7 @@ for the task at runtime:
.. code-block:: bash
$ celery control rate_limit tasks.add 10/m
$ celery -A tasks control rate_limit tasks.add 10/m
worker@example.com: OK
new rate limit set successfully
View
@@ -14,19 +14,19 @@ What is a Task Queue?
Task queues are used as a mechanism to distribute work across threads or
machines.
A task queue's input is a unit of work, called a task, dedicated worker
processes then constantly monitor the queue for new work to perform.
A task queue's input is a unit of work called a task. Dedicated worker
processes constantly monitor task queues for new work to perform.
Celery communicates via messages, usually using a broker
to mediate between clients and workers. To initiate a task a client puts a
message on the queue, the broker then delivers the message to a worker.
to mediate between clients and workers. To initiate a task, a client adds a
message to the queue, which the broker then delivers to a worker.
A Celery system can consist of multiple workers and brokers, giving way
to high availability and horizontal scaling.
Celery is written in Python, but the protocol can be implemented in any
language. So far there's RCelery_ for the Ruby programming language,
node-celery_ for Node.js and a `PHP client`_, but language interoperability can also be achieved
node-celery_ for Node.js and a `PHP client`_. Language interoperability can also be achieved
by :ref:`using webhooks <guide-webhooks>`.
.. _RCelery: http://leapfrogdevelopment.github.com/rcelery/
View
@@ -70,11 +70,11 @@ you simply import this instance.
Starting the worker
-------------------
The :program:`celery` program can be used to start the worker:
The :program:`celery` program can be used to start the worker (you need to run the worker in the directory above proj):
.. code-block:: bash
$ celery worker --app=proj -l info
$ celery -A proj worker -l info
When the worker starts you should see a banner and some messages::
@@ -160,7 +160,7 @@ You can restart it too:
.. code-block:: bash
$ celery multi restart w1 -A proj -l info
$ celery multi restart w1 -A proj -l info
celery multi v3.1.1 (Cipater)
> Stopping nodes...
> w1.halcyon.local: TERM -> 64024
@@ -201,7 +201,7 @@ you are encouraged to put these in a dedicated directory:
$ mkdir -p /var/run/celery
$ mkdir -p /var/log/celery
$ celery multi start w1 -A proj -l info --pidfile=/var/run/celery/%n.pid \
--logfile=/var/log/celery/%n.pid
--logfile=/var/log/celery/%n%I.log
With the multi command you can start multiple workers, and there is a powerful
command-line syntax to specify arguments for different workers too,
@@ -317,7 +317,7 @@ exception, in fact ``result.get()`` will propagate any errors by default::
File "/opt/devel/celery/celery/result.py", line 113, in get
interval=interval)
File "/opt/devel/celery/celery/backends/amqp.py", line 138, in wait_for
raise self.exception_to_python(meta['result'])
raise meta['result']
TypeError: add() takes exactly 2 arguments (1 given)
If you don't wish for the errors to propagate then you can disable that
View
@@ -371,7 +371,7 @@ Fixes
objects with a broken ``__repr__`` does not crash the worker, or otherwise
make errors hard to understand (Issue #298).
* Remote control command ``active_queues``: did not account for queues added
* Remote control command :control:`active_queues`: did not account for queues added
at runtime.
In addition the dictionary replied by this command now has a different
View
@@ -353,7 +353,7 @@ If you're looking for versions prior to 3.0.x you should go to :ref:`history`.
- The ``pool_restart`` remote control command now reports
an error if the :setting:`CELERYD_POOL_RESTARTS` setting is not set.
- ``celery.conf.add_defaults`` can now be used with non-dict objects.
- :meth:`@add_defaults`` can now be used with non-dict objects.
- Fixed compatibility problems in the Proxy class (Issue #1087).
@@ -1108,7 +1108,7 @@ If you're looking for versions prior to 3.0.x you should go to :ref:`history`.
- App instances now supports the with statement.
This calls the new :meth:`~celery.Celery.close` method at exit, which
This calls the new :meth:`@close` method at exit, which
cleans up after the app like closing pool connections.
Note that this is only necessary when dynamically creating apps,
@@ -1411,16 +1411,16 @@ If you're looking for versions prior to 3.0.x you should go to :ref:`history`.
}
}
- New :meth:`@Celery.add_defaults` method can add new default configuration
- New :meth:`@add_defaults` method can add new default configuration
dicts to the applications configuration.
For example::
config = {'FOO': 10}
celery.add_defaults(config)
app.add_defaults(config)
is the same as ``celery.conf.update(config)`` except that data will not be
is the same as ``app.conf.update(config)`` except that data will not be
copied, and that it will not be pickled when the worker spawns child
processes.
@@ -1429,16 +1429,16 @@ If you're looking for versions prior to 3.0.x you should go to :ref:`history`.
def initialize_config():
# insert heavy stuff that can't be done at import time here.
celery.add_defaults(initialize_config)
app.add_defaults(initialize_config)
which means the same as the above except that it will not happen
until the celery configuration is actually used.
As an example, Celery can lazily use the configuration of a Flask app::
flask_app = Flask()
celery = Celery()
celery.add_defaults(lambda: flask_app.config)
app = Celery()
app.add_defaults(lambda: flask_app.config)
- Revoked tasks were not marked as revoked in the result backend (Issue #871).
@@ -1455,8 +1455,8 @@ If you're looking for versions prior to 3.0.x you should go to :ref:`history`.
- New method names:
- ``Celery.default_connection()`` âž  :meth:`~@Celery.connection_or_acquire`.
- ``Celery.default_producer()`` âž  :meth:`~@Celery.producer_or_acquire`.
- ``Celery.default_connection()`` âž  :meth:`~@connection_or_acquire`.
- ``Celery.default_producer()`` âž  :meth:`~@producer_or_acquire`.
The old names still work for backward compatibility.
View
@@ -28,9 +28,9 @@ commas.
.. code-block:: bash
$ pip install celery[librabbitmq]
$ pip install "celery[librabbitmq]"
$ pip install celery[librabbitmq,redis,auth,msgpack]
$ pip install "celery[librabbitmq,redis,auth,msgpack]"
The following bundles are available:
View
@@ -1,4 +1,4 @@
:Version: 3.1.10 (Cipater)
:Version: 3.1.18 (Cipater)
:Web: http://celeryproject.org/
:Download: http://pypi.python.org/pypi/celery/
:Source: http://github.com/celery/celery/
@@ -177,7 +177,7 @@ development easier, and sometimes they add important hooks like closing
database connections at ``fork``.
.. _`Django`: http://djangoproject.com/
.. _`Pylons`: http://pylonshq.com/
.. _`Pylons`: http://pylonsproject.org/
.. _`Flask`: http://flask.pocoo.org/
.. _`web2py`: http://web2py.com/
.. _`Bottle`: http://bottlepy.org/
View
@@ -408,7 +408,7 @@ See :ref:`guide-canvas` for more about creating task workflows.
A group is lazy so you must call it to take action and evaluate
the group.
Will return a `group` task that when called will then call of the
Will return a `group` task that when called will then call all of the
tasks in the group (and return a :class:`GroupResult` instance
that can be used to inspect the state of the group).
View
@@ -0,0 +1,90 @@
=========================================
CELERYSA-0002: Celery Security Advisory
=========================================
:contact: security@celeryproject.org
:CVE id: TBA
:date: 2014-07-10 05:00:00 P.M UTC
Details
=======
:package: celery
:vulnerability: Environment error
:problem type: local
:risk: low
:versions-affected: 2.5, 3.0, 3.1
Description
===========
The built-in utility used to daemonize the Celery worker service sets
an insecure umask by default (umask 0).
This means that any files or directories created by the worker will
end up having world-writable permissions.
In practice this means that local users will be able to modify and possibly
corrupt the files created by user tasks.
This is not immediately exploitable but can be if those files are later
evaluated as a program, for example a task that creates Python program files
that are later executed.
Patches are now available for all maintained versions (see below),
and users are urged to upgrade, even if not directly
affected.
Acknowledgements
================
Special thanks to Red Hat for originally discovering and reporting the issue.
Systems affected
================
Users of Celery versions 3.0, and 3.1, except the recently
released 3.1.13, are affected if daemonizing the
Celery programs using the `--detach` argument or using the `celery multi` program
to start workers in the background, without setting a custom `--umask`
argument.
Solution
========
NOTE:
Not all users of Celery will use it to create files, but if you do
then files may already have been created with insecure permissions.
So after upgrading, or using the workaround, then please make sure
that files already created are not world writable.
To work around the issue you can set a custom umask using the ``--umask``
argument:
$ celery worker -l info --detach --umask=16 # (022)
Or you can upgrade to a more recent version:
- Users of the 3.1 series should upgrade to 3.1.13:
* ``pip install -U celery``, or
* ``easy_install -U celery``, or
* http://pypi.python.org/pypi/celery/3.1.13
- Users of the 3.0 series should upgrade to 3.0.25:
* ``pip install -U celery==3.0.25``, or
* ``easy_install -U celery==3.0.25``, or
* http://pypi.python.org/pypi/celery/3.0.25
Distribution package maintainers are urged to provide their users
with updated packages.
Please direct questions to the celery-users mailing-list:
http://groups.google.com/group/celery-users/,
or if you are planning to report a new security related issue we request that
you keep the information confidential by contacting
security@celeryproject.org instead.
Thank you!
View
@@ -55,10 +55,14 @@ must also export them (e.g. ``export DISPLAY=":0"``)
.. code-block:: bash
$ celery multi start worker1 \
-A proj \
--pidfile="$HOME/run/celery/%n.pid" \
--logfile="$HOME/log/celery/%n.log"
$ celery multi restart worker1 --pidfile="$HOME/run/celery/%n.pid"
$ celery multi restart worker1 \
-A proj \
--logfile="$HOME/log/celery/%n%I.log" \
--pidfile="$HOME/run/celery/%n.pid
$ celery multi stopwait worker1 --pidfile="$HOME/run/celery/%n.pid"
@@ -211,7 +215,7 @@ This is an example configuration for a Python project:
CELERYBEAT_CHDIR="/opt/Myproject/"
# Extra arguments to celerybeat
CELERYBEAT_OPTS="--schedule=/var/run/celerybeat-schedule"
CELERYBEAT_OPTS="--schedule=/var/run/celery/celerybeat-schedule"
.. _generic-initd-celerybeat-django-example:
@@ -265,7 +269,7 @@ Available options
* CELERY_CREATE_LOGDIR
Always create logfile directory. By default only enable when no custom
logfile location set.
.. _daemon-systemd-generic:
Usage systemd
@@ -279,10 +283,10 @@ Service file: celery.service
:Usage: `systemctl {start|stop|restart|status} celery.service`
:Configuration file: /etc/conf.d/celery
To create a temporary folders for the log and pid files change user and group in
To create a temporary folders for the log and pid files change user and group in
/usr/lib/tmpfiles.d/celery.conf.
To configure user, group, chdir change settings User, Group and WorkingDirectory defines
in /usr/lib/systemd/system/celery.service.
To configure user, group, chdir change settings User, Group and WorkingDirectory defines
in /usr/lib/systemd/system/celery.service.
.. _generic-systemd-celery-example:
View
@@ -31,7 +31,7 @@ The cache key expires after some time in case something unexpected happens
from celery import task
from celery.utils.log import get_task_logger
from django.core.cache import cache
from django.utils.hashcompat import md5_constructor as md5
from hashlib import md5
from djangofeeds.models import Feed
logger = get_task_logger(__name__)
@@ -42,7 +42,7 @@ The cache key expires after some time in case something unexpected happens
def import_feed(feed_url):
# The cache key consists of the task name and the MD5 digest
# of the feed URL.
feed_url_digest = md5(feed_url).hexdigest()
feed_url_hexdigest = md5(feed_url).hexdigest()
lock_id = '{0}-lock-{1}'.format(self.name, feed_url_hexdigest)
# cache.add fails if if the key already exists
View
@@ -67,7 +67,8 @@ This is only a problem in a limited set of use cases:
#. If the module that the task is defined in is run as a program.
#. If the application is created in the Python shell (REPL).
For example here, where the tasks module is also used to start a worker:
For example here, where the tasks module is also used to start a worker
with :meth:`@worker_main`:
:file:`tasks.py`:
@@ -114,7 +115,7 @@ There are several options you can set that will change how
Celery works. These options can be set directly on the app instance,
or you can use a dedicated configuration module.
The configuration is available as :attr:`@Celery.conf`::
The configuration is available as :attr:`@conf`::
>>> app.conf.CELERY_TIMEZONE
'Europe/London'
@@ -137,7 +138,7 @@ that are consulted in order:
#. The configuration module (if any)
#. The default configuration (:mod:`celery.app.defaults`).
You can even add new default sources by using the :meth:`@Celery.add_defaults`
You can even add new default sources by using the :meth:`@add_defaults`
method.
.. seealso::
@@ -148,13 +149,13 @@ method.
``config_from_object``
----------------------
The :meth:`@Celery.config_from_object` method loads configuration
The :meth:`@config_from_object` method loads configuration
from a configuration object.
This can be a configuration module, or any object with configuration attributes.
Note that any configuration that was previous set will be reset when
:meth:`~@Celery.config_from_object` is called. If you want to set additional
:meth:`~@config_from_object` is called. If you want to set additional
configuration you should do so after.
Example 1: Using the name of a module
@@ -216,7 +217,7 @@ Example 3: Using a configuration class/object
``config_from_envvar``
----------------------
The :meth:`@Celery.config_from_envvar` takes the configuration module name
The :meth:`@config_from_envvar` takes the configuration module name
from an environment variable
For example -- to load configuration from a module specified in the
@@ -288,9 +289,9 @@ Creating a :class:`@Celery` instance will only do the following:
#. Create the task registry.
#. Set itself as the current app (but not if the ``set_as_current``
argument was disabled)
#. Call the :meth:`@Celery.on_init` callback (does nothing by default).
#. Call the :meth:`@on_init` callback (does nothing by default).
The :meth:`~@Celery.task` decorator does not actually create the
The :meth:`@task` decorator does not actually create the
tasks at the point when it's called, instead it will defer the creation
of the task to happen either when the task is used, or after the
application has been *finalized*,
@@ -317,7 +318,7 @@ you use the task, or access an attribute (in this case :meth:`repr`):
True
*Finalization* of the app happens either explicitly by calling
:meth:`@Celery.finalize` -- or implicitly by accessing the :attr:`~@Celery.tasks`
:meth:`@finalize` -- or implicitly by accessing the :attr:`@tasks`
attribute.
Finalizing the object will:
@@ -464,8 +465,8 @@ chain breaks:
Abstract Tasks
==============
All tasks created using the :meth:`~@Celery.task` decorator
will inherit from the applications base :attr:`~@Celery.Task` class.
All tasks created using the :meth:`~@task` decorator
will inherit from the applications base :attr:`~@Task` class.
You can specify a different base class with the ``base`` argument:
@@ -504,7 +505,7 @@ Once a task is bound to an app it will read configuration to set default values
and so on.
It's also possible to change the default base class for an application
by changing its :meth:`@Celery.Task` attribute:
by changing its :meth:`@Task` attribute:
.. code-block:: python
@@ -520,7 +521,7 @@ by changing its :meth:`@Celery.Task` attribute:
>>> app.Task
<unbound MyBaseTask>
>>> @x.task
>>> @app.task
... def add(x, y):
... return x + y
View
@@ -144,12 +144,11 @@ This is an example error callback:
.. code-block:: python
@app.task
def error_handler(uuid):
result = AsyncResult(uuid)
exc = result.get(propagate=False)
@app.task(bind=True)
def error_handler(self, uuid):
result = self.app.AsyncResult(uuid)
print('Task {0} raised exception: {1!r}\n{2!r}'.format(
uuid, exc, result.traceback))
uuid, result.result, result.traceback))
it can be added to the task using the ``link_error`` execution
option:
@@ -442,7 +441,7 @@ Though this particular example is much better expressed as a group:
>>> from celery import group
>>> numbers = [(2, 2), (4, 4), (8, 8), (16, 16)]
>>> res = group(add.subtask(n) for i in numbers).apply_async()
>>> res = group(add.s(i) for i in numbers).apply_async()
>>> res.get()
[4, 8, 16, 32]
@@ -463,7 +462,7 @@ the workers :option:`-Q` argument:
.. code-block:: bash
$ celery worker -l info -Q celery,priority.high
$ celery -A proj worker -l info -Q celery,priority.high
.. seealso::
View
@@ -419,39 +419,6 @@ The linked task will be applied with the result of its parent
task as the first argument, which in the above case will result
in ``mul(4, 16)`` since the result is 4.
The results will keep track of what subtasks a task applies,
and this can be accessed from the result instance::
>>> res.children
[<AsyncResult: 8c350acf-519d-4553-8a53-4ad3a5c5aeb4>]
>>> res.children[0].get()
64
The result instance also has a :meth:`~@AsyncResult.collect` method
that treats the result as a graph, enabling you to iterate over
the results::
>>> list(res.collect())
[(<AsyncResult: 7b720856-dc5f-4415-9134-5c89def5664e>, 4),
(<AsyncResult: 8c350acf-519d-4553-8a53-4ad3a5c5aeb4>, 64)]
By default :meth:`~@AsyncResult.collect` will raise an
:exc:`~@IncompleteStream` exception if the graph is not fully
formed (one of the tasks has not completed yet),
but you can get an intermediate representation of the graph
too::
>>> for result, value in res.collect(intermediate=True)):
....
You can link together as many tasks as you like,
and signatures can be linked too::
>>> s = add.s(2, 2)
>>> s.link(mul.s(4))
>>> s.link(log_result.s())
You can also add *error callbacks* using the ``link_error`` argument::
>>> add.apply_async((2, 2), link_error=log_error.s())
@@ -508,11 +475,51 @@ work your way up the chain to get intermediate results::
>>> res.parent.parent
<AsyncResult: eeaad925-6778-4ad1-88c8-b2a63d017933>
Chains can also be made using the ``|`` (pipe) operator::
>>> (add.s(2, 2) | mul.s(8) | mul.s(10)).apply_async()
.. note::
It's not possible to synchronize on groups, so a group chained to another
signature is automatically upgraded to a chord:
.. code-block:: python
# will actually be a chord when finally evaluated
res = (group(add.s(i, i) for i in range(10)) | xsum.s()).delay()
Trails
~~~~~~
Tasks will keep track of what subtasks a task calls in the
result backend (unless disabled using :attr:`Task.trail <~@Task.trail>`)
and this can be accessed from the result instance::
>>> res.children
[<AsyncResult: 8c350acf-519d-4553-8a53-4ad3a5c5aeb4>]
>>> res.children[0].get()
64
The result instance also has a :meth:`~@AsyncResult.collect` method
that treats the result as a graph, enabling you to iterate over
the results::
>>> list(res.collect())
[(<AsyncResult: 7b720856-dc5f-4415-9134-5c89def5664e>, 4),
(<AsyncResult: 8c350acf-519d-4553-8a53-4ad3a5c5aeb4>, 64)]
By default :meth:`~@AsyncResult.collect` will raise an
:exc:`~@IncompleteStream` exception if the graph is not fully
formed (one of the tasks has not completed yet),
but you can get an intermediate representation of the graph
too::
>>> for result, value in res.collect(intermediate=True)):
....
Graphs
~~~~~~
@@ -742,7 +749,7 @@ to the :exc:`~@ChordError` exception:
File "*/celery/result.py", line 120, in get
interval=interval)
File "*/celery/backends/amqp.py", line 150, in wait_for
raise self.exception_to_python(meta['result'])
raise meta['result']
celery.exceptions.ChordError: Dependency 97de6f3f-ea67-4517-a21c-d867c61fcb47
raised ValueError('something something',)
View
@@ -44,7 +44,7 @@ You can enable the Eventlet pool by using the ``-P`` option to
.. code-block:: bash
$ celery worker -P eventlet -c 1000
$ celery -A proj worker -P eventlet -c 1000
.. _eventlet-examples:
View
@@ -613,7 +613,7 @@ Command-specific options
~~~~~~~~~~~~~~~~~~~~~~~~
You can add additional command-line options to the ``worker``, ``beat`` and
``events`` commands by modifying the :attr:`~@Celery.user_options` attribute of the
``events`` commands by modifying the :attr:`~@user_options` attribute of the
application instance.
Celery commands uses the :mod:`optparse` module to parse command-line
View
@@ -58,40 +58,41 @@ Commands
.. code-block:: bash
$ celery status
$ celery -A proj status
* **result**: Show the result of a task
.. code-block:: bash
$ celery result -t tasks.add 4e196aa4-0141-4601-8138-7aa33db0f577
$ celery -A proj result -t tasks.add 4e196aa4-0141-4601-8138-7aa33db0f577
Note that you can omit the name of the task as long as the
task doesn't use a custom result backend.
* **purge**: Purge messages from all configured task queues.
.. code-block:: bash
$ celery purge
.. warning::
There is no undo for this operation, and messages will
be permanently deleted!
.. code-block:: bash
$ celery -A proj purge
* **inspect active**: List active tasks
.. code-block:: bash
$ celery inspect active
$ celery -A proj inspect active
These are all the tasks that are currently being executed.
* **inspect scheduled**: List scheduled ETA tasks
.. code-block:: bash
$ celery inspect scheduled
$ celery -A proj inspect scheduled
These are tasks reserved by the worker because they have the
`eta` or `countdown` argument set.
@@ -100,7 +101,7 @@ Commands
.. code-block:: bash
$ celery inspect reserved
$ celery -A proj inspect reserved
This will list all tasks that have been prefetched by the worker,
and is currently waiting to be executed (does not include tasks
@@ -110,37 +111,37 @@ Commands
.. code-block:: bash
$ celery inspect revoked
$ celery -A proj inspect revoked
* **inspect registered**: List registered tasks
.. code-block:: bash
$ celery inspect registered
$ celery -A proj inspect registered
* **inspect stats**: Show worker statistics (see :ref:`worker-statistics`)
.. code-block:: bash
$ celery inspect stats
$ celery -A proj inspect stats
* **control enable_events**: Enable events
.. code-block:: bash
$ celery control enable_events
$ celery -A proj control enable_events
* **control disable_events**: Disable events
.. code-block:: bash
$ celery control disable_events
$ celery -A proj control disable_events
* **migrate**: Migrate tasks from one broker to another (**EXPERIMENTAL**).
.. code-block:: bash
$ celery migrate redis://localhost amqp://localhost
$ celery -A proj migrate redis://localhost amqp://localhost
This command will migrate all the tasks on one broker to another.
As this command is new and experimental you should be sure to have
@@ -164,9 +165,9 @@ You can specify a single, or a list of workers by using the
.. code-block:: bash
$ celery inspect -d w1,w2 reserved
$ celery -A proj inspect -d w1,w2 reserved
$ celery control -d w1,w2 enable_events
$ celery -A proj control -d w1,w2 enable_events
.. _monitoring-flower:
@@ -232,13 +233,13 @@ Running the flower command will start a web-server that you can visit:
.. code-block:: bash
$ celery flower
$ celery -A proj flower
The default port is http://localhost:5555, but you can change this using the `--port` argument:
.. code-block:: bash
$ celery flower --port=5555
$ celery -A proj flower --port=5555
Broker URL can also be passed through the `--broker` argument :
@@ -273,7 +274,7 @@ Starting:
.. code-block:: bash
$ celery events
$ celery -A proj events
You should see a screen like:
@@ -285,13 +286,13 @@ You should see a screen like:
.. code-block:: bash
$ celery events --camera=<camera-class> --frequency=1.0
$ celery -A proj events --camera=<camera-class> --frequency=1.0
and it includes a tool to dump events to :file:`stdout`:
.. code-block:: bash
$ celery events --dump
$ celery -A proj events --dump
For a complete list of options use ``--help``:
@@ -457,7 +458,7 @@ arguments:
.. code-block:: bash
$ celery events -c myapp.Camera --frequency=2.0
$ celery -A proj events -c myapp.Camera --frequency=2.0
.. _monitoring-camera:
@@ -497,7 +498,7 @@ it with the :option:`-c` option:
.. code-block:: bash
$ celery events -c myapp.DumpCam --frequency=2.0
$ celery -A proj events -c myapp.DumpCam --frequency=2.0
Or you can use it programmatically like this:
View
@@ -223,5 +223,5 @@ worker option:
$ celery -A proj worker -l info -Ofair
With this option enabled the worker will only write to workers that are
With this option enabled the worker will only write to processes that are
available for work, disabling the prefetch behavior.
Oops, something went wrong.