Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Alternative: Remove byte array allocations from AmqpReceiver DisposeMessagesAsync #20427

Merged
merged 3 commits into from Apr 15, 2021
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
87 changes: 43 additions & 44 deletions sdk/servicebus/Azure.Messaging.ServiceBus/src/Amqp/AmqpReceiver.cs
Expand Up @@ -2,9 +2,11 @@
// Licensed under the MIT License.

using System;
using System.Buffers;
using System.Collections.Generic;
using System.Linq;
using System.Runtime.ExceptionServices;
using System.Runtime.InteropServices;
using System.Threading;
using System.Threading.Tasks;
using System.Transactions;
Expand Down Expand Up @@ -72,6 +74,8 @@ internal class AmqpReceiver : TransportReceiver
private readonly FaultTolerantAmqpObject<ReceivingAmqpLink> _receiveLink;
private readonly FaultTolerantAmqpObject<RequestResponseAmqpLink> _managementLink;

private const int SizeOfGuidInBytes = 16;

/// <summary>
/// Gets the sequence number of the last peeked message.
/// </summary>
Expand Down Expand Up @@ -407,7 +411,7 @@ private static void CloseLink(RequestResponseAmqpLink link)
cancellationToken).ConfigureAwait(false);

/// <summary>
/// Completes a series of <see cref="ServiceBusMessage"/> using a list of lock tokens. This will delete the message from the service.
/// Completes a <see cref="ServiceBusReceivedMessage"/> using a lock token. This will delete the message from the service.
/// </summary>
///
/// <param name="lockToken">The lockToken of the <see cref="ServiceBusReceivedMessage"/> to complete.</param>
Expand All @@ -417,34 +421,39 @@ private static void CloseLink(RequestResponseAmqpLink link)
TimeSpan timeout)
{
Guid lockTokenGuid = new Guid(lockToken);
var lockTokenGuids = new[] { lockTokenGuid };
if (_requestResponseLockedMessages.Contains(lockTokenGuid))
{
await DisposeMessageRequestResponseAsync(
lockTokenGuids,
lockTokenGuid,
timeout,
DispositionStatus.Completed,
SessionId).ConfigureAwait(false);
return;
}
await DisposeMessagesAsync(lockTokenGuids, AmqpConstants.AcceptedOutcome, timeout).ConfigureAwait(false);
await DisposeMessageAsync(lockTokenGuid, AmqpConstants.AcceptedOutcome, timeout).ConfigureAwait(false);
}

/// <summary>
/// Completes a series of <see cref="ServiceBusMessage"/> using a list of lock tokens. This will delete the message from the service.
/// Settles a <see cref="ServiceBusReceivedMessage"/> using a lock token.
/// </summary>
///
/// <param name="lockTokens">An <see cref="IEnumerable{T}"/> containing the lock tokens of the corresponding messages to complete.</param>
/// <param name="lockToken">The lockToken of the <see cref="ServiceBusReceivedMessage"/> to complete.</param>
/// <param name="outcome"></param>
/// <param name="timeout"></param>
private async Task DisposeMessagesAsync(
IEnumerable<Guid> lockTokens,
private async Task DisposeMessageAsync(
Guid lockToken,
Outcome outcome,
TimeSpan timeout)
{
ThrowIfSessionLockLost();
List<ArraySegment<byte>> deliveryTags = ConvertLockTokensToDeliveryTags(lockTokens);

byte[] bufferForLockToken = ArrayPool<byte>.Shared.Rent(SizeOfGuidInBytes);
if (!MemoryMarshal.TryWrite(bufferForLockToken, ref lockToken))
{
lockToken.ToByteArray().AsSpan().CopyTo(bufferForLockToken);
}

ArraySegment<byte> deliveryTag = new ArraySegment<byte>(bufferForLockToken, 0, SizeOfGuidInBytes);
ReceivingAmqpLink receiveLink = null;
try
{
Expand All @@ -463,27 +472,21 @@ private static void CloseLink(RequestResponseAmqpLink link)
receiveLink = await _receiveLink.GetOrCreateAsync(timeout).ConfigureAwait(false);
}

var disposeMessageTasks = new Task<Outcome>[deliveryTags.Count];
var i = 0;
foreach (ArraySegment<byte> deliveryTag in deliveryTags)
{
disposeMessageTasks[i++] = receiveLink.DisposeMessageAsync(deliveryTag, transactionId, outcome, true, timeout);
}

Outcome[] outcomes = await Task.WhenAll(disposeMessageTasks).ConfigureAwait(false);
Outcome outcomeResult = await receiveLink
.DisposeMessageAsync(deliveryTag, transactionId, outcome, true, timeout).ConfigureAwait(false);
Error error = null;
foreach (Outcome item in outcomes)
Outcome disposedOutcome =
outcomeResult.DescriptorCode == Rejected.Code && ((error = ((Rejected)outcomeResult).Error) != null)
? outcomeResult
: null;
if (disposedOutcome != null)
{
Outcome disposedOutcome = item.DescriptorCode == Rejected.Code && ((error = ((Rejected)item).Error) != null) ? item : null;
if (disposedOutcome != null)
if (error.Condition.Equals(AmqpErrorCode.NotFound))
{
if (error.Condition.Equals(AmqpErrorCode.NotFound))
{
ThrowLockLostException();
}

throw error.ToMessagingContractException();
ThrowLockLostException();
}

throw error.ToMessagingContractException();
}
}
catch (Exception exception)
Expand All @@ -503,6 +506,10 @@ private static void CloseLink(RequestResponseAmqpLink link)

throw;
}
finally
{
ArrayPool<byte>.Shared.Return(bufferForLockToken);
}
}

