Skip to content

Commit

Permalink
ServiceBusRetryPolicy generic overloads to avoid closure capturing (#…
Browse files Browse the repository at this point in the history
…19522)

* ServiceBusRetryPolicy generic overloads to avoid closure capturing

* Avoid closure on sending and retrying batched messages

* Do conversion once

* Address leftover RunOperation in AmqpSender

* Static lambdas

* Switch to value tuple approach

* Remove async/await where not needed

* Cleanup VoidResult usage

* Revert "Remove async/await where not needed"

This reverts commit 0420017.

* Switch to ValueTask

* Use more intention revealing names in static lambdas

* Remove unnecessary null check

* Return expected public type earlier
  • Loading branch information
danielmarbach committed Mar 22, 2021
1 parent fb6adfe commit e3eca79
Show file tree
Hide file tree
Showing 3 changed files with 164 additions and 136 deletions.
168 changes: 92 additions & 76 deletions sdk/servicebus/Azure.Messaging.ServiceBus/src/Amqp/AmqpReceiver.cs
Expand Up @@ -242,19 +242,18 @@ private static void CloseLink(RequestResponseAmqpLink link)
TimeSpan? maxWaitTime,
CancellationToken cancellationToken)
{
IReadOnlyList<ServiceBusReceivedMessage> messages = null;
await _retryPolicy.RunOperation(async (timeout) =>
{
messages = await ReceiveMessagesAsyncInternal(
maxMessages,
maxWaitTime,
timeout,
cancellationToken).ConfigureAwait(false);
},
_connectionScope,
cancellationToken).ConfigureAwait(false);

return messages;
return await _retryPolicy.RunOperation(static async (value, timeout, token) =>
{
var (receiver, maxMessages, maxWaitTime) = value;
return await receiver.ReceiveMessagesAsyncInternal(
maxMessages,
maxWaitTime,
timeout,
token).ConfigureAwait(false);
},
(this, maxMessages, maxWaitTime),
_connectionScope,
cancellationToken).ConfigureAwait(false);
}

/// <summary>
Expand Down Expand Up @@ -350,10 +349,14 @@ private static void CloseLink(RequestResponseAmqpLink link)
string lockToken,
CancellationToken cancellationToken = default) =>
await _retryPolicy.RunOperation(
async (timeout) =>
await CompleteInternalAsync(
lockToken,
timeout).ConfigureAwait(false),
static async (value, timeout, _) =>
{
var (receiver, lockToken) = value;
await receiver.CompleteInternalAsync(
lockToken,
timeout).ConfigureAwait(false);
},
(this, lockToken),
_connectionScope,
cancellationToken).ConfigureAwait(false);

Expand Down Expand Up @@ -491,10 +494,15 @@ private void ThrowLockLostException()
IDictionary<string, object> propertiesToModify = null,
CancellationToken cancellationToken = default) =>
await _retryPolicy.RunOperation(
async (timeout) => await DeferInternalAsync(
lockToken,
timeout,
propertiesToModify).ConfigureAwait(false),
static async (value, timeout, _) =>
{
var (receiver, lockToken, properties) = value;
await receiver.DeferInternalAsync(
lockToken,
timeout,
properties).ConfigureAwait(false);
},
(this, lockToken, propertiesToModify),
_connectionScope,
cancellationToken).ConfigureAwait(false);

Expand Down Expand Up @@ -543,10 +551,15 @@ private void ThrowLockLostException()
IDictionary<string, object> propertiesToModify = null,
CancellationToken cancellationToken = default) =>
await _retryPolicy.RunOperation(
async (timeout) => await AbandonInternalAsync(
lockToken,
timeout,
propertiesToModify).ConfigureAwait(false),
static async (value, timeout, _) =>
{
var (receiver, lockToken, properties) = value;
await receiver.AbandonInternalAsync(
lockToken,
timeout,
properties).ConfigureAwait(false);
},
(this, lockToken, propertiesToModify),
_connectionScope,
cancellationToken).ConfigureAwait(false);

Expand Down Expand Up @@ -602,12 +615,17 @@ private void ThrowLockLostException()
IDictionary<string, object> propertiesToModify = default,
CancellationToken cancellationToken = default) =>
await _retryPolicy.RunOperation(
async (timeout) => await DeadLetterInternalAsync(
lockToken,
timeout,
propertiesToModify,
deadLetterReason,
deadLetterErrorDescription).ConfigureAwait(false),
static async (value, timeout, _) =>
{
var (receiver, lockToken, propertiesToModify, deadLetterReason, deadLetterErrorDescription) = value;
await receiver.DeadLetterInternalAsync(
lockToken,
timeout,
propertiesToModify,
deadLetterReason,
deadLetterErrorDescription).ConfigureAwait(false);
},
(this, lockToken, propertiesToModify, deadLetterReason, deadLetterErrorDescription),
_connectionScope,
cancellationToken).ConfigureAwait(false);

