Skip to content

Commit

Permalink
More resilient ASBS lock renewal sample. (#5677)
Browse files Browse the repository at this point in the history
  • Loading branch information
ramonsmits committed May 16, 2022
1 parent 7851bc5 commit 966ddf6
Show file tree
Hide file tree
Showing 9 changed files with 178 additions and 69 deletions.
Expand Up @@ -8,9 +8,11 @@

class LockRenewalBehavior : Behavior<ITransportReceiveContext>
{
public LockRenewalBehavior(TimeSpan renewLockTokenIn)
public LockRenewalBehavior(TimeSpan lockDuration, TimeSpan renewLockTokenIn, string queueName)
{
this.lockDuration = lockDuration;
this.renewLockTokenIn = renewLockTokenIn;
this.queueName = queueName;
}

public override async Task Invoke(ITransportReceiveContext context, Func<Task> next)
Expand All @@ -28,60 +30,107 @@ public override async Task Invoke(ITransportReceiveContext context, Func<Task> n

#endregion

var messageReceiver = serviceBusClient.CreateReceiver("Samples.ASB.SendReply.LockRenewal");
var messageReceiver = serviceBusClient.CreateReceiver(queueName);

var cts = new CancellationTokenSource();
var token = cts.Token;
try
{
var now = DateTimeOffset.UtcNow;
var remaining = message.LockedUntil - now;

Log.Info($"Incoming message ID: {message.MessageId}");
if (remaining < renewLockTokenIn)
{
throw new Exception($"Not processing message because remaining lock duration is below configured renewal interval.")
{
Data = { { "ServiceBusReceivedMessage.MessageId", message.MessageId } }
};
}

_ = RenewLockToken(token);
var elapsed = lockDuration - remaining;

#region processing-and-cancellation
if (elapsed > renewLockTokenIn)
{
Log.Warn($"{message.MessageId}: Incoming message is locked untill {message.LockedUntil:s}Z but already passed configured renewal interval, renewing lock first. Consider lowering the prefetch count.");
await messageReceiver.RenewMessageLockAsync(message).ConfigureAwait(false);
}

try
{
await next().ConfigureAwait(false);
using (var cts = new CancellationTokenSource())
{
var token = cts.Token;

_ = RenewLockToken(token);

#region processing-and-cancellation

try
{
await next().ConfigureAwait(false);
}
finally
{
remaining = message.LockedUntil - DateTimeOffset.UtcNow;

if (remaining < renewLockTokenIn)
{
Log.Warn($"{message.MessageId}: Processing completed but LockedUntil {message.LockedUntil:s}Z less than {renewLockTokenIn}. This could indicate issues during lock renewal.");
}

cts.Cancel();
}

#endregion
}
}
finally
{
Log.Info($"Cancelling renewal task for incoming message ID: {message.MessageId}");
cts.Cancel();
cts.Dispose();
await messageReceiver.DisposeAsync(); // Cannot use "await using" because of lang version 7.3
}

#endregion

#region renewal-background-task

async Task RenewLockToken(CancellationToken cancellationToken)
{
try
{
int attempts = 0;

while (!cancellationToken.IsCancellationRequested)
{
Log.Info($"Lock will be renewed in {renewLockTokenIn}");

await Task.Delay(renewLockTokenIn, cancellationToken).ConfigureAwait(false);

await messageReceiver.RenewMessageLockAsync(message).ConfigureAwait(false);

Log.Info($"Lock renewed till {message.LockedUntil} UTC / {message.LockedUntil.ToLocalTime()} local");
try
{
await messageReceiver.RenewMessageLockAsync(message, cancellationToken).ConfigureAwait(false);
attempts = 0;
Log.Info($"{message.MessageId}: Lock renewed untill {message.LockedUntil:s}Z.");
}
catch (ServiceBusException e) when (e.Reason == ServiceBusFailureReason.MessageLockLost)
{
Log.Error($"{message.MessageId}: Lock lost.", e);
return;
}
catch (Exception e) when (!(e is OperationCanceledException))
{
++attempts;
Log.Warn($"{message.MessageId}: Failed to renew lock (#{attempts:N0}), if lock cannot be renewed within {message.LockedUntil:s}Z message will reappear.", e);
}
}
}
catch (OperationCanceledException)
catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested)
{
Log.Info($"Lock renewal task for incoming message ID: {message.MessageId} was cancelled.");
// Expected, no need to process
}
catch (Exception exception)
catch (Exception e)
{
Log.Error($"Failed to renew lock for incoming message ID: {message.MessageId}", exception);
Log.Fatal($"{message.MessageId}: RenewLockToken: " + e.Message, e);
}
}

#endregion
}

readonly TimeSpan lockDuration;
readonly TimeSpan renewLockTokenIn;
readonly string queueName;
static readonly ILog Log = LogManager.GetLogger<LockRenewalBehavior>();
}
Expand Up @@ -7,11 +7,8 @@ public static class LockRenewalConfiguration
public static void LockRenewal(this EndpointConfiguration endpointConfiguration, Action<LockRenewalOptions> action)
{
var options = new LockRenewalOptions();

action(options);

var settings = endpointConfiguration.GetSettings();

settings.Set<LockRenewalOptions>(options);
}
}
@@ -1,4 +1,5 @@
using System;
using NServiceBus;
using NServiceBus.Features;

