Skip to content

Commit

Permalink
Remove async/await where not needed
Browse files Browse the repository at this point in the history
  • Loading branch information
danielmarbach committed Mar 19, 2021
1 parent 54874f0 commit 0420017
Show file tree
Hide file tree
Showing 2 changed files with 85 additions and 131 deletions.
148 changes: 62 additions & 86 deletions sdk/servicebus/Azure.Messaging.ServiceBus/src/Amqp/AmqpReceiver.cs
Expand Up @@ -237,24 +237,22 @@ private static void CloseLink(RequestResponseAmqpLink link)
/// <param name="cancellationToken">An optional <see cref="CancellationToken"/> instance to signal the request to cancel the operation.</param>
///
/// <returns>List of messages received. Returns an empty list if no message is found.</returns>
public override async Task<IReadOnlyList<ServiceBusReceivedMessage>> ReceiveMessagesAsync(
public override Task<IReadOnlyList<ServiceBusReceivedMessage>> ReceiveMessagesAsync(
int maxMessages,
TimeSpan? maxWaitTime,
CancellationToken cancellationToken)
{
return await _retryPolicy.RunOperation(static async (value, timeout, token) =>
CancellationToken cancellationToken) =>
_retryPolicy.RunOperation(static (value, timeout, token) =>
{
var (receiver, maxmsgs, maxwait) = value;
return await receiver.ReceiveMessagesAsyncInternal(
return receiver.ReceiveMessagesAsyncInternal(
maxmsgs,
maxwait,
timeout,
token).ConfigureAwait(false);
token);
},
(this, maxMessages, maxWaitTime),
_connectionScope,
cancellationToken).ConfigureAwait(false);
}
cancellationToken);

