Permalink
Comparing changes
Open a pull request
- 2 commits
- 10 files changed
- 0 commit comments
- 1 contributor
Unified
Split
Showing
with
179 additions
and 133 deletions.
- +4 −24 kombu/transport/SQS.py
- +23 −14 kombu/transport/redis.py
- +24 −18 kombu/transport/virtual/base.py
- +8 −4 kombu/utils/scheduling.py
- +60 −26 t/unit/transport/test_SQS.py
- +18 −18 {kombu/tests → t/unit}/transport/test_etcd.py
- +1 −1 t/unit/transport/test_memory.py
- +22 −16 t/unit/transport/test_redis.py
- +10 −4 t/unit/transport/virtual/test_base.py
- +9 −8 t/unit/utils/test_scheduling.py
| @@ -37,7 +37,6 @@ | ||
| from __future__ import absolute_import, unicode_literals | ||
| import collections | ||
| import socket | ||
| import string | ||
| @@ -99,12 +98,6 @@ def __init__(self, *args, **kwargs): | ||
| # queues that are known to already exist. | ||
| self._update_queue_cache(self.queue_name_prefix) | ||
| # The drain_events() method stores extra messages in a local | ||
| # Deque object. This allows multiple messages to be requested from | ||
| # SQS at once for performance, but maintains the same external API | ||
| # to the caller of the drain_events() method. | ||
| self._queue_message_cache = collections.deque() | ||
| self.hub = kwargs.get('hub') or get_event_loop() | ||
| def _update_queue_cache(self, queue_name_prefix): | ||
| @@ -145,24 +138,9 @@ def drain_events(self, timeout=None): | ||
| # If we're not allowed to consume or have no consumers, raise Empty | ||
| if not self._consumers or not self.qos.can_consume(): | ||
| raise Empty() | ||
| message_cache = self._queue_message_cache | ||
| # Check if there are any items in our buffer. If there are any, pop | ||
| # off that queue first. | ||
| try: | ||
| return message_cache.popleft() | ||
| except IndexError: | ||
| pass | ||
| # At this point, go and get more messages from SQS | ||
| res, queue = self._poll(self.cycle, timeout=timeout) | ||
| message_cache.extend((r, queue) for r in res) | ||
| # Now try to pop off the queue again. | ||
| try: | ||
| return message_cache.popleft() | ||
| except IndexError: | ||
| raise Empty() | ||
| self._poll(self.cycle, self.connection._deliver, timeout=timeout) | ||
| def _reset_cycle(self): | ||
| """Reset the consume cycle. | ||
| @@ -286,7 +264,9 @@ def _get_bulk(self, queue, | ||
| messages = q.get_messages(num_messages=maxcount) | ||
| if messages: | ||
| return self._messages_to_python(messages, queue) | ||
| for msg in self._messages_to_python(messages, queue): | ||
| self.connection._deliver(msg, queue) | ||
| return | ||
| raise Empty() | ||
| def _get(self, queue): | ||
| @@ -325,7 +325,7 @@ def maybe_restore_messages(self): | ||
| def on_readable(self, fileno): | ||
| chan, type = self._fd_to_chan[fileno] | ||
| if chan.qos.can_consume(): | ||
| return chan.handlers[type]() | ||
| chan.handlers[type]() | ||
| def handle_event(self, fileno, event): | ||
| if event & READ: | ||
| @@ -334,7 +334,7 @@ def handle_event(self, fileno, event): | ||
| chan, type = self._fd_to_chan[fileno] | ||
| chan._poll_error(type) | ||
| def get(self, timeout=None): | ||
| def get(self, callback, timeout=None): | ||
| self._in_protected_read = True | ||
| try: | ||
| for channel in self._channels: | ||
| @@ -345,15 +345,14 @@ def get(self, timeout=None): | ||
| self._register_LISTEN(channel) | ||
| events = self.poller.poll(timeout) | ||
| for fileno, event in events or []: | ||
| ret = self.handle_event(fileno, event) | ||
| if ret: | ||
| return ret | ||
| if events: | ||
| for fileno, event in events: | ||
| ret = self.handle_event(fileno, event) | ||
| if ret: | ||
| return | ||
| # - no new data, so try to restore messages. | ||
| # - reset active redis commands. | ||
| self.maybe_restore_messages() | ||
| raise Empty() | ||
| finally: | ||
| self._in_protected_read = False | ||
| @@ -660,6 +659,16 @@ def _handle_message(self, client, r): | ||
| def _receive(self): | ||
| c = self.subclient | ||
| ret = [] | ||
| try: | ||
| ret.append(self._receive_one(c)) | ||
| except Empty: | ||
| pass | ||
| while c.connection.can_read(timeout=0): | ||
| ret.append(self._receive_one(c)) | ||
| return any(ret) | ||
| def _receive_one(self, c): | ||
| response = None | ||
| try: | ||
| response = c.parse_response() | ||
| @@ -680,8 +689,9 @@ def _receive(self): | ||
| channel, repr(payload)[:4096], exc_info=1) | ||
| raise Empty() | ||
| exchange = channel.split('/', 1)[0] | ||
| return message, self._fanout_to_queue[exchange] | ||
| raise Empty() | ||
| self.connection._deliver( | ||
| message, self._fanout_to_queue[exchange]) | ||
| return True | ||
| def _brpop_start(self, timeout=1): | ||
| queues = self._queue_cycle.consume(len(self.active_queues)) | ||
| @@ -707,7 +717,8 @@ def _brpop_read(self, **options): | ||
| dest, item = dest__item | ||
| dest = bytes_to_str(dest).rsplit(self.sep, 1)[0] | ||
| self._queue_cycle.rotate(dest) | ||
| return loads(bytes_to_str(item)), dest | ||
| self.connection._deliver(loads(bytes_to_str(item)), dest) | ||
| return True | ||
| else: | ||
| raise Empty() | ||
| finally: | ||
| @@ -1033,9 +1044,7 @@ def on_poll_start(): | ||
| def on_readable(self, fileno): | ||
| """Handle AIO event for one of our file descriptors.""" | ||
| item = self.cycle.on_readable(fileno) | ||
| if item: | ||
| self._deliver(*item) | ||
| self.cycle.on_readable(fileno) | ||
| def _get_errors(self): | ||
| """Utility to import redis-py's exceptions at runtime.""" | ||
| @@ -393,9 +393,13 @@ def _has_queue(self, queue, **kwargs): | ||
| """ | ||
| return True | ||
| def _poll(self, cycle, timeout=None): | ||
| def _poll(self, cycle, callback, timeout=None): | ||
| """Poll a list of queues for available messages.""" | ||
| return cycle.get() | ||
| return cycle.get(callback) | ||
| def _get_and_deliver(self, queue, callback): | ||
| message = self._get(queue) | ||
| callback(message, queue) | ||
| class Channel(AbstractChannel, base.StdChannel): | ||
| @@ -590,6 +594,15 @@ def _next_delivery_tag(self): | ||
| def basic_publish(self, message, exchange, routing_key, **kwargs): | ||
| """Publish message.""" | ||
| self._inplace_augment_message(message, exchange, routing_key) | ||
| if exchange: | ||
| return self.typeof(exchange).deliver( | ||
| message, exchange, routing_key, **kwargs | ||
| ) | ||
| # anon exchange: routing_key is the destination queue | ||
| return self._put(routing_key, message, **kwargs) | ||
| def _inplace_augment_message(self, message, exchange, routing_key): | ||
| message['body'], body_encoding = self.encode_body( | ||
| message['body'], self.body_encoding, | ||
| ) | ||
| @@ -602,12 +615,6 @@ def basic_publish(self, message, exchange, routing_key, **kwargs): | ||
| exchange=exchange, | ||
| routing_key=routing_key, | ||
| ) | ||
| if exchange: | ||
| return self.typeof(exchange).deliver( | ||
| message, exchange, routing_key, **kwargs | ||
| ) | ||
| # anon exchange: routing_key is the destination queue | ||
| return self._put(routing_key, message, **kwargs) | ||
| def basic_consume(self, queue, no_ack, callback, consumer_tag, **kwargs): | ||
| """Consume from `queue`""" | ||
| @@ -725,11 +732,12 @@ def _restore(self, message): | ||
| def _restore_at_beginning(self, message): | ||
| return self._restore(message) | ||
| def drain_events(self, timeout=None): | ||
| def drain_events(self, timeout=None, callback=None): | ||
| callback = callback or self.connection._deliver | ||
| if self._consumers and self.qos.can_consume(): | ||
| if hasattr(self, '_get_many'): | ||
| return self._get_many(self._active_queues, timeout=timeout) | ||
| return self._poll(self.cycle, timeout=timeout) | ||
| return self._poll(self.cycle, callback, timeout=timeout) | ||
| raise Empty() | ||
| def message_to_python(self, raw_message): | ||
| @@ -787,7 +795,8 @@ def decode_body(self, body, encoding=None): | ||
| return body | ||
| def _reset_cycle(self): | ||
| self._cycle = FairCycle(self._get, self._active_queues, Empty) | ||
| self._cycle = FairCycle( | ||
| self._get_and_deliver, self._active_queues, Empty) | ||
| def __enter__(self): | ||
| return self | ||
| @@ -935,22 +944,19 @@ def close_connection(self, connection): | ||
| channel.close() | ||
| def drain_events(self, connection, timeout=None): | ||
| loop = 0 | ||
| time_start = monotonic() | ||
| get = self.cycle.get | ||
| polling_interval = self.polling_interval | ||
| while 1: | ||
| try: | ||
| item, channel = get(timeout=timeout) | ||
| get(self._deliver, timeout=timeout) | ||
| except Empty: | ||
| if timeout and monotonic() - time_start >= timeout: | ||
| if timeout is not None and monotonic() - time_start >= timeout: | ||
| raise socket.timeout() | ||
| loop += 1 | ||
| if polling_interval is not None: | ||
| sleep(polling_interval) | ||
| else: | ||
| break | ||
| self._deliver(*item) | ||
| def _deliver(self, message, queue): | ||
| if not queue: | ||
| @@ -980,8 +986,8 @@ def on_message_ready(self, channel, message, queue): | ||
| queue, message)) | ||
| self._callbacks[queue](message) | ||
| def _drain_channel(self, channel, timeout=None): | ||
| return channel.drain_events(timeout=timeout) | ||
| def _drain_channel(self, channel, callback, timeout=None): | ||
| return channel.drain_events(callback=callback, timeout=timeout) | ||
| @property | ||
| def default_connection_params(self): | ||
| @@ -40,15 +40,19 @@ def _next(self): | ||
| if not self.resources: | ||
| raise self.predicate() | ||
| def get(self, **kwargs): | ||
| def get(self, callback, **kwargs): | ||
| succeeded = 0 | ||
| for tried in count(0): # for infinity | ||
| resource = self._next() | ||
| try: | ||
| return self.fun(resource, **kwargs), resource | ||
| return self.fun(resource, callback, **kwargs) | ||
| except self.predicate: | ||
| if tried >= len(self.resources) - 1: | ||
| raise | ||
| if not succeeded: | ||
| raise | ||
| break | ||
| else: | ||
| succeeded += 1 | ||
| def close(self): | ||
| pass | ||
Oops, something went wrong.