Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Initial suggestion for Global locking for Timed Services #3092

Merged
merged 1 commit into from
May 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions Brighter.sln
Original file line number Diff line number Diff line change
Expand Up @@ -329,6 +329,8 @@ Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Paramore.Brighter.ServiceAc
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Paramore.Brighter.Sqlite.Dapper", "src\Paramore.Brighter.Sqlite.Dapper\Paramore.Brighter.Sqlite.Dapper.csproj", "{3384FBF0-5DCB-452D-8288-FAD1D0023089}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Paramore.Brighter.Locking.Azure", "Paramore.Brighter.Locking.Azure\Paramore.Brighter.Locking.Azure.csproj", "{021F3B51-A640-4C0D-9B47-FB4E32DF6715}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
Expand Down Expand Up @@ -1875,6 +1877,18 @@ Global
{3384FBF0-5DCB-452D-8288-FAD1D0023089}.Release|Mixed Platforms.Build.0 = Release|Any CPU
{3384FBF0-5DCB-452D-8288-FAD1D0023089}.Release|x86.ActiveCfg = Release|Any CPU
{3384FBF0-5DCB-452D-8288-FAD1D0023089}.Release|x86.Build.0 = Release|Any CPU
{021F3B51-A640-4C0D-9B47-FB4E32DF6715}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{021F3B51-A640-4C0D-9B47-FB4E32DF6715}.Debug|Any CPU.Build.0 = Debug|Any CPU
{021F3B51-A640-4C0D-9B47-FB4E32DF6715}.Debug|Mixed Platforms.ActiveCfg = Debug|Any CPU
{021F3B51-A640-4C0D-9B47-FB4E32DF6715}.Debug|Mixed Platforms.Build.0 = Debug|Any CPU
{021F3B51-A640-4C0D-9B47-FB4E32DF6715}.Debug|x86.ActiveCfg = Debug|Any CPU
{021F3B51-A640-4C0D-9B47-FB4E32DF6715}.Debug|x86.Build.0 = Debug|Any CPU
{021F3B51-A640-4C0D-9B47-FB4E32DF6715}.Release|Any CPU.ActiveCfg = Release|Any CPU
{021F3B51-A640-4C0D-9B47-FB4E32DF6715}.Release|Any CPU.Build.0 = Release|Any CPU
{021F3B51-A640-4C0D-9B47-FB4E32DF6715}.Release|Mixed Platforms.ActiveCfg = Release|Any CPU
{021F3B51-A640-4C0D-9B47-FB4E32DF6715}.Release|Mixed Platforms.Build.0 = Release|Any CPU
{021F3B51-A640-4C0D-9B47-FB4E32DF6715}.Release|x86.ActiveCfg = Release|Any CPU
{021F3B51-A640-4C0D-9B47-FB4E32DF6715}.Release|x86.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
Expand Down
106 changes: 106 additions & 0 deletions Paramore.Brighter.Locking.Azure/AzureBlobLockingProvider.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
using Azure;
using Azure.Storage.Blobs;
using Azure.Storage.Blobs.Specialized;
using Microsoft.Extensions.Logging;
using Paramore.Brighter.Logging;

namespace Paramore.Brighter.Locking.Azure;