/// <summary>
/// Receives a list of <see cref="ServiceBusMessage" /> from the Service Bus entity.
Expand Down Expand Up @@ -345,20 +343,18 @@ private static void CloseLink(RequestResponseAmqpLink link)
/// </remarks>
///
/// <returns>A task to be resolved on when the operation has completed.</returns>
public override async Task CompleteAsync(
public override Task CompleteAsync(
string lockToken,
CancellationToken cancellationToken = default) =>
await _retryPolicy.RunOperation(
static async (value, timeout, _) =>
_retryPolicy.RunOperation(
static (value, timeout, _) =>
{
var (receiver, lckToken) = value;
await receiver.CompleteInternalAsync(
lckToken,
timeout).ConfigureAwait(false);
return receiver.CompleteInternalAsync(lckToken, timeout);
},
(this, lockToken),
_connectionScope,
cancellationToken).ConfigureAwait(false);
cancellationToken);

/// <summary>
/// Completes a series of <see cref="ServiceBusMessage"/> using a list of lock tokens. This will delete the message from the service.
Expand Down Expand Up @@ -489,22 +485,19 @@ private void ThrowLockLostException()
/// </remarks>
///
/// <returns>A task to be resolved on when the operation has completed.</returns>
public override async Task DeferAsync(
public override Task DeferAsync(
string lockToken,
IDictionary<string, object> propertiesToModify = null,
CancellationToken cancellationToken = default) =>
await _retryPolicy.RunOperation(
static async (value, timeout, _) =>
_retryPolicy.RunOperation(
static (value, timeout, _) =>
{
var (receiver, lckToken, properties) = value;
await receiver.DeferInternalAsync(
lckToken,
timeout,
properties).ConfigureAwait(false);
return receiver.DeferInternalAsync(lckToken, timeout, properties);
},
(this, lockToken, propertiesToModify),
_connectionScope,
cancellationToken).ConfigureAwait(false);
cancellationToken);

/// <summary>Indicates that the receiver wants to defer the processing for the message.</summary>
///
Expand Down Expand Up @@ -546,22 +539,19 @@ private void ThrowLockLostException()
/// </remarks>
///
/// <returns>A task to be resolved on when the operation has completed.</returns>
public override async Task AbandonAsync(
public override Task AbandonAsync(
string lockToken,
IDictionary<string, object> propertiesToModify = null,
CancellationToken cancellationToken = default) =>
await _retryPolicy.RunOperation(
static async (value, timeout, _) =>
_retryPolicy.RunOperation(
static (value, timeout, _) =>
{
var (receiver, lckToken, properties) = value;
await receiver.AbandonInternalAsync(
lckToken,
timeout,
properties).ConfigureAwait(false);
return receiver.AbandonInternalAsync(lckToken, timeout, properties);
},
(this, lockToken, propertiesToModify),
_connectionScope,
cancellationToken).ConfigureAwait(false);
cancellationToken);

/// <summary>
/// Abandons a <see cref="ServiceBusMessage"/> using a lock token. This will make the message available again for processing.
Expand Down Expand Up @@ -608,26 +598,21 @@ private void ThrowLockLostException()
/// </remarks>
///
/// <returns>A task to be resolved on when the operation has completed.</returns>
public override async Task DeadLetterAsync(
public override Task DeadLetterAsync(
string lockToken,
string deadLetterReason,
string deadLetterErrorDescription = default,
IDictionary<string, object> propertiesToModify = default,
CancellationToken cancellationToken = default) =>
await _retryPolicy.RunOperation(
static async (value, timeout, _) =>
_retryPolicy.RunOperation(
static (value, timeout, _) =>
{
var (receiver, lckToken, properties, reason, description) = value;
await receiver.DeadLetterInternalAsync(
lckToken,
timeout,
properties,
reason,
description).ConfigureAwait(false);
return receiver.DeadLetterInternalAsync(lckToken, timeout, properties, reason, description);
},
(this, lockToken, propertiesToModify, deadLetterReason, deadLetterErrorDescription),
_connectionScope,
cancellationToken).ConfigureAwait(false);
cancellationToken);

/// <summary>
/// Moves a message to the dead-letter subqueue.
Expand Down Expand Up @@ -840,26 +825,25 @@ private static List<ArraySegment<byte>> ConvertLockTokensToDeliveryTags(IEnumera
/// Also, unlike <see cref="ReceiveMessagesAsync(int, TimeSpan?, CancellationToken)"/>, this method will fetch even Deferred messages (but not Deadlettered message)
/// </remarks>
/// <returns></returns>
public override async Task<IReadOnlyList<ServiceBusReceivedMessage>> PeekMessagesAsync(
public override Task<IReadOnlyList<ServiceBusReceivedMessage>> PeekMessagesAsync(
long? sequenceNumber,
int messageCount = 1,
CancellationToken cancellationToken = default)
{
long seqNumber = sequenceNumber ?? LastPeekedSequenceNumber + 1;
return await _retryPolicy.RunOperation(
static async (value, timeout, token) =>
return _retryPolicy.RunOperation(
static (value, timeout, token) =>
{
var (receiver, number, count) = value;
return await receiver.PeekMessagesInternalAsync(
return receiver.PeekMessagesInternalAsync(
number,
count,
timeout,
token)
.ConfigureAwait(false);
token);
},
(this, seqNumber, messageCount),
_connectionScope,
cancellationToken).ConfigureAwait(false);
cancellationToken);
}

/// <summary>
Expand Down Expand Up @@ -947,22 +931,20 @@ private static List<ArraySegment<byte>> ConvertLockTokensToDeliveryTags(IEnumera
///
/// <param name="lockToken">Lock token associated with the message.</param>
/// <param name="cancellationToken">An optional <see cref="CancellationToken"/> instance to signal the request to cancel the operation.</param>
public override async Task<DateTimeOffset> RenewMessageLockAsync(
public override Task<DateTimeOffset> RenewMessageLockAsync(
string lockToken,
CancellationToken cancellationToken)
{
return await _retryPolicy.RunOperation(
static async (value, timeout, _) =>
CancellationToken cancellationToken) =>
_retryPolicy.RunOperation(
static (value, timeout, _) =>
{
var (receiver, lckToken) = value;
return await receiver.RenewMessageLockInternalAsync(
return receiver.RenewMessageLockInternalAsync(
lckToken,
timeout).ConfigureAwait(false);
timeout);
},
(this, lockToken),
_connectionScope,
cancellationToken).ConfigureAwait(false);
}
cancellationToken);

/// <summary>
/// Renews the lock on the message. The lock will be renewed based on the setting specified on the queue.
Expand Down Expand Up @@ -1024,8 +1006,7 @@ private async Task<AmqpResponseMessage> ExecuteRequest(TimeSpan timeout, AmqpReq
public override async Task RenewSessionLockAsync(CancellationToken cancellationToken = default)
{
var lockedUntil = await _retryPolicy.RunOperation(
static async (receiver, timeout, _) => await receiver.RenewSessionLockInternal(
timeout).ConfigureAwait(false),
static (receiver, timeout, _) => receiver.RenewSessionLockInternal(timeout),
this,
_connectionScope,
cancellationToken).ConfigureAwait(false);
Expand Down Expand Up @@ -1068,13 +1049,13 @@ internal async Task<DateTimeOffset> RenewSessionLockInternal(TimeSpan timeout)
/// <param name="cancellationToken">An optional <see cref="CancellationToken"/> instance to signal the request to cancel the operation.</param>
///
/// <returns>The session state as <see cref="BinaryData"/>.</returns>
public override async Task<BinaryData> GetStateAsync(CancellationToken cancellationToken = default)
public override Task<BinaryData> GetStateAsync(CancellationToken cancellationToken = default)
{
return await _retryPolicy.RunOperation(
static async (receiver, timeout, _) => await receiver.GetStateInternal(timeout).ConfigureAwait(false),
return _retryPolicy.RunOperation(
static (receiver, timeout, _) => receiver.GetStateInternal(timeout),
this,
_connectionScope,
cancellationToken).ConfigureAwait(false);
cancellationToken);
}

internal async Task<BinaryData> GetStateInternal(TimeSpan timeout)
Expand Down Expand Up @@ -1116,22 +1097,20 @@ internal async Task<BinaryData> GetStateInternal(TimeSpan timeout)
/// <remarks>This state is stored on Service Bus forever unless you set an empty state on it.</remarks>
///
/// <returns>A task to be resolved on when the operation has completed.</returns>
public override async Task SetStateAsync(
public override Task SetStateAsync(
BinaryData sessionState,
CancellationToken cancellationToken)
{
await _retryPolicy.RunOperation(
static async (value, timeout, _) =>
CancellationToken cancellationToken) =>
_retryPolicy.RunOperation(
static (value, timeout, _) =>
{
var (receiver, state) = value;
await receiver.SetStateInternal(
return receiver.SetStateInternal(
state,
timeout).ConfigureAwait(false);
timeout);
},
(this, sessionState),
_connectionScope,
cancellationToken).ConfigureAwait(false);
}
cancellationToken);

internal async Task SetStateInternal(
BinaryData sessionState,
Expand Down Expand Up @@ -1171,22 +1150,20 @@ internal async Task<BinaryData> GetStateInternal(TimeSpan timeout)
/// <returns>Messages identified by sequence number are returned. Returns null if no messages are found.
/// Throws if the messages have not been deferred.</returns>
/// <seealso cref="DeferAsync"/>
public override async Task<IReadOnlyList<ServiceBusReceivedMessage>> ReceiveDeferredMessagesAsync(
public override Task<IReadOnlyList<ServiceBusReceivedMessage>> ReceiveDeferredMessagesAsync(
long[] sequenceNumbers,
CancellationToken cancellationToken = default)
{
return await _retryPolicy.RunOperation(
static async (value, timeout, _) =>
CancellationToken cancellationToken = default) =>
_retryPolicy.RunOperation(
static (value, timeout, _) =>
{
var (receiver, sqn) = value;
return await receiver.ReceiveDeferredMessagesAsyncInternal(
return receiver.ReceiveDeferredMessagesAsyncInternal(
sqn,
timeout).ConfigureAwait(false);
timeout);
},
(this, sequenceNumbers),
_connectionScope,
cancellationToken).ConfigureAwait(false);
}
cancellationToken);

internal virtual async Task<IReadOnlyList<ServiceBusReceivedMessage>> ReceiveDeferredMessagesAsyncInternal(
long[] sequenceNumbers,
Expand Down Expand Up @@ -1322,14 +1299,13 @@ private void OnReceiverLinkClosed(object receiver, EventArgs e)
/// <param name="cancellationToken">An optional <see cref="CancellationToken"/> instance to signal the request to cancel the operation.</param>
///
/// <returns>A task to be resolved on when the operation has completed.</returns>
public override async Task OpenLinkAsync(CancellationToken cancellationToken)
public override Task OpenLinkAsync(CancellationToken cancellationToken)
{
_ = await _retryPolicy.RunOperation(
static async (link, timeout, _) =>
await link.GetOrCreateAsync(timeout).ConfigureAwait(false),
return _retryPolicy.RunOperation(
static (link, timeout, _) => link.GetOrCreateAsync(timeout),
_receiveLink,
_connectionScope,
cancellationToken).ConfigureAwait(false);
cancellationToken);
}

private bool HasLinkCommunicationError(ReceivingAmqpLink link) =>
Expand Down

0 comments on commit 0420017

Please sign in to comment.