public class LockRenewalFeature : Feature
Expand All @@ -15,7 +16,7 @@ internal LockRenewalFeature()
{
// NServiceBus.Transport.AzureServiceBus sets LockDuration to 5 minutes by default
LockDuration = TimeSpan.FromMinutes(5),
ExecuteRenewalBefore = TimeSpan.FromSeconds(10)
RenewalInterval = TimeSpan.FromMinutes(1)
});
});

Expand All @@ -25,11 +26,11 @@ internal LockRenewalFeature()
protected override void Setup(FeatureConfigurationContext context)
{
var lockRenewalOptions = context.Settings.Get<LockRenewalOptions>();
var renewLockTokenIn = lockRenewalOptions.LockDuration - lockRenewalOptions.ExecuteRenewalBefore;
var queueName = context.Settings.LocalAddress();

context.Pipeline.Register(
stepId: "LockRenewal",
factoryMethod: builder => new LockRenewalBehavior(renewLockTokenIn),
factoryMethod: builder => new LockRenewalBehavior(lockRenewalOptions.LockDuration, lockRenewalOptions.RenewalInterval, queueName),
description: "Renew message lock token");
}
}
Expand Up @@ -4,5 +4,5 @@ public class LockRenewalOptions
{
public TimeSpan LockDuration { get; set; }

public TimeSpan ExecuteRenewalBefore { get; set; }
public TimeSpan RenewalInterval { get; set; }
}
Expand Up @@ -21,21 +21,24 @@ static async Task Main()
throw new Exception("Could not read the 'AzureServiceBus_ConnectionString' environment variable. Check the sample prerequisites.");
}

endpointConfiguration.UseTransport<AzureServiceBusTransport>().ConnectionString(connectionString);
var transport = endpointConfiguration.UseTransport<AzureServiceBusTransport>().ConnectionString(connectionString);

#region override-lock-renewal-configuration

var lockDuration = TimeSpan.FromSeconds(30);
var renewalInterval = TimeSpan.FromSeconds(5);

endpointConfiguration.LockRenewal(options =>
{
options.LockDuration = TimeSpan.FromSeconds(30);
options.ExecuteRenewalBefore = TimeSpan.FromSeconds(5);
options.LockDuration = lockDuration;
options.RenewalInterval = renewalInterval;
});

#endregion

var endpointInstance = await Endpoint.Start(endpointConfiguration).ConfigureAwait(false);

await OverrideQueueLockDuration("Samples.ASB.SendReply.LockRenewal", connectionString).ConfigureAwait(false);
await OverrideQueueLockDuration("Samples.ASB.SendReply.LockRenewal", connectionString, lockDuration).ConfigureAwait(false);

await endpointInstance.SendLocal(new LongProcessingMessage { ProcessingDuration = TimeSpan.FromSeconds(45) });

Expand All @@ -45,11 +48,11 @@ static async Task Main()
await endpointInstance.Stop().ConfigureAwait(false);
}

private static async Task OverrideQueueLockDuration(string queuePath, string connectionString)
static async Task OverrideQueueLockDuration(string queuePath, string connectionString, TimeSpan lockDuration)
{
var managementClient = new ServiceBusAdministrationClient(connectionString);
var queueDescription = await managementClient.GetQueueAsync(queuePath).ConfigureAwait(false);
queueDescription.Value.LockDuration = TimeSpan.FromSeconds(30);
queueDescription.Value.LockDuration = lockDuration;

await managementClient.UpdateQueueAsync(queueDescription.Value).ConfigureAwait(false);
}
Expand Down
Expand Up @@ -8,9 +8,11 @@

