Skip to content

Commit

Permalink
[ServiceBus] Consistency review changes as detailed in issue #12415. (#…
Browse files Browse the repository at this point in the history
…13160)

* Consistency review changes as detailed in issue #12415.
* significant amount of renames, parameter removal, mgmt shim class building, and a few added capabilities in terms of renew_lock retval and receive_deferred param acceptance.
* Update mgmt test recordings

Co-authored-by: Adam Ling (MSFT) <adam_ling@outlook.com>
  • Loading branch information
KieranBrantnerMagee and yunhaoling committed Sep 9, 2020
1 parent 0c5ecad commit 6a5c123
Show file tree
Hide file tree
Showing 127 changed files with 6,324 additions and 3,544 deletions.
14 changes: 14 additions & 0 deletions sdk/servicebus/azure-servicebus/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,25 @@
## 7.0.0b6 (Unreleased)

**New Features**

* `renew_lock()` now returns the UTC datetime that the lock is set to expire at.
* `receive_deferred_messages()` can now take a single sequence number as well as a list of sequence numbers.
* Messages can now be sent twice in succession.
* Internal AMQP message properties (header, footer, annotations, properties, etc) are now exposed via `Message.amqp_message`

**Breaking Changes**

* Renamed `prefetch` to `prefetch_count`.
* Renamed `ReceiveSettleMode` enum to `ReceiveMode`, and respectively the `mode` parameter to `receive_mode`.
* `retry_total`, `retry_backoff_factor` and `retry_backoff_max` are now defined at the `ServiceBusClient` level and inherited by senders and receivers created from it.
* No longer export `NEXT_AVAILABLE` in `azure.servicebus` module. A null `session_id` will suffice.
* Renamed parameter `message_count` to `max_message_count` as fewer messages may be present for method `peek_messages()` and `receive_messages()`.
* Renamed `PeekMessage` to `PeekedMessage`.
* Renamed `get_session_state()` and `set_session_state()` to `get_state()` and `set_state()` accordingly.
* Renamed parameter `description` to `error_description` for method `dead_letter()`.
* Renamed properties `created_time` and `modified_time` to `created_at_utc` and `modified_at_utc` within `AuthorizationRule` and `NamespaceProperties`.
* Removed parameter `requires_preprocessing` from `SqlRuleFilter` and `SqlRuleAction`.
* Removed property `namespace_type` from `NamespaceProperties`.
* Rename `ServiceBusManagementClient` to `ServiceBusAdministrationClient`
* Attempting to call `send_messages` on something not a `Message`, `BatchMessage`, or list of `Message`s, will now throw a `TypeError` instead of `ValueError`
* Sending a message twice will no longer result in a MessageAlreadySettled exception.
Expand Down
10 changes: 5 additions & 5 deletions sdk/servicebus/azure-servicebus/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -171,13 +171,13 @@ with ServiceBusClient.from_connection_string(connstr) as client:
if received_message_array:
print(str(received_message_array[0]))

