Skip to content

Commit

Permalink
ClusterSharding should clean its internal cache if region/proxy died (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
Arkatufus committed Feb 21, 2023
1 parent 46d680f commit 053199e
Show file tree
Hide file tree
Showing 3 changed files with 63 additions and 2 deletions.
26 changes: 26 additions & 0 deletions src/contrib/cluster/Akka.Cluster.Sharding.Tests/ShardRegionSpec.cs
Expand Up @@ -18,6 +18,7 @@
using Xunit;
using Xunit.Abstractions;
using static Akka.Cluster.ClusterEvent;
using static FluentAssertions.FluentActions;

namespace Akka.Cluster.Sharding.Tests
{
Expand Down Expand Up @@ -118,6 +119,31 @@ private IActorRef StartProxy(ActorSystem sys)
return ClusterSharding.Get(sys).StartProxy(shardTypeName, null, extractEntityId, extractShardId);
}

[Fact(DisplayName = "ClusterSharding must clean up its internal regions cache when ShardRegion actor died")]
public void ClusterShardingDeadShardRegionTest()
{
ClusterSharding_must_initialize_cluster_and_allocate_sharded_actors();

var sharding = ClusterSharding.Get(sysA);

IActorRef testRegion = null;
// sanity check, region should be started and registered
Invoking(() => testRegion = sharding.ShardRegion(shardTypeName))
.Should().NotThrow();
testRegion.Should().Be(region1);

sysA.Stop(region1);
// Shard should be unregistered after termination
AwaitCondition(() => !sharding.ShardTypeNames.Contains(shardTypeName));

testRegion = StartShard(sysA);
// Shard should start up and be registered again
AwaitCondition(() => sharding.ShardTypeNames.Contains(shardTypeName));

// Original actor ref should not be the same as the old one
testRegion.Should().NotBe(region1);
}

[Fact]
public void ClusterSharding_must()
{
Expand Down
3 changes: 2 additions & 1 deletion src/contrib/cluster/Akka.Cluster.Sharding/ClusterSharding.cs
Expand Up @@ -302,7 +302,8 @@ public ClusterSharding(ExtendedActorSystem system)
var guardianName = system.Settings.Config.GetString("akka.cluster.sharding.guardian-name");
var dispatcher = system.Settings.Config.GetString("akka.cluster.sharding.use-dispatcher");
if (string.IsNullOrEmpty(dispatcher)) dispatcher = Dispatchers.InternalDispatcherId;
return system.SystemActorOf(Props.Create(() => new ClusterShardingGuardian()).WithDispatcher(dispatcher), guardianName);
return system.SystemActorOf(Props.Create(() => new ClusterShardingGuardian(_regions, _proxies))
.WithDispatcher(dispatcher), guardianName);
});
}

Expand Down
Expand Up @@ -6,10 +6,12 @@
//-----------------------------------------------------------------------

using System;
using System.Collections.Concurrent;
using System.Collections.Immutable;
using Akka.Actor;
using Akka.Cluster.Sharding.Internal;
using Akka.Cluster.Tools.Singleton;
using Akka.Dispatch.SysMsg;
using Akka.DistributedData;
using Akka.Pattern;

Expand Down Expand Up @@ -169,11 +171,20 @@ public sealed class StartProxy : INoSerializationVerificationNeeded
private readonly int _majorityMinCap = Context.System.Settings.Config.GetInt("akka.cluster.sharding.distributed-data.majority-min-cap", 0);
private ImmutableDictionary<string, IActorRef> _replicatorsByRole = ImmutableDictionary<string, IActorRef>.Empty;

private readonly ConcurrentDictionary<string, IActorRef> _regions;
private readonly ConcurrentDictionary<string, IActorRef> _proxies;
private readonly ConcurrentDictionary<IActorRef, string> _typeLookup = new ConcurrentDictionary<IActorRef, string>();

/// <summary>
/// TBD
/// </summary>
public ClusterShardingGuardian()
public ClusterShardingGuardian(
ConcurrentDictionary<string, IActorRef> regions,
ConcurrentDictionary<string, IActorRef> proxies)
{
_regions = regions;
_proxies = proxies;

Receive<Start>(start =>
{
try
Expand Down Expand Up @@ -244,6 +255,9 @@ public ClusterShardingGuardian()
.WithDispatcher(Context.Props.Dispatcher), encName);
});
_regions.TryAdd(start.TypeName, shardRegion);
_typeLookup.TryAdd(shardRegion, start.TypeName);
Context.Watch(shardRegion);
Sender.Tell(new Started(shardRegion));
}
catch (Exception ex)
Expand Down Expand Up @@ -271,6 +285,9 @@ public ClusterShardingGuardian()
extractShardId: startProxy.ExtractShardId)
.WithDispatcher(Context.Props.Dispatcher), encName));
_proxies.TryAdd(startProxy.TypeName, shardRegion);
_typeLookup.TryAdd(shardRegion, startProxy.TypeName);
Context.Watch(shardRegion);
Sender.Tell(new Started(shardRegion));
}
catch (Exception ex)
Expand All @@ -281,6 +298,23 @@ public ClusterShardingGuardian()
Sender.Tell(new Status.Failure(ex));
}
});

Receive<Terminated>(msg =>
{
if (!_typeLookup.TryGetValue(msg.ActorRef, out var typeName))
return;
_typeLookup.TryRemove(msg.ActorRef, out _);
if (_regions.TryGetValue(typeName, out var regionActor) && regionActor.Equals(msg.ActorRef))
{
_regions.TryRemove(typeName, out _);
return;
}
if(_proxies.TryGetValue(typeName, out var proxyActor) && proxyActor.Equals(msg.ActorRef))
_proxies.TryRemove(typeName, out _);
});
}

private ReplicatorSettings GetReplicatorSettings(ClusterShardingSettings shardingSettings)
Expand Down

0 comments on commit 053199e

Please sign in to comment.