class LockRenewalBehavior : Behavior<ITransportReceiveContext>
{
public LockRenewalBehavior(TimeSpan renewLockTokenIn)
public LockRenewalBehavior(TimeSpan lockDuration, TimeSpan renewLockTokenIn, string queueName)
{
this.lockDuration = lockDuration;
this.renewLockTokenIn = renewLockTokenIn;
this.queueName = queueName;
}

public override async Task Invoke(ITransportReceiveContext context, Func<Task> next)
Expand All @@ -28,60 +30,108 @@ public override async Task Invoke(ITransportReceiveContext context, Func<Task> n

#endregion

var messageReceiver = serviceBusClient.CreateReceiver("Samples.ASB.SendReply.LockRenewal");
var messageReceiver = serviceBusClient.CreateReceiver(queueName);

var cts = new CancellationTokenSource();
var token = cts.Token;
try
{
var now = DateTimeOffset.UtcNow;
var remaining = message.LockedUntil - now;

Log.Info($"Incoming message ID: {message.MessageId}");
if (remaining < renewLockTokenIn)
{
throw new Exception($"Not processing message because remaining lock duration is below configured renewal interval.")
{
Data = { { "ServiceBusReceivedMessage.MessageId", message.MessageId } }
};
}

_ = RenewLockToken(token);
var elapsed = lockDuration - remaining;

#region processing-and-cancellation
if (elapsed > renewLockTokenIn)
{
Log.Warn($"{message.MessageId}: Incoming message is locked untill {message.LockedUntil:s}Z but already passed configured renewal interval, renewing lock first. Consider lowering the prefetch count.");
await messageReceiver.RenewMessageLockAsync(message, context.CancellationToken).ConfigureAwait(false);
}

try
{
await next().ConfigureAwait(false);
;
using (var cts = CancellationTokenSource.CreateLinkedTokenSource(context.CancellationToken))
{
var token = cts.Token;

_ = RenewLockToken(token);

#region processing-and-cancellation

try
{
await next().ConfigureAwait(false);
}
finally
{
remaining = message.LockedUntil - DateTimeOffset.UtcNow;

if (remaining < renewLockTokenIn)
{
Log.Warn($"{message.MessageId}: Processing completed but LockedUntil {message.LockedUntil:s}Z less than {renewLockTokenIn}. This could indicate issues during lock renewal.");
}

cts.Cancel();
}

#endregion
}
}
finally
{
Log.Info($"Cancelling renewal task for incoming message ID: {message.MessageId}");
cts.Cancel();
cts.Dispose();
await messageReceiver.DisposeAsync(); // Cannot use "await using" because of lang version 7.3
}

#endregion

#region renewal-background-task

async Task RenewLockToken(CancellationToken cancellationToken)
{
try
{
int attempts = 0;

while (!cancellationToken.IsCancellationRequested)
{
Log.Info($"Lock will be renewed in {renewLockTokenIn}");

await Task.Delay(renewLockTokenIn, cancellationToken).ConfigureAwait(false);

await messageReceiver.RenewMessageLockAsync(message, cancellationToken).ConfigureAwait(false);

Log.Info($"Lock renewed till {message.LockedUntil} UTC / {message.LockedUntil.ToLocalTime()} local");
try
{
await messageReceiver.RenewMessageLockAsync(message, cancellationToken).ConfigureAwait(false);
attempts = 0;
Log.Info($"{message.MessageId}: Lock renewed untill {message.LockedUntil:s}Z.");
}
catch (ServiceBusException e) when (e.Reason == ServiceBusFailureReason.MessageLockLost)
{
Log.Error($"{message.MessageId}: Lock lost.", e);
return;
}
catch (Exception e) when (!(e is OperationCanceledException))
{
++attempts;
Log.Warn($"{message.MessageId}: Failed to renew lock (#{attempts:N0}), if lock cannot be renewed within {message.LockedUntil:s}Z message will reappear.", e);
}
}
}
catch (OperationCanceledException)
catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested)
{
Log.Info($"Lock renewal task for incoming message ID: {message.MessageId} was cancelled.");
// Expected, no need to process
}
catch (Exception exception)
catch (Exception e)
{
Log.Error($"Failed to renew lock for incoming message ID: {message.MessageId}", exception);
Log.Fatal($"{message.MessageId}: RenewLockToken: " + e.Message, e);
}
}

#endregion
}

readonly TimeSpan lockDuration;
readonly TimeSpan renewLockTokenIn;
readonly string queueName;
static readonly ILog Log = LogManager.GetLogger<LockRenewalBehavior>();
}

0 comments on commit 966ddf6

Please sign in to comment.