with client.get_queue_receiver(queue_name, prefetch=5) as receiver:
received_message_array = receiver.receive_messages(max_batch_size=5, max_wait_time=10) # try to receive maximum 5 messages in a batch within 10 seconds
with client.get_queue_receiver(queue_name) as receiver:
received_message_array = receiver.receive_messages(max_message_count=5, max_wait_time=10) # try to receive maximum 5 messages in a batch within 10 seconds
for message in received_message_array:
print(str(message))
```

In this example, max_batch_size (and prefetch, as required by max_batch_size) declares the maximum number of messages to attempt receiving before hitting a max_wait_time as specified in seconds.
In this example, max_message_count declares the maximum number of messages to attempt receiving before hitting a max_wait_time as specified in seconds.

> **NOTE:** It should also be noted that `ServiceBusReceiver.peek_messages()` is subtly different than receiving, as it does not lock the messages being peeked, and thus they cannot be settled.
Expand Down Expand Up @@ -235,8 +235,8 @@ with ServiceBusClient.from_connection_string(connstr) as client:

When receiving from a queue, you have multiple actions you can take on the messages you receive.

> **NOTE**: You can only settle `ReceivedMessage` objects which are received in `ReceiveSettleMode.PeekLock` mode (this is the default).
> `ReceiveSettleMode.ReceiveAndDelete` mode removes the message from the queue on receipt. `PeekMessage` messages
> **NOTE**: You can only settle `ReceivedMessage` objects which are received in `ReceiveMode.PeekLock` mode (this is the default).
> `ReceiveMode.ReceiveAndDelete` mode removes the message from the queue on receipt. `PeekedMessage` messages
> returned from `peek()` cannot be settled, as the message lock is not taken like it is in the aforementioned receive methods. Sessionful messages have a similar limitation.
If the message has a lock as mentioned above, settlement will fail if the message lock has expired.
Expand Down
8 changes: 4 additions & 4 deletions sdk/servicebus/azure-servicebus/azure/servicebus/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,19 +14,19 @@
from ._servicebus_session_receiver import ServiceBusSessionReceiver
from ._servicebus_session import ServiceBusSession
from ._base_handler import ServiceBusSharedKeyCredential
from ._common.message import Message, BatchMessage, PeekMessage, ReceivedMessage
from ._common.constants import ReceiveSettleMode, SubQueue
from ._common.message import Message, BatchMessage, PeekedMessage, ReceivedMessage
from ._common.constants import ReceiveMode, SubQueue
from ._common.auto_lock_renewer import AutoLockRenew

TransportType = constants.TransportType

__all__ = [
'Message',
'BatchMessage',
'PeekMessage',
'PeekedMessage',
'ReceivedMessage',
'ReceiveSettleMode',
'SubQueue',
'ReceiveMode',
'ServiceBusClient',
'ServiceBusReceiver',
'ServiceBusSessionReceiver',
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,16 +62,16 @@
MGMT_REQUEST_SEQUENCE_NUMBERS = 'sequence-numbers'
MGMT_REQUEST_RECEIVER_SETTLE_MODE = 'receiver-settle-mode'
MGMT_REQUEST_FROM_SEQUENCE_NUMBER = 'from-sequence-number'
MGMT_REQUEST_MESSAGE_COUNT = 'message-count'
MGMT_REQUEST_MAX_MESSAGE_COUNT = 'message-count'
MGMT_REQUEST_MESSAGE = 'message'
MGMT_REQUEST_MESSAGES = 'messages'
MGMT_REQUEST_MESSAGE_ID = 'message-id'
MGMT_REQUEST_PARTITION_KEY = 'partition-key'
MGMT_REQUEST_VIA_PARTITION_KEY = 'via-partition-key'
MGMT_REQUEST_DEAD_LETTER_REASON = 'deadletter-reason'
MGMT_REQUEST_DEAD_LETTER_DESCRIPTION = 'deadletter-description'
MGMT_REQUEST_DEAD_LETTER_ERROR_DESCRIPTION = 'deadletter-description'
RECEIVER_LINK_DEAD_LETTER_REASON = 'DeadLetterReason'
RECEIVER_LINK_DEAD_LETTER_DESCRIPTION = 'DeadLetterErrorDescription'
RECEIVER_LINK_DEAD_LETTER_ERROR_DESCRIPTION = 'DeadLetterErrorDescription'
MGMT_REQUEST_OP_TYPE_ENTITY_MGMT = b"entity-mgmt"

MESSAGE_COMPLETE = 'complete'
Expand Down Expand Up @@ -106,7 +106,7 @@
TRANSFER_DEAD_LETTER_QUEUE_SUFFIX = '/$Transfer' + DEAD_LETTER_QUEUE_SUFFIX


class ReceiveSettleMode(Enum):
class ReceiveMode(Enum):
PeekLock = constants.ReceiverSettleMode.PeekLock
ReceiveAndDelete = constants.ReceiverSettleMode.ReceiveAndDelete

Expand Down
53 changes: 29 additions & 24 deletions sdk/servicebus/azure-servicebus/azure/servicebus/_common/message.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
SETTLEMENT_COMPLETE,
SETTLEMENT_DEFER,
SETTLEMENT_DEADLETTER,
ReceiveSettleMode,
ReceiveMode,
_X_OPT_ENQUEUED_TIME,
_X_OPT_SEQUENCE_NUMBER,
_X_OPT_ENQUEUE_SEQUENCE_NUMBER,
Expand All @@ -32,9 +32,9 @@
_X_OPT_DEAD_LETTER_SOURCE,
MGMT_RESPONSE_MESSAGE_EXPIRATION,
MGMT_REQUEST_DEAD_LETTER_REASON,
MGMT_REQUEST_DEAD_LETTER_DESCRIPTION,
MGMT_REQUEST_DEAD_LETTER_ERROR_DESCRIPTION,
RECEIVER_LINK_DEAD_LETTER_REASON,
RECEIVER_LINK_DEAD_LETTER_DESCRIPTION,
RECEIVER_LINK_DEAD_LETTER_ERROR_DESCRIPTION,
MESSAGE_COMPLETE,
MESSAGE_DEAD_LETTER,
MESSAGE_ABANDON,
Expand Down Expand Up @@ -577,7 +577,7 @@ def add(self, message):
self._messages.append(message)


class PeekMessage(Message):
class PeekedMessage(Message):
"""A preview message.
This message is still on the queue, and unlocked.
Expand All @@ -587,7 +587,7 @@ class PeekMessage(Message):

def __init__(self, message):
# type: (uamqp.message.Message) -> None
super(PeekMessage, self).__init__(None, message=message) # type: ignore
super(PeekedMessage, self).__init__(None, message=message) # type: ignore

def _to_outgoing_message(self):
# type: () -> Message
Expand Down Expand Up @@ -736,7 +736,7 @@ def sequence_number(self):
return None


class ReceivedMessageBase(PeekMessage):
class ReceivedMessageBase(PeekedMessage):
"""
A Service Bus Message received from service side.
Expand All @@ -753,10 +753,10 @@ class ReceivedMessageBase(PeekMessage):
:caption: Checking the properties on a received message.
"""

def __init__(self, message, mode=ReceiveSettleMode.PeekLock, **kwargs):
# type: (uamqp.message.Message, ReceiveSettleMode, Any) -> None
def __init__(self, message, receive_mode=ReceiveMode.PeekLock, **kwargs):
# type: (uamqp.message.Message, ReceiveMode, Any) -> None
super(ReceivedMessageBase, self).__init__(message=message)
self._settled = (mode == ReceiveSettleMode.ReceiveAndDelete)
self._settled = (receive_mode == ReceiveMode.ReceiveAndDelete)
self._received_timestamp_utc = utc_now()
self._is_deferred_message = kwargs.get("is_deferred_message", False)
self.auto_renew_error = None # type: Optional[Exception]
Expand All @@ -765,7 +765,7 @@ def __init__(self, message, mode=ReceiveSettleMode.PeekLock, **kwargs):
except KeyError:
raise TypeError("ReceivedMessage requires a receiver to be initialized. This class should never be" + \
"initialized by a user; the Message class should be utilized instead.")
self._expiry = None
self._expiry = None # type: Optional[datetime.datetime]

def _check_live(self, action):
# pylint: disable=no-member
Expand All @@ -784,7 +784,7 @@ def _check_live(self, action):
except AttributeError:
pass

def _settle_via_mgmt_link(self, settle_operation, dead_letter_reason=None, dead_letter_description=None):
def _settle_via_mgmt_link(self, settle_operation, dead_letter_reason=None, dead_letter_error_description=None):
# type: (str, Optional[str], Optional[str]) -> Callable
# pylint: disable=protected-access

Expand All @@ -807,7 +807,7 @@ def _settle_via_mgmt_link(self, settle_operation, dead_letter_reason=None, dead_
[self.lock_token],
dead_letter_details={
MGMT_REQUEST_DEAD_LETTER_REASON: dead_letter_reason or "",
MGMT_REQUEST_DEAD_LETTER_DESCRIPTION: dead_letter_description or ""
MGMT_REQUEST_DEAD_LETTER_ERROR_DESCRIPTION: dead_letter_error_description or ""
}
)
if settle_operation == MESSAGE_DEFER:
Expand All @@ -818,7 +818,7 @@ def _settle_via_mgmt_link(self, settle_operation, dead_letter_reason=None, dead_
)
raise ValueError("Unsupported settle operation type: {}".format(settle_operation))

def _settle_via_receiver_link(self, settle_operation, dead_letter_reason=None, dead_letter_description=None):
def _settle_via_receiver_link(self, settle_operation, dead_letter_reason=None, dead_letter_error_description=None):
# type: (str, Optional[str], Optional[str]) -> Callable
if settle_operation == MESSAGE_COMPLETE:
return functools.partial(self.message.accept)
Expand All @@ -828,10 +828,10 @@ def _settle_via_receiver_link(self, settle_operation, dead_letter_reason=None, d
return functools.partial(
self.message.reject,
condition=DEADLETTERNAME,
description=dead_letter_description,
description=dead_letter_error_description,
info={
RECEIVER_LINK_DEAD_LETTER_REASON: dead_letter_reason,
RECEIVER_LINK_DEAD_LETTER_DESCRIPTION: dead_letter_description
RECEIVER_LINK_DEAD_LETTER_ERROR_DESCRIPTION: dead_letter_error_description
}
)
if settle_operation == MESSAGE_DEFER:
Expand Down Expand Up @@ -905,15 +905,15 @@ def _settle_message(
self,
settle_operation,
dead_letter_reason=None,
dead_letter_description=None,
dead_letter_error_description=None,
):
# type: (str, Optional[str], Optional[str]) -> None
try:
if not self._is_deferred_message:
try:
self._settle_via_receiver_link(settle_operation,
dead_letter_reason=dead_letter_reason,
dead_letter_description=dead_letter_description)()
dead_letter_error_description=dead_letter_error_description)()
return
except RuntimeError as exception:
_LOGGER.info(
Expand All @@ -924,7 +924,7 @@ def _settle_message(
)
self._settle_via_mgmt_link(settle_operation,
dead_letter_reason=dead_letter_reason,
dead_letter_description=dead_letter_description)()
dead_letter_error_description=dead_letter_error_description)()
except Exception as e:
raise MessageSettleFailed(settle_operation, e)

Expand Down Expand Up @@ -955,7 +955,7 @@ def complete(self):
self._settle_message(MESSAGE_COMPLETE)
self._settled = True

def dead_letter(self, reason=None, description=None):
def dead_letter(self, reason=None, error_description=None):
# type: (Optional[str], Optional[str]) -> None
"""Move the message to the Dead Letter queue.
Expand All @@ -964,7 +964,7 @@ def dead_letter(self, reason=None, description=None):
or processing. The queue can also be configured to send expired messages to the Dead Letter queue.
:param str reason: The reason for dead-lettering the message.
:param str description: The detailed description for dead-lettering the message.
:param str error_description: The detailed error description for dead-lettering the message.
:rtype: None
:raises: ~azure.servicebus.exceptions.MessageAlreadySettled if the message has been settled.
:raises: ~azure.servicebus.exceptions.MessageLockExpired if message lock has already expired.
Expand All @@ -983,7 +983,9 @@ def dead_letter(self, reason=None, description=None):
"""
# pylint: disable=protected-access
self._check_live(MESSAGE_DEAD_LETTER)
self._settle_message(MESSAGE_DEAD_LETTER, dead_letter_reason=reason, dead_letter_description=description)
self._settle_message(MESSAGE_DEAD_LETTER,
dead_letter_reason=reason,
dead_letter_error_description=error_description)
self._settled = True

def abandon(self):
Expand Down Expand Up @@ -1041,7 +1043,7 @@ def defer(self):
self._settled = True

def renew_lock(self):
# type: () -> None
# type: () -> datetime.datetime
# pylint: disable=protected-access,no-member
"""Renew the message lock.
Expand All @@ -1057,7 +1059,8 @@ def renew_lock(self):
Lock renewal can be performed as a background task by registering the message with an
`azure.servicebus.AutoLockRenew` instance.
:rtype: None
:returns: The utc datetime the lock is set to expire at.
:rtype: datetime.datetime
:raises: TypeError if the message is sessionful.
:raises: ~azure.servicebus.exceptions.MessageLockExpired is message lock has already expired.
:raises: ~azure.servicebus.exceptions.MessageAlreadySettled is message has already been settled.
Expand All @@ -1073,7 +1076,9 @@ def renew_lock(self):
raise ValueError("Unable to renew lock - no lock token found.")

expiry = self._receiver._renew_locks(token) # type: ignore
self._expiry = utc_from_timestamp(expiry[MGMT_RESPONSE_MESSAGE_EXPIRATION][0]/1000.0)
self._expiry = utc_from_timestamp(expiry[MGMT_RESPONSE_MESSAGE_EXPIRATION][0]/1000.0) # type: datetime.datetime

return self._expiry


class AMQPMessage(object):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,9 @@

import uamqp

from .message import PeekMessage, ReceivedMessage
from .message import PeekedMessage, ReceivedMessage
from ..exceptions import ServiceBusError, MessageLockExpired
from .constants import ReceiveSettleMode
from .constants import ReceiveMode


def default(status_code, message, description):
Expand All @@ -34,7 +34,7 @@ def peek_op(status_code, message, description):
parsed = []
for m in message.get_data()[b'messages']:
wrapped = uamqp.Message.decode_from_bytes(bytearray(m[b'message']))
parsed.append(PeekMessage(wrapped))
parsed.append(PeekedMessage(wrapped))
return parsed
if status_code in [202, 204]:
return []
Expand Down Expand Up @@ -63,14 +63,14 @@ def deferred_message_op(
message,
description,
receiver,
mode=ReceiveSettleMode.PeekLock,
receive_mode=ReceiveMode.PeekLock,
message_type=ReceivedMessage
):
if status_code == 200:
parsed = []
for m in message.get_data()[b'messages']:
wrapped = uamqp.Message.decode_from_bytes(bytearray(m[b'message']))
parsed.append(message_type(wrapped, mode, is_deferred_message=True, receiver=receiver))
parsed.append(message_type(wrapped, receive_mode, is_deferred_message=True, receiver=receiver))
return parsed
if status_code in [202, 204]:
return []
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
SESSION_LOCKED_UNTIL,
DATETIMEOFFSET_EPOCH,
MGMT_REQUEST_SESSION_ID,
ReceiveSettleMode
ReceiveMode
)
from ..exceptions import (
_ServiceBusErrorPolicy,
Expand All @@ -32,26 +32,26 @@ def _populate_attributes(self, **kwargs):

self._auth_uri = "sb://{}/{}".format(self.fully_qualified_namespace, self.entity_path)
self._entity_uri = "amqps://{}/{}".format(self.fully_qualified_namespace, self.entity_path)
self._mode = kwargs.get("mode", ReceiveSettleMode.PeekLock)
self._receive_mode = kwargs.get("receive_mode", ReceiveMode.PeekLock)
self._error_policy = _ServiceBusErrorPolicy(
max_retries=self._config.retry_total
)
self._name = "SBReceiver-{}".format(uuid.uuid4())
self._last_received_sequenced_number = None
self._message_iter = None
self._connection = kwargs.get("connection")
prefetch = kwargs.get("prefetch", 0)
if int(prefetch) < 0 or int(prefetch) > 50000:
raise ValueError("Prefetch must be an integer between 0 and 50000 inclusive.")
self._prefetch = prefetch + 1
prefetch_count = kwargs.get("prefetch_count", 0)
if int(prefetch_count) < 0 or int(prefetch_count) > 50000:
raise ValueError("prefetch_count must be an integer between 0 and 50000 inclusive.")
self._prefetch_count = prefetch_count + 1
# The relationship between the amount can be received and the time interval is linear: amount ~= perf * interval
# In large max_batch_size case, like 5000, the pull receive would always return hundreds of messages limited by
# the perf and time.
# In large max_message_count case, like 5000, the pull receive would always return hundreds of messages limited
# by the perf and time.
self._further_pull_receive_timeout_ms = 200
self._max_wait_time = kwargs.get("max_wait_time", None)

def _build_message(self, received, message_type=ReceivedMessage):
message = message_type(message=received, mode=self._mode, receiver=self)
message = message_type(message=received, receive_mode=self._receive_mode, receiver=self)
self._last_received_sequenced_number = message.sequence_number
return message

Expand Down

0 comments on commit 6a5c123

Please sign in to comment.