Skip to content

Commit

Permalink
Initial suggestion for Global locking for Timed Services
Browse files Browse the repository at this point in the history
Issue #3075

This is an example of providers to allow locking across resources,
included is

- In Memory Lock (for when we only want a lock in process)
- Azure Blob Lock
  • Loading branch information
preardon committed May 16, 2024
1 parent b3346eb commit 09c7da1
Show file tree
Hide file tree
Showing 13 changed files with 383 additions and 31 deletions.
14 changes: 14 additions & 0 deletions Brighter.sln
Original file line number Diff line number Diff line change
Expand Up @@ -329,6 +329,8 @@ Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Paramore.Brighter.ServiceAc
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Paramore.Brighter.Sqlite.Dapper", "src\Paramore.Brighter.Sqlite.Dapper\Paramore.Brighter.Sqlite.Dapper.csproj", "{3384FBF0-5DCB-452D-8288-FAD1D0023089}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Paramore.Brighter.Locking.Azure", "Paramore.Brighter.Locking.Azure\Paramore.Brighter.Locking.Azure.csproj", "{021F3B51-A640-4C0D-9B47-FB4E32DF6715}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
Expand Down Expand Up @@ -1875,6 +1877,18 @@ Global
{3384FBF0-5DCB-452D-8288-FAD1D0023089}.Release|Mixed Platforms.Build.0 = Release|Any CPU
{3384FBF0-5DCB-452D-8288-FAD1D0023089}.Release|x86.ActiveCfg = Release|Any CPU
{3384FBF0-5DCB-452D-8288-FAD1D0023089}.Release|x86.Build.0 = Release|Any CPU
{021F3B51-A640-4C0D-9B47-FB4E32DF6715}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{021F3B51-A640-4C0D-9B47-FB4E32DF6715}.Debug|Any CPU.Build.0 = Debug|Any CPU
{021F3B51-A640-4C0D-9B47-FB4E32DF6715}.Debug|Mixed Platforms.ActiveCfg = Debug|Any CPU
{021F3B51-A640-4C0D-9B47-FB4E32DF6715}.Debug|Mixed Platforms.Build.0 = Debug|Any CPU
{021F3B51-A640-4C0D-9B47-FB4E32DF6715}.Debug|x86.ActiveCfg = Debug|Any CPU
{021F3B51-A640-4C0D-9B47-FB4E32DF6715}.Debug|x86.Build.0 = Debug|Any CPU
{021F3B51-A640-4C0D-9B47-FB4E32DF6715}.Release|Any CPU.ActiveCfg = Release|Any CPU
{021F3B51-A640-4C0D-9B47-FB4E32DF6715}.Release|Any CPU.Build.0 = Release|Any CPU
{021F3B51-A640-4C0D-9B47-FB4E32DF6715}.Release|Mixed Platforms.ActiveCfg = Release|Any CPU
{021F3B51-A640-4C0D-9B47-FB4E32DF6715}.Release|Mixed Platforms.Build.0 = Release|Any CPU
{021F3B51-A640-4C0D-9B47-FB4E32DF6715}.Release|x86.ActiveCfg = Release|Any CPU
{021F3B51-A640-4C0D-9B47-FB4E32DF6715}.Release|x86.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
Expand Down
106 changes: 106 additions & 0 deletions Paramore.Brighter.Locking.Azure/AzureBlobLockingProvider.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
using Azure;
using Azure.Storage.Blobs;
using Azure.Storage.Blobs.Specialized;
using Microsoft.Extensions.Logging;
using Paramore.Brighter.Logging;

namespace Paramore.Brighter.Locking.Azure;

