Skip to content

Commit

Permalink
More resilient ASBS lock renewal sample.
Browse files Browse the repository at this point in the history
  • Loading branch information
ramonsmits committed May 16, 2022
1 parent 7851bc5 commit 42f4649
Show file tree
Hide file tree
Showing 8 changed files with 196 additions and 45 deletions.
@@ -0,0 +1,10 @@
<?xml version="1.0" encoding="utf-8"?>

<configuration>
<startup>
<supportedRuntime version="v4.0" sku=".NETFramework,Version=v4.8" />
</startup>
<system.transactions>
<defaultSettings timeout="00:09:00" />
</system.transactions>
</configuration>
Expand Up @@ -12,4 +12,4 @@
<PackageReference Include="NServiceBus.Transport.AzureServiceBus" Version="2.*" />
</ItemGroup>

</Project>
</Project>
Expand Up @@ -8,9 +8,11 @@

class LockRenewalBehavior : Behavior<ITransportReceiveContext>
{
public LockRenewalBehavior(TimeSpan renewLockTokenIn)
public LockRenewalBehavior(TimeSpan lockDuration, TimeSpan renewLockTokenIn, string queueName)
{
this.lockDuration = lockDuration;
this.renewLockTokenIn = renewLockTokenIn;
this.queueName = queueName;
}

public override async Task Invoke(ITransportReceiveContext context, Func<Task> next)
Expand All @@ -28,60 +30,114 @@ public override async Task Invoke(ITransportReceiveContext context, Func<Task> n

#endregion

var messageReceiver = serviceBusClient.CreateReceiver("Samples.ASB.SendReply.LockRenewal");
var messageReceiver = serviceBusClient.CreateReceiver(queueName);

var cts = new CancellationTokenSource();
var token = cts.Token;
try
{
var now = DateTimeOffset.UtcNow;
var remaining = message.LockedUntil - now;

Log.Info($"Incoming message ID: {message.MessageId}");
if (remaining < renewLockTokenIn)
{
throw new Exception($"Not processing message because remaining lock duration is below configured renewal interval.")
{
Data = { { "ServiceBusReceivedMessage.MessageId", message.MessageId } }
};
}

_ = RenewLockToken(token);
var elapsed = lockDuration - remaining;

#region processing-and-cancellation
if (elapsed > renewLockTokenIn)
{
Log.Warn($"{message.MessageId}: Incoming message is locked untill {message.LockedUntil:s}Z but already passed configured renewal interval, renewing lock first. Consider lowering the prefetch count.");
await messageReceiver.RenewMessageLockAsync(message).ConfigureAwait(false);
}

try
{
await next().ConfigureAwait(false);
using (var cts = new CancellationTokenSource())
{
var token = cts.Token;

_ = RenewLockToken(token);

#region processing-and-cancellation

try
{
await next().ConfigureAwait(false);
}
finally
{
remaining = message.LockedUntil - DateTimeOffset.UtcNow;

if (remaining < renewLockTokenIn)
{
Log.Warn($"{message.MessageId}: Processing completed but LockedUntil {message.LockedUntil:s}Z less than {renewLockTokenIn}. This could indicate issues during lock renewal.");
}

cts.Cancel();
}

#endregion
}
}
finally
{
Log.Info($"Cancelling renewal task for incoming message ID: {message.MessageId}");
cts.Cancel();
cts.Dispose();
await messageReceiver.DisposeAsync(); // Cannot use "await using" because of lang version 7.3
}

#endregion

#region renewal-background-task

async Task RenewLockToken(CancellationToken cancellationToken)
{
try
{
int attempts = 0;

while (!cancellationToken.IsCancellationRequested)
{
Log.Info($"Lock will be renewed in {renewLockTokenIn}");

await Task.Delay(renewLockTokenIn, cancellationToken).ConfigureAwait(false);

await messageReceiver.RenewMessageLockAsync(message).ConfigureAwait(false);

Log.Info($"Lock renewed till {message.LockedUntil} UTC / {message.LockedUntil.ToLocalTime()} local");
var now = DateTimeOffset.UtcNow;
var ts = now + renewLockTokenIn;
ts = ts.Round(renewLockTokenIn);

var diff = ts - now;

//await Task.Delay(renewLockTokenIn, cancellationToken).ConfigureAwait(false);
await Task.Delay(diff, cancellationToken).ConfigureAwait(false);

try
{
await messageReceiver.RenewMessageLockAsync(message, cancellationToken).ConfigureAwait(false);
attempts = 0;
Log.Info($"{message.MessageId}: Lock renewed untill {message.LockedUntil:s}Z.");
}
catch (ServiceBusException e) when (e.Reason == ServiceBusFailureReason.MessageLockLost)
{
Log.Error($"{message.MessageId}: Lock lost.", e);
return;
}
catch (Exception e) when (!(e is OperationCanceledException))
{
++attempts;
Log.Warn($"{message.MessageId}: Failed to renew lock (#{attempts:N0}), if lock cannot be renewed within {message.LockedUntil:s}Z message will reappear.", e);
}
}
}
catch (OperationCanceledException)
catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested)
{
Log.Info($"Lock renewal task for incoming message ID: {message.MessageId} was cancelled.");
// Expected, no need to process
}
catch (Exception exception)
catch (Exception e)
{
Log.Error($"Failed to renew lock for incoming message ID: {message.MessageId}", exception);
Log.Fatal($"{message.MessageId}: RenewLockToken: " + e.Message, e);
}
}

#endregion
}

readonly TimeSpan lockDuration;
readonly TimeSpan renewLockTokenIn;
readonly string queueName;
static readonly ILog Log = LogManager.GetLogger<LockRenewalBehavior>();
}
Expand Up @@ -7,11 +7,8 @@ public static class LockRenewalConfiguration
public static void LockRenewal(this EndpointConfiguration endpointConfiguration, Action<LockRenewalOptions> action)
{
var options = new LockRenewalOptions();

action(options);

var settings = endpointConfiguration.GetSettings();

settings.Set<LockRenewalOptions>(options);
}
}
@@ -1,4 +1,5 @@
using System;
using NServiceBus;
using NServiceBus.Features;

