From 053199edd6d3045605a7010e6ea0454073840286 Mon Sep 17 00:00:00 2001 From: Gregorius Soedharmo Date: Wed, 22 Feb 2023 04:14:05 +0700 Subject: [PATCH] ClusterSharding should clean its internal cache if region/proxy died (#6424) --- .../ShardRegionSpec.cs | 26 ++++++++++++++ .../Akka.Cluster.Sharding/ClusterSharding.cs | 3 +- .../ClusterShardingGuardian.cs | 36 ++++++++++++++++++- 3 files changed, 63 insertions(+), 2 deletions(-) diff --git a/src/contrib/cluster/Akka.Cluster.Sharding.Tests/ShardRegionSpec.cs b/src/contrib/cluster/Akka.Cluster.Sharding.Tests/ShardRegionSpec.cs index 66d337ae23b..924c56f0b98 100644 --- a/src/contrib/cluster/Akka.Cluster.Sharding.Tests/ShardRegionSpec.cs +++ b/src/contrib/cluster/Akka.Cluster.Sharding.Tests/ShardRegionSpec.cs @@ -18,6 +18,7 @@ using Xunit; using Xunit.Abstractions; using static Akka.Cluster.ClusterEvent; +using static FluentAssertions.FluentActions; namespace Akka.Cluster.Sharding.Tests { @@ -118,6 +119,31 @@ private IActorRef StartProxy(ActorSystem sys) return ClusterSharding.Get(sys).StartProxy(shardTypeName, null, extractEntityId, extractShardId); } + [Fact(DisplayName = "ClusterSharding must clean up its internal regions cache when ShardRegion actor died")] + public void ClusterShardingDeadShardRegionTest() + { + ClusterSharding_must_initialize_cluster_and_allocate_sharded_actors(); + + var sharding = ClusterSharding.Get(sysA); + + IActorRef testRegion = null; + // sanity check, region should be started and registered + Invoking(() => testRegion = sharding.ShardRegion(shardTypeName)) + .Should().NotThrow(); + testRegion.Should().Be(region1); + + sysA.Stop(region1); + // Shard should be unregistered after termination + AwaitCondition(() => !sharding.ShardTypeNames.Contains(shardTypeName)); + + testRegion = StartShard(sysA); + // Shard should start up and be registered again + AwaitCondition(() => sharding.ShardTypeNames.Contains(shardTypeName)); + + // Original actor ref should not be the same as the old one + testRegion.Should().NotBe(region1); + } + [Fact] public void ClusterSharding_must() { diff --git a/src/contrib/cluster/Akka.Cluster.Sharding/ClusterSharding.cs b/src/contrib/cluster/Akka.Cluster.Sharding/ClusterSharding.cs index 15ecdfdc69c..d75262cdba9 100644 --- a/src/contrib/cluster/Akka.Cluster.Sharding/ClusterSharding.cs +++ b/src/contrib/cluster/Akka.Cluster.Sharding/ClusterSharding.cs @@ -302,7 +302,8 @@ public ClusterSharding(ExtendedActorSystem system) var guardianName = system.Settings.Config.GetString("akka.cluster.sharding.guardian-name"); var dispatcher = system.Settings.Config.GetString("akka.cluster.sharding.use-dispatcher"); if (string.IsNullOrEmpty(dispatcher)) dispatcher = Dispatchers.InternalDispatcherId; - return system.SystemActorOf(Props.Create(() => new ClusterShardingGuardian()).WithDispatcher(dispatcher), guardianName); + return system.SystemActorOf(Props.Create(() => new ClusterShardingGuardian(_regions, _proxies)) + .WithDispatcher(dispatcher), guardianName); }); } diff --git a/src/contrib/cluster/Akka.Cluster.Sharding/ClusterShardingGuardian.cs b/src/contrib/cluster/Akka.Cluster.Sharding/ClusterShardingGuardian.cs index 8822edf20fc..53dddb06ed0 100644 --- a/src/contrib/cluster/Akka.Cluster.Sharding/ClusterShardingGuardian.cs +++ b/src/contrib/cluster/Akka.Cluster.Sharding/ClusterShardingGuardian.cs @@ -6,10 +6,12 @@ //----------------------------------------------------------------------- using System; +using System.Collections.Concurrent; using System.Collections.Immutable; using Akka.Actor; using Akka.Cluster.Sharding.Internal; using Akka.Cluster.Tools.Singleton; +using Akka.Dispatch.SysMsg; using Akka.DistributedData; using Akka.Pattern; @@ -169,11 +171,20 @@ public sealed class StartProxy : INoSerializationVerificationNeeded private readonly int _majorityMinCap = Context.System.Settings.Config.GetInt("akka.cluster.sharding.distributed-data.majority-min-cap", 0); private ImmutableDictionary _replicatorsByRole = ImmutableDictionary.Empty; + private readonly ConcurrentDictionary _regions; + private readonly ConcurrentDictionary _proxies; + private readonly ConcurrentDictionary _typeLookup = new ConcurrentDictionary(); + /// /// TBD /// - public ClusterShardingGuardian() + public ClusterShardingGuardian( + ConcurrentDictionary regions, + ConcurrentDictionary proxies) { + _regions = regions; + _proxies = proxies; + Receive(start => { try @@ -244,6 +255,9 @@ public ClusterShardingGuardian() .WithDispatcher(Context.Props.Dispatcher), encName); }); + _regions.TryAdd(start.TypeName, shardRegion); + _typeLookup.TryAdd(shardRegion, start.TypeName); + Context.Watch(shardRegion); Sender.Tell(new Started(shardRegion)); } catch (Exception ex) @@ -271,6 +285,9 @@ public ClusterShardingGuardian() extractShardId: startProxy.ExtractShardId) .WithDispatcher(Context.Props.Dispatcher), encName)); + _proxies.TryAdd(startProxy.TypeName, shardRegion); + _typeLookup.TryAdd(shardRegion, startProxy.TypeName); + Context.Watch(shardRegion); Sender.Tell(new Started(shardRegion)); } catch (Exception ex) @@ -281,6 +298,23 @@ public ClusterShardingGuardian() Sender.Tell(new Status.Failure(ex)); } }); + + Receive(msg => + { + if (!_typeLookup.TryGetValue(msg.ActorRef, out var typeName)) + return; + + _typeLookup.TryRemove(msg.ActorRef, out _); + + if (_regions.TryGetValue(typeName, out var regionActor) && regionActor.Equals(msg.ActorRef)) + { + _regions.TryRemove(typeName, out _); + return; + } + + if(_proxies.TryGetValue(typeName, out var proxyActor) && proxyActor.Equals(msg.ActorRef)) + _proxies.TryRemove(typeName, out _); + }); } private ReplicatorSettings GetReplicatorSettings(ClusterShardingSettings shardingSettings)