Permalink
Browse files

Merge branch '3.0'

  • Loading branch information...
2 parents 877b420 + 337c8cb commit 77c97c4c68a161d68e7b60d637590a83b467bcf8 @ask ask committed Sep 20, 2012
Showing with 52 additions and 10 deletions.
  1. +22 −0 Changelog
  2. +12 −0 celery/__main__.py
  3. +3 −2 celery/task/trace.py
  4. +12 −5 celery/worker/consumer.py
  5. +1 −1 docs/configuration.rst
  6. +2 −2 setup.py
View
@@ -68,6 +68,12 @@ If you're looking for versions prior to 3.x you should see :ref:`history`.
Fix contributed by Sam Cooke.
+- The worker now makes sure the request/task stacks are not modified
+ by the initial ``Task.__call__``.
+
+ This would previously be a problem if a custom task class defined
+ ``__call__`` and also called ``super()``.
+
- Because of many bugs the fast local optimization has been disabled,
and can only be enabled by setting the :envvar:`USE_FAST_LOCALS` attribute.
@@ -76,6 +82,22 @@ If you're looking for versions prior to 3.x you should see :ref:`history`.
- More fixes related to late eventlet/gevent patching.
+- Documentation for the settings out of sync with reality:
+
+ - :setting:`CELERY_TASK_PUBLISH_RETRY`
+
+ Documented as disabled by default, but it was enabled by default
+ since 2.5 as stated by the 2.5 changelog.
+
+ - :setting:`CELERY_TASK_PUBLISH_RETRY_POLICY`
+
+ The default max_retries had been set to 100, but documented as being
+ 3, and the interval_max was set to 1 but documented as 0.2.
+ The default setting are now set to 3 and 0.2 as it was originally
+ documented.
+
+ Fix contributed by Matt Long.
+
- Worker: Log messages when connection established and lost have been improved
so that they are more useful when used with the upcoming multiple broker
hostlist for failover that is coming in the next Kombu version.
View
@@ -20,5 +20,17 @@ def _compat_worker():
main()
+def _compat_multi():
+ maybe_patch_concurrency()
+ from celery.bin.celeryd_multi import main
+ main()
+
+
+def _compat_beat():
+ maybe_patch_concurrency()
+ from celery.bin.celerybeat import main
+ main()
+
+
if __name__ == '__main__':
main()
View
@@ -382,8 +382,9 @@ def _install_stack_protection():
_patched['BaseTask.__call__'] = orig = BaseTask.__call__
def __protected_call__(self, *args, **kwargs):
- req, stack = self.request, self.request_stack
- if not req._protected and len(stack) == 2 and \
+ stack = self.request_stack
+ req = stack.top
+ if req and not req._protected and len(stack) == 2 and \
not req.called_directly:
req._protected = 1
return self.run(*args, **kwargs)
View
@@ -79,6 +79,7 @@
from time import sleep
from Queue import Empty
+from kombu.syn import _detect_environment
from kombu.utils.encoding import safe_repr
from kombu.utils.eventio import READ, WRITE, ERR
@@ -357,6 +358,12 @@ def __init__(self, ready_queue,
if not hub:
self.amqheartbeat = 0
+ if _detect_environment() == 'gevent':
+ # there's a gevent bug that causes timeouts to not be reset,
+ # so if the connection timeout is exceeded once, it can NEVER
+ # connect again.
+ self.app.conf.BROKER_CONNECTION_TIMEOUT = None
+
def update_strategies(self):
S = self.strategies
app = self.app
@@ -606,7 +613,7 @@ def close_connection(self):
debug('Closing broker connection...')
self.maybe_conn_error(connection.close)
- def stop_consumers(self, close_connection=True):
+ def stop_consumers(self, close_connection=True, join=True):
"""Stop consuming tasks and broadcast commands, also stops
the heartbeat thread and event dispatcher.
@@ -623,7 +630,7 @@ def stop_consumers(self, close_connection=True):
self.heart = self.heart.stop()
debug('Cancelling task consumer...')
- if self.task_consumer:
+ if join and self.task_consumer:
self.maybe_conn_error(self.task_consumer.cancel)
if self.event_dispatcher:
@@ -632,7 +639,7 @@ def stop_consumers(self, close_connection=True):
self.maybe_conn_error(self.event_dispatcher.close)
debug('Cancelling broadcast consumer...')
- if self.broadcast_consumer:
+ if join and self.broadcast_consumer:
self.maybe_conn_error(self.broadcast_consumer.cancel)
if close_connection:
@@ -707,7 +714,7 @@ def reset_connection(self):
"""Re-establish the broker connection and set up consumers,
heartbeat and the event dispatcher."""
debug('Re-establishing connection to the broker...')
- self.stop_consumers()
+ self.stop_consumers(join=False)
# Clear internal queues to get rid of old messages.
# They can't be acked anyway, as a delivery tag is specific
@@ -794,7 +801,7 @@ def stop(self):
# anymore.
self.close()
debug('Stopping consumers...')
- self.stop_consumers(close_connection=False)
+ self.stop_consumers(close_connection=False, join=True)
def close(self):
self._state = CLOSE
View
@@ -930,7 +930,7 @@ Decides if publishing task messages will be retried in the case
of connection loss or other connection errors.
See also :setting:`CELERY_TASK_PUBLISH_RETRY_POLICY`.
-Disabled by default.
+Enabled by default.
.. setting:: CELERY_TASK_PUBLISH_RETRY_POLICY
View
@@ -177,11 +177,11 @@ def reqs(f):
if CELERY_COMPAT_PROGRAMS:
console_scripts.extend([
'celeryd = celery.__main__:_compat_worker',
- 'celerybeat = celery.bin.celerybeat:main',
+ 'celerybeat = celery.__main__:_compat_beat',
'camqadm = celery.bin.camqadm:main',
'celeryev = celery.bin.celeryev:main',
'celeryctl = celery.bin.celeryctl:main',
- 'celeryd-multi = celery.bin.celeryd_multi:main',
+ 'celeryd-multi = celery.__main__:_compat_multi',
])
# bundles: Only relevant for Celery developers.

0 comments on commit 77c97c4

Please sign in to comment.