public class LockRenewalFeature : Feature
Expand All @@ -15,7 +16,7 @@ internal LockRenewalFeature()
{
// NServiceBus.Transport.AzureServiceBus sets LockDuration to 5 minutes by default
LockDuration = TimeSpan.FromMinutes(5),
ExecuteRenewalBefore = TimeSpan.FromSeconds(10)
RenewalInterval = TimeSpan.FromMinutes(1)
});
});

Expand All @@ -25,11 +26,11 @@ internal LockRenewalFeature()
protected override void Setup(FeatureConfigurationContext context)
{
var lockRenewalOptions = context.Settings.Get<LockRenewalOptions>();
var renewLockTokenIn = lockRenewalOptions.LockDuration - lockRenewalOptions.ExecuteRenewalBefore;
var queueName = context.Settings.LocalAddress();

context.Pipeline.Register(
stepId: "LockRenewal",
factoryMethod: builder => new LockRenewalBehavior(renewLockTokenIn),
factoryMethod: builder => new LockRenewalBehavior(lockRenewalOptions.LockDuration, lockRenewalOptions.RenewalInterval, queueName),
description: "Renew message lock token");
}
}
Expand Up @@ -4,5 +4,5 @@ public class LockRenewalOptions
{
public TimeSpan LockDuration { get; set; }

public TimeSpan ExecuteRenewalBefore { get; set; }
public TimeSpan RenewalInterval { get; set; }
}
@@ -1,4 +1,5 @@
using System.Threading.Tasks;
using System;
using System.Threading.Tasks;
using LockRenewal;
using NServiceBus;
using NServiceBus.Logging;
Expand All @@ -11,9 +12,11 @@ public class LongProcessingMessageHandler : IHandleMessages<LongProcessingMessag

public async Task Handle(LongProcessingMessage message, IMessageHandlerContext context)
{
log.Info($"--- Received a message with processing duration of {message.ProcessingDuration}");
//var duration = message.ProcessingDuration;
var duration = Program.ProcessingDuration;
log.Info($"--- Received a message with processing duration of {duration}, delay until {DateTime.Now+duration}");

await Task.Delay(message.ProcessingDuration).ConfigureAwait(false);
await Task.Delay(duration).ConfigureAwait(false);

log.Info("--- Processing completed");
}
Expand Down
@@ -1,5 +1,7 @@
using System;
using System.Reflection;
using System.Runtime;
using System.Runtime.InteropServices;
using System.Threading.Tasks;
using System.Transactions;
using Azure.Messaging.ServiceBus.Administration;
Expand All @@ -8,27 +10,52 @@

class Program
{

static readonly TimeSpan LockDuration = TimeSpan.FromMinutes(5);
static readonly TimeSpan RenewalInterval = TimeSpan.FromMinutes(1.5);
public static readonly TimeSpan ProcessingDuration = TimeSpan.FromMinutes(11);

static async Task Main()
{
Console.Title = "Samples.ASB.LockRenewal";

// if(RuntimeInformation.FrameworkDescription.StartsWith(".NET Framework"))
// ConfigureTransactionTimeoutNetFramework(TimeSpan.FromHours(1));
// else
// ConfigureTransactionTimeoutCore(TimeSpan.FromHours(1));

Console.WriteLine("Version = {0}", Environment.Version);
Console.WriteLine("OSVersion = {0}", Environment.OSVersion);
Console.WriteLine("FrameworkDescription = {0}", RuntimeInformation.FrameworkDescription);
Console.WriteLine("OSDescription = {0}", RuntimeInformation.OSDescription);
Console.WriteLine("LatencyMode = {0}", GCSettings.LatencyMode);
Console.WriteLine("IsServerGC = {0}", GCSettings.IsServerGC);
Console.WriteLine("LargeObjectHeapCompactionMode = {0}", GCSettings.LargeObjectHeapCompactionMode);
Console.WriteLine("DefaultTimeout = {0}", TransactionManager.DefaultTimeout);
Console.WriteLine("MaximumTimeout = {0}", TransactionManager.MaximumTimeout);

var endpointConfiguration = new EndpointConfiguration("Samples.ASB.SendReply.LockRenewal");
endpointConfiguration.EnableInstallers();

endpointConfiguration.LimitMessageProcessingConcurrencyTo(1);

var connectionString = Environment.GetEnvironmentVariable("AzureServiceBus_ConnectionString");
if (string.IsNullOrWhiteSpace(connectionString))
{
throw new Exception("Could not read the 'AzureServiceBus_ConnectionString' environment variable. Check the sample prerequisites.");
throw new Exception(
"Could not read the 'AzureServiceBus_ConnectionString' environment variable. Check the sample prerequisites.");
}

endpointConfiguration.UseTransport<AzureServiceBusTransport>().ConnectionString(connectionString);
var transport = endpointConfiguration.UseTransport<AzureServiceBusTransport>();
transport.ConnectionString(connectionString);
transport.PrefetchCount(0);

#region override-lock-renewal-configuration

endpointConfiguration.LockRenewal(options =>
{
options.LockDuration = TimeSpan.FromSeconds(30);
options.ExecuteRenewalBefore = TimeSpan.FromSeconds(5);
options.LockDuration = LockDuration;
options.RenewalInterval = RenewalInterval;
});

#endregion
Expand All @@ -37,19 +64,30 @@ static async Task Main()

await OverrideQueueLockDuration("Samples.ASB.SendReply.LockRenewal", connectionString).ConfigureAwait(false);

await endpointInstance.SendLocal(new LongProcessingMessage { ProcessingDuration = TimeSpan.FromSeconds(45) });
if (ProcessingDuration > TransactionManager.MaximumTimeout)
{
throw new Exception("MaximumTimeout");
}

Console.WriteLine("Press any key to exit");
Console.ReadKey();
for (int i = 0; i < 100; i++)
{
await endpointInstance.SendLocal(new LongProcessingMessage { ProcessingDuration = ProcessingDuration });
}

do
{
Console.WriteLine("Press ESCy key to exit");
}
while (Console.ReadKey().Key != ConsoleKey.Escape);

await endpointInstance.Stop().ConfigureAwait(false);
}

private static async Task OverrideQueueLockDuration(string queuePath, string connectionString)
static async Task OverrideQueueLockDuration(string queuePath, string connectionString)
{
var managementClient = new ServiceBusAdministrationClient(connectionString);
var queueDescription = await managementClient.GetQueueAsync(queuePath).ConfigureAwait(false);
queueDescription.Value.LockDuration = TimeSpan.FromSeconds(30);
queueDescription.Value.LockDuration = LockDuration;

await managementClient.UpdateQueueAsync(queueDescription.Value).ConfigureAwait(false);
}
Expand Down Expand Up @@ -83,4 +121,50 @@ static void ConfigureTransactionTimeoutNetFramework(TimeSpan timeout)
}

#endregion
}


static class DateTimeRoundingExtensions
{
public static DateTimeOffset RoundUp(this DateTimeOffset instance, TimeSpan period)
{
return new DateTimeOffset((instance.Ticks + period.Ticks - 1) / period.Ticks * period.Ticks, instance.Offset);
}

public static DateTimeOffset RoundDown(this DateTimeOffset instance, TimeSpan period)
{
var delta = instance.Ticks % period.Ticks;
return new DateTimeOffset(instance.Ticks - delta, instance.Offset);
}

public static DateTimeOffset Round(this DateTimeOffset value, TimeSpan period, MidpointRounding style = default)
{
if (period <= TimeSpan.Zero) throw new ArgumentOutOfRangeException(nameof(period), "value must be positive");

var units = (decimal)value.Ticks / period.Ticks; // Conversion to decimal not to loose precision
var roundedUnits = Math.Round(units, style);
var roundedTicks = (long)roundedUnits * period.Ticks;
var instance = new DateTimeOffset(roundedTicks,value.Offset);
return instance;
}

public static TimeSpan RoundDown(this TimeSpan instance, TimeSpan period)
{
var delta = instance.Ticks % period.Ticks;
return new TimeSpan(instance.Ticks - delta);
}

public static TimeSpan RoundUp(this TimeSpan instance, TimeSpan period)
{
return new TimeSpan((instance.Ticks + period.Ticks - 1) / period.Ticks * period.Ticks);
}

public static TimeSpan Round(this TimeSpan instance, TimeSpan period)
{
if (period == TimeSpan.Zero) return instance;

var rndTicks = period.Ticks;
var ansTicks = instance.Ticks + Math.Sign(instance.Ticks) * rndTicks / 2;
return TimeSpan.FromTicks(ansTicks - ansTicks % rndTicks);
}
}

0 comments on commit 42f4649

Please sign in to comment.