Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix Lease release/acquire operation logic #1289

Merged
merged 4 commits into from
Feb 8, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -40,18 +40,26 @@ public AzureLeaseSpec(ITestOutputHelper helper): base(Config(), nameof(AzureLeas
public void NonAcquiredReleaseTest()
{
var probe = CreateTestProbe();
var _ = _lease.Release().ContinueWith(r =>
var _ = _lease.Release().ContinueWith(task =>
{
r.IsFaulted.Should().BeTrue();
r.Exception.Should().NotBeNull();
var exception = r.Exception!.InnerException;
exception.Should().NotBeNull();
exception.Should().BeOfType<LeaseException>();
exception!.Message.Should().Be("Tried to release a lease that is not acquired");
probe.Tell(Done.Instance);
probe.Tell(task);
});

probe.ExpectMsg<Done>();
var task = probe.ExpectMsg<Task<bool>>();
task.IsFaulted.Should().BeFalse();
task.Exception.Should().BeNull();
task.Result.Should().BeTrue();
}

[Fact(DisplayName = "Acquire should be idempotent and returns the same task while acquire is in progress")]
public async Task MultipleAcquireTest()
{
var task1 = _lease.Acquire();
var task2 = _lease.Acquire();
var task3 = _lease.Acquire();

task1.Should().Be(task2).And.Be(task3);
(await task1).Should().BeTrue();
}

public async Task InitializeAsync()
Expand Down
85 changes: 65 additions & 20 deletions src/coordination/azure/Akka.Coordination.Azure/AzureLease.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,11 @@
// -----------------------------------------------------------------------

using System;
using System.Linq;
using System.Runtime.ExceptionServices;
using System.Text;
using System.Text.RegularExpressions;
using System.Threading;
using System.Threading.Tasks;
using Akka.Actor;
using Akka.Configuration;
Expand Down Expand Up @@ -45,6 +48,8 @@ private static string MakeDns1039Compatible(string name)
private readonly TimeSpan _timeout;
private readonly string _leaseName;
private readonly IActorRef _leaseActor;
private readonly object _acquireLock = new ();
private Task<bool>? _acquireTask;

public AzureLease(LeaseSettings settings, ExtendedActorSystem system) :
this(system, new AtomicBoolean(), settings)
Expand Down Expand Up @@ -92,12 +97,18 @@ public override async Task<bool> Release()
if(_log.IsDebugEnabled)
_log.Debug("Releasing lease");
var result = await _leaseActor.Ask(LeaseActor.Release.Instance, _timeout);
return result switch
switch (result)
{
LeaseActor.LeaseReleased _ => true,
LeaseActor.InvalidRequest req => throw new LeaseException(req.Reason),
_ => throw new LeaseException($"Unexpected response type: {result.GetType()}")
};
case LeaseActor.LeaseReleased:
return true;
case LeaseActor.InvalidReleaseRequest:
_log.Info("Tried to release a lease that is not acquired");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd mark this as DEBUG

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it needs to be INFO because if its not, we could not debug if a lease was "released" because it was actually released or because we didn't have it in the first place

return true;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Logically, since we didn't have the lease, we should return true.
This will cause the plugins to report that the lease have been successfully released, which can be confusing.
This is why the log should be emitted in info level so that we can track that the lease was "successfully released" because we didn't have it in the first place.

case Status.Failure f:
throw new LeaseException($"Failure while releasing lease: {f.Cause.Message}", f.Cause);
default:
throw new LeaseException($"Unexpected response type: {result.GetType()}");
}
}
catch (AskTimeoutException)
{
Expand All @@ -109,25 +120,59 @@ public override async Task<bool> Release()
public override Task<bool> Acquire()
=> Acquire(null);

public override async Task<bool> Acquire(Action<Exception?>? leaseLostCallback)
public override Task<bool> Acquire(Action<Exception?>? leaseLostCallback)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To make this idempotent, we could not use async...await pattern

{
try
lock (_acquireLock)
{
if (_acquireTask is not null)
{
if(_log.IsDebugEnabled)
_log.Debug("Lease is already being acquired");
return _acquireTask;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

}

if(_log.IsDebugEnabled)
_log.Debug("Acquiring lease");
var result = await _leaseActor.Ask(new LeaseActor.Acquire(leaseLostCallback), _timeout);
return result switch
{
LeaseActor.LeaseAcquired _ => true,
LeaseActor.LeaseTaken _ => false,
LeaseActor.InvalidRequest req => throw new LeaseException(req.Reason),
_ => throw new LeaseException($"Unexpected response type: {result.GetType()}")
};
}
catch (AskTimeoutException)
{
throw new LeaseTimeoutException(
$"Timed out trying to acquire lease [{_leaseName}, {_settings.OwnerName}]. It may still be taken.");
_acquireTask = _leaseActor.Ask(new LeaseActor.Acquire(leaseLostCallback), _timeout)
.ContinueWith(t =>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

{
if (t.IsFaulted)
{
if (t.Exception is { })
{
var flattened = t.Exception.Flatten();
if (flattened.InnerExceptions.Count > 0
&& flattened.InnerExceptions.Any(e => e is AskTimeoutException))
{
throw new LeaseTimeoutException(
$"Timed out trying to acquire lease [{_leaseName}, {_settings.OwnerName}]. It may still be taken.",
t.Exception);
}
}

throw new LeaseException(
$"Faulted trying to acquire lease [{_leaseName}, {_settings.OwnerName}]. It may still be taken.",
t.Exception);
}

// For completeness, we're not using cancellation token
if (t.IsCanceled)
{
throw new LeaseException(
$"Canceled while trying to acquire lease [{_leaseName}, {_settings.OwnerName}]. It may still be taken.",
t.Exception);
}

return t.Result switch
{
LeaseActor.LeaseAcquired => true,
LeaseActor.LeaseTaken => false,
Status.Failure f => throw new LeaseException($"Failure while acquiring lease: {f.Cause.Message}", f.Cause),
_ => throw new LeaseException($"Unexpected response type: {t.Result.GetType()}")
};
});

return _acquireTask;
}
}

Expand Down
14 changes: 5 additions & 9 deletions src/coordination/azure/Akka.Coordination.Azure/LeaseActor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -188,15 +188,11 @@ public sealed class LeaseReleased: IResponse, IDeadLetterSuppression
private LeaseReleased() {}
}

public sealed class InvalidRequest: IResponse, IDeadLetterSuppression
public sealed class InvalidReleaseRequest: IResponse, IDeadLetterSuppression
{
public InvalidRequest(string reason)
{
Reason = reason;
}

// ReSharper disable once MemberHidesStaticFromOuterClass
public string Reason { get; }
public static readonly InvalidReleaseRequest Instance = new ();
private InvalidReleaseRequest()
{ }
}

public static Props Props(IAzureApi client, LeaseSettings leaseSettings, string leaseName, AtomicBoolean granted)
Expand Down Expand Up @@ -481,7 +477,7 @@ public LeaseActor(IAzureApi client, LeaseSettings settings, string leaseName, At
_ownerName,
leaseName,
StateName);
Sender.Tell(new InvalidRequest("Tried to release a lease that is not acquired"));
Sender.Tell(InvalidReleaseRequest.Instance);
return Stay().Using(@event.StateData);

case Status.Failure f when @event.StateData is IReplyRequired replyRequired:
Expand Down