Skip to content

Commit

Permalink
Fix serialize-messages for Akka.Cluster and Akka.Remote (#3725)
Browse files Browse the repository at this point in the history
close #3724 - provided JSON constructor for Member

disabled serialization verification for internal `PublishChanges`
  • Loading branch information
Aaronontheweb committed Jun 4, 2019
1 parent a3963cd commit 00b73df
Show file tree
Hide file tree
Showing 3 changed files with 116 additions and 43 deletions.
53 changes: 53 additions & 0 deletions src/core/Akka.Cluster.Tests/Serialization/BugFix3724Spec.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
//-----------------------------------------------------------------------
// <copyright file="BugFix3724Spec.cs" company="Akka.NET Project">
// Copyright (C) 2009-2019 Lightbend Inc. <http://www.lightbend.com>
// Copyright (C) 2013-2019 .NET Foundation <https://github.com/akkadotnet/akka.net>
// </copyright>
//-----------------------------------------------------------------------

using System;
using Akka.Actor;
using Akka.TestKit;
using Akka.Util.Internal;
using FluentAssertions;
using Xunit;
using Xunit.Abstractions;

namespace Akka.Cluster.Tests.Serialization
{
/// <summary>
/// https://github.com/akkadotnet/akka.net/issues/3724
/// Used to validate that `akka.actor.serialize-messages = on` works while
/// using Akka.Cluster
/// </summary>
public class BugFix3724Spec : AkkaSpec
{
public BugFix3724Spec(ITestOutputHelper helper)
: base(@"akka.actor.provider = cluster
akka.actor.serialize-messages = on", helper)
{
_cluster = Cluster.Get(Sys);
_selfAddress = Sys.AsInstanceOf<ExtendedActorSystem>().Provider.DefaultAddress;
}

private readonly Address _selfAddress;
private readonly Cluster _cluster;

[Fact(DisplayName = "Should be able to use 'akka.actor.serialize-messages' while running Akka.Cluster")]
public void Should_serialize_all_AkkaCluster_messages()
{
_cluster.Subscribe(TestActor, ClusterEvent.SubscriptionInitialStateMode.InitialStateAsEvents,
typeof(ClusterEvent.MemberUp));
Within(TimeSpan.FromSeconds(10), () =>
{
EventFilter.Exception<Exception>().Expect(0, () =>
{
// wait for a singleton cluster to fully form and publish a member up event
_cluster.Join(_selfAddress);
var up = ExpectMsg<ClusterEvent.MemberUp>();
up.Member.Address.Should().Be(_selfAddress);
});
});
}
}
}
91 changes: 48 additions & 43 deletions src/core/Akka.Cluster/ClusterDaemon.cs
Original file line number Diff line number Diff line change
Expand Up @@ -134,8 +134,8 @@ internal sealed class InternalClusterAction
/// </summary>
internal sealed class Join : IClusterMessage
{
readonly UniqueAddress _node;
readonly ImmutableHashSet<string> _roles;
private readonly UniqueAddress _node;
private readonly ImmutableHashSet<string> _roles;

/// <summary>
/// TBD
Expand Down Expand Up @@ -192,8 +192,8 @@ public override string ToString()
/// </summary>
internal sealed class Welcome : IClusterMessage
{
readonly UniqueAddress _from;
readonly Gossip _gossip;
private readonly UniqueAddress _from;
private readonly Gossip _gossip;

/// <summary>
/// TBD
Expand Down Expand Up @@ -246,7 +246,7 @@ public override int GetHashCode()
/// </summary>
internal sealed class JoinSeedNodes : IDeadLetterSuppression
{
readonly ImmutableList<Address> _seedNodes;
private readonly ImmutableList<Address> _seedNodes;

/// <summary>
/// Creates a new instance of the command.
Expand Down Expand Up @@ -291,7 +291,7 @@ public override bool Equals(object obj)
/// <inheritdoc cref="JoinSeenNode"/>
internal sealed class InitJoinAck : IClusterMessage, IDeadLetterSuppression
{
readonly Address _address;
private readonly Address _address;

/// <summary>
/// TBD
Expand Down Expand Up @@ -334,7 +334,7 @@ public override int GetHashCode()
/// <inheritdoc cref="JoinSeenNode"/>
internal sealed class InitJoinNack : IClusterMessage, IDeadLetterSuppression
{
readonly Address _address;
private readonly Address _address;

/// <summary>
/// TBD
Expand Down Expand Up @@ -545,7 +545,7 @@ public static PublishStatsTick Instance
/// </summary>
internal sealed class SendGossipTo
{
readonly Address _address;
private readonly Address _address;

/// <summary>
/// TBD
Expand Down Expand Up @@ -649,9 +649,9 @@ public interface ISubscriptionMessage { }
/// </summary>
public sealed class Subscribe : ISubscriptionMessage
{
readonly IActorRef _subscriber;
readonly ClusterEvent.SubscriptionInitialStateMode _initialStateMode;
readonly ImmutableHashSet<Type> _to;
private readonly IActorRef _subscriber;
private readonly ClusterEvent.SubscriptionInitialStateMode _initialStateMode;
private readonly ImmutableHashSet<Type> _to;

/// <summary>
/// Creates a new subscription
Expand Down Expand Up @@ -697,8 +697,8 @@ public ImmutableHashSet<Type> To
/// </summary>
public sealed class Unsubscribe : ISubscriptionMessage, IDeadLetterSuppression
{
readonly IActorRef _subscriber;
readonly Type _to;
private readonly IActorRef _subscriber;
private readonly Type _to;

/// <summary>
/// TBD
Expand Down Expand Up @@ -733,7 +733,7 @@ public Type To
/// </summary>
public sealed class SendCurrentClusterState : ISubscriptionMessage
{
readonly IActorRef _receiver;
private readonly IActorRef _receiver;

/// <summary>
/// TBD
Expand All @@ -754,41 +754,46 @@ public SendCurrentClusterState(IActorRef receiver)
}

/// <summary>
/// TBD
/// INTERNAL API.
///
/// Marker interface for publication events from Akka.Cluster.
/// </summary>
interface IPublishMessage { }
/// <remarks>
/// <see cref="INoSerializationVerificationNeeded"/> is not explicitly used on the JVM,
/// but without it we run into serialization issues via https://github.com/akkadotnet/akka.net/issues/3724
/// </remarks>
private interface IPublishMessage : INoSerializationVerificationNeeded { }

/// <summary>
/// TBD
/// INTERNAL API.
///
/// Used to publish Gossip and Membership changes inside Akka.Cluster.
/// </summary>
internal sealed class PublishChanges : IPublishMessage
{
readonly Gossip _newGossip;

/// <summary>
/// TBD
/// Creates a new <see cref="PublishChanges"/> message with updated gossip.
/// </summary>
/// <param name="newGossip">TBD</param>
/// <param name="newGossip">The gossip to publish internally.</param>
internal PublishChanges(Gossip newGossip)
{
_newGossip = newGossip;
NewGossip = newGossip;
}

/// <summary>
/// TBD
/// The gossip being published.
/// </summary>
public Gossip NewGossip
{
get { return _newGossip; }
}
public Gossip NewGossip { get; }
}

/// <summary>
/// TBD
/// INTERNAL API.
///
/// Used to publish events out to the cluster.
/// </summary>
internal sealed class PublishEvent : IPublishMessage
{
readonly ClusterEvent.IClusterDomainEvent _event;
private readonly ClusterEvent.IClusterDomainEvent _event;

/// <summary>
/// TBD
Expand Down Expand Up @@ -991,15 +996,15 @@ internal static string VclockName(UniqueAddress node)

// note that self is not initially member,
// and the SendGossip is not versioned for this 'Node' yet
Gossip _latestGossip = Gossip.Empty;
private Gossip _latestGossip = Gossip.Empty;

readonly bool _statsEnabled;
private readonly bool _statsEnabled;
private GossipStats _gossipStats = new GossipStats();
private ImmutableList<Address> _seedNodes;
private IActorRef _seedNodeProcess;
private int _seedNodeProcessCounter = 0; //for unique names

readonly IActorRef _publisher;
private readonly IActorRef _publisher;
private int _leaderActionCounter = 0;
private int _selfDownCounter = 0;

Expand Down Expand Up @@ -1093,15 +1098,15 @@ private void AddCoordinatedLeave()
});
}

ActorSelection ClusterCore(Address address)
private ActorSelection ClusterCore(Address address)
{
return Context.ActorSelection(new RootActorPath(address) / "system" / "cluster" / "core" / "daemon");
}

readonly ICancelable _gossipTaskCancellable;
readonly ICancelable _failureDetectorReaperTaskCancellable;
readonly ICancelable _leaderActionsTaskCancellable;
readonly ICancelable _publishStatsTaskTaskCancellable;
private readonly ICancelable _gossipTaskCancellable;
private readonly ICancelable _failureDetectorReaperTaskCancellable;
private readonly ICancelable _leaderActionsTaskCancellable;
private readonly ICancelable _publishStatsTaskTaskCancellable;

/// <inheritdoc cref="ActorBase.PreStart"/>
protected override void PreStart()
Expand Down Expand Up @@ -2583,7 +2588,7 @@ public void PublishInternalStats()
_publisher.Tell(new ClusterEvent.CurrentInternalStats(_gossipStats, vclockStats));
}

readonly ILoggingAdapter _log = Context.GetLogger();
private readonly ILoggingAdapter _log = Context.GetLogger();
}

/// <summary>
Expand Down Expand Up @@ -2706,13 +2711,13 @@ private void Done(object message)
/// </summary>
internal sealed class FirstSeedNodeProcess : UntypedActor
{
readonly ILoggingAdapter _log = Context.GetLogger();
private readonly ILoggingAdapter _log = Context.GetLogger();

private ImmutableList<Address> _remainingSeeds;
readonly Address _selfAddress;
readonly Cluster _cluster;
readonly Deadline _timeout;
readonly ICancelable _retryTaskToken;
private readonly Address _selfAddress;
private readonly Cluster _cluster;
private readonly Deadline _timeout;
private readonly ICancelable _retryTaskToken;

/// <summary>
/// TBD
Expand Down
15 changes: 15 additions & 0 deletions src/core/Akka.Cluster/Member.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
using System.Linq;
using Akka.Actor;
using Akka.Util.Internal;
using Newtonsoft.Json;

namespace Akka.Cluster
{
Expand Down Expand Up @@ -92,6 +93,20 @@ internal Member(UniqueAddress uniqueAddress, int upNumber, MemberStatus status,
Roles = roles;
}

/// <summary>
/// Used when `akka.actor.serialize-messages = on`.
/// </summary>
/// <param name="uniqueAddress">The address of the member.</param>
/// <param name="upNumber">The upNumber of the member, as assigned by the leader at the time the node joined the cluster.</param>
/// <param name="status">The status of this member.</param>
/// <param name="roles">The roles for this member. Can be empty.</param>
[JsonConstructor]
internal Member(UniqueAddress uniqueAddress, int upNumber, MemberStatus status, IEnumerable<string> roles)
: this(uniqueAddress, upNumber, status, roles.ToImmutableHashSet())
{

}

/// <summary>
/// The <see cref="Address"/> for this member.
/// </summary>
Expand Down

0 comments on commit 00b73df

Please sign in to comment.