Skip to content

Commit

Permalink
Fix Lease release/acquire operation logic (#1289)
Browse files Browse the repository at this point in the history
* Fix lease operation logic

* Lease release op when lease not acquired is a no-op
* Lease acquire when lease was not acquired returns idempotent operation task instead of timing out

* Change release failure log severity from debug to info

* Fix unit test
  • Loading branch information
Arkatufus committed Feb 8, 2023
1 parent 90ff040 commit dcf6d81
Show file tree
Hide file tree
Showing 3 changed files with 87 additions and 38 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -40,18 +40,26 @@ public AzureLeaseSpec(ITestOutputHelper helper): base(Config(), nameof(AzureLeas
public void NonAcquiredReleaseTest()
{
var probe = CreateTestProbe();
var _ = _lease.Release().ContinueWith(r =>
var _ = _lease.Release().ContinueWith(task =>
{
r.IsFaulted.Should().BeTrue();
r.Exception.Should().NotBeNull();
var exception = r.Exception!.InnerException;
exception.Should().NotBeNull();
exception.Should().BeOfType<LeaseException>();
exception!.Message.Should().Be("Tried to release a lease that is not acquired");
probe.Tell(Done.Instance);
probe.Tell(task);
});

probe.ExpectMsg<Done>();
var task = probe.ExpectMsg<Task<bool>>();
task.IsFaulted.Should().BeFalse();
task.Exception.Should().BeNull();
task.Result.Should().BeTrue();
}

[Fact(DisplayName = "Acquire should be idempotent and returns the same task while acquire is in progress")]
public async Task MultipleAcquireTest()
{
var task1 = _lease.Acquire();
var task2 = _lease.Acquire();
var task3 = _lease.Acquire();

task1.Should().Be(task2).And.Be(task3);
(await task1).Should().BeTrue();
}

public async Task InitializeAsync()
Expand Down
85 changes: 65 additions & 20 deletions src/coordination/azure/Akka.Coordination.Azure/AzureLease.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,11 @@
// -----------------------------------------------------------------------

using System;
using System.Linq;
using System.Runtime.ExceptionServices;
using System.Text;
using System.Text.RegularExpressions;
using System.Threading;
using System.Threading.Tasks;
using Akka.Actor;
using Akka.Configuration;
Expand Down Expand Up @@ -45,6 +48,8 @@ private static string MakeDns1039Compatible(string name)
private readonly TimeSpan _timeout;
private readonly string _leaseName;
private readonly IActorRef _leaseActor;
private readonly object _acquireLock = new ();
private Task<bool>? _acquireTask;

public AzureLease(LeaseSettings settings, ExtendedActorSystem system) :
this(system, new AtomicBoolean(), settings)
Expand Down Expand Up @@ -92,12 +97,18 @@ public override async Task<bool> Release()
if(_log.IsDebugEnabled)
_log.Debug("Releasing lease");
var result = await _leaseActor.Ask(LeaseActor.Release.Instance, _timeout);
return result switch
switch (result)
{
LeaseActor.LeaseReleased _ => true,
LeaseActor.InvalidRequest req => throw new LeaseException(req.Reason),
_ => throw new LeaseException($"Unexpected response type: {result.GetType()}")
};
case LeaseActor.LeaseReleased:
return true;
case LeaseActor.InvalidReleaseRequest:
_log.Info("Tried to release a lease that is not acquired");
return true;
case Status.Failure f:
throw new LeaseException($"Failure while releasing lease: {f.Cause.Message}", f.Cause);
default:
throw new LeaseException($"Unexpected response type: {result.GetType()}");
}
}
catch (AskTimeoutException)
{
Expand All @@ -109,25 +120,59 @@ public override async Task<bool> Release()
public override Task<bool> Acquire()
=> Acquire(null);

public override async Task<bool> Acquire(Action<Exception?>? leaseLostCallback)
public override Task<bool> Acquire(Action<Exception?>? leaseLostCallback)
{
try
lock (_acquireLock)
{
if (_acquireTask is not null)
{
if(_log.IsDebugEnabled)
_log.Debug("Lease is already being acquired");
return _acquireTask;
}

if(_log.IsDebugEnabled)
_log.Debug("Acquiring lease");
var result = await _leaseActor.Ask(new LeaseActor.Acquire(leaseLostCallback), _timeout);
return result switch
{
LeaseActor.LeaseAcquired _ => true,
LeaseActor.LeaseTaken _ => false,
LeaseActor.InvalidRequest req => throw new LeaseException(req.Reason),
_ => throw new LeaseException($"Unexpected response type: {result.GetType()}")
};
}
catch (AskTimeoutException)
{
throw new LeaseTimeoutException(
$"Timed out trying to acquire lease [{_leaseName}, {_settings.OwnerName}]. It may still be taken.");
_acquireTask = _leaseActor.Ask(new LeaseActor.Acquire(leaseLostCallback), _timeout)
.ContinueWith(t =>
{
if (t.IsFaulted)
{
if (t.Exception is { })
{
var flattened = t.Exception.Flatten();
if (flattened.InnerExceptions.Count > 0
&& flattened.InnerExceptions.Any(e => e is AskTimeoutException))
{
throw new LeaseTimeoutException(
$"Timed out trying to acquire lease [{_leaseName}, {_settings.OwnerName}]. It may still be taken.",
t.Exception);
}
}
throw new LeaseException(
$"Faulted trying to acquire lease [{_leaseName}, {_settings.OwnerName}]. It may still be taken.",
t.Exception);
}
// For completeness, we're not using cancellation token
if (t.IsCanceled)
{
throw new LeaseException(
$"Canceled while trying to acquire lease [{_leaseName}, {_settings.OwnerName}]. It may still be taken.",
t.Exception);
}
return t.Result switch
{
LeaseActor.LeaseAcquired => true,
LeaseActor.LeaseTaken => false,
Status.Failure f => throw new LeaseException($"Failure while acquiring lease: {f.Cause.Message}", f.Cause),
_ => throw new LeaseException($"Unexpected response type: {t.Result.GetType()}")
};
});

return _acquireTask;
}
}

Expand Down
14 changes: 5 additions & 9 deletions src/coordination/azure/Akka.Coordination.Azure/LeaseActor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -188,15 +188,11 @@ public sealed class LeaseReleased: IResponse, IDeadLetterSuppression
private LeaseReleased() {}
}

public sealed class InvalidRequest: IResponse, IDeadLetterSuppression
public sealed class InvalidReleaseRequest: IResponse, IDeadLetterSuppression
{
public InvalidRequest(string reason)
{
Reason = reason;
}

// ReSharper disable once MemberHidesStaticFromOuterClass
public string Reason { get; }
public static readonly InvalidReleaseRequest Instance = new ();
private InvalidReleaseRequest()
{ }
}

public static Props Props(IAzureApi client, LeaseSettings leaseSettings, string leaseName, AtomicBoolean granted)
Expand Down Expand Up @@ -481,7 +477,7 @@ public LeaseActor(IAzureApi client, LeaseSettings settings, string leaseName, At
_ownerName,
leaseName,
StateName);
Sender.Tell(new InvalidRequest("Tried to release a lease that is not acquired"));
Sender.Tell(InvalidReleaseRequest.Instance);
return Stay().Using(@event.StateData);
case Status.Failure f when @event.StateData is IReplyRequired replyRequired:
Expand Down

0 comments on commit dcf6d81

Please sign in to comment.