public class AzureBlobLockingProvider(AzureBlobLockingProviderOptions options) : IDistributedLock
{
private readonly BlobContainerClient _containerClient = new BlobContainerClient(options.BlobContainerUri, options.TokenCredential);
private readonly ILogger _logger = ApplicationLogging.CreateLogger<AzureBlobLockingProviderOptions>();

private readonly Dictionary<string, string> _leases = new Dictionary<string, string>();

public async Task<bool> ObtainLockAsync(string resource, CancellationToken cancellationToken)
{
var client = GetBlobClient(resource);

// Write if does not exist
if (!await client.ExistsAsync(cancellationToken))
{
await using var emptyStream = new MemoryStream();
await using var writer = new StreamWriter(emptyStream);
await writer.WriteAsync(string.Empty);
await writer.FlushAsync(cancellationToken);
emptyStream.Position = 0;
await client.UploadAsync(emptyStream, cancellationToken: cancellationToken);
}

try
{
var response = await client.GetBlobLeaseClient().AcquireAsync(options.LeaseValidity, cancellationToken: cancellationToken);
_leases.Add(NormaliseResourceName(resource), response.Value.LeaseId);
return true;
}
catch (RequestFailedException e)
{
_logger.LogInformation("Could not Acquire Lease on Blob {LockResourceName}", resource);
return false;
}
}

public bool ObtainLock(string resource)
{
var client = GetBlobClient(resource);

// Write if does not exist
if (!client.Exists())
{
using var emptyStream = new MemoryStream();
using var writer = new StreamWriter(emptyStream);
writer.Write(string.Empty);
writer.Flush();
emptyStream.Position = 0;
client.Upload(emptyStream);
}

try
{
var response = client.GetBlobLeaseClient().Acquire(options.LeaseValidity);
_leases.Add(NormaliseResourceName(resource), response.Value.LeaseId);
return true;
}
catch (RequestFailedException e)
{
_logger.LogInformation("Could not Acquire Lease on Blob {LockResourceName}", resource);
return false;
}
}

public async Task ReleaseLockAsync(string resource, CancellationToken cancellationToken)
{
var client = GetBlobLeaseClientForResource(resource);
if(client == null)
return;
await client.ReleaseAsync(cancellationToken: cancellationToken);
_leases.Remove(NormaliseResourceName(resource));
}

public void ReleaseLock(string resource)
{
var client = GetBlobLeaseClientForResource(resource);
if(client == null)
return;
client.Release();
_leases.Remove(NormaliseResourceName(resource));
}

private BlobLeaseClient? GetBlobLeaseClientForResource(string resource)
{
if (_leases.ContainsKey(NormaliseResourceName(resource)))
return GetBlobClient(resource).GetBlobLeaseClient(_leases[NormaliseResourceName(resource)]);

_logger.LogInformation("No lock found for {LockResourceName}", resource);
return null;
}

private BlobClient GetBlobClient(string resource)
{
var storageLocation = options.StorageLocationFunc.Invoke(NormaliseResourceName(resource));
return _containerClient.GetBlobClient(storageLocation);
}

private static string NormaliseResourceName(string resourceName) => resourceName.ToLower();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
using Azure.Core;

namespace Paramore.Brighter.Locking.Azure;

public class AzureBlobLockingProviderOptions(
Uri blobContainerUri,
TokenCredential tokenCredential
)
{
/// <summary>
/// The URI of the blob container
/// </summary>
public Uri BlobContainerUri { get; init; } = blobContainerUri;

/// <summary>
/// The Credential to use when writing blobs
/// </summary>
public TokenCredential TokenCredential { get; init; } = tokenCredential;

/// <summary>
/// The amount of time before the lease automatically expires
/// </summary>
public TimeSpan LeaseValidity { get; init; } = TimeSpan.FromMinutes(1);

/// <summary>
/// The function to provide the location to store the locks inside of the Blob container
/// </summary>
public Func<string, string> StorageLocationFunc = (resource) => $"lock-{resource}";
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<TargetFramework>net8.0</TargetFramework>
<ImplicitUsings>enable</ImplicitUsings>
<Nullable>enable</Nullable>
<Title>This is the Azure Distributed Locking Provider.</Title>
<Authors>Paul Reardon</Authors>
</PropertyGroup>

<ItemGroup>
<ProjectReference Include="..\src\Paramore.Brighter\Paramore.Brighter.csproj" />
</ItemGroup>

<ItemGroup>
<PackageReference Include="Azure.Storage.Blobs" />
</ItemGroup>

</Project>
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,10 @@ public static IBrighterBuilder UseExternalBus(
brighterBuilder.Services.Add(asyncOutboxdescriptor);
}
}

// If no distributed locking service is added, then add the in memory variant
var distributedLock = busConfiguration.DistributedLock ?? new InMemoryLock();
brighterBuilder.Services.AddSingleton(distributedLock);

if (busConfiguration.UseRpc)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,19 +11,21 @@ namespace Paramore.Brighter.Extensions.Hosting

public class TimedOutboxArchiver(
IServiceScopeFactory serviceScopeFactory,
IDistributedLock distributedLock,
TimedOutboxArchiverOptions options)
: IHostedService, IDisposable
{
private static readonly ILogger s_logger = ApplicationLogging.CreateLogger<TimedOutboxSweeper>();
private Timer _timer;

private static readonly SemaphoreSlim _semaphore = new SemaphoreSlim(1, 1);
private const string LockingResourceName = "Archiver";

public Task StartAsync(CancellationToken cancellationToken)
{
s_logger.LogInformation("Outbox Archiver Service is starting");

_timer = new Timer(async (e) => await Archive(e, cancellationToken), null, TimeSpan.Zero, TimeSpan.FromSeconds(options.TimerInterval));
_timer = new Timer(async (e) => await Archive(e, cancellationToken), null, TimeSpan.Zero,
TimeSpan.FromSeconds(options.TimerInterval));

return Task.CompletedTask;
}
Expand All @@ -44,7 +46,7 @@ public void Dispose()

private async Task Archive(object state, CancellationToken cancellationToken)
{
if (await _semaphore.WaitAsync(TimeSpan.Zero, cancellationToken))
if (await distributedLock.ObtainLockAsync(LockingResourceName, cancellationToken))
{
var scope = serviceScopeFactory.CreateScope();
s_logger.LogInformation("Outbox Archiver looking for messages to Archive");
Expand All @@ -60,7 +62,7 @@ private async Task Archive(object state, CancellationToken cancellationToken)
}
finally
{
_semaphore.Release();
await distributedLock.ReleaseLockAsync(LockingResourceName, cancellationToken);
}

s_logger.LogInformation("Outbox Sweeper sleeping");
Expand Down
65 changes: 38 additions & 27 deletions src/Paramore.Brighter.Extensions.Hosting/TimedOutboxSweeper.cs
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,18 @@ namespace Paramore.Brighter.Extensions.Hosting
public class TimedOutboxSweeper : IHostedService, IDisposable
{
private readonly IServiceScopeFactory _serviceScopeFactory;
private readonly IDistributedLock _distributedLock;
private readonly TimedOutboxSweeperOptions _options;
private static readonly ILogger s_logger= ApplicationLogging.CreateLogger<TimedOutboxSweeper>();

private Timer _timer;
//private Timer _timer;

public TimedOutboxSweeper (IServiceScopeFactory serviceScopeFactory, TimedOutboxSweeperOptions options)
private const string LockingResourceName = "OutboxSweeper";

public TimedOutboxSweeper(IServiceScopeFactory serviceScopeFactory, IDistributedLock distributedLock,
TimedOutboxSweeperOptions options)
{
_serviceScopeFactory = serviceScopeFactory;
_distributedLock = distributedLock;
_options = options;
}

Expand All @@ -42,36 +45,44 @@ public Task StartAsync(CancellationToken cancellationToken)

private void OnElapsed(object sender, ElapsedEventArgs elapsedEventArgs)
{
s_logger.LogInformation("Outbox Sweeper looking for unsent messages");

var scope = _serviceScopeFactory.CreateScope();
try
if (_distributedLock.ObtainLock(LockingResourceName))
{
IAmACommandProcessor commandProcessor = scope.ServiceProvider.GetService<IAmACommandProcessor>();
s_logger.LogInformation("Outbox Sweeper looking for unsent messages");

var outBoxSweeper = new OutboxSweeper(
millisecondsSinceSent: _options.MinimumMessageAge,
commandProcessor: commandProcessor,
_options.BatchSize,
_options.UseBulk,
_options.Args);
var scope = _serviceScopeFactory.CreateScope();
try
{
IAmACommandProcessor commandProcessor = scope.ServiceProvider.GetService<IAmACommandProcessor>();

if (_options.UseBulk)
outBoxSweeper.SweepAsyncOutbox();
else
outBoxSweeper.Sweep();
}
catch (Exception e)
{
s_logger.LogError(e, "Error while sweeping the outbox.");
throw;
var outBoxSweeper = new OutboxSweeper(
millisecondsSinceSent: _options.MinimumMessageAge,
commandProcessor: commandProcessor,
_options.BatchSize,
_options.UseBulk,
_options.Args);

if (_options.UseBulk)
outBoxSweeper.SweepAsyncOutbox();
else
outBoxSweeper.Sweep();
}
catch (Exception e)
{
s_logger.LogError(e, "Error while sweeping the outbox.");
throw;
}
finally
{
_distributedLock.ReleaseLock(LockingResourceName);
scope.Dispose();
((Timer)sender).Enabled = true;
}
}
finally
else
{
scope.Dispose();
((Timer)sender).Enabled = true;
s_logger.LogWarning("Outbox Sweeper is still running - abandoning attempt.");
}

s_logger.LogInformation("Outbox Sweeper sleeping");
}

Expand Down
5 changes: 5 additions & 0 deletions src/Paramore.Brighter/ExternalBusConfiguration.cs
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,11 @@ public class ExternalBusConfiguration : IAmExternalBusConfiguration
/// The Outbox we wish to use for messaging
/// </summary>
public IAmAnOutbox Outbox { get; set; }

/// <summary>
/// The Distributed Locking Service
/// </summary>
public IDistributedLock DistributedLock { get; set; }

/// <summary>
/// The maximum amount of messages to deposit into the outbox in one transmissions.
Expand Down
37 changes: 37 additions & 0 deletions src/Paramore.Brighter/IDistributedLock.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
using System.Threading;
using System.Threading.Tasks;

namespace Paramore.Brighter;

public interface IDistributedLock
{
/// <summary>
/// Attempt to obtain a lock on a resource
/// </summary>
/// <param name="resource">The name of the resource to Lock</param>
/// <param name="cancellationToken">The Cancellation Token</param>
/// <returns>True if the lock was obtained</returns>
Task<bool> ObtainLockAsync(string resource, CancellationToken cancellationToken);

/// <summary>
/// Attempt to obtain a lock on a resource
/// </summary>
/// <param name="resource">The name of the resource to Lock</param>
/// <returns>True if the lock was obtained</returns>
bool ObtainLock(string resource);

/// <summary>
/// Release a lock
/// </summary>
/// <param name="resource"></param>
/// <param name="cancellationToken"></param>
/// <returns>Awaitable Task</returns>
Task ReleaseLockAsync(string resource, CancellationToken cancellationToken);

/// <summary>
/// Release a lock
/// </summary>
/// <param name="resource"></param>
/// <returns>Awaitable Task</returns>
void ReleaseLock(string resource);
}
Loading
Loading