diff --git a/src/contrib/cluster/Akka.Cluster.Sharding.Tests/Akka.Cluster.Sharding.Tests.csproj b/src/contrib/cluster/Akka.Cluster.Sharding.Tests/Akka.Cluster.Sharding.Tests.csproj index 2976dd0c896..3dec67c8537 100644 --- a/src/contrib/cluster/Akka.Cluster.Sharding.Tests/Akka.Cluster.Sharding.Tests.csproj +++ b/src/contrib/cluster/Akka.Cluster.Sharding.Tests/Akka.Cluster.Sharding.Tests.csproj @@ -5,6 +5,7 @@ Akka.Cluster.Sharding.Tests $(NetFrameworkTestVersion);$(NetTestVersion);$(NetCoreTestVersion) + 9.0 diff --git a/src/contrib/cluster/Akka.Cluster.Sharding.Tests/ShardRegionQueriesHashCodeSpecs.cs b/src/contrib/cluster/Akka.Cluster.Sharding.Tests/ShardRegionQueriesHashCodeSpecs.cs new file mode 100644 index 00000000000..aa83a9bd30c --- /dev/null +++ b/src/contrib/cluster/Akka.Cluster.Sharding.Tests/ShardRegionQueriesHashCodeSpecs.cs @@ -0,0 +1,166 @@ +//----------------------------------------------------------------------- +// +// Copyright (C) 2009-2022 Lightbend Inc. +// Copyright (C) 2013-2022 .NET Foundation +// +//----------------------------------------------------------------------- + +using System; +using System.Linq; +using System.Threading.Tasks; +using Akka.Actor; +using Akka.Cluster.Tools.Singleton; +using Akka.Configuration; +using Akka.TestKit; +using Akka.TestKit.TestActors; +using Akka.Util; +using Xunit; +using Xunit.Abstractions; +using FluentAssertions; + +namespace Akka.Cluster.Sharding.Tests +{ + public class ShardRegionQueriesHashCodeSpecs : AkkaSpec + { + private class MessageExtractor: HashCodeMessageExtractor + { + public MessageExtractor() : base(10) + { } + + public override string EntityId(object message) + { + return message switch + { + int i => i.ToString(), + _ => throw new NotSupportedException() + }; + } + } + + private readonly Cluster _cluster; + private readonly ClusterSharding _clusterSharding; + private readonly IActorRef _shardRegion; + + private readonly ActorSystem _proxySys; + + public ShardRegionQueriesHashCodeSpecs(ITestOutputHelper outputHelper) : base(GetConfig(), outputHelper) + { + _clusterSharding = ClusterSharding.Get(Sys); + _cluster = Cluster.Get(Sys); + _shardRegion = _clusterSharding.Start("entity", s => EchoActor.Props(this, true), + ClusterShardingSettings.Create(Sys).WithRole("shard"), new MessageExtractor()); + + var proxySysConfig = ConfigurationFactory.ParseString("akka.cluster.roles = [proxy]") + .WithFallback(Sys.Settings.Config); + _proxySys = ActorSystem.Create(Sys.Name, proxySysConfig); + + _cluster.Join(_cluster.SelfAddress); + AwaitAssert(() => { _cluster.SelfMember.Status.ShouldBe(MemberStatus.Up); }); + + // form a 2-node cluster + var proxyCluster = Cluster.Get(_proxySys); + proxyCluster.Join(_cluster.SelfAddress); + AwaitAssert(() => { proxyCluster.SelfMember.Status.ShouldBe(MemberStatus.Up); }); + } + + protected override async Task AfterAllAsync() + { + await ShutdownAsync(_proxySys); + await base.AfterAllAsync(); + } + + private static Config GetConfig() + { + return ConfigurationFactory.ParseString(@" + akka.loglevel = WARNING + akka.actor.provider = cluster + akka.remote.dot-netty.tcp.port = 0 + akka.cluster.roles = [shard]") + .WithFallback(Sharding.ClusterSharding.DefaultConfig()) + .WithFallback(DistributedData.DistributedData.DefaultConfig()) + .WithFallback(ClusterSingletonManager.DefaultConfig()); + } + + [Fact] + public async Task ShardRegion_GetEntityLocation_test() + { + // creates an entity with entityId="1" + await _shardRegion.Ask(1, TimeSpan.FromSeconds(3)); + + // determine where entity with "entityId=1" is located in cluster + var q1 = await _shardRegion.Ask(new GetEntityLocation("1", TimeSpan.FromSeconds(1))); + + q1.EntityId.Should().Be("1"); + + // have a valid ShardId + q1.ShardId.Should().NotBeEmpty(); + + // have valid address for node that will / would host entity + q1.ShardRegion.Should().NotBe(Address.AllSystems); // has real address + + // if entity actor is alive, will retrieve a reference to it + q1.EntityRef.HasValue.Should().BeTrue(); + } + + [Fact(DisplayName = "ShardRegion should support GetEntityLocation queries locally")] + public async Task ShardRegion_should_support_GetEntityLocation_query_locally() + { + // arrange + await _shardRegion.Ask(1, TimeSpan.FromSeconds(3)); + await _shardRegion.Ask(2, TimeSpan.FromSeconds(3)); + + // act + var q1 = await _shardRegion.Ask(new GetEntityLocation("1", TimeSpan.FromSeconds(1))); + var q2 = await _shardRegion.Ask(new GetEntityLocation("2", TimeSpan.FromSeconds(1))); + var q3 = await _shardRegion.Ask(new GetEntityLocation("3", TimeSpan.FromSeconds(1))); + + // assert + void AssertValidEntityLocation(EntityLocation e, string entityId) + { + e.EntityId.Should().Be(entityId); + e.EntityRef.Should().NotBe(Option.None); + e.ShardId.Should().NotBeNullOrEmpty(); + e.ShardRegion.Should().Be(_cluster.SelfAddress); + } + + AssertValidEntityLocation(q1, "1"); + AssertValidEntityLocation(q2, "2"); + + q3.EntityRef.Should().Be(Option.None); + q3.ShardId.Should().NotBeNullOrEmpty(); // should still have computed a valid shard? + q3.ShardRegion.Should().Be(Address.AllSystems); + } + + [Fact(DisplayName = "ShardRegion should support GetEntityLocation queries remotely")] + public async Task ShardRegion_should_support_GetEntityLocation_query_remotely() + { + // arrange + var sharding2 = ClusterSharding.Get(_proxySys); + var shardRegionProxy = await sharding2.StartProxyAsync("entity", "shard", new MessageExtractor()); + + await shardRegionProxy.Ask(1, TimeSpan.FromSeconds(3)); + await shardRegionProxy.Ask(2, TimeSpan.FromSeconds(3)); + + // act + var q1 = await shardRegionProxy.Ask(new GetEntityLocation("1", TimeSpan.FromSeconds(1))); + var q2 = await shardRegionProxy.Ask(new GetEntityLocation("2", TimeSpan.FromSeconds(1))); + var q3 = await shardRegionProxy.Ask(new GetEntityLocation("3", TimeSpan.FromSeconds(1))); + + // assert + void AssertValidEntityLocation(EntityLocation e, string entityId) + { + e.EntityId.Should().Be(entityId); + e.EntityRef.Should().NotBe(Option.None); + e.ShardId.Should().NotBeNullOrEmpty(); + e.ShardRegion.Should().Be(_cluster.SelfAddress); + } + + AssertValidEntityLocation(q1, "1"); + AssertValidEntityLocation(q2, "2"); + + q3.EntityRef.Should().Be(Option.None); + q3.ShardId.Should().NotBeNullOrEmpty(); // should still have computed a valid shard? + q3.ShardRegion.Should().Be(Address.AllSystems); + } + } +} \ No newline at end of file diff --git a/src/contrib/cluster/Akka.Cluster.Sharding.Tests/ShardRegionQueriesSpecs.cs b/src/contrib/cluster/Akka.Cluster.Sharding.Tests/ShardRegionQueriesSpecs.cs index 89ad40adab0..17c1abf7c1b 100644 --- a/src/contrib/cluster/Akka.Cluster.Sharding.Tests/ShardRegionQueriesSpecs.cs +++ b/src/contrib/cluster/Akka.Cluster.Sharding.Tests/ShardRegionQueriesSpecs.cs @@ -70,11 +70,11 @@ private string ExtractShardId(object message) switch (message) { case int i: - return (i % 10).ToString(); + return (i % 10 + 1).ToString(); // must support ShardRegion.StartEntity in order for // GetEntityLocation to work properly case ShardRegion.StartEntity se: - return se.EntityId; + return (int.Parse(se.EntityId) % 10 + 1).ToString(); } throw new NotSupportedException(); diff --git a/src/contrib/cluster/Akka.Cluster.Sharding/ShardRegion.cs b/src/contrib/cluster/Akka.Cluster.Sharding/ShardRegion.cs index 6b2bdea2d08..b10b88f75f9 100644 --- a/src/contrib/cluster/Akka.Cluster.Sharding/ShardRegion.cs +++ b/src/contrib/cluster/Akka.Cluster.Sharding/ShardRegion.cs @@ -948,6 +948,7 @@ Address GetNodeAddress(IActorRef shardOrRegionRef) try { + var entityId = getEntityLocation.EntityId; var shardId = _extractShardId(new StartEntity(getEntityLocation.EntityId)); if (string.IsNullOrEmpty(shardId)) { @@ -989,7 +990,7 @@ async Task ResolveEntityRef(Address destinationAddress, ActorPath entityPath) // NOTE: in the event that we're querying a shard's location from a ShardRegionProxy // the shard may not be technically "homed" inside the proxy, but it does exist. #pragma warning disable CS4014 - ResolveEntityRef(GetNodeAddress(shardRegionRef), shardRegionRef.Path / shardId / shardId); // needs to run as a detached task + ResolveEntityRef(GetNodeAddress(shardRegionRef), shardRegionRef.Path / shardId / entityId); // needs to run as a detached task #pragma warning restore CS4014 } @@ -997,7 +998,7 @@ async Task ResolveEntityRef(Address destinationAddress, ActorPath entityPath) } #pragma warning disable CS4014 - ResolveEntityRef(GetNodeAddress(shardActorRef), shardActorRef.Path / shardId); // needs to run as a detached task + ResolveEntityRef(GetNodeAddress(shardActorRef), shardActorRef.Path / entityId); // needs to run as a detached task #pragma warning restore CS4014 } catch (Exception ex)