Skip to content

Commit

Permalink
Switches delivery_tag to uuid.uuid4() for Qpid transport
Browse files Browse the repository at this point in the history
  • Loading branch information
bmbouter committed Jan 25, 2016
1 parent 66f417f commit 7d6af48
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 25 deletions.
3 changes: 2 additions & 1 deletion kombu/tests/transport/test_qpid.py
Expand Up @@ -5,6 +5,7 @@
import socket
import sys
import time
import uuid

from collections import Callable
from itertools import count
Expand Down Expand Up @@ -1317,7 +1318,7 @@ def test_basic_publish(self, mock_put,
mock_message['properties']['body_encoding'], mock_body_encoding,
)
self.assertIsInstance(
mock_message['properties']['delivery_tag'], int,
mock_message['properties']['delivery_tag'], uuid.UUID,
)
self.assertIs(
mock_message['properties']['delivery_info']['exchange'],
Expand Down
43 changes: 19 additions & 24 deletions kombu/transport/qpid.py
Expand Up @@ -74,14 +74,13 @@
from __future__ import absolute_import

import os
import random
import select
import socket
import ssl
import sys
import time
import uuid

from itertools import count
from gettext import gettext as _

import amqp.protocol
Expand Down Expand Up @@ -160,7 +159,7 @@ class QoS(object):
ACKed asynchronously through a call to :meth:`ack`. Messages that are
received, but not ACKed will not be delivered by the broker to another
consumer until an ACK is received, or the session is closed. Messages
are referred to using delivery_tag integers, which are unique per
are referred to using delivery_tag, which are unique per
:class:`Channel`. Delivery tags are managed outside of this object and
are passed in with a message to :meth:`append`. Un-ACKed messages can
be looked up from QoS using :meth:`get` and can be rejected and
Expand Down Expand Up @@ -214,15 +213,15 @@ def can_consume_max_estimate(self):
def append(self, message, delivery_tag):
"""Append message to the list of un-ACKed messages.
Add a message, referenced by the integer delivery_tag, for ACKing,
Add a message, referenced by the delivery_tag, for ACKing,
rejecting, or getting later. Messages are saved into an
:class:`~kombu.utils.compat.OrderedDict` by delivery_tag.
:param message: A received message that has not yet been ACKed.
:type message: qpid.messaging.Message
:param delivery_tag: An integer number to refer to this message by
:param delivery_tag: A UUID to refer to this message by
upon receipt.
:type delivery_tag: int
:type delivery_tag: uuid.UUID
"""
self._not_yet_acked[delivery_tag] = message

Expand All @@ -233,7 +232,7 @@ def get(self, delivery_tag):
:param delivery_tag: The delivery tag associated with the message
to be returned.
:type delivery_tag: int
:type delivery_tag: uuid.UUID
:return: An un-ACKed message that is looked up by delivery_tag.
:rtype: qpid.messaging.Message
Expand All @@ -248,7 +247,7 @@ def ack(self, delivery_tag):
:param delivery_tag: the delivery tag associated with the message
to be acknowledged.
:type delivery_tag: int
:type delivery_tag: uuid.UUID
"""
message = self._not_yet_acked.pop(delivery_tag)
self.session.acknowledge(message=message)
Expand All @@ -266,7 +265,7 @@ def reject(self, delivery_tag, requeue=False):
:param delivery_tag: The delivery tag associated with the message
to be rejected.
:type delivery_tag: int
:type delivery_tag: uuid.UUID
:keyword requeue: If True, the broker will be notified to requeue
the message. If False, the broker will be told to drop the
message entirely. In both cases, the message will be removed
Expand Down Expand Up @@ -311,10 +310,9 @@ class Channel(base.StdChannel):
Messages sent using this Channel are assigned a delivery_tag. The
delivery_tag is generated for a message as they are prepared for
sending by :meth:`basic_publish`. The delivery_tag is unique per
Channel instance using :meth:`~itertools.count`. The delivery_tag has
no meaningful context in other objects, and is only maintained in the
memory of this object, and the underlying :class:`QoS` object that
provides support.
Channel instance. The delivery_tag has no meaningful context in other
objects, and is only maintained in the memory of this object, and the
underlying :class:`QoS` object that provides support.
Each Channel object instantiates exactly one :class:`QoS` object for
prefetch limiting, and asynchronous ACKing. The :class:`QoS` object is
Expand Down Expand Up @@ -842,7 +840,7 @@ def basic_ack(self, delivery_tag):
:param delivery_tag: The delivery tag associated with the message
to be acknowledged.
:type delivery_tag: int
:type delivery_tag: uuid.UUID
"""
self.qos.ack(delivery_tag)

Expand All @@ -860,7 +858,7 @@ def basic_reject(self, delivery_tag, requeue=False):
:param delivery_tag: The delivery tag associated with the message
to be rejected.
:type delivery_tag: int
:type delivery_tag: uuid.UUID
:keyword requeue: If False, the rejected message will be dropped by
the broker and not delivered to any other consumers. If True,
then the rejected message will be requeued for delivery to
Expand Down Expand Up @@ -901,10 +899,9 @@ def basic_consume(self, queue, no_ack, callback, consumer_tag, **kwargs):
handled by the caller of :meth:`~Transport.drain_events`. Messages
can be ACKed after being received through a call to :meth:`basic_ack`.
If no_ack is True, the messages are immediately ACKed to avoid a
memory leak in qpid.messaging when messages go un-ACKed. The no_ack
flag indicates that the receiver of the message does not intent to
call :meth:`basic_ack`.
If no_ack is True, The no_ack flag indicates that the receiver of
the message will not call :meth:`basic_ack` later. Since the
message will not be ACKed later, it is ACKed immediately.
:meth:`basic_consume` transforms the message object type prior to
calling the callback. Initially the message comes in as a
Expand Down Expand Up @@ -940,9 +937,7 @@ def _callback(qpid_message):
delivery_tag = message.delivery_tag
self.qos.append(qpid_message, delivery_tag)
if no_ack:
# Celery will not ack this message later, so we should to
# avoid a memory leak in qpid.messaging due to un-ACKed
# messages.
# Celery will not ack this message later, so we should ack now
self.basic_ack(delivery_tag)
return callback(message)

Expand Down Expand Up @@ -1068,7 +1063,7 @@ def basic_publish(self, message, exchange, routing_key, **kwargs):
- wraps the body as a buffer object, so that
:class:`qpid.messaging.endpoints.Sender` uses a content type
that can support arbitrarily large messages.
- assigns a random delivery_tag
- sets delivery_tag to a random uuid.UUID
- sets the exchange and routing_key info as delivery_info
Internally uses :meth:`_put` to send the message synchronously. This
Expand All @@ -1094,7 +1089,7 @@ def basic_publish(self, message, exchange, routing_key, **kwargs):
props = message['properties']
props.update(
body_encoding=body_encoding,
delivery_tag=random.randint(1, sys.maxint),
delivery_tag=uuid.uuid4(),
)
props['delivery_info'].update(
exchange=exchange,
Expand Down

0 comments on commit 7d6af48

Please sign in to comment.