private void ThrowLockLostException()
Expand Down Expand Up @@ -564,17 +571,16 @@ private void ThrowLockLostException()
IDictionary<string, object> propertiesToModify = null)
{
Guid lockTokenGuid = new Guid(lockToken);
var lockTokenGuids = new[] { lockTokenGuid };
if (_requestResponseLockedMessages.Contains(lockTokenGuid))
{
return DisposeMessageRequestResponseAsync(
lockTokenGuids,
lockTokenGuid,
timeout,
DispositionStatus.Defered,
SessionId,
propertiesToModify);
}
return DisposeMessagesAsync(lockTokenGuids, GetDeferOutcome(propertiesToModify), timeout);
return DisposeMessageAsync(lockTokenGuid, GetDeferOutcome(propertiesToModify), timeout);
}

/// <summary>
Expand Down Expand Up @@ -622,17 +628,16 @@ private void ThrowLockLostException()
IDictionary<string, object> propertiesToModify = null)
{
Guid lockTokenGuid = new Guid(lockToken);
var lockTokenGuids = new[] { lockTokenGuid };
if (_requestResponseLockedMessages.Contains(lockTokenGuid))
{
return DisposeMessageRequestResponseAsync(
lockTokenGuids,
lockTokenGuid,
timeout,
DispositionStatus.Abandoned,
SessionId,
propertiesToModify);
}
return DisposeMessagesAsync(lockTokenGuids, GetAbandonOutcome(propertiesToModify), timeout);
return DisposeMessageAsync(lockTokenGuid, GetAbandonOutcome(propertiesToModify), timeout);
}

/// <summary>
Expand Down Expand Up @@ -695,11 +700,10 @@ private void ThrowLockLostException()
Argument.AssertNotTooLong(deadLetterErrorDescription, Constants.MaxDeadLetterReasonLength, nameof(deadLetterErrorDescription));

Guid lockTokenGuid = new Guid(lockToken);
var lockTokenGuids = new[] { lockTokenGuid };
if (_requestResponseLockedMessages.Contains(lockTokenGuid))
{
return DisposeMessageRequestResponseAsync(
lockTokenGuids,
lockTokenGuid,
timeout,
DispositionStatus.Suspended,
SessionId,
Expand All @@ -708,8 +712,8 @@ private void ThrowLockLostException()
deadLetterErrorDescription);
}

return DisposeMessagesAsync(
lockTokenGuids,
return DisposeMessageAsync(
lockTokenGuid,
GetRejectedOutcome(propertiesToModify, deadLetterReason, deadLetterErrorDescription),
timeout);
}
Expand Down Expand Up @@ -756,15 +760,15 @@ private void ThrowLockLostException()
/// Updates the disposition status of deferred messages.
/// </summary>
///
/// <param name="lockTokens">Message lock tokens to update disposition status.</param>
/// <param name="lockToken">Message lock token to update disposition status.</param>
/// <param name="timeout"></param>
/// <param name="dispositionStatus"></param>
/// <param name="sessionId"></param>
/// <param name="propertiesToModify"></param>
/// <param name="deadLetterReason"></param>
/// <param name="deadLetterDescription"></param>
private async Task DisposeMessageRequestResponseAsync(
Guid[] lockTokens,
Guid lockToken,
TimeSpan timeout,
DispositionStatus dispositionStatus,
string sessionId = null,
Expand All @@ -780,7 +784,7 @@ private void ThrowLockLostException()
amqpRequestMessage.AmqpMessage.ApplicationProperties.Map[ManagementConstants.Request.AssociatedLinkName] = receiveLink.Name;
}

amqpRequestMessage.Map[ManagementConstants.Properties.LockTokens] = lockTokens;
amqpRequestMessage.Map[ManagementConstants.Properties.LockTokens] = new Guid[]{ lockToken };
amqpRequestMessage.Map[ManagementConstants.Properties.DispositionStatus] = dispositionStatus.ToString().ToLowerInvariant();

if (deadLetterReason != null)
Expand Down Expand Up @@ -836,11 +840,6 @@ private void ThrowLockLostException()
private static Outcome GetDeferOutcome(IDictionary<string, object> propertiesToModify) =>
GetModifiedOutcome(propertiesToModify, true);

private static List<ArraySegment<byte>> ConvertLockTokensToDeliveryTags(IEnumerable<Guid> lockTokens)
{
return lockTokens.Select(lockToken => new ArraySegment<byte>(lockToken.ToByteArray())).ToList();
}

private static Outcome GetModifiedOutcome(
IDictionary<string, object> propertiesToModify,
bool undeliverableHere)
Expand Down