Expand Down Expand Up @@ -828,20 +846,20 @@ private static List<ArraySegment<byte>> ConvertLockTokensToDeliveryTags(IEnumera
CancellationToken cancellationToken = default)
{
long seqNumber = sequenceNumber ?? LastPeekedSequenceNumber + 1;
IReadOnlyList<ServiceBusReceivedMessage> messages = null;

await _retryPolicy.RunOperation(
async (timeout) =>
messages = await PeekMessagesInternalAsync(
seqNumber,
messageCount,
timeout,
cancellationToken)
.ConfigureAwait(false),
return await _retryPolicy.RunOperation(
static async (value, timeout, token) =>
{
var (receiver, seqNumber, messageCount) = value;
return await receiver.PeekMessagesInternalAsync(
seqNumber,
messageCount,
timeout,
token)
.ConfigureAwait(false);
},
(this, seqNumber, messageCount),
_connectionScope,
cancellationToken).ConfigureAwait(false);

return messages;
}

/// <summary>
Expand Down Expand Up @@ -933,17 +951,17 @@ private static List<ArraySegment<byte>> ConvertLockTokensToDeliveryTags(IEnumera
string lockToken,
CancellationToken cancellationToken)
{
DateTimeOffset lockedUntil = DateTimeOffset.MinValue;
await _retryPolicy.RunOperation(
async (timeout) =>
return await _retryPolicy.RunOperation(
static async (value, timeout, _) =>
{
lockedUntil = await RenewMessageLockInternalAsync(
var (receiver, lockToken) = value;
return await receiver.RenewMessageLockInternalAsync(
lockToken,
timeout).ConfigureAwait(false);
},
(this, lockToken),
_connectionScope,
cancellationToken).ConfigureAwait(false);
return lockedUntil;
}

/// <summary>
Expand Down Expand Up @@ -1005,13 +1023,10 @@ private async Task<AmqpResponseMessage> ExecuteRequest(TimeSpan timeout, AmqpReq
/// </summary>
public override async Task RenewSessionLockAsync(CancellationToken cancellationToken = default)
{
DateTimeOffset lockedUntil;
await _retryPolicy.RunOperation(
async (timeout) =>
{
lockedUntil = await RenewSessionLockInternal(
timeout).ConfigureAwait(false);
},
var lockedUntil = await _retryPolicy.RunOperation(
static async (receiver, timeout, _) => await receiver.RenewSessionLockInternal(
timeout).ConfigureAwait(false),
this,
_connectionScope,
cancellationToken).ConfigureAwait(false);
SessionLockedUntil = lockedUntil;
Expand Down Expand Up @@ -1055,15 +1070,11 @@ internal async Task<DateTimeOffset> RenewSessionLockInternal(TimeSpan timeout)
/// <returns>The session state as <see cref="BinaryData"/>.</returns>
public override async Task<BinaryData> GetStateAsync(CancellationToken cancellationToken = default)
{
BinaryData sessionState = default;
await _retryPolicy.RunOperation(
async (timeout) =>
{
sessionState = await GetStateInternal(timeout).ConfigureAwait(false);
},
return await _retryPolicy.RunOperation(
static async (receiver, timeout, _) => await receiver.GetStateInternal(timeout).ConfigureAwait(false),
this,
_connectionScope,
cancellationToken).ConfigureAwait(false);
return sessionState;
}

internal async Task<BinaryData> GetStateInternal(TimeSpan timeout)
Expand Down Expand Up @@ -1110,12 +1121,14 @@ internal async Task<BinaryData> GetStateInternal(TimeSpan timeout)
CancellationToken cancellationToken)
{
await _retryPolicy.RunOperation(
async (timeout) =>
static async (value, timeout, _) =>
{
await SetStateInternal(
var (receiver, sessionState) = value;
await receiver.SetStateInternal(
sessionState,
timeout).ConfigureAwait(false);
},
(this, sessionState),
_connectionScope,
cancellationToken).ConfigureAwait(false);
}
Expand Down Expand Up @@ -1162,14 +1175,17 @@ internal async Task<BinaryData> GetStateInternal(TimeSpan timeout)
long[] sequenceNumbers,
CancellationToken cancellationToken = default)
{
IReadOnlyList<ServiceBusReceivedMessage> messages = null;
await _retryPolicy.RunOperation(
async (timeout) => messages = await ReceiveDeferredMessagesAsyncInternal(
sequenceNumbers,
timeout).ConfigureAwait(false),
return await _retryPolicy.RunOperation(
static async (value, timeout, _) =>
{
var (receiver, sequenceNumbers) = value;
return await receiver.ReceiveDeferredMessagesAsyncInternal(
sequenceNumbers,
timeout).ConfigureAwait(false);
},
(this, sequenceNumbers),
_connectionScope,
cancellationToken).ConfigureAwait(false);
return messages;
}

internal virtual async Task<IReadOnlyList<ServiceBusReceivedMessage>> ReceiveDeferredMessagesAsyncInternal(
Expand Down Expand Up @@ -1308,12 +1324,12 @@ private void OnReceiverLinkClosed(object receiver, EventArgs e)
/// <returns>A task to be resolved on when the operation has completed.</returns>
public override async Task OpenLinkAsync(CancellationToken cancellationToken)
{
ReceivingAmqpLink link = null;
await _retryPolicy.RunOperation(
async (timeout) =>
link = await _receiveLink.GetOrCreateAsync(timeout).ConfigureAwait(false),
_connectionScope,
cancellationToken).ConfigureAwait(false);
_ = await _retryPolicy.RunOperation(
static async (receiveLink, timeout, _) =>
await receiveLink.GetOrCreateAsync(timeout).ConfigureAwait(false),
_receiveLink,
_connectionScope,
cancellationToken).ConfigureAwait(false);
}

private bool HasLinkCommunicationError(ReceivingAmqpLink link) =>
Expand Down

0 comments on commit e3eca79

Please sign in to comment.