Skip to content

Commit

Permalink
Cluster singleton should consider Member AppVersion during hand over. (
Browse files Browse the repository at this point in the history
…#6065)

* Cluster singleton should consider Member AppVersion during hand over.

* Add `akka.cluster.singleton.consider-app-version` HOCON setting as opt-in flag

* Update API Verify list

* Add MemberAgeOrdering spec

* Change insertion order

* Update API Verify list

Co-authored-by: Aaron Stannard <aaron@petabridge.com>
  • Loading branch information
Arkatufus and Aaronontheweb committed Aug 18, 2022
1 parent d55f67f commit 9acaf00
Show file tree
Hide file tree
Showing 18 changed files with 406 additions and 47 deletions.
@@ -0,0 +1,135 @@
//-----------------------------------------------------------------------
// <copyright file="ClusterSingletonRestart2Spec.cs" company="Akka.NET Project">
// Copyright (C) 2009-2021 Lightbend Inc. <http://www.lightbend.com>
// Copyright (C) 2013-2021 .NET Foundation <https://github.com/akkadotnet/akka.net>
// </copyright>
//-----------------------------------------------------------------------

using System;
using System.Collections.Immutable;
using System.Linq;
using System.Threading.Tasks;
using Akka.Actor;
using Akka.Cluster.Tools.Singleton;
using Akka.Configuration;
using Akka.TestKit;
using FluentAssertions;
using Xunit;
using Xunit.Abstractions;

namespace Akka.Cluster.Tools.Tests.Singleton
{
public class ClusterSingletonRestart3Spec : AkkaSpec
{
private readonly ActorSystem _sys1;
private readonly ActorSystem _sys2;
private readonly ActorSystem _sys3;

public ClusterSingletonRestart3Spec(ITestOutputHelper output) : base(@"
akka.loglevel = DEBUG
akka.actor.provider = ""cluster""
akka.cluster.app-version = ""1.0.0""
akka.cluster.auto-down-unreachable-after = 2s
akka.cluster.singleton.min-number-of-hand-over-retries = 5
akka.cluster.singleton.consider-app-version = true
akka.remote {
dot-netty.tcp {
hostname = ""127.0.0.1""
port = 0
}
}", output)
{
_sys1 = Sys;
_sys2 = ActorSystem.Create(Sys.Name, Sys.Settings.Config);
InitializeLogger(_sys2);
_sys3 = ActorSystem.Create(Sys.Name, ConfigurationFactory.ParseString("akka.cluster.app-version = \"1.0.2\"")
.WithFallback(Sys.Settings.Config));
InitializeLogger(_sys3);
}

public void Join(ActorSystem from, ActorSystem to)
{
from.ActorOf(ClusterSingletonManager.Props(Props.Create(() => new Singleton()),
PoisonPill.Instance,
ClusterSingletonManagerSettings.Create(from)), "echo");


Within(TimeSpan.FromSeconds(45), () =>
{
AwaitAssert(() =>
{
Cluster.Get(from).Join(Cluster.Get(to).SelfAddress);
Cluster.Get(from).State.Members.Select(x => x.UniqueAddress).Should().Contain(Cluster.Get(from).SelfUniqueAddress);
Cluster.Get(from)
.State.Members.Select(x => x.Status)
.ToImmutableHashSet()
.Should()
.Equal(ImmutableHashSet<MemberStatus>.Empty.Add(MemberStatus.Up));
});
});
}

[Fact]
public void Singleton_should_consider_AppVersion_when_handing_over()
{
Join(_sys1, _sys1);
Join(_sys2, _sys1);

var proxy2 = _sys2.ActorOf(
ClusterSingletonProxy.Props("user/echo", ClusterSingletonProxySettings.Create(_sys2)), "proxy2");

Within(TimeSpan.FromSeconds(5), () =>
{
AwaitAssert(() =>
{
var probe = CreateTestProbe(_sys2);
proxy2.Tell("poke", probe.Ref);
var singleton = probe.ExpectMsg<Member>(TimeSpan.FromSeconds(1));
singleton.Should().Be(Cluster.Get(_sys1).SelfMember);
singleton.AppVersion.Version.Should().Be("1.0.0");
});
});

// A new node with higher AppVersion joins the cluster
Join(_sys3, _sys1);

// Old node with the singleton instance left the cluster
Cluster.Get(_sys1).Leave(Cluster.Get(_sys1).SelfAddress);

// let it stabilize
Task.Delay(TimeSpan.FromSeconds(5)).Wait();

Within(TimeSpan.FromSeconds(10), () =>
{
AwaitAssert(() =>
{
var probe = CreateTestProbe(_sys2);
proxy2.Tell("poke", probe.Ref);
// note that _sys3 has a higher app-version, so the singleton should start there
var singleton = probe.ExpectMsg<Member>(TimeSpan.FromSeconds(1));
singleton.Should().Be(Cluster.Get(_sys3).SelfMember);
singleton.AppVersion.Version.Should().Be("1.0.2");
});
});
}

protected override async Task AfterAllAsync()
{
await base.AfterAllAsync();
await ShutdownAsync(_sys2);
await ShutdownAsync(_sys3);
}

public class Singleton : ReceiveActor
{
public Singleton()
{
ReceiveAny(o =>
{
Sender.Tell(Cluster.Get(Context.System).SelfMember);
});
}
}
}
}
@@ -0,0 +1,89 @@
// -----------------------------------------------------------------------
// <copyright file="MemberAgeOrderingSpec.cs" company="Akka.NET Project">
// Copyright (C) 2009-2022 Lightbend Inc. <http://www.lightbend.com>
// Copyright (C) 2013-2022 .NET Foundation <https://github.com/akkadotnet/akka.net>
// </copyright>
// -----------------------------------------------------------------------

using System.Collections.Generic;
using System.Collections.Immutable;
using System.Linq;
using Akka.Actor;
using Akka.Cluster.Tools.Singleton;
using Akka.Util;
using FluentAssertions;
using Xunit;

namespace Akka.Cluster.Tools.Tests.Singleton
{
public class MemberAgeOrderingSpec
{
[Fact(DisplayName = "MemberAgeOrdering should sort based on UpNumber")]
public void SortByUpNumberTest()
{
var members = new SortedSet<Member>(MemberAgeOrdering.DescendingWithAppVersion)
{
Create(Address.Parse("akka://sys@darkstar:1112"), upNumber: 3),
Create(Address.Parse("akka://sys@darkstar:1113"), upNumber: 1),
Create(Address.Parse("akka://sys@darkstar:1111"), upNumber: 9),
};

var seq = members.ToList();
seq.Count.Should().Be(3);
seq[0].Should().Be(Create(Address.Parse("akka://sys@darkstar:1113"), upNumber: 1));
seq[1].Should().Be(Create(Address.Parse("akka://sys@darkstar:1112"), upNumber: 3));
seq[2].Should().Be(Create(Address.Parse("akka://sys@darkstar:1111"), upNumber: 9));
}

[Fact(DisplayName = "MemberAgeOrdering should sort based on Address if UpNumber is the same")]
public void SortByAddressTest()
{
var members = new SortedSet<Member>(MemberAgeOrdering.DescendingWithAppVersion)
{
Create(Address.Parse("akka://sys@darkstar:1112"), upNumber: 1),
Create(Address.Parse("akka://sys@darkstar:1113"), upNumber: 1),
Create(Address.Parse("akka://sys@darkstar:1111"), upNumber: 1),
};

var seq = members.ToList();
seq.Count.Should().Be(3);
seq[0].Should().Be(Create(Address.Parse("akka://sys@darkstar:1111"), upNumber: 1));
seq[1].Should().Be(Create(Address.Parse("akka://sys@darkstar:1112"), upNumber: 1));
seq[2].Should().Be(Create(Address.Parse("akka://sys@darkstar:1113"), upNumber: 1));
}

[Fact(DisplayName = "MemberAgeOrdering should prefer AppVersion over UpNumber")]
public void SortByAppVersionTest()
{
var members = new SortedSet<Member>(MemberAgeOrdering.DescendingWithAppVersion)
{
Create(Address.Parse("akka://sys@darkstar:1112"), upNumber: 3, appVersion: AppVersion.Create("1.0.0")),
Create(Address.Parse("akka://sys@darkstar:1113"), upNumber: 1, appVersion: AppVersion.Create("1.0.0")),
Create(Address.Parse("akka://sys@darkstar:1111"), upNumber: 2, appVersion: AppVersion.Create("1.0.0")),
Create(Address.Parse("akka://sys@darkstar:1114"), upNumber: 7, appVersion: AppVersion.Create("1.0.2")),
Create(Address.Parse("akka://sys@darkstar:1115"), upNumber: 8, appVersion: AppVersion.Create("1.0.2")),
Create(Address.Parse("akka://sys@darkstar:1116"), upNumber: 6, appVersion: AppVersion.Create("1.0.2")),
};

var seq = members.ToList();
seq.Count.Should().Be(6);
seq[0].Should().Be(Create(Address.Parse("akka://sys@darkstar:1116"), upNumber: 6, appVersion: AppVersion.Create("1.0.2")));
seq[1].Should().Be(Create(Address.Parse("akka://sys@darkstar:1114"), upNumber: 7, appVersion: AppVersion.Create("1.0.2")));
seq[2].Should().Be(Create(Address.Parse("akka://sys@darkstar:1115"), upNumber: 8, appVersion: AppVersion.Create("1.0.2")));
seq[3].Should().Be(Create(Address.Parse("akka://sys@darkstar:1113"), upNumber: 1, appVersion: AppVersion.Create("1.0.0")));
seq[4].Should().Be(Create(Address.Parse("akka://sys@darkstar:1111"), upNumber: 2, appVersion: AppVersion.Create("1.0.0")));
seq[5].Should().Be(Create(Address.Parse("akka://sys@darkstar:1112"), upNumber: 3, appVersion: AppVersion.Create("1.0.0")));
}

public static Member Create(
Address address,
MemberStatus status = MemberStatus.Up,
ImmutableHashSet<string> roles = null,
int uid = 0,
int upNumber = 0,
AppVersion appVersion = null)
{
return Member.Create(new UniqueAddress(address, uid), upNumber, status, roles ?? ImmutableHashSet<string>.Empty, appVersion ?? AppVersion.Zero);
}
}
}
Expand Up @@ -880,7 +880,9 @@ private void InitializeFSM()
{
case StartOldestChangedBuffer _:
{
_oldestChangedBuffer = Context.ActorOf(Actor.Props.Create<OldestChangedBuffer>(_settings.Role).WithDispatcher(Context.Props.Dispatcher));
_oldestChangedBuffer = Context.ActorOf(
Actor.Props.Create(() => new OldestChangedBuffer(_settings.Role, _settings.ConsiderAppVersion))
.WithDispatcher(Context.Props.Dispatcher));
GetNextOldestChanged();
return Stay();
}
Expand Down
Expand Up @@ -57,7 +57,8 @@ public static ClusterSingletonManagerSettings Create(Config config)
role: RoleOption(config.GetString("role")),
removalMargin: TimeSpan.Zero, // defaults to ClusterSettings.DownRemovalMargin
handOverRetryInterval: config.GetTimeSpan("hand-over-retry-interval"),
leaseSettings: lease);
leaseSettings: lease,
considerAppVersion: config.GetBoolean("consider-app-version"));
}

