Skip to content

Commit

Permalink
[ServiceBus] Replace get_*_deadletter_receiver functions with a sub_q…
Browse files Browse the repository at this point in the history
…ueue parameter (#13552)

* Remove get_*_deadletter_receiver functions and add a sub_queue parameter to get_*_receiver functions taking enum SubQueue to specify the target subqueue.
Adjusts tests, docs accordingly.

Co-authored-by: Adam Ling (MSFT) <adam_ling@outlook.com>
  • Loading branch information
KieranBrantnerMagee and yunhaoling committed Sep 9, 2020
1 parent f5d7cc4 commit 90190a5
Show file tree
Hide file tree
Showing 15 changed files with 148 additions and 308 deletions.
1 change: 1 addition & 0 deletions sdk/servicebus/azure-servicebus/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
* Remove `is_anonymous_accessible` from management entities.
* Remove `support_ordering` from `create_queue` and `QueueProperties`
* Remove `enable_subscription_partitioning` from `create_topic` and `TopicProperties`
* `get_dead_letter_[queue,subscription]_receiver()` has been removed. To connect to a dead letter queue, utilize the `sub_queue` parameter of `get_[queue,subscription]_receiver()` provided with a value from the `SubQueue` enum

## 7.0.0b5 (2020-08-10)

Expand Down
2 changes: 1 addition & 1 deletion sdk/servicebus/azure-servicebus/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -282,7 +282,7 @@ with ServiceBusClient.from_connection_string(connstr) as client:

#### [DeadLetter][deadletter_reference]

Transfer the message from the primary queue into a special "dead-letter sub-queue" where it can be accessed using the `ServiceBusClient.get_<queue|subscription>_deadletter_receiver` function and consumed from like any other receiver. (see sample [here](./samples/sync_samples/receive_deadlettered_messages.py))
Transfer the message from the primary queue into a special "dead-letter sub-queue" where it can be accessed using the `ServiceBusClient.get_<queue|subscription>_receiver` function with parameter `sub_queue=SubQueue.DeadLetter` and consumed from like any other receiver. (see sample [here](./samples/sync_samples/receive_deadlettered_messages.py))

```Python
from azure.servicebus import ServiceBusClient
Expand Down
4 changes: 2 additions & 2 deletions sdk/servicebus/azure-servicebus/azure/servicebus/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
from ._servicebus_session import ServiceBusSession
from ._base_handler import ServiceBusSharedKeyCredential
from ._common.message import Message, BatchMessage, PeekMessage, ReceivedMessage
from ._common.constants import ReceiveSettleMode, NEXT_AVAILABLE
from ._common.constants import ReceiveSettleMode, SubQueue
from ._common.auto_lock_renewer import AutoLockRenew

TransportType = constants.TransportType
Expand All @@ -26,7 +26,7 @@
'PeekMessage',
'ReceivedMessage',
'ReceiveSettleMode',
'NEXT_AVAILABLE',
'SubQueue',
'ServiceBusClient',
'ServiceBusReceiver',
'ServiceBusSessionReceiver',
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,11 @@ class SessionFilter(Enum):
NextAvailable = 0


class SubQueue(Enum):
DeadLetter = 1
TransferDeadLetter = 2


ANNOTATION_SYMBOL_PARTITION_KEY = types.AMQPSymbol(_X_OPT_PARTITION_KEY)
ANNOTATION_SYMBOL_VIA_PARTITION_KEY = types.AMQPSymbol(_X_OPT_VIA_PARTITION_KEY)
ANNOTATION_SYMBOL_SCHEDULED_ENQUEUE_TIME = types.AMQPSymbol(_X_OPT_SCHEDULED_ENQUEUE_TIME)
Expand Down
186 changes: 45 additions & 141 deletions sdk/servicebus/azure-servicebus/azure/servicebus/_servicebus_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
from ._servicebus_session_receiver import ServiceBusSessionReceiver
from ._common._configuration import Configuration
from ._common.utils import create_authentication, generate_dead_letter_entity_name
from ._common.constants import SubQueue

if TYPE_CHECKING:
from azure.core.credentials import TokenCredential
Expand Down Expand Up @@ -192,6 +193,9 @@ def get_queue_receiver(self, queue_name, **kwargs):
"""Get ServiceBusReceiver for the specific queue.
:param str queue_name: The path of specific Service Bus Queue the client connects to.
:keyword Optional[SubQueue] sub_queue: If specified, the subqueue this receiver will connect to.
This includes the DeadLetter and TransferDeadLetter queues, holds messages that can't be delivered to any
receiver or messages that can't be processed. The default is None, meaning connect to the primary queue.
:keyword mode: The mode with which messages will be retrieved from the entity. The two options
are PeekLock and ReceiveAndDelete. Messages received with PeekLock must be settled within a given
lock period before they will be removed from the queue. Messages received with ReceiveAndDelete
Expand Down Expand Up @@ -222,10 +226,16 @@ def get_queue_receiver(self, queue_name, **kwargs):
"""
sub_queue = kwargs.get('sub_queue', None)
if sub_queue and sub_queue in SubQueue:
queue_name = generate_dead_letter_entity_name(
queue_name=queue_name,
transfer_deadletter=(sub_queue == SubQueue.TransferDeadLetter)
)
# pylint: disable=protected-access
handler = ServiceBusReceiver(
fully_qualified_namespace=self.fully_qualified_namespace,
queue_name=queue_name,
entity_name=queue_name,
credential=self._credential,
logging_enable=self._config.logging_enable,
transport_type=self._config.transport_type,
Expand All @@ -237,69 +247,6 @@ def get_queue_receiver(self, queue_name, **kwargs):
self._handlers.append(handler)
return handler

def get_queue_deadletter_receiver(self, queue_name, **kwargs):
# type: (str, Any) -> ServiceBusReceiver
"""Get ServiceBusReceiver for the dead-letter queue which is the secondary subqueue provided by
the specific Queue, it holds messages that can't be delivered to any receiver or messages that can't
be processed.
:param str queue_name: The path of specific Service Bus Queue the client connects to.
:keyword mode: The mode with which messages will be retrieved from the entity. The two options
are PeekLock and ReceiveAndDelete. Messages received with PeekLock must be settled within a given
lock period before they will be removed from the queue. Messages received with ReceiveAndDelete
will be immediately removed from the queue, and cannot be subsequently rejected or re-received if
the client fails to process the message. The default mode is PeekLock.
:paramtype mode: ~azure.servicebus.ReceiveSettleMode
:keyword float max_wait_time: The timeout in seconds between received messages after which the receiver will
automatically stop receiving. The default value is 0, meaning no timeout.
:keyword int retry_total: The total number of attempts to redo a failed operation when an error occurs.
Default value is 3.
:keyword float retry_backoff_factor: Delta back-off internal in the unit of second between retries.
Default value is 0.8.
:keyword float retry_backoff_max: Maximum back-off interval in the unit of second. Default value is 120.
:keyword bool transfer_deadletter: Whether to connect to the transfer dead-letter queue, or the standard
dead-letter queue. The transfer dead-letter queue holds messages that have failed to be transferred in
ForwardTo or SendVia scenarios. Default is False, using the standard dead-letter endpoint.
:keyword int prefetch: The maximum number of messages to cache with each request to the service.
This setting is only for advanced performance tuning. Increasing this value will improve message throughput
performance but increase the chance that messages will expire while they are cached if they're not
processed fast enough.
The default value is 0, meaning messages will be received from the service and processed one at a time.
In the case of prefetch being 0, `ServiceBusReceiver.receive` would try to cache `max_batch_size` (if provided)
within its request to the service.
:rtype: ~azure.servicebus.ServiceBusReceiver
.. admonition:: Example:
.. literalinclude:: ../samples/sync_samples/sample_code_servicebus.py
:start-after: [START create_queue_deadletter_receiver_from_sb_client_sync]
:end-before: [END create_queue_deadletter_receiver_from_sb_client_sync]
:language: python
:dedent: 4
:caption: Create a new instance of the ServiceBusReceiver for Dead Letter Queue from ServiceBusClient.
"""
# pylint: disable=protected-access
entity_name = generate_dead_letter_entity_name(
queue_name=queue_name,
transfer_deadletter=kwargs.get('transfer_deadletter', False)
)
handler = ServiceBusReceiver(
fully_qualified_namespace=self.fully_qualified_namespace,
entity_name=entity_name,
credential=self._credential,
logging_enable=self._config.logging_enable,
transport_type=self._config.transport_type,
http_proxy=self._config.http_proxy,
connection=self._connection,
is_dead_letter_receiver=True,
user_agent=self._config.user_agent,
**kwargs
)
self._handlers.append(handler)
return handler

def get_topic_sender(self, topic_name, **kwargs):
# type: (str, Any) -> ServiceBusSender
"""Get ServiceBusSender for the specific topic.
Expand Down Expand Up @@ -343,6 +290,9 @@ def get_subscription_receiver(self, topic_name, subscription_name, **kwargs):
:param str topic_name: The name of specific Service Bus Topic the client connects to.
:param str subscription_name: The name of specific Service Bus Subscription
under the given Service Bus Topic.
:keyword Optional[SubQueue] sub_queue: If specified, the subqueue this receiver will connect to.
This includes the DeadLetter and TransferDeadLetter queues, holds messages that can't be delivered to any
receiver or messages that can't be processed. The default is None, meaning connect to the primary queue.
:keyword mode: The mode with which messages will be retrieved from the entity. The two options
are PeekLock and ReceiveAndDelete. Messages received with PeekLock must be settled within a given
lock period before they will be removed from the subscription. Messages received with ReceiveAndDelete
Expand Down Expand Up @@ -377,83 +327,37 @@ def get_subscription_receiver(self, topic_name, subscription_name, **kwargs):
"""
# pylint: disable=protected-access
handler = ServiceBusReceiver(
fully_qualified_namespace=self.fully_qualified_namespace,
topic_name=topic_name,
subscription_name=subscription_name,
credential=self._credential,
logging_enable=self._config.logging_enable,
transport_type=self._config.transport_type,
http_proxy=self._config.http_proxy,
connection=self._connection,
user_agent=self._config.user_agent,
**kwargs
)
self._handlers.append(handler)
return handler

def get_subscription_deadletter_receiver(self, topic_name, subscription_name, **kwargs):
# type: (str, str, Any) -> ServiceBusReceiver
"""Get ServiceBusReceiver for the dead-letter queue which is the secondary subqueue provided by
the specific topic subscription, it holds messages that can't be delivered to any receiver or messages that
can't be processed.
:param str topic_name: The name of specific Service Bus Topic the client connects to.
:param str subscription_name: The name of specific Service Bus Subscription
under the given Service Bus Topic.
:keyword mode: The mode with which messages will be retrieved from the entity. The two options
are PeekLock and ReceiveAndDelete. Messages received with PeekLock must be settled within a given
lock period before they will be removed from the subscription. Messages received with ReceiveAndDelete
will be immediately removed from the subscription, and cannot be subsequently rejected or re-received if
the client fails to process the message. The default mode is PeekLock.
:paramtype mode: ~azure.servicebus.ReceiveSettleMode
:keyword float max_wait_time: The timeout in seconds between received messages after which the receiver will
automatically stop receiving. The default value is 0, meaning no timeout.
:keyword int retry_total: The total number of attempts to redo a failed operation when an error occurs.
Default value is 3.
:keyword float retry_backoff_factor: Delta back-off internal in the unit of second between retries.
Default value is 0.8.
:keyword float retry_backoff_max: Maximum back-off interval in the unit of second. Default value is 120.
:keyword bool transfer_deadletter: Whether to connect to the transfer dead-letter queue, or the standard
dead-letter queue. The transfer dead letter queue holds messages that have failed to be transferred in
ForwardTo or SendVia scenarios. Default is False, using the standard dead-letter endpoint.
:keyword int prefetch: The maximum number of messages to cache with each request to the service.
This setting is only for advanced performance tuning. Increasing this value will improve message throughput
performance but increase the chance that messages will expire while they are cached if they're not
processed fast enough.
The default value is 0, meaning messages will be received from the service and processed one at a time.
In the case of prefetch being 0, `ServiceBusReceiver.receive` would try to cache `max_batch_size` (if provided)
within its request to the service.
:rtype: ~azure.servicebus.ServiceBusReceiver
.. admonition:: Example:
.. literalinclude:: ../samples/sync_samples/sample_code_servicebus.py
:start-after: [START create_subscription_deadletter_receiver_from_sb_client_sync]
:end-before: [END create_subscription_deadletter_receiver_from_sb_client_sync]
:language: python
:dedent: 4
:caption: Create a new instance of the ServiceBusReceiver for Dead Letter Queue from ServiceBusClient.
"""
entity_name = generate_dead_letter_entity_name(
topic_name=topic_name,
subscription_name=subscription_name,
transfer_deadletter=kwargs.get('transfer_deadletter', False)
)
handler = ServiceBusReceiver(
fully_qualified_namespace=self.fully_qualified_namespace,
entity_name=entity_name,
credential=self._credential,
logging_enable=self._config.logging_enable,
transport_type=self._config.transport_type,
http_proxy=self._config.http_proxy,
connection=self._connection,
is_dead_letter_receiver=True,
user_agent=self._config.user_agent,
**kwargs
)
sub_queue = kwargs.get('sub_queue', None)
if sub_queue and sub_queue in SubQueue:
entity_name = generate_dead_letter_entity_name(
topic_name=topic_name,
subscription_name=subscription_name,
transfer_deadletter=(sub_queue == SubQueue.TransferDeadLetter)
)
handler = ServiceBusReceiver(
fully_qualified_namespace=self.fully_qualified_namespace,
entity_name=entity_name,
credential=self._credential,
logging_enable=self._config.logging_enable,
transport_type=self._config.transport_type,
http_proxy=self._config.http_proxy,
connection=self._connection,
user_agent=self._config.user_agent,
**kwargs
)
else:
handler = ServiceBusReceiver(
fully_qualified_namespace=self.fully_qualified_namespace,
topic_name=topic_name,
subscription_name=subscription_name,
credential=self._credential,
logging_enable=self._config.logging_enable,
transport_type=self._config.transport_type,
http_proxy=self._config.http_proxy,
connection=self._connection,
user_agent=self._config.user_agent,
**kwargs
)
self._handlers.append(handler)
return handler

Expand Down

0 comments on commit 90190a5

Please sign in to comment.