Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Switch APIs to use the lock token guid to avoid converting from and to string #20543

Merged
merged 1 commit into from
Apr 22, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 24 additions & 28 deletions sdk/servicebus/Azure.Messaging.ServiceBus/src/Amqp/AmqpReceiver.cs
Original file line number Diff line number Diff line change
Expand Up @@ -396,7 +396,7 @@ private async Task<IReadOnlyList<ServiceBusReceivedMessage>> ReceiveMessagesAsyn
///
/// <returns>A task to be resolved on when the operation has completed.</returns>
public override async Task CompleteAsync(
string lockToken,
Guid lockToken,
CancellationToken cancellationToken = default) =>
await _retryPolicy.RunOperation(
static async (value, timeout, _) =>
Expand All @@ -417,20 +417,19 @@ await receiver.CompleteInternalAsync(
/// <param name="lockToken">The lockToken of the <see cref="ServiceBusReceivedMessage"/> to complete.</param>
/// <param name="timeout"></param>
private async Task CompleteInternalAsync(
string lockToken,
Guid lockToken,
TimeSpan timeout)
{
Guid lockTokenGuid = new Guid(lockToken);
if (_requestResponseLockedMessages.Contains(lockTokenGuid))
if (_requestResponseLockedMessages.Contains(lockToken))
{
await DisposeMessageRequestResponseAsync(
lockTokenGuid,
lockToken,
timeout,
DispositionStatus.Completed,
SessionId).ConfigureAwait(false);
return;
}
await DisposeMessageAsync(lockTokenGuid, AmqpConstants.AcceptedOutcome, timeout).ConfigureAwait(false);
await DisposeMessageAsync(lockToken, AmqpConstants.AcceptedOutcome, timeout).ConfigureAwait(false);
}

/// <summary>
Expand Down Expand Up @@ -532,7 +531,7 @@ private void ThrowLockLostException()
/// <param name="cancellationToken">An optional <see cref="CancellationToken"/> instance to signal the request to cancel the operation.</param>
///
/// <remarks>
/// A lock token can be found in <see cref="ServiceBusReceivedMessage.LockToken"/>,
/// A lock token can be found in <see cref="ServiceBusReceivedMessage.LockTokenGuid"/>,
/// only when <see cref="ServiceBusReceiveMode"/> is set to <see cref="ServiceBusReceiveMode.PeekLock"/>.
/// In order to receive this message again in the future, you will need to save
/// the <see cref="ServiceBusReceivedMessage.SequenceNumber"/>
Expand All @@ -543,7 +542,7 @@ private void ThrowLockLostException()
///
/// <returns>A task to be resolved on when the operation has completed.</returns>
public override async Task DeferAsync(
string lockToken,
Guid lockToken,
IDictionary<string, object> propertiesToModify = null,
CancellationToken cancellationToken = default) =>
await _retryPolicy.RunOperation(
Expand All @@ -566,21 +565,20 @@ await receiver.DeferInternalAsync(
/// <param name="propertiesToModify">The properties of the message to modify while deferring the message.</param>
///
private Task DeferInternalAsync(
string lockToken,
Guid lockToken,
TimeSpan timeout,
IDictionary<string, object> propertiesToModify = null)
{
Guid lockTokenGuid = new Guid(lockToken);
if (_requestResponseLockedMessages.Contains(lockTokenGuid))
if (_requestResponseLockedMessages.Contains(lockToken))
{
return DisposeMessageRequestResponseAsync(
lockTokenGuid,
lockToken,
timeout,
DispositionStatus.Defered,
SessionId,
propertiesToModify);
}
return DisposeMessageAsync(lockTokenGuid, GetDeferOutcome(propertiesToModify), timeout);
return DisposeMessageAsync(lockToken, GetDeferOutcome(propertiesToModify), timeout);
}

/// <summary>
Expand All @@ -599,7 +597,7 @@ private Task DeferInternalAsync(
///
/// <returns>A task to be resolved on when the operation has completed.</returns>
public override async Task AbandonAsync(
string lockToken,
Guid lockToken,
IDictionary<string, object> propertiesToModify = null,
CancellationToken cancellationToken = default) =>
await _retryPolicy.RunOperation(
Expand All @@ -623,21 +621,20 @@ await receiver.AbandonInternalAsync(
/// <param name="timeout"></param>
/// <param name="propertiesToModify">The properties of the message to modify while abandoning the message.</param>
private Task AbandonInternalAsync(
string lockToken,
Guid lockToken,
TimeSpan timeout,
IDictionary<string, object> propertiesToModify = null)
{
Guid lockTokenGuid = new Guid(lockToken);
if (_requestResponseLockedMessages.Contains(lockTokenGuid))
if (_requestResponseLockedMessages.Contains(lockToken))
{
return DisposeMessageRequestResponseAsync(
lockTokenGuid,
lockToken,
timeout,
DispositionStatus.Abandoned,
SessionId,
propertiesToModify);
}
return DisposeMessageAsync(lockTokenGuid, GetAbandonOutcome(propertiesToModify), timeout);
return DisposeMessageAsync(lockToken, GetAbandonOutcome(propertiesToModify), timeout);
}

/// <summary>
Expand All @@ -660,7 +657,7 @@ private Task AbandonInternalAsync(
///
/// <returns>A task to be resolved on when the operation has completed.</returns>
public override async Task DeadLetterAsync(
string lockToken,
Guid lockToken,
string deadLetterReason,
string deadLetterErrorDescription = default,
IDictionary<string, object> propertiesToModify = default,
Expand Down Expand Up @@ -690,7 +687,7 @@ await receiver.DeadLetterInternalAsync(
/// <param name="deadLetterReason">The reason for dead-lettering the message.</param>
/// <param name="deadLetterErrorDescription">The error description for dead-lettering the message.</param>
internal virtual Task DeadLetterInternalAsync(
string lockToken,
Guid lockToken,
TimeSpan timeout,
IDictionary<string, object> propertiesToModify,
string deadLetterReason,
Expand All @@ -699,11 +696,10 @@ internal virtual Task DeadLetterInternalAsync(
Argument.AssertNotTooLong(deadLetterReason, Constants.MaxDeadLetterReasonLength, nameof(deadLetterReason));
Argument.AssertNotTooLong(deadLetterErrorDescription, Constants.MaxDeadLetterReasonLength, nameof(deadLetterErrorDescription));

Guid lockTokenGuid = new Guid(lockToken);
if (_requestResponseLockedMessages.Contains(lockTokenGuid))
if (_requestResponseLockedMessages.Contains(lockToken))
{
return DisposeMessageRequestResponseAsync(
lockTokenGuid,
lockToken,
timeout,
DispositionStatus.Suspended,
SessionId,
Expand All @@ -713,7 +709,7 @@ internal virtual Task DeadLetterInternalAsync(
}

return DisposeMessageAsync(
lockTokenGuid,
lockToken,
GetRejectedOutcome(propertiesToModify, deadLetterReason, deadLetterErrorDescription),
timeout);
}
Expand Down Expand Up @@ -993,7 +989,7 @@ private async Task<IReadOnlyList<ServiceBusReceivedMessage>> PeekMessagesInterna
/// <param name="lockToken">Lock token associated with the message.</param>
/// <param name="cancellationToken">An optional <see cref="CancellationToken"/> instance to signal the request to cancel the operation.</param>
public override async Task<DateTimeOffset> RenewMessageLockAsync(
string lockToken,
Guid lockToken,
CancellationToken cancellationToken)
{
return await _retryPolicy.RunOperation(
Expand All @@ -1018,7 +1014,7 @@ static async (value, timeout, _) =>
/// <param name="lockToken">Lock token associated with the message.</param>
/// <param name="timeout"></param>
private async Task<DateTimeOffset> RenewMessageLockInternalAsync(
string lockToken,
Guid lockToken,
TimeSpan timeout)
{
DateTimeOffset lockedUntil;
Expand All @@ -1033,7 +1029,7 @@ private async Task<DateTimeOffset> RenewMessageLockInternalAsync(
{
amqpRequestMessage.AmqpMessage.ApplicationProperties.Map[ManagementConstants.Request.AssociatedLinkName] = receiveLink.Name;
}
amqpRequestMessage.Map[ManagementConstants.Properties.LockTokens] = new[] { new Guid(lockToken) };
amqpRequestMessage.Map[ManagementConstants.Properties.LockTokens] = new Guid[] { lockToken };

AmqpResponseMessage amqpResponseMessage = await ExecuteRequest(
timeout,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ public abstract Task<IReadOnlyList<ServiceBusReceivedMessage>> ReceiveMessagesAs
///
/// <returns>A task to be resolved on when the operation has completed.</returns>
public abstract Task CompleteAsync(
string lockToken,
Guid lockToken,
CancellationToken cancellationToken);

/// <summary> Indicates that the receiver wants to defer the processing for the message.</summary>
Expand All @@ -91,7 +91,7 @@ public abstract Task CompleteAsync(
/// <param name="cancellationToken">An optional <see cref="CancellationToken"/> instance to signal the request to cancel the operation.</param>
///
/// <remarks>
/// A lock token can be found in <see cref="ServiceBusReceivedMessage.LockToken"/>,
/// A lock token can be found in <see cref="ServiceBusReceivedMessage.LockTokenGuid"/>,
/// only when <see cref="ServiceBusReceiveMode"/> is set to <see cref="ServiceBusReceiveMode.PeekLock"/>.
/// In order to receive this message again in the future, you will need to save the <see cref="ServiceBusReceivedMessage.SequenceNumber"/>
/// and receive it using ReceiveDeferredMessageBatchAsync(IEnumerable, CancellationToken).
Expand All @@ -101,7 +101,7 @@ public abstract Task CompleteAsync(
///
/// <returns>A task to be resolved on when the operation has completed.</returns>
public abstract Task DeferAsync(
string lockToken,
Guid lockToken,
IDictionary<string, object> propertiesToModify = null,
CancellationToken cancellationToken = default);

Expand Down Expand Up @@ -141,7 +141,7 @@ public abstract Task<IReadOnlyList<ServiceBusReceivedMessage>> PeekMessagesAsync
///
/// <returns>A task to be resolved on when the operation has completed.</returns>
public abstract Task AbandonAsync(
string lockToken,
Guid lockToken,
IDictionary<string, object> propertiesToModify = null,
CancellationToken cancellationToken = default);

Expand All @@ -165,7 +165,7 @@ public abstract Task AbandonAsync(
///
/// <returns>A task to be resolved on when the operation has completed.</returns>
public abstract Task DeadLetterAsync(
string lockToken,
Guid lockToken,
string deadLetterReason = default,
string deadLetterErrorDescription = default,
IDictionary<string, object> propertiesToModify = default,
Expand All @@ -191,7 +191,7 @@ public abstract Task<IReadOnlyList<ServiceBusReceivedMessage>> ReceiveDeferredMe
/// <param name="lockToken">Lock token associated with the message.</param>
/// <param name="cancellationToken">An optional <see cref="CancellationToken"/> instance to signal the request to cancel the operation.</param>
public abstract Task<DateTimeOffset> RenewMessageLockAsync(
string lockToken,
Guid lockToken,
CancellationToken cancellationToken);

/// <summary>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -413,8 +413,18 @@ public virtual void CancelScheduledMessagesException(string identifier, string e
#endregion

#region Settlement

[NonEvent]
public virtual void CompleteMessageStart(string identifier, int messageCount, Guid lockToken)
danielmarbach marked this conversation as resolved.
Show resolved Hide resolved
{
if (IsEnabled())
{
CompleteMessageStartCore(identifier, messageCount, lockToken.ToString());
}
}

[Event(CompleteMessageStartEvent, Level = EventLevel.Informational, Message = "{0}: CompleteAsync start. MessageCount = {1}, LockTokens = {2}")]
public virtual void CompleteMessageStart(string identifier, int messageCount, string lockTokens)
public virtual void CompleteMessageStartCore(string identifier, int messageCount, string lockTokens)
{
if (IsEnabled())
{
Expand All @@ -440,8 +450,17 @@ public virtual void CompleteMessageException(string identifier, string exception
}
}

[NonEvent]
public virtual void DeferMessageStart(string identifier, int messageCount, Guid lockToken)
danielmarbach marked this conversation as resolved.
Show resolved Hide resolved
{
if (IsEnabled())
{
DeferMessageStartCore(identifier, messageCount, lockToken.ToString());
}
}

[Event(DeferMessageStartEvent, Level = EventLevel.Informational, Message = "{0}: DeferAsync start. MessageCount = {1}, LockToken = {2}")]
public virtual void DeferMessageStart(string identifier, int messageCount, string lockToken)
public virtual void DeferMessageStartCore(string identifier, int messageCount, string lockToken)
{
if (IsEnabled())
{
Expand All @@ -464,8 +483,17 @@ public virtual void DeferMessageException(string identifier, string exception)
WriteEvent(DeferMessageExceptionEvent, identifier, exception);
}

[NonEvent]
public virtual void AbandonMessageStart(string identifier, int messageCount, Guid lockToken)
{
if (IsEnabled())
{
AbandonMessageStartCore(identifier, messageCount, lockToken.ToString());
}
}

[Event(AbandonMessageStartEvent, Level = EventLevel.Informational, Message = "{0}: AbandonAsync start. MessageCount = {1}, LockToken = {2}")]
public virtual void AbandonMessageStart(string identifier, int messageCount, string lockToken)
public virtual void AbandonMessageStartCore(string identifier, int messageCount, string lockToken)
{
if (IsEnabled())
{
Expand All @@ -491,8 +519,17 @@ public virtual void AbandonMessageException(string identifier, string exception)
}
}

[NonEvent]
public virtual void DeadLetterMessageStart(string identifier, int messageCount, Guid lockToken)
{
if (IsEnabled())
{
DeadLetterMessageStartCore(identifier, messageCount, lockToken.ToString());
}
}

[Event(DeadLetterMessageStartEvent, Level = EventLevel.Informational, Message = "{0}: DeadLetterAsync start. MessageCount = {1}, LockToken = {2}")]
public virtual void DeadLetterMessageStart(string identifier, int messageCount, string lockToken)
public virtual void DeadLetterMessageStartCore(string identifier, int messageCount, string lockToken)
{
if (IsEnabled())
{
Expand Down Expand Up @@ -520,8 +557,18 @@ public virtual void DeadLetterMessageException(string identifier, string excepti
#endregion

#region Lock renewal

[NonEvent]
public virtual void RenewMessageLockStart(string identifier, int messageCount, Guid lockToken)
{
if (IsEnabled())
{
RenewMessageLockStartCore(identifier, messageCount, lockToken.ToString());
}
}

[Event(RenewMessageLockStartEvent, Level = EventLevel.Informational, Message = "{0}: RenewLockAsync start. MessageCount = {1}, LockToken = {2}")]
public virtual void RenewMessageLockStart(string identifier, int messageCount, string lockToken)
public virtual void RenewMessageLockStartCore(string identifier, int messageCount, string lockToken)
{
if (IsEnabled())
{
Expand Down Expand Up @@ -688,8 +735,17 @@ public virtual void StopProcessingException(string identifier, string exception)
}
}

[NonEvent]
public virtual void ProcessorRenewMessageLockStart(string identifier, int messageCount, Guid lockToken)
{
if (IsEnabled())
{
ProcessorRenewMessageLockStartCore(identifier, messageCount, lockToken.ToString());
}
}

[Event(ProcessorRenewMessageLockStartEvent, Level = EventLevel.Informational, Message = "{0}: Processor RenewMessageLock start. MessageCount = {1}, LockToken = {2}")]
public virtual void ProcessorRenewMessageLockStart(string identifier, int messageCount, string lockToken)
public virtual void ProcessorRenewMessageLockStartCore(string identifier, int messageCount, string lockToken)
{
if (IsEnabled())
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ private async Task ProcessOneMessage(
// as we want in flight auto-completion to be able
// to finish
await Receiver.CompleteMessageAsync(
message.LockToken,
message.LockTokenGuid,
CancellationToken.None)
.ConfigureAwait(false);
}
Expand Down Expand Up @@ -212,7 +212,7 @@ await RaiseExceptionReceived(
// as we want in flight abandon to be able
// to finish even if user stopped processing
await Receiver.AbandonMessageAsync(
message.LockToken,
message.LockTokenGuid,
cancellationToken: CancellationToken.None)
.ConfigureAwait(false);
}
Expand Down Expand Up @@ -256,7 +256,7 @@ private async Task RenewMessageLock(
{
try
{
ServiceBusEventSource.Log.ProcessorRenewMessageLockStart(Processor.Identifier, 1, message.LockToken);
ServiceBusEventSource.Log.ProcessorRenewMessageLockStart(Processor.Identifier, 1, message.LockTokenGuid);
TimeSpan delay = CalculateRenewDelay(message.LockedUntil);

// We're awaiting the task created by 'ContinueWith' to avoid awaiting the Delay task which may be canceled
Expand Down
Loading