private static string RoleOption(string role)
Expand Down Expand Up @@ -91,6 +92,14 @@ private static string RoleOption(string role)
/// LeaseSettings for acquiring before creating the singleton actor
/// </summary>
public LeaseUsageSettings LeaseSettings { get; }


/// <summary>
/// Should <see cref="Member.AppVersion"/> be considered when the cluster singleton instance is being moved to another node.
/// When set to false, singleton instance will always be created on oldest member.
/// When set to true, singleton instance will be created on the oldest member with the highest <see cref="Member.AppVersion"/> number.
/// </summary>
public bool ConsiderAppVersion { get; }

/// <summary>
/// Creates a new instance of the <see cref="ClusterSingletonManagerSettings"/>.
Expand All @@ -114,9 +123,19 @@ private static string RoleOption(string role)
/// over has started or the previous oldest member is removed from the cluster
/// (+ <paramref name="removalMargin"/>).
/// </param>
/// <param name="considerAppVersion">
/// Should <see cref="Member.AppVersion"/> be considered when the cluster singleton instance is being moved to another node.
/// When set to false, singleton instance will always be created on oldest member.
/// When set to true, singleton instance will be created on the oldest member with the highest <see cref="Member.AppVersion"/> number.
/// </param>
/// <exception cref="ArgumentException">TBD</exception>
public ClusterSingletonManagerSettings(string singletonName, string role, TimeSpan removalMargin, TimeSpan handOverRetryInterval)
: this(singletonName, role, removalMargin, handOverRetryInterval, null)
public ClusterSingletonManagerSettings(
string singletonName,
string role,
TimeSpan removalMargin,
TimeSpan handOverRetryInterval,
bool considerAppVersion)
: this(singletonName, role, removalMargin, handOverRetryInterval, null, considerAppVersion)
{
}

