Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ServiceBusRetryPolicy generic overloads to avoid closure capturing #19522

Merged
merged 13 commits into from Mar 22, 2021
150 changes: 75 additions & 75 deletions sdk/servicebus/Azure.Messaging.ServiceBus/src/Amqp/AmqpReceiver.cs
Expand Up @@ -242,19 +242,16 @@ private static void CloseLink(RequestResponseAmqpLink link)
TimeSpan? maxWaitTime,
CancellationToken cancellationToken)
{
IReadOnlyList<ServiceBusReceivedMessage> messages = null;
await _retryPolicy.RunOperation(async (timeout) =>
{
messages = await ReceiveMessagesAsyncInternal(
maxMessages,
maxWaitTime,
return await _retryPolicy.RunOperation(async (receiver, maxmsgs, maxwait, timeout, token) => await receiver.ReceiveMessagesAsyncInternal(
maxmsgs,
maxwait,
timeout,
cancellationToken).ConfigureAwait(false);
},
_connectionScope,
cancellationToken).ConfigureAwait(false);

return messages;
token).ConfigureAwait(false),
this,
maxMessages,
maxWaitTime,
_connectionScope,
cancellationToken).ConfigureAwait(false);
}

/// <summary>
Expand Down Expand Up @@ -350,10 +347,12 @@ private static void CloseLink(RequestResponseAmqpLink link)
string lockToken,
CancellationToken cancellationToken = default) =>
await _retryPolicy.RunOperation(
async (timeout) =>
await CompleteInternalAsync(
lockToken,
async (receiver, lckToken, timeout, _) =>
await receiver.CompleteInternalAsync(
lckToken,
timeout).ConfigureAwait(false),
this,
lockToken,
_connectionScope,
cancellationToken).ConfigureAwait(false);

Expand Down Expand Up @@ -491,10 +490,13 @@ private void ThrowLockLostException()
IDictionary<string, object> propertiesToModify = null,
CancellationToken cancellationToken = default) =>
await _retryPolicy.RunOperation(
async (timeout) => await DeferInternalAsync(
lockToken,
async (receiver, lckToken, properties, timeout, _) => await receiver.DeferInternalAsync(
lckToken,
timeout,
propertiesToModify).ConfigureAwait(false),
properties).ConfigureAwait(false),
this,
lockToken,
propertiesToModify,
_connectionScope,
cancellationToken).ConfigureAwait(false);

Expand Down Expand Up @@ -543,10 +545,13 @@ private void ThrowLockLostException()
IDictionary<string, object> propertiesToModify = null,
CancellationToken cancellationToken = default) =>
await _retryPolicy.RunOperation(
async (timeout) => await AbandonInternalAsync(
lockToken,
async (receiver, lckToken, properties, timeout, _) => await receiver.AbandonInternalAsync(
danielmarbach marked this conversation as resolved.
Show resolved Hide resolved
lckToken,
timeout,
propertiesToModify).ConfigureAwait(false),
properties).ConfigureAwait(false),
this,
lockToken,
propertiesToModify,
_connectionScope,
cancellationToken).ConfigureAwait(false);

Expand Down Expand Up @@ -602,12 +607,17 @@ private void ThrowLockLostException()
IDictionary<string, object> propertiesToModify = default,
CancellationToken cancellationToken = default) =>
await _retryPolicy.RunOperation(
async (timeout) => await DeadLetterInternalAsync(
lockToken,
async (receiver, lckToken, properties, reason, description, timeout, _) => await receiver.DeadLetterInternalAsync(
lckToken,
timeout,
propertiesToModify,
deadLetterReason,
deadLetterErrorDescription).ConfigureAwait(false),
properties,
reason,
description).ConfigureAwait(false),
this,
lockToken,
propertiesToModify,
deadLetterReason,
deadLetterErrorDescription,
_connectionScope,
cancellationToken).ConfigureAwait(false);

