Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 4 additions & 19 deletions sdk/eventhub/azure-eventhub/azure/eventhub/_pyamqp/_platform.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,25 +83,10 @@ def _versionatom(s):
KNOWN_TCP_OPTS.remove("TCP_MAXSEG")
KNOWN_TCP_OPTS.remove("TCP_USER_TIMEOUT")

if sys.version_info < (2, 7, 7): # pragma: no cover
import functools

def _to_bytes_arg(fun):
@functools.wraps(fun)
def _inner(s, *args, **kwargs):
return fun(s.encode(), *args, **kwargs)

return _inner

pack = _to_bytes_arg(struct.pack)
pack_into = _to_bytes_arg(struct.pack_into)
unpack = _to_bytes_arg(struct.unpack)
unpack_from = _to_bytes_arg(struct.unpack_from)
else:
pack = struct.pack
pack_into = struct.pack_into
unpack = struct.unpack
unpack_from = struct.unpack_from
pack = struct.pack
pack_into = struct.pack_into
unpack = struct.unpack
unpack_from = struct.unpack_from

__all__ = [
"LINUX_VERSION",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

import logging
from datetime import datetime
from uuid import uuid4
from typing import Any, Optional, Union

from ..utils import utc_now, utc_from_timestamp
Expand Down Expand Up @@ -72,7 +73,7 @@ async def _put_token(
) -> None:
message = Message( # type: ignore # TODO: missing positional args header, etc.
value=token,
properties=Properties(message_id=self._mgmt_link.next_message_id), # type: ignore
properties=Properties(message_id=uuid4()), # type: ignore
application_properties={
CBS_NAME: audience,
CBS_OPERATION: CBS_PUT_TOKEN,
Expand All @@ -87,7 +88,6 @@ async def _put_token(
operation=CBS_PUT_TOKEN,
type=token_type,
)
self._mgmt_link.next_message_id += 1

async def _on_amqp_management_open_complete(self, management_open_result: ManagementOpenResult) -> None:
if self.state in (CbsState.CLOSED, CbsState.ERROR):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ class Connection: # pylint:disable=too-many-instance-attributes
and 1 for transport type AmqpOverWebsocket.
"""

def __init__( # pylint:disable=too-many-locals
def __init__( # pylint:disable=too-many-locals
self,
endpoint: str,
*,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

import time
import logging
from uuid import uuid4
from functools import partial
from typing import Optional, Union

Expand Down Expand Up @@ -35,7 +36,6 @@ class ManagementLink(object): # pylint:disable=too-many-instance-attributes
"""

def __init__(self, session, endpoint, **kwargs):
self.next_message_id = 0
self.state = ManagementLinkState.IDLE
self._pending_operations = []
self._session = session
Expand Down Expand Up @@ -228,17 +228,16 @@ async def execute_operation(
message.application_properties["locales"] = locales
try:
# TODO: namedtuple is immutable, which may push us to re-think about the namedtuple approach for Message
new_properties = message.properties._replace(message_id=self.next_message_id)
new_properties = message.properties._replace(message_id=uuid4())
except AttributeError:
new_properties = Properties(message_id=self.next_message_id)
new_properties = Properties(message_id=uuid4())
message = message._replace(properties=new_properties)
expire_time = (time.time() + timeout) if timeout else None
message_delivery = _MessageDelivery(message, MessageDeliveryState.WaitingToBeSent, expire_time)

on_send_complete = partial(self._on_send_complete, message_delivery)

await self._request_link.send_transfer(message, on_send_complete=on_send_complete, timeout=timeout)
self.next_message_id += 1
self._pending_operations.append(PendingManagementOperation(message, on_execute_operation_complete))

async def close(self):
Expand Down
4 changes: 2 additions & 2 deletions sdk/eventhub/azure-eventhub/azure/eventhub/_pyamqp/cbs.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
# -------------------------------------------------------------------------

import logging
from uuid import uuid4
from datetime import datetime
from typing import Any, Optional, Tuple, Union

Expand Down Expand Up @@ -88,7 +89,7 @@ def __init__(
def _put_token(self, token: str, token_type: str, audience: str, expires_on: Optional[datetime] = None) -> None:
message = Message( # type: ignore # TODO: missing positional args header, etc.
value=token,
properties=Properties(message_id=self._mgmt_link.next_message_id), # type: ignore
properties=Properties(message_id=uuid4()), # type: ignore
application_properties={
CBS_NAME: audience,
CBS_OPERATION: CBS_PUT_TOKEN,
Expand All @@ -103,7 +104,6 @@ def _put_token(self, token: str, token_type: str, audience: str, expires_on: Opt
operation=CBS_PUT_TOKEN,
type=token_type,
)
self._mgmt_link.next_message_id += 1

def _on_amqp_management_open_complete(self, management_open_result: ManagementOpenResult) -> None:
if self.state in (CbsState.CLOSED, CbsState.ERROR):
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# --------------------------------------------------------------------------
# -------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for
# license information.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

import time
import logging
from uuid import uuid4
from functools import partial
from collections import namedtuple
from typing import Optional, Union
Expand Down Expand Up @@ -37,7 +38,6 @@ class ManagementLink(object): # pylint:disable=too-many-instance-attributes
"""

def __init__(self, session, endpoint, **kwargs):
self.next_message_id = 0
self.state = ManagementLinkState.IDLE
self._pending_operations = []
self._session = session
Expand Down Expand Up @@ -228,17 +228,16 @@ def execute_operation(
message.application_properties["locales"] = locales
try:
# TODO: namedtuple is immutable, which may push us to re-think about the namedtuple approach for Message
new_properties = message.properties._replace(message_id=self.next_message_id)
new_properties = message.properties._replace(message_id=uuid4())
except AttributeError:
new_properties = Properties(message_id=self.next_message_id)
new_properties = Properties(message_id=uuid4())
message = message._replace(properties=new_properties)
expire_time = (time.time() + timeout) if timeout else None
message_delivery = _MessageDelivery(message, MessageDeliveryState.WaitingToBeSent, expire_time)

on_send_complete = partial(self._on_send_complete, message_delivery)

self._request_link.send_transfer(message, on_send_complete=on_send_complete, timeout=timeout)
self.next_message_id += 1
self._pending_operations.append(PendingManagementOperation(message, on_execute_operation_complete))

def close(self):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

import logging
from datetime import datetime
from uuid import uuid4
from typing import Any, Optional, Union

from ..utils import utc_now, utc_from_timestamp
Expand Down Expand Up @@ -72,7 +73,7 @@ async def _put_token(
) -> None:
message = Message( # type: ignore # TODO: missing positional args header, etc.
value=token,
properties=Properties(message_id=self._mgmt_link.next_message_id), # type: ignore
properties=Properties(message_id=uuid4()), # type: ignore
application_properties={
CBS_NAME: audience,
CBS_OPERATION: CBS_PUT_TOKEN,
Expand All @@ -87,7 +88,6 @@ async def _put_token(
operation=CBS_PUT_TOKEN,
type=token_type,
)
self._mgmt_link.next_message_id += 1

async def _on_amqp_management_open_complete(self, management_open_result: ManagementOpenResult) -> None:
if self.state in (CbsState.CLOSED, CbsState.ERROR):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

import time
import logging
from uuid import uuid4
from functools import partial
from typing import Optional, Union

Expand Down Expand Up @@ -35,7 +36,6 @@ class ManagementLink(object): # pylint:disable=too-many-instance-attributes
"""

def __init__(self, session, endpoint, **kwargs):
self.next_message_id = 0
self.state = ManagementLinkState.IDLE
self._pending_operations = []
self._session = session
Expand Down Expand Up @@ -228,17 +228,16 @@ async def execute_operation(
message.application_properties["locales"] = locales
try:
# TODO: namedtuple is immutable, which may push us to re-think about the namedtuple approach for Message
new_properties = message.properties._replace(message_id=self.next_message_id)
new_properties = message.properties._replace(message_id=uuid4())
except AttributeError:
new_properties = Properties(message_id=self.next_message_id)
new_properties = Properties(message_id=uuid4())
message = message._replace(properties=new_properties)
expire_time = (time.time() + timeout) if timeout else None
message_delivery = _MessageDelivery(message, MessageDeliveryState.WaitingToBeSent, expire_time)

on_send_complete = partial(self._on_send_complete, message_delivery)

await self._request_link.send_transfer(message, on_send_complete=on_send_complete, timeout=timeout)
self.next_message_id += 1
self._pending_operations.append(PendingManagementOperation(message, on_execute_operation_complete))

async def close(self):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
# -------------------------------------------------------------------------

import logging
from uuid import uuid4
from datetime import datetime
from typing import Any, Optional, Tuple, Union

Expand Down Expand Up @@ -88,7 +89,7 @@ def __init__(
def _put_token(self, token: str, token_type: str, audience: str, expires_on: Optional[datetime] = None) -> None:
message = Message( # type: ignore # TODO: missing positional args header, etc.
value=token,
properties=Properties(message_id=self._mgmt_link.next_message_id), # type: ignore
properties=Properties(message_id=uuid4()), # type: ignore
application_properties={
CBS_NAME: audience,
CBS_OPERATION: CBS_PUT_TOKEN,
Expand All @@ -103,7 +104,6 @@ def _put_token(self, token: str, token_type: str, audience: str, expires_on: Opt
operation=CBS_PUT_TOKEN,
type=token_type,
)
self._mgmt_link.next_message_id += 1

def _on_amqp_management_open_complete(self, management_open_result: ManagementOpenResult) -> None:
if self.state in (CbsState.CLOSED, CbsState.ERROR):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

import time
import logging
from uuid import uuid4
from functools import partial
from collections import namedtuple
from typing import Optional, Union
Expand Down Expand Up @@ -37,7 +38,6 @@ class ManagementLink(object): # pylint:disable=too-many-instance-attributes
"""

def __init__(self, session, endpoint, **kwargs):
self.next_message_id = 0
self.state = ManagementLinkState.IDLE
self._pending_operations = []
self._session = session
Expand Down Expand Up @@ -228,17 +228,16 @@ def execute_operation(
message.application_properties["locales"] = locales
try:
# TODO: namedtuple is immutable, which may push us to re-think about the namedtuple approach for Message
new_properties = message.properties._replace(message_id=self.next_message_id)
new_properties = message.properties._replace(message_id=uuid4())
except AttributeError:
new_properties = Properties(message_id=self.next_message_id)
new_properties = Properties(message_id=uuid4())
message = message._replace(properties=new_properties)
expire_time = (time.time() + timeout) if timeout else None
message_delivery = _MessageDelivery(message, MessageDeliveryState.WaitingToBeSent, expire_time)

on_send_complete = partial(self._on_send_complete, message_delivery)

self._request_link.send_transfer(message, on_send_complete=on_send_complete, timeout=timeout)
self.next_message_id += 1
self._pending_operations.append(PendingManagementOperation(message, on_execute_operation_complete))

def close(self):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
AmqpMessageProperties,
)
from azure.servicebus._pyamqp.message import Message
from azure.servicebus._pyamqp._decode import decode_payload
from azure.servicebus._pyamqp import error, management_operation
from azure.servicebus._pyamqp.aio import AMQPClientAsync, ReceiveClientAsync, _management_operation_async
from azure.servicebus._common.constants import ServiceBusReceiveMode, ServiceBusSubQueue
Expand All @@ -65,7 +66,6 @@
ServiceBusQueuePreparer,
CachedServiceBusResourceGroupPreparer,
)
from tests.utilities import get_logger, print_message, sleep_until_expired
from mocks_async import MockReceivedMessage, MockReceiver
from tests.utilities import (
get_logger,
Expand Down Expand Up @@ -676,6 +676,64 @@ async def test_async_queue_by_queue_client_conn_str_receive_handler_receiveandde
peeked_msgs = await receiver.peek_messages(max_message_count=10, timeout=10)
assert len(peeked_msgs) == 0

@pytest.mark.asyncio
@pytest.mark.liveTest
@pytest.mark.live_test_only
@CachedServiceBusResourceGroupPreparer(name_prefix="servicebustest")
@CachedServiceBusNamespacePreparer(name_prefix="servicebustest")
@ServiceBusQueuePreparer(
name_prefix="servicebustest", dead_lettering_on_message_expiration=True, lock_duration="PT10S"
)
@pytest.mark.parametrize("uamqp_transport", uamqp_transport_params, ids=uamqp_transport_ids)
@ArgPasserAsync()
async def test_async_queue_receive_handler_peek_check_uniq_corr_id(
self, uamqp_transport, *, servicebus_namespace=None, servicebus_queue=None, **kwargs
):
fully_qualified_namespace = f"{servicebus_namespace.name}{SERVICEBUS_ENDPOINT_SUFFIX}"
credential = get_credential(is_async=True)
async with ServiceBusClient(
fully_qualified_namespace, credential, logging_enable=False, uamqp_transport=uamqp_transport
) as sb_client:
# send 10 messages
async with sb_client.get_queue_sender(servicebus_queue.name) as sender:
for i in range(10):
message = ServiceBusMessage("Handler message no. {}".format(i))
await sender.send_messages(message)

def _hack_parse_received_message(message, message_type, **kwargs):
parsed = []
# Get correlation ID from mgmt msg wrapper. Individual msgs in "value" do not have correlation IDs.
correlation_ids.add(message.properties.correlation_id)
if message.value:
for m in message.value[b"messages"]:
wrapped = decode_payload(memoryview(m[b"message"]))
parsed.append(message_type(wrapped, **kwargs))
return parsed

async def peek_message(receiver, seq_num):
await receiver.peek_messages(max_message_count=1, sequence_number=seq_num)

# Peek 10 unique messages concurrently
async with sb_client.get_queue_receiver(servicebus_queue.name) as receiver:
received_msgs = await receiver.receive_messages(max_message_count=10, max_wait_time=5)
while len(received_msgs) < 10:
received_msgs.extend(await receiver.receive_messages(max_message_count=1, max_wait_time=5))
assert len(received_msgs) == 10

# Grab the sequence numbers of the received messages
seq_nums = set()
for msg in received_msgs:
seq_nums.add(int(msg.sequence_number))

# Use the sequence numbers to peek messages concurrently
# Add the correlation IDs of the peeked msgs to a set
# If the correlation ID is not unique, the set will be smaller than 10
# Otherwise, the set will be 10
correlation_ids = set()
receiver._amqp_transport.parse_received_message = _hack_parse_received_message
await asyncio.gather(*(peek_message(receiver, seq_num) for seq_num in seq_nums))
assert len(correlation_ids) == 10

@pytest.mark.asyncio
@pytest.mark.liveTest
@pytest.mark.live_test_only
Expand Down
Loading