Expand All @@ -143,8 +162,19 @@ public ClusterSingletonManagerSettings(string singletonName, string role, TimeSp
/// (+ <paramref name="removalMargin"/>).
/// </param>
/// <param name="leaseSettings">LeaseSettings for acquiring before creating the singleton actor</param>
/// <param name="considerAppVersion">
/// Should <see cref="Member.AppVersion"/> be considered when the cluster singleton instance is being moved to another node.
/// When set to false, singleton instance will always be created on oldest member.
/// When set to true, singleton instance will be created on the oldest member with the highest <see cref="Member.AppVersion"/> number.
/// </param>
/// <exception cref="ArgumentException">TBD</exception>
public ClusterSingletonManagerSettings(string singletonName, string role, TimeSpan removalMargin, TimeSpan handOverRetryInterval, LeaseUsageSettings leaseSettings)
public ClusterSingletonManagerSettings(
string singletonName,
string role,
TimeSpan removalMargin,
TimeSpan handOverRetryInterval,
LeaseUsageSettings leaseSettings,
bool considerAppVersion)
{
if (string.IsNullOrWhiteSpace(singletonName))
throw new ArgumentNullException(nameof(singletonName));
Expand All @@ -158,6 +188,7 @@ public ClusterSingletonManagerSettings(string singletonName, string role, TimeSp
RemovalMargin = removalMargin;
HandOverRetryInterval = handOverRetryInterval;
LeaseSettings = leaseSettings;
ConsiderAppVersion = considerAppVersion;
}

/// <summary>
Expand Down Expand Up @@ -210,15 +241,21 @@ public ClusterSingletonManagerSettings WithLeaseSettings(LeaseUsageSettings leas
return Copy(leaseSettings: leaseSettings);
}

private ClusterSingletonManagerSettings Copy(string singletonName = null, Option<string> role = default, TimeSpan? removalMargin = null,
TimeSpan? handOverRetryInterval = null, Option<LeaseUsageSettings> leaseSettings = default)
private ClusterSingletonManagerSettings Copy(
string singletonName = null,
Option<string> role = default,
TimeSpan? removalMargin = null,
TimeSpan? handOverRetryInterval = null,
Option<LeaseUsageSettings> leaseSettings = default,
bool? considerAppVersion = null)
{
return new ClusterSingletonManagerSettings(
singletonName: singletonName ?? SingletonName,
role: role.HasValue ? role.Value : Role,
removalMargin: removalMargin ?? RemovalMargin,
handOverRetryInterval: handOverRetryInterval ?? HandOverRetryInterval,
leaseSettings: leaseSettings.HasValue ? leaseSettings.Value : LeaseSettings
leaseSettings: leaseSettings.HasValue ? leaseSettings.Value : LeaseSettings,
considerAppVersion: considerAppVersion ?? ConsiderAppVersion
);
}
}
Expand Down
Expand Up @@ -76,6 +76,7 @@ public static Props Props(string singletonManagerPath, ClusterSingletonProxySett
.WithDeploy(Deploy.Local);
}

private readonly MemberAgeOrdering _memberAgeComparer;
private readonly ClusterSingletonProxySettings _settings;
private readonly Cluster _cluster = Cluster.Get(Context.System);
private readonly Queue<KeyValuePair<object, IActorRef>> _buffer = new Queue<KeyValuePair<object, IActorRef>>(); // queue seems to fit better
Expand All @@ -84,7 +85,7 @@ public static Props Props(string singletonManagerPath, ClusterSingletonProxySett
private string _identityId;
private IActorRef _singleton = null;
private ICancelable _identityTimer = null;
private ImmutableSortedSet<Member> _membersByAge = ImmutableSortedSet<Member>.Empty.WithComparer(MemberAgeOrdering.Descending);
private ImmutableSortedSet<Member> _membersByAge;
private ILoggingAdapter _log;

/// <summary>
Expand All @@ -98,6 +99,11 @@ public ClusterSingletonProxy(string singletonManagerPath, ClusterSingletonProxyS
_singletonPath = (singletonManagerPath + "/" + settings.SingletonName).Split('/');
_identityId = CreateIdentifyId(_identityCounter);

_memberAgeComparer = settings.ConsiderAppVersion
? MemberAgeOrdering.DescendingWithAppVersion
: MemberAgeOrdering.Descending;
_membersByAge = ImmutableSortedSet<Member>.Empty.WithComparer(_memberAgeComparer);

Receive<ClusterEvent.CurrentClusterState>(s => HandleInitial(s));
Receive<ClusterEvent.MemberUp>(m => Add(m.Member));
Receive<ClusterEvent.MemberExited>(m => Remove(m.Member));
Expand Down Expand Up @@ -197,7 +203,7 @@ private void HandleInitial(ClusterEvent.CurrentClusterState state)
TrackChanges(() =>
_membersByAge = state.Members
.Where(m => m.Status == MemberStatus.Up && MatchingRole(m))
.ToImmutableSortedSet(MemberAgeOrdering.Descending));
.ToImmutableSortedSet(_memberAgeComparer));
}

// Discard old singleton ActorRef and send a periodic message to self to identify the singleton.
Expand Down

0 comments on commit 9acaf00

Please sign in to comment.