public class AzureBlobLockingProvider(AzureBlobLockingProviderOptions options) : IDistributedLock
{
private readonly BlobContainerClient _containerClient = new BlobContainerClient(options.BlobContainerUri, options.TokenCredential);
private readonly ILogger _logger = ApplicationLogging.CreateLogger<AzureBlobLockingProviderOptions>();

private readonly Dictionary<string, string> _leases = new Dictionary<string, string>();

public async Task<bool> ObtainLockAsync(string resource, CancellationToken cancellationToken)
{
var client = GetBlobClient(resource);

// Write if does not exist
if (!await client.ExistsAsync(cancellationToken))
{
await using var emptyStream = new MemoryStream();
await using var writer = new StreamWriter(emptyStream);
await writer.WriteAsync(string.Empty);
await writer.FlushAsync(cancellationToken);
emptyStream.Position = 0;
await client.UploadAsync(emptyStream, cancellationToken: cancellationToken);
}

try
{
var response = await client.GetBlobLeaseClient().AcquireAsync(options.LeaseValidity, cancellationToken: cancellationToken);
_leases.Add(NormaliseResourceName(resource), response.Value.LeaseId);
return true;
}
catch (RequestFailedException e)
{
_logger.LogInformation("Could not Acquire Lease on Blob {LockResourceName}", resource);
return false;
}
}

public bool ObtainLock(string resource)
{
var client = GetBlobClient(resource);

// Write if does not exist
if (!client.Exists())
{
using var emptyStream = new MemoryStream();
using var writer = new StreamWriter(emptyStream);
writer.Write(string.Empty);
writer.Flush();
emptyStream.Position = 0;
client.Upload(emptyStream);
}

try
{
var response = client.GetBlobLeaseClient().Acquire(options.LeaseValidity);
_leases.Add(NormaliseResourceName(resource), response.Value.LeaseId);
return true;
}
catch (RequestFailedException e)
{
_logger.LogInformation("Could not Acquire Lease on Blob {LockResourceName}", resource);
return false;
}
}

public async Task ReleaseLockAsync(string resource, CancellationToken cancellationToken)
{
var client = GetBlobLeaseClientForResource(resource);
if(client == null)
return;
await client.ReleaseAsync(cancellationToken: cancellationToken);
_leases.Remove(NormaliseResourceName(resource));
}

public void ReleaseLock(string resource)
{
var client = GetBlobLeaseClientForResource(resource);
if(client == null)
return;
client.Release();
_leases.Remove(NormaliseResourceName(resource));
}

private BlobLeaseClient? GetBlobLeaseClientForResource(string resource)
{
if (_leases.ContainsKey(NormaliseResourceName(resource)))
return GetBlobClient(resource).GetBlobLeaseClient(_leases[NormaliseResourceName(resource)]);

_logger.LogInformation("No lock found for {LockResourceName}", resource);
return null;
}

private BlobClient GetBlobClient(string resource)
{
var storageLocation = options.StorageLocationFunc.Invoke(NormaliseResourceName(resource));
return _containerClient.GetBlobClient(storageLocation);
}

private static string NormaliseResourceName(string resourceName) => resourceName.ToLower();
}
29 changes: 29 additions & 0 deletions Paramore.Brighter.Locking.Azure/AzureBlobLockingProviderOptions.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
using Azure.Core;

namespace Paramore.Brighter.Locking.Azure;

public class AzureBlobLockingProviderOptions(
Uri blobContainerUri,
TokenCredential tokenCredential
)
{
/// <summary>
/// The URI of the blob container
/// </summary>
public Uri BlobContainerUri { get; init; } = blobContainerUri;

/// <summary>
/// The Credential to use when writing blobs
/// </summary>
public TokenCredential TokenCredential { get; init; } = tokenCredential;

/// <summary>
/// The amount of time before the lease automatically expires
/// </summary>
public TimeSpan LeaseValidity { get; init; } = TimeSpan.FromMinutes(1);

/// <summary>
/// The function to provide the location to store the locks inside of the Blob container
/// </summary>
public Func<string, string> StorageLocationFunc = (resource) => $"lock-{resource}";
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<TargetFramework>net8.0</TargetFramework>
<ImplicitUsings>enable</ImplicitUsings>
<Nullable>enable</Nullable>
</PropertyGroup>

<ItemGroup>
<ProjectReference Include="..\src\Paramore.Brighter\Paramore.Brighter.csproj" />
</ItemGroup>

<ItemGroup>
<PackageReference Include="Azure.Storage.Blobs" />
</ItemGroup>

</Project>
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,10 @@ public static IBrighterBuilder BrighterHandlerBuilder(IServiceCollection service
brighterBuilder.Services.Add(asyncOutboxdescriptor);
}
}