Expand Down Expand Up @@ -828,20 +838,18 @@ private static List<ArraySegment<byte>> ConvertLockTokensToDeliveryTags(IEnumera
CancellationToken cancellationToken = default)
{
long seqNumber = sequenceNumber ?? LastPeekedSequenceNumber + 1;
IReadOnlyList<ServiceBusReceivedMessage> messages = null;

await _retryPolicy.RunOperation(
async (timeout) =>
messages = await PeekMessagesInternalAsync(
seqNumber,
messageCount,
timeout,
cancellationToken)
.ConfigureAwait(false),
return await _retryPolicy.RunOperation(
async (receiver, number, count, timeout, token) => await receiver.PeekMessagesInternalAsync(
number,
count,
timeout,
token)
.ConfigureAwait(false),
this,
seqNumber,
messageCount,
_connectionScope,
cancellationToken).ConfigureAwait(false);

return messages;
}

/// <summary>
Expand Down Expand Up @@ -933,17 +941,14 @@ private static List<ArraySegment<byte>> ConvertLockTokensToDeliveryTags(IEnumera
string lockToken,
CancellationToken cancellationToken)
{
DateTimeOffset lockedUntil = DateTimeOffset.MinValue;
await _retryPolicy.RunOperation(
async (timeout) =>
{
lockedUntil = await RenewMessageLockInternalAsync(
lockToken,
timeout).ConfigureAwait(false);
},
return await _retryPolicy.RunOperation(
async (receiver, lckToken, timeout, _) => await receiver.RenewMessageLockInternalAsync(
lckToken,
timeout).ConfigureAwait(false),
this,
lockToken,
_connectionScope,
cancellationToken).ConfigureAwait(false);
return lockedUntil;
}

/// <summary>
Expand Down Expand Up @@ -1005,13 +1010,10 @@ private async Task<AmqpResponseMessage> ExecuteRequest(TimeSpan timeout, AmqpReq
/// </summary>
public override async Task RenewSessionLockAsync(CancellationToken cancellationToken = default)
{
DateTimeOffset lockedUntil;
await _retryPolicy.RunOperation(
async (timeout) =>
{
lockedUntil = await RenewSessionLockInternal(
timeout).ConfigureAwait(false);
},
var lockedUntil = await _retryPolicy.RunOperation(
danielmarbach marked this conversation as resolved.
Show resolved Hide resolved
async (receiver, timeout, _) => await receiver.RenewSessionLockInternal(
timeout).ConfigureAwait(false),
this,
_connectionScope,
cancellationToken).ConfigureAwait(false);
SessionLockedUntil = lockedUntil;
Expand Down Expand Up @@ -1055,15 +1057,11 @@ internal async Task<DateTimeOffset> RenewSessionLockInternal(TimeSpan timeout)
/// <returns>The session state as <see cref="BinaryData"/>.</returns>
public override async Task<BinaryData> GetStateAsync(CancellationToken cancellationToken = default)
{
BinaryData sessionState = default;
await _retryPolicy.RunOperation(
async (timeout) =>
{
sessionState = await GetStateInternal(timeout).ConfigureAwait(false);
},
return await _retryPolicy.RunOperation(
async (receiver, timeout, _) => await receiver.GetStateInternal(timeout).ConfigureAwait(false),
this,
_connectionScope,
cancellationToken).ConfigureAwait(false);
return sessionState;
}

internal async Task<BinaryData> GetStateInternal(TimeSpan timeout)
Expand Down Expand Up @@ -1110,12 +1108,14 @@ internal async Task<BinaryData> GetStateInternal(TimeSpan timeout)
CancellationToken cancellationToken)
{
await _retryPolicy.RunOperation(
async (timeout) =>
async (receiver, state, timeout, _) =>
{
await SetStateInternal(
sessionState,
await receiver.SetStateInternal(
state,
timeout).ConfigureAwait(false);
},
this,
sessionState,
_connectionScope,
cancellationToken).ConfigureAwait(false);
}
Expand Down Expand Up @@ -1162,14 +1162,14 @@ internal async Task<BinaryData> GetStateInternal(TimeSpan timeout)
long[] sequenceNumbers,
CancellationToken cancellationToken = default)
{
IReadOnlyList<ServiceBusReceivedMessage> messages = null;
await _retryPolicy.RunOperation(
async (timeout) => messages = await ReceiveDeferredMessagesAsyncInternal(
sequenceNumbers,
return await _retryPolicy.RunOperation(
async (receiver, sqn, timeout, _) => await receiver.ReceiveDeferredMessagesAsyncInternal(
sqn,
timeout).ConfigureAwait(false),
this,
sequenceNumbers,
_connectionScope,
cancellationToken).ConfigureAwait(false);
return messages;
}

internal virtual async Task<IReadOnlyList<ServiceBusReceivedMessage>> ReceiveDeferredMessagesAsyncInternal(
Expand Down Expand Up @@ -1308,12 +1308,12 @@ private void OnReceiverLinkClosed(object receiver, EventArgs e)
/// <returns>A task to be resolved on when the operation has completed.</returns>
public override async Task OpenLinkAsync(CancellationToken cancellationToken)
{
ReceivingAmqpLink link = null;
await _retryPolicy.RunOperation(
async (timeout) =>
link = await _receiveLink.GetOrCreateAsync(timeout).ConfigureAwait(false),
_connectionScope,
cancellationToken).ConfigureAwait(false);
_ = await _retryPolicy.RunOperation(
async (link, timeout, _) =>
await link.GetOrCreateAsync(timeout).ConfigureAwait(false),
_receiveLink,
_connectionScope,
cancellationToken).ConfigureAwait(false);
}

private bool HasLinkCommunicationError(ReceivingAmqpLink link) =>
Expand Down
71 changes: 34 additions & 37 deletions sdk/servicebus/Azure.Messaging.ServiceBus/src/Amqp/AmqpSender.cs
Expand Up @@ -161,17 +161,14 @@ internal class AmqpSender : TransportSender
CreateMessageBatchOptions options,
CancellationToken cancellationToken)
{
TransportMessageBatch messageBatch = null;
Task createBatchTask = _retryPolicy.RunOperation(async (timeout) =>
{
messageBatch = await CreateMessageBatchInternalAsync(
options,
timeout).ConfigureAwait(false);
},
_connectionScope,
cancellationToken);
await createBatchTask.ConfigureAwait(false);
return messageBatch;
Task<TransportMessageBatch> createBatchTask = _retryPolicy.RunOperation(async (sender, ops, timeout, _) => await sender.CreateMessageBatchInternalAsync(
danielmarbach marked this conversation as resolved.
Show resolved Hide resolved
ops,
timeout).ConfigureAwait(false),
this,
options,
_connectionScope,
cancellationToken);
return await createBatchTask.ConfigureAwait(false);
}

internal async ValueTask<TransportMessageBatch> CreateMessageBatchInternalAsync(
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Example

Before

          if (num1 != 0)
          {
            this.\u003C\u003E8__1 = new AmqpSender.\u003C\u003Ec__DisplayClass16_0();
            this.\u003C\u003E8__1.\u003C\u003E4__this = this.\u003C\u003E4__this;
            this.\u003C\u003E8__1.options = this.options;
            this.\u003C\u003E8__1.messageBatch = (TransportMessageBatch) null;
            // ISSUE: method pointer
            configuredTaskAwaiter = amqpSender._retryPolicy.RunOperation(new Func<TimeSpan, Task>((object) this.\u003C\u003E8__1, __methodptr(\u003CCreateMessageBatchAsync\u003Eb__0)), (TransportConnectionScope) amqpSender._connectionScope, this.cancellationToken).ConfigureAwait(false).GetAwaiter();
            if (!configuredTaskAwaiter.IsCompleted)
            {
              this.\u003C\u003E1__state = num2 = 0;
              this.\u003C\u003Eu__1 = configuredTaskAwaiter;
              // ISSUE: cast to a reference type
              // ISSUE: cast to a reference type
              ((AsyncValueTaskMethodBuilder<TransportMessageBatch>) ref this.\u003C\u003Et__builder).AwaitUnsafeOnCompleted<ConfiguredTaskAwaitable.ConfiguredTaskAwaiter, AmqpSender.\u003CCreateMessageBatchAsync\u003Ed__16>((M0&) ref configuredTaskAwaiter, (M1&) ref this);
              return;
            }
          }

After

          if (num1 != 0)
          {
            // ISSUE: method pointer
            configuredTaskAwaiter = t1._retryPolicy.RunOperation<AmqpSender, CreateMessageBatchOptions, TransportMessageBatch>(AmqpSender.\u003C\u003Ec.\u003C\u003E9__16_0 ?? (AmqpSender.\u003C\u003Ec.\u003C\u003E9__16_0 = new Func<AmqpSender, CreateMessageBatchOptions, TimeSpan, CancellationToken, Task<TransportMessageBatch>>((object) AmqpSender.\u003C\u003Ec.\u003C\u003E9, __methodptr(\u003CCreateMessageBatchAsync\u003Eb__16_0))), t1, this.options, (TransportConnectionScope) t1._connectionScope, this.cancellationToken).ConfigureAwait(false).GetAwaiter();
            if (!configuredTaskAwaiter.IsCompleted)
            {
              this.\u003C\u003E1__state = num2 = 0;
              this.\u003C\u003Eu__1 = configuredTaskAwaiter;
              // ISSUE: cast to a reference type
              // ISSUE: cast to a reference type
              ((AsyncValueTaskMethodBuilder<TransportMessageBatch>) ref this.\u003C\u003Et__builder).AwaitUnsafeOnCompleted<ConfiguredTaskAwaitable<TransportMessageBatch>.ConfiguredTaskAwaiter, AmqpSender.\u003CCreateMessageBatchAsync\u003Ed__16>((M0&) ref configuredTaskAwaiter, (M1&) ref this);
              return;
            }
          }

So we are saving two allocations now for every request and this on every callsite touched by this PR.

Expand Down Expand Up @@ -210,12 +207,13 @@ internal class AmqpSender : TransportSender
ServiceBusMessageBatch messageBatch,
CancellationToken cancellationToken)
{
AmqpMessage messageFactory() => AmqpMessageConverter.BatchSBMessagesAsAmqpMessage(messageBatch.AsEnumerable<ServiceBusMessage>());
await _retryPolicy.RunOperation(async (timeout) =>
await SendBatchInternalAsync(
messageFactory,
await _retryPolicy.RunOperation(async (sender, msgs, timeout, token) =>
await sender.SendBatchInternalAsync(
msgs,
timeout,
cancellationToken).ConfigureAwait(false),
token).ConfigureAwait(false),
this,
messageBatch.AsEnumerable<ServiceBusMessage>(),
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was fairly certain this is an OK change since the batch has a list internally. I couldn't see a reason why it would be required to cast to IEnumerable over and over again during retries. I preserved though the conversion to the AmqpMessage which seems to be the crucial part

_connectionScope,
cancellationToken).ConfigureAwait(false);
}
Expand All @@ -224,12 +222,12 @@ internal class AmqpSender : TransportSender
/// Sends a set of messages to the associated Queue/Topic using a batched approach.
/// </summary>
///
/// <param name="messageFactory"></param>
/// <param name="messages"></param>
/// <param name="timeout"></param>
/// <param name="cancellationToken">An optional <see cref="CancellationToken"/> instance to signal the request to cancel the operation.</param>
///
internal virtual async Task SendBatchInternalAsync(
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is internal virtual, so I wasn't sure if I'm allowed to change it or not. Please advise

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes feel free to change.

Func<AmqpMessage> messageFactory,
IEnumerable<ServiceBusMessage> messages,
TimeSpan timeout,
CancellationToken cancellationToken)
{
Expand All @@ -238,7 +236,7 @@ internal class AmqpSender : TransportSender

try
{
using (AmqpMessage batchMessage = messageFactory())
using (AmqpMessage batchMessage = AmqpMessageConverter.BatchSBMessagesAsAmqpMessage(messages))
{
string messageHash = batchMessage.GetHashCode().ToString(CultureInfo.InvariantCulture);

Expand Down Expand Up @@ -305,12 +303,13 @@ internal class AmqpSender : TransportSender
IReadOnlyList<ServiceBusMessage> messages,
CancellationToken cancellationToken)
{
AmqpMessage messageFactory() => AmqpMessageConverter.BatchSBMessagesAsAmqpMessage(messages);
await _retryPolicy.RunOperation(async (timeout) =>
await SendBatchInternalAsync(
messageFactory,
await _retryPolicy.RunOperation(async (sender, msgs, timeout, token) =>
await sender.SendBatchInternalAsync(
msgs,
timeout,
cancellationToken).ConfigureAwait(false),
token).ConfigureAwait(false),
this,
messages,
_connectionScope,
cancellationToken).ConfigureAwait(false);
}
Expand Down Expand Up @@ -376,17 +375,15 @@ public override async Task CloseAsync(CancellationToken cancellationToken)
IReadOnlyList<ServiceBusMessage> messages,
CancellationToken cancellationToken = default)
{
long[] seqNumbers = null;
await _retryPolicy.RunOperation(async (timeout) =>
{
seqNumbers = await ScheduleMessageInternalAsync(
messages,
timeout,
cancellationToken).ConfigureAwait(false);
},
_connectionScope,
cancellationToken).ConfigureAwait(false);
return seqNumbers ?? Array.Empty<long>();
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can this ever be null? I tried to do in the ScheduleMessageInternalAsync return sequenceNumbers ?? Array.Empty<long>() and flow analysis claimed it cannot be null so I wonder if this is really needed

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think it can be null.

return await _retryPolicy.RunOperation(async (sender, msgs, timeout, token) => await sender
.ScheduleMessageInternalAsync(
msgs,
timeout,
token).ConfigureAwait(false),
this,
messages,
_connectionScope,
cancellationToken).ConfigureAwait(false) ?? Array.Empty<long>();
}

/// <summary>
Expand Down Expand Up @@ -491,12 +488,12 @@ public override async Task CloseAsync(CancellationToken cancellationToken)
long[] sequenceNumbers,
CancellationToken cancellationToken = default)
{
Task cancelMessageTask = _retryPolicy.RunOperation(async (timeout) =>
Task cancelMessageTask = _retryPolicy.RunOperation(async (timeout, token) =>
{
await CancelScheduledMessageInternalAsync(
sequenceNumbers,
timeout,
cancellationToken).ConfigureAwait(false);
token).ConfigureAwait(false);
},
_connectionScope,
cancellationToken);
Expand Down