Skip to content

Commit

Permalink
[Cluster] Enable HeartbeatResponse message type (#6063)
Browse files Browse the repository at this point in the history
* enable HeartbeatResponse message type

* Update API Verifier list

* Change default setting to opt-out, not opt-in

* Fix SerializerWithStringManifest bug

* Update API Verify list

* Fix F# serializer spec

* Revert previous manifest fix

* Revert F# serializer spec

* Fix ClusterMessageSerializerSpec

* Fix API Verify list

* Fix `Manifest()` method.

* Harden test to make sure that the new serializer works properly and legacy code still have the old behaviour

* Add documentation to the new legacy flag setting

Co-authored-by: Aaron Stannard <aaron@petabridge.com>
  • Loading branch information
Arkatufus and Aaronontheweb committed Aug 16, 2022
1 parent 899c3a2 commit d55f67f
Show file tree
Hide file tree
Showing 13 changed files with 195 additions and 63 deletions.
Expand Up @@ -221,6 +221,7 @@ namespace Akka.Cluster
public System.Nullable<System.TimeSpan> ShutdownAfterUnsuccessfulJoinSeedNodes { get; }
public System.TimeSpan UnreachableNodesReaperInterval { get; }
public string UseDispatcher { get; }
public bool UseLegacyHeartbeatMessage { get; }
public bool VerboseGossipReceivedLogging { get; }
public bool VerboseHeartbeatLogging { get; }
public System.TimeSpan WeaklyUpAfter { get; }
Expand Down
Expand Up @@ -221,6 +221,7 @@ namespace Akka.Cluster
public System.Nullable<System.TimeSpan> ShutdownAfterUnsuccessfulJoinSeedNodes { get; }
public System.TimeSpan UnreachableNodesReaperInterval { get; }
public string UseDispatcher { get; }
public bool UseLegacyHeartbeatMessage { get; }
public bool VerboseGossipReceivedLogging { get; }
public bool VerboseHeartbeatLogging { get; }
public System.TimeSpan WeaklyUpAfter { get; }
Expand Down
Expand Up @@ -221,6 +221,7 @@ namespace Akka.Cluster
public System.Nullable<System.TimeSpan> ShutdownAfterUnsuccessfulJoinSeedNodes { get; }
public System.TimeSpan UnreachableNodesReaperInterval { get; }
public string UseDispatcher { get; }
public bool UseLegacyHeartbeatMessage { get; }
public bool VerboseGossipReceivedLogging { get; }
public bool VerboseHeartbeatLogging { get; }
public System.TimeSpan WeaklyUpAfter { get; }
Expand Down
20 changes: 18 additions & 2 deletions src/core/Akka.Cluster.Tests/ClusterHeartBeatSenderStateSpec.cs
Expand Up @@ -17,12 +17,28 @@
using Akka.Util.Internal;
using Xunit;
using FluentAssertions;
using Xunit.Abstractions;

namespace Akka.Cluster.Tests
{
public class ClusterHeartBeatSenderStateSpec : ClusterSpecBase
public class ClusterHeartBeatSenderStateSpec : ClusterHeartBeatSenderStateBase
{
public ClusterHeartBeatSenderStateSpec()
public ClusterHeartBeatSenderStateSpec(ITestOutputHelper output) : base(output, false)
{
}
}

public class ClusterHeartBeatSenderStateLegacySpec : ClusterHeartBeatSenderStateBase
{
public ClusterHeartBeatSenderStateLegacySpec(ITestOutputHelper output) : base(output, true)
{
}
}

public abstract class ClusterHeartBeatSenderStateBase : ClusterSpecBase
{
protected ClusterHeartBeatSenderStateBase(ITestOutputHelper output, bool useLegacyMessage)
: base(output, useLegacyMessage)
{
_emptyState = EmptyState(aa);
}
Expand Down
25 changes: 21 additions & 4 deletions src/core/Akka.Cluster.Tests/ClusterHeartbeatReceiverSpec.cs
Expand Up @@ -15,12 +15,29 @@

namespace Akka.Cluster.Tests
{
public class ClusterHeartbeatReceiverSpec : AkkaSpec
public class ClusterHeartbeatReceiverSpec : ClusterHeartbeatReceiverBase
{
public static Config Config = @"akka.actor.provider = cluster";
public ClusterHeartbeatReceiverSpec(ITestOutputHelper output) : base(output, false)
{
}
}

public class ClusterHeartbeatReceiverLegacySpec : ClusterHeartbeatReceiverBase
{
public ClusterHeartbeatReceiverLegacySpec(ITestOutputHelper output) : base(output, true)
{
}
}

public abstract class ClusterHeartbeatReceiverBase : AkkaSpec
{
private static Config Config(bool useLegacyHeartbeat) => $@"
akka.actor.provider = cluster
akka.cluster.use-legacy-heartbeat-message = {(useLegacyHeartbeat ? "true" : "false")}
";

public ClusterHeartbeatReceiverSpec(ITestOutputHelper output)
: base(Config, output)
protected ClusterHeartbeatReceiverBase(ITestOutputHelper output, bool useLegacyHeartbeat)
: base(Config(useLegacyHeartbeat), output)
{

}
Expand Down
23 changes: 19 additions & 4 deletions src/core/Akka.Cluster.Tests/ClusterHeartbeatSenderSpec.cs
Expand Up @@ -18,7 +18,21 @@

namespace Akka.Cluster.Tests
{
public class ClusterHeartbeatSenderSpec : AkkaSpec
public class ClusterHeartbeatSenderSpec : ClusterHeartbeatSenderBase
{
public ClusterHeartbeatSenderSpec(ITestOutputHelper output) : base(output, false)
{
}
}

public class ClusterHeartbeatSenderLegacySpec : ClusterHeartbeatSenderBase
{
public ClusterHeartbeatSenderLegacySpec(ITestOutputHelper output) : base(output, true)
{
}
}

public abstract class ClusterHeartbeatSenderBase : AkkaSpec
{
class TestClusterHeartbeatSender : ClusterHeartbeatSender
{
Expand All @@ -40,14 +54,15 @@ protected override ActorSelection HeartbeatReceiver(Address address)
}
}

public static readonly Config Config = @"
private static Config Config(bool useLegacyHeartbeat) => $@"
akka.loglevel = DEBUG
akka.actor.provider = cluster
akka.cluster.failure-detector.heartbeat-interval = 0.2s
akka.cluster.use-legacy-heartbeat-message = {(useLegacyHeartbeat ? "true" : "false")}
";

public ClusterHeartbeatSenderSpec(ITestOutputHelper output)
: base(Config, output){ }
protected ClusterHeartbeatSenderBase(ITestOutputHelper output, bool useLegacyMessage)
: base(Config(useLegacyMessage), output){ }

[Fact]
public async Task ClusterHeartBeatSender_must_increment_heartbeat_SeqNo()
Expand Down
16 changes: 10 additions & 6 deletions src/core/Akka.Cluster.Tests/ClusterSpecBase.cs
Expand Up @@ -7,6 +7,7 @@

using Akka.Configuration;
using Akka.TestKit;
using Xunit.Abstractions;

namespace Akka.Cluster.Tests
{
Expand All @@ -15,20 +16,23 @@ namespace Akka.Cluster.Tests
/// </summary>
public abstract class ClusterSpecBase : AkkaSpec
{
protected ClusterSpecBase(Config config) : base(config.WithFallback(BaseConfig))
protected ClusterSpecBase(Config config, ITestOutputHelper output, bool useLegacyHeartbeat)
: base(config.WithFallback(BaseConfig(useLegacyHeartbeat)), output)
{

}

protected ClusterSpecBase()
: base(BaseConfig)
protected ClusterSpecBase(ITestOutputHelper output, bool useLegacyHeartbeat)
: base(BaseConfig(useLegacyHeartbeat), output)
{

}

protected static readonly Config BaseConfig = ConfigurationFactory.ParseString(@"
akka.actor.serialize-messages = on
akka.actor.serialize-creators = on");
private static Config BaseConfig(bool useLegacyHeartbeat) =>
ConfigurationFactory.ParseString($@"
akka.actor.serialize-messages = on
akka.actor.serialize-creators = on
akka.cluster.use-legacy-heartbeat-message = {(useLegacyHeartbeat ? "true" : "false")}");
}
}

19 changes: 17 additions & 2 deletions src/core/Akka.Cluster.Tests/HeartbeatNodeRingSpec.cs
Expand Up @@ -9,12 +9,27 @@
using Akka.Actor;
using Xunit;
using FluentAssertions;
using Xunit.Abstractions;

namespace Akka.Cluster.Tests
{
public class HeartbeatNodeRingSpec : ClusterSpecBase
public class HeartbeatNodeRingSpec : HeartbeatNodeRingBase
{
public HeartbeatNodeRingSpec()
public HeartbeatNodeRingSpec(ITestOutputHelper output) : base(output, false)
{
}
}

public class HeartbeatNodeRingLegacySpec : HeartbeatNodeRingBase
{
public HeartbeatNodeRingLegacySpec(ITestOutputHelper output) : base(output, true)
{
}
}

public abstract class HeartbeatNodeRingBase : ClusterSpecBase
{
protected HeartbeatNodeRingBase(ITestOutputHelper output, bool useLegacyMessage) : base(output, useLegacyMessage)
{
_nodes = ImmutableHashSet.Create(aa, bb, cc, dd, ee, ff);
}
Expand Down
Expand Up @@ -15,16 +15,36 @@
using Xunit;
using FluentAssertions;
using Akka.Util;
using Akka.Util.Internal;
using Google.Protobuf;
using Xunit.Abstractions;

namespace Akka.Cluster.Tests.Serialization
{
public class ClusterMessageSerializerSpec : AkkaSpec
public class ClusterMessageSerializerSpec: ClusterMessageSerializerBase
{
public ClusterMessageSerializerSpec()
: base(@"akka.actor.provider = cluster")
public ClusterMessageSerializerSpec(ITestOutputHelper output) : base(output, false)
{
}
}

public class ClusterMessageSerializerLegacySpec: ClusterMessageSerializerBase
{
public ClusterMessageSerializerLegacySpec(ITestOutputHelper output) : base(output, true)
{
}
}

public abstract class ClusterMessageSerializerBase : AkkaSpec
{
private readonly bool _useLegacyHeartbeat;
public ClusterMessageSerializerBase(ITestOutputHelper output, bool useLegacyHeartbeat)
: base($@"
akka.actor.provider = cluster
akka.cluster.use-legacy-heartbeat-message = {(useLegacyHeartbeat ? "true" : "false")}", output)
{
_useLegacyHeartbeat = useLegacyHeartbeat;
}

private static readonly Member a1 = TestMember.Create(new Address("akka.tcp", "sys", "a", 2552), MemberStatus.Joining, appVersion: AppVersion.Create("1.0.0"));
private static readonly Member b1 = TestMember.Create(new Address("akka.tcp", "sys", "b", 2552), MemberStatus.Up, ImmutableHashSet.Create("r1"), appVersion: AppVersion.Create("1.1.0"));
Expand All @@ -36,45 +56,23 @@ public ClusterMessageSerializerSpec()
public void Can_serialize_Heartbeat()
{
var address = new Address("akka.tcp", "system", "some.host.org", 4711);
var message = new ClusterHeartbeatSender.Heartbeat(address, -1, -1);
AssertEqual(message);
}

[Fact]
public void Can_serialize_Hearbeatv1419_later()
{
var hb = new Akka.Cluster.Serialization.Proto.Msg.Heartbeat()
{
From = Akka.Cluster.Serialization.ClusterMessageSerializer.AddressToProto(a1.Address),
CreationTime = 2,
SequenceNr = 1
}.ToByteArray();

var serializer = (SerializerWithStringManifest)Sys.Serialization.FindSerializerForType(typeof(ClusterHeartbeatSender.Heartbeat));
serializer.FromBinary(hb, Akka.Cluster.Serialization.ClusterMessageSerializer.HeartBeatManifest);
var legacyMessage = new ClusterHeartbeatSender.Heartbeat(address, -1, -1);
var message = new ClusterHeartbeatSender.Heartbeat(address, 10, 3);

// Legacy heartbeat serializer will replace the sequence number and creation date with -1 and -1 respectively
AssertEqual(message, _useLegacyHeartbeat ? legacyMessage : message);
}

[Fact]
public void Can_serialize_HeartbeatRsp()
{
var address = new Address("akka.tcp", "system", "some.host.org", 4711);
var uniqueAddress = new UniqueAddress(address, 17);
var message = new ClusterHeartbeatSender.HeartbeatRsp(uniqueAddress, -1, -1);
AssertEqual(message);
}

[Fact]
public void Can_serialize_HearbeatRspv1419_later()
{
var hb = new Akka.Cluster.Serialization.Proto.Msg.HeartBeatResponse()
{
From = Akka.Cluster.Serialization.ClusterMessageSerializer.UniqueAddressToProto(a1.UniqueAddress),
CreationTime = 2,
SequenceNr = 1
}.ToByteArray();

var serializer = (SerializerWithStringManifest)Sys.Serialization.FindSerializerForType(typeof(ClusterHeartbeatSender.Heartbeat));
serializer.FromBinary(hb, Akka.Cluster.Serialization.ClusterMessageSerializer.HeartBeatRspManifest);
var legacyMessage = new ClusterHeartbeatSender.HeartbeatRsp(uniqueAddress, -1, -1);
var message = new ClusterHeartbeatSender.HeartbeatRsp(uniqueAddress, 10, 3);

// Legacy heartbeat serializer will replace the sequence number and creation date with -1 and -1 respectively
AssertEqual(message, _useLegacyHeartbeat ? legacyMessage : message);
}

[Fact]
Expand Down Expand Up @@ -220,16 +218,18 @@ public void Can_serialize_ClusterRouterPoolWithEmptyRole()

private T AssertAndReturn<T>(T message)
{
var serializer = Sys.Serialization.FindSerializerFor(message);
var serialized = serializer.ToBinary(message);
var serializer = (SerializerWithStringManifest) Sys.Serialization.FindSerializerFor(message);
serializer.Should().BeOfType<ClusterMessageSerializer>();
return serializer.FromBinary<T>(serialized);

var serialized = serializer.ToBinary(message);
var manifest = serializer.Manifest(message);
return (T) serializer.FromBinary(serialized, manifest);
}

private void AssertEqual<T>(T message)
private void AssertEqual<T>(T message, T newMessage = null) where T : class
{
var deserialized = AssertAndReturn(message);
Assert.Equal(message, deserialized);
Assert.Equal(newMessage ?? message, deserialized);
}
}
}
22 changes: 21 additions & 1 deletion src/core/Akka.Cluster.Tests/SerializationChecksSpec.cs
Expand Up @@ -5,13 +5,33 @@
// </copyright>
//-----------------------------------------------------------------------

using Akka.Configuration;
using Akka.TestKit;
using Xunit;
using Xunit.Abstractions;

namespace Akka.Cluster.Tests
{
public class SerializationChecksSpec : ClusterSpecBase
public class SerializationChecksSpec : SerializationChecksBase
{
public SerializationChecksSpec(ITestOutputHelper output) : base(output, false)
{
}
}

public class SerializationChecksLegacySpec : SerializationChecksBase
{
public SerializationChecksLegacySpec(ITestOutputHelper output) : base(output, true)
{
}
}

public abstract class SerializationChecksBase : ClusterSpecBase
{
protected SerializationChecksBase(ITestOutputHelper output, bool useLegacyHeartbeat) : base(output, useLegacyHeartbeat)
{
}

[Fact]
public void Settings_serializemessages_and_serializecreators_must_be_on_for_tests()
{
Expand Down
8 changes: 8 additions & 0 deletions src/core/Akka.Cluster/ClusterSettings.cs
Expand Up @@ -118,6 +118,7 @@ TimeSpan GetWeaklyUpDuration()

WeaklyUpAfter = GetWeaklyUpDuration();

UseLegacyHeartbeatMessage = clusterConfig.GetBoolean("use-legacy-heartbeat-message", false);
}

/// <summary>
Expand Down Expand Up @@ -300,6 +301,13 @@ TimeSpan GetWeaklyUpDuration()
/// The leader will move <see cref="MemberStatus.WeaklyUp"/> members to <see cref="MemberStatus.Up"/> status once convergence has been reached.
/// </summary>
public TimeSpan WeaklyUpAfter { get; }

/// <summary>
/// Enable/disable legacy pre-1.4.19 <see cref="ClusterHeartbeatSender.Heartbeat"/> and
/// <see cref="ClusterHeartbeatSender.HeartbeatRsp"/> wire format serialization support.
/// Set this to true if you're doing a rolling update from Akka.NET version older than 1.4.19.
/// </summary>
public bool UseLegacyHeartbeatMessage { get; }
}
}

4 changes: 4 additions & 0 deletions src/core/Akka.Cluster/Configuration/Cluster.conf
Expand Up @@ -169,6 +169,10 @@ akka {
# greater than this value.
reduce-gossip-different-view-probability = 400

# Enable/disable legacy pre-1.4.19 heartbeat and heartbeat response wire format serialization support
# Set this flag to true if you're doing a rolling update from Akka.NET version older than 1.4.19.
use-legacy-heartbeat-message = false

# Settings for the Phi accrual failure detector (http://ddg.jaist.ac.jp/pub/HDY+04.pdf
# [Hayashibara et al]) used by the cluster subsystem to detect unreachable
# members.
Expand Down

0 comments on commit d55f67f

Please sign in to comment.