Skip to content

Commit

Permalink
Use new kombu Queue.expires setting
Browse files Browse the repository at this point in the history
  • Loading branch information
ask committed Oct 15, 2016
1 parent 5c01f0e commit ff14064
Show file tree
Hide file tree
Showing 5 changed files with 29 additions and 46 deletions.
19 changes: 8 additions & 11 deletions celery/backends/amqp.py
Expand Up @@ -13,9 +13,7 @@
from celery.exceptions import TimeoutError
from celery.five import range, monotonic
from celery.utils import deprecated
from celery.utils.functional import dictfilter
from celery.utils.log import get_logger
from celery.utils.time import maybe_s_to_ms

from .base import BaseBackend

Expand Down Expand Up @@ -80,9 +78,6 @@ def __init__(self, app, connection=None, exchange=None, exchange_type=None,
)
self.serializer = serializer or conf.result_serializer
self.auto_delete = auto_delete
self.queue_arguments = dictfilter({
'x-expires': maybe_s_to_ms(self.expires),
})

def _create_exchange(self, name, type='direct', delivery_mode=2):
return self.Exchange(name=name,
Expand All @@ -93,12 +88,14 @@ def _create_exchange(self, name, type='direct', delivery_mode=2):

def _create_binding(self, task_id):
name = self.rkey(task_id)
return self.Queue(name=name,
exchange=self.exchange,
routing_key=name,
durable=self.persistent,
auto_delete=self.auto_delete,
queue_arguments=self.queue_arguments)
return self.Queue(
name=name,
exchange=self.exchange,
routing_key=name,
durable=self.persistent,
auto_delete=self.auto_delete,
expires=self.expires,
)

def revive(self, channel):
pass
Expand Down
9 changes: 3 additions & 6 deletions celery/backends/rpc.py
Expand Up @@ -16,8 +16,6 @@
from celery import states
from celery._state import task_join_will_block
from celery.five import items, range
from celery.utils.functional import dictfilter
from celery.utils.time import maybe_s_to_ms

from . import base
from .async import AsyncBackendMixin, BaseResultConsumer
Expand Down Expand Up @@ -124,9 +122,6 @@ def __init__(self, app, connection=None, exchange=None, exchange_type=None,
)
self.serializer = serializer or conf.result_serializer
self.auto_delete = auto_delete
self.queue_arguments = dictfilter({
'x-expires': maybe_s_to_ms(self.expires),
})
self.result_consumer = self.ResultConsumer(
self, self.app, self.accept,
self._pending_results, self._pending_messages,
Expand Down Expand Up @@ -310,7 +305,9 @@ def as_uri(self, include_password=True):
def binding(self):
return self.Queue(
self.oid, self.exchange, self.oid,
durable=False, auto_delete=True
durable=False,
auto_delete=True,
expires=self.expires,
)

@cached_property
Expand Down
23 changes: 7 additions & 16 deletions celery/events/__init__.py
Expand Up @@ -24,9 +24,8 @@
from celery import uuid
from celery.app import app_or_default
from celery.five import items
from celery.utils.functional import dictfilter
from celery.utils.nodenames import anon_nodename
from celery.utils.time import adjust_timestamp, utcoffset, maybe_s_to_ms
from celery.utils.time import adjust_timestamp, utcoffset

__all__ = ['Events', 'Event', 'EventDispatcher', 'EventReceiver']

Expand Down Expand Up @@ -310,14 +309,17 @@ def __init__(self, channel, handlers=None, routing_key='#',
self.queue_prefix = queue_prefix or self.app.conf.event_queue_prefix
self.exchange = get_exchange(
self.connection or self.app.connection_for_write())
if queue_ttl is None:
queue_ttl = self.app.conf.event_queue_ttl
if queue_expires is None:
queue_expires = self.app.conf.event_queue_expires
self.queue = Queue(
'.'.join([self.queue_prefix, self.node_id]),
exchange=self.exchange,
routing_key=self.routing_key,
auto_delete=True, durable=False,
queue_arguments=self._get_queue_arguments(
ttl=queue_ttl, expires=queue_expires,
),
message_ttl=queue_ttl,
expires=queue_expires,
)
self.clock = self.app.clock
self.adjust_clock = self.clock.adjust
Expand All @@ -326,17 +328,6 @@ def __init__(self, channel, handlers=None, routing_key='#',
accept = {self.app.conf.event_serializer, 'json'}
self.accept = accept

def _get_queue_arguments(self, ttl=None, expires=None):
conf = self.app.conf
return dictfilter({
'x-message-ttl': maybe_s_to_ms(
ttl if ttl is not None else conf.event_queue_ttl,
),
'x-expires': maybe_s_to_ms(
expires if expires is not None else conf.event_queue_expires,
),
})

def process(self, type, event):
"""Process event by dispatching to configured handler."""
handler = self.handlers.get(type) or self.handlers.get('*')
Expand Down
11 changes: 3 additions & 8 deletions celery/utils/time.py
Expand Up @@ -24,9 +24,9 @@
__all__ = [
'LocalTimezone', 'timezone', 'maybe_timedelta',
'delta_resolution', 'remaining', 'rate', 'weekday',
'humanize_seconds', 'maybe_iso8601', 'is_naive', 'make_aware',
'localize', 'to_utc', 'maybe_make_aware', 'ffwd', 'utcoffset',
'adjust_timestamp', 'maybe_s_to_ms',
'humanize_seconds', 'maybe_iso8601', 'is_naive',
'make_aware', 'localize', 'to_utc', 'maybe_make_aware',
'ffwd', 'utcoffset', 'adjust_timestamp',
]

PY3 = sys.version_info[0] == 3
Expand Down Expand Up @@ -379,8 +379,3 @@ def utcoffset(time=_time, localtime=_time.localtime):
def adjust_timestamp(ts, offset, here=utcoffset):
"""Adjust timestamp based on provided utcoffset."""
return ts - (offset - here()) * 3600


def maybe_s_to_ms(v):
"""Convert seconds to milliseconds, but return None for None."""
return int(float(v) * 1000.0) if v is not None else v
13 changes: 8 additions & 5 deletions t/unit/backends/test_amqp.py
Expand Up @@ -99,15 +99,18 @@ def test_repair_uuid(self):

def test_expires_is_int(self):
b = self.create_backend(expires=48)
assert b.queue_arguments.get('x-expires') == 48 * 1000.0
q = b._create_binding('x1y2z3')
assert q.expires == 48

def test_expires_is_float(self):
b = self.create_backend(expires=48.3)
assert b.queue_arguments.get('x-expires') == 48.3 * 1000.0
q = b._create_binding('x1y2z3')
assert q.expires == 48.3

def test_expires_is_timedelta(self):
b = self.create_backend(expires=timedelta(minutes=1))
assert b.queue_arguments.get('x-expires') == 60 * 1000.0
q = b._create_binding('x1y2z3')
assert q.expires == 60

@mock.sleepdeprived()
def test_store_result_retries(self):
Expand Down Expand Up @@ -246,8 +249,8 @@ def test_no_expires(self):
app = self.app
app.conf.result_expires = None
b = self.create_backend(expires=None)
with pytest.raises(KeyError):
b.queue_arguments['x-expires']
q = b._create_binding('foo')
assert q.expires is None

def test_process_cleanup(self):
self.create_backend().process_cleanup()
Expand Down

0 comments on commit ff14064

Please sign in to comment.