// If no distributed locking service is added, then add the in memory variant
var distributedLock = busConfiguration.DistributedLock ?? new InMemoryLock();
brighterBuilder.Services.AddSingleton(distributedLock);

if (busConfiguration.UseRpc)
{
Expand Down
10 changes: 6 additions & 4 deletions src/Paramore.Brighter.Extensions.Hosting/TimedOutboxArchiver.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,19 +11,21 @@ namespace Paramore.Brighter.Extensions.Hosting

public class TimedOutboxArchiver(
IServiceScopeFactory serviceScopeFactory,
IDistributedLock distributedLock,
TimedOutboxArchiverOptions options)
: IHostedService, IDisposable
{
private static readonly ILogger s_logger = ApplicationLogging.CreateLogger<TimedOutboxSweeper>();
private Timer _timer;

private static readonly SemaphoreSlim _semaphore = new SemaphoreSlim(1, 1);
private const string LockingResourceName = "Archiver";

public Task StartAsync(CancellationToken cancellationToken)
{
s_logger.LogInformation("Outbox Archiver Service is starting");

_timer = new Timer(async (e) => await Archive(e, cancellationToken), null, TimeSpan.Zero, TimeSpan.FromSeconds(options.TimerInterval));
_timer = new Timer(async (e) => await Archive(e, cancellationToken), null, TimeSpan.Zero,
TimeSpan.FromSeconds(options.TimerInterval));

return Task.CompletedTask;
}
Expand All @@ -44,7 +46,7 @@ public void Dispose()

private async Task Archive(object state, CancellationToken cancellationToken)
{
if (await _semaphore.WaitAsync(TimeSpan.Zero, cancellationToken))
if (await distributedLock.ObtainLockAsync(LockingResourceName, cancellationToken))
{
var scope = serviceScopeFactory.CreateScope();
s_logger.LogInformation("Outbox Archiver looking for messages to Archive");
Expand All @@ -60,7 +62,7 @@ private async Task Archive(object state, CancellationToken cancellationToken)
}
finally
{
_semaphore.Release();
await distributedLock.ReleaseLockAsync(LockingResourceName, cancellationToken);
}

s_logger.LogInformation("Outbox Sweeper sleeping");
Expand Down
65 changes: 38 additions & 27 deletions src/Paramore.Brighter.Extensions.Hosting/TimedOutboxSweeper.cs
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,18 @@ namespace Paramore.Brighter.Extensions.Hosting
public class TimedOutboxSweeper : IHostedService, IDisposable
{
private readonly IServiceScopeFactory _serviceScopeFactory;
private readonly IDistributedLock _distributedLock;
private readonly TimedOutboxSweeperOptions _options;
private static readonly ILogger s_logger= ApplicationLogging.CreateLogger<TimedOutboxSweeper>();

private Timer _timer;
//private Timer _timer;

public TimedOutboxSweeper (IServiceScopeFactory serviceScopeFactory, TimedOutboxSweeperOptions options)
private const string LockingResourceName = "OutboxSweeper";

public TimedOutboxSweeper(IServiceScopeFactory serviceScopeFactory, IDistributedLock distributedLock,
TimedOutboxSweeperOptions options)
{
_serviceScopeFactory = serviceScopeFactory;
_distributedLock = distributedLock;
_options = options;
}

Expand All @@ -42,36 +45,44 @@ public Task StartAsync(CancellationToken cancellationToken)

private void OnElapsed(object sender, ElapsedEventArgs elapsedEventArgs)
{
s_logger.LogInformation("Outbox Sweeper looking for unsent messages");

var scope = _serviceScopeFactory.CreateScope();
try
if (_distributedLock.ObtainLock(LockingResourceName))
{
IAmACommandProcessor commandProcessor = scope.ServiceProvider.GetService<IAmACommandProcessor>();
s_logger.LogInformation("Outbox Sweeper looking for unsent messages");

var outBoxSweeper = new OutboxSweeper(
millisecondsSinceSent: _options.MinimumMessageAge,
commandProcessor: commandProcessor,
_options.BatchSize,
_options.UseBulk,
_options.Args);
var scope = _serviceScopeFactory.CreateScope();
try
{
IAmACommandProcessor commandProcessor = scope.ServiceProvider.GetService<IAmACommandProcessor>();

if (_options.UseBulk)
outBoxSweeper.SweepAsyncOutbox();
else
outBoxSweeper.Sweep();
}
catch (Exception e)
{
s_logger.LogError(e, "Error while sweeping the outbox.");
throw;
var outBoxSweeper = new OutboxSweeper(
millisecondsSinceSent: _options.MinimumMessageAge,
commandProcessor: commandProcessor,
_options.BatchSize,
_options.UseBulk,
_options.Args);

if (_options.UseBulk)
outBoxSweeper.SweepAsyncOutbox();
else
outBoxSweeper.Sweep();
}
catch (Exception e)
{
s_logger.LogError(e, "Error while sweeping the outbox.");
throw;
}
finally
{
_distributedLock.ReleaseLock(LockingResourceName);
scope.Dispose();
((Timer)sender).Enabled = true;
}
}
finally
else
{
scope.Dispose();
((Timer)sender).Enabled = true;
s_logger.LogWarning("Outbox Sweeper is still running - abandoning attempt.");
}

s_logger.LogInformation("Outbox Sweeper sleeping");
}

Expand Down
5 changes: 5 additions & 0 deletions src/Paramore.Brighter/ExternalBusConfiguration.cs
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,11 @@ public class ExternalBusConfiguration : IAmExternalBusConfiguration
/// The Outbox we wish to use for messaging
/// </summary>
public IAmAnOutbox Outbox { get; set; }

/// <summary>
/// The Distributed Locking Service
/// </summary>
public IDistributedLock DistributedLock { get; set; }

/// <summary>
/// The maximum amount of messages to deposit into the outbox in one transmissions.
Expand Down
37 changes: 37 additions & 0 deletions src/Paramore.Brighter/IDistributedLock.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
using System.Threading;
using System.Threading.Tasks;

namespace Paramore.Brighter;

public interface IDistributedLock
{
/// <summary>
/// Attempt to obtain a lock on a resource
/// </summary>
/// <param name="resource">The name of the resource to Lock</param>
/// <param name="cancellationToken">The Cancellation Token</param>
/// <returns>True if the lock was obtained</returns>
Task<bool> ObtainLockAsync(string resource, CancellationToken cancellationToken);

/// <summary>
/// Attempt to obtain a lock on a resource
/// </summary>
/// <param name="resource">The name of the resource to Lock</param>
/// <returns>True if the lock was obtained</returns>
bool ObtainLock(string resource);

/// <summary>
/// Release a lock
/// </summary>
/// <param name="resource"></param>
/// <param name="cancellationToken"></param>
/// <returns>Awaitable Task</returns>
Task ReleaseLockAsync(string resource, CancellationToken cancellationToken);

/// <summary>
/// Release a lock
/// </summary>
/// <param name="resource"></param>
/// <returns>Awaitable Task</returns>
void ReleaseLock(string resource);
}
Loading

0 comments on commit 09c7da1

Please sign in to comment.