From 36d846a1f9872e3f473d71e16e3d509dd37b577f Mon Sep 17 00:00:00 2001 From: Renato Cavalcanti Date: Thu, 9 Dec 2021 21:31:55 +0100 Subject: [PATCH] StartEntity for local --- ...geExtractor.scala => EntityEnvelope.scala} | 24 ++++++++++-- .../ClusterShardingEntityProvider.scala | 15 ++++++- .../typed/scaladsl/ClusterSharding.scala | 4 +- .../NewEntityWithClusterShardingSpec.scala | 39 +++++++++---------- 4 files changed, 53 insertions(+), 29 deletions(-) rename akka-actor-typed/src/main/scala/akka/actor/typed/{EntityMessageExtractor.scala => EntityEnvelope.scala} (80%) diff --git a/akka-actor-typed/src/main/scala/akka/actor/typed/EntityMessageExtractor.scala b/akka-actor-typed/src/main/scala/akka/actor/typed/EntityEnvelope.scala similarity index 80% rename from akka-actor-typed/src/main/scala/akka/actor/typed/EntityMessageExtractor.scala rename to akka-actor-typed/src/main/scala/akka/actor/typed/EntityEnvelope.scala index 14070111a7c3..920a402c13e5 100644 --- a/akka-actor-typed/src/main/scala/akka/actor/typed/EntityMessageExtractor.scala +++ b/akka-actor-typed/src/main/scala/akka/actor/typed/EntityEnvelope.scala @@ -8,6 +8,26 @@ import akka.actor.InvalidMessageException import akka.actor.WrappedMessage import akka.util.unused +object EntityEnvelope { + + final case class StartEntity(entityId: String) + + /** Allows starting a specific Entity by its entity identifier */ + object StartEntity { + + /** + * Returns [[EntityEnvelope]] which can be sent in order to wake up the + * specified (by `entityId`) Entity, ''without'' delivering a real message to it. + */ + def apply[M](entityId: String): EntityEnvelope[M] = { + // StartEntity isn't really of type M, but erased and StartEntity is only handled internally, not delivered to the entity + new EntityEnvelope[M](entityId, new StartEntity(entityId).asInstanceOf[M]) + } + } +} +final case class EntityEnvelope[M](entityId: String, message: M) extends WrappedMessage { + if (message == null) throw InvalidMessageException("[null] is not an allowed message") +} object EntityMessageExtractor { /** @@ -67,7 +87,3 @@ abstract class NoEnvelopeMessageExtractor[M](val numberOfShards: Int) extends En override def toString = s"NoEnvelopeMessageExtractor($numberOfShards)" } - -final case class EntityEnvelope[M](entityId: String, message: M) extends WrappedMessage { - if (message == null) throw InvalidMessageException("[null] is not an allowed message") -} diff --git a/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/internal/ClusterShardingEntityProvider.scala b/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/internal/ClusterShardingEntityProvider.scala index 25bd64de79e4..17a50b82a254 100644 --- a/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/internal/ClusterShardingEntityProvider.scala +++ b/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/internal/ClusterShardingEntityProvider.scala @@ -8,18 +8,19 @@ import java.net.URLEncoder import scala.concurrent.Future -import akka.actor.ActorPath import akka.actor.ActorRefProvider import akka.actor.typed.ActorRef import akka.actor.typed.ActorSystem import akka.actor.typed.Entity import akka.actor.typed.Entity.EntityCommand import akka.actor.typed.EntityContext +import akka.actor.typed.EntityEnvelope import akka.actor.typed.EntityRef import akka.actor.typed.EntityTypeKey import akka.actor.typed.internal.InternalRecipientRef import akka.actor.typed.internal.entity.EntityProvider import akka.cluster.sharding.ShardRegion.HashCodeMessageExtractor +import akka.cluster.sharding.ShardRegion.{ StartEntity => ClassicStartEntity } import akka.cluster.sharding.typed.ClusterShardingSettings import akka.cluster.sharding.typed.ShardingMessageExtractor import akka.cluster.sharding.typed.internal.{ EntityTypeKeyImpl => ShardedEntityTypeKeyImpl } @@ -106,7 +107,12 @@ private object EntityAdapter { override def typeKey: EntityTypeKey[M] = toEntityTypeKey(delegate.typeKey) - override def tell(msg: M): Unit = delegate.tell(msg) + override def tell(msg: M): Unit = + msg match { + case EntityEnvelope.StartEntity(entityId) => + delegate.tell(ClassicStartEntity(entityId).asInstanceOf[M]) + case _ => delegate.tell(msg) + } override def ask[Res](f: ActorRef[Res] => M)(implicit timeout: Timeout): Future[Res] = delegate.ask(f)(timeout) @@ -121,5 +127,10 @@ private object EntityAdapter { override def refPrefix: String = URLEncoder.encode(s"${typeKey.name}-$entityId", ByteString.UTF_8) + + override def toString: String = delegate.toString + + override def hashCode(): Int = delegate.hashCode() + override def equals(obj: Any): Boolean = delegate.equals(obj) } } diff --git a/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/scaladsl/ClusterSharding.scala b/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/scaladsl/ClusterSharding.scala index d569b98c7a02..0f844b438421 100644 --- a/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/scaladsl/ClusterSharding.scala +++ b/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/scaladsl/ClusterSharding.scala @@ -12,8 +12,6 @@ import akka.actor.typed.ActorRef import akka.actor.typed.ActorSystem import akka.actor.typed.Behavior import akka.actor.typed.Entity.EntityCommand -import akka.actor.typed.{ EntityRef => VirtualEntityRef } -import akka.actor.typed.{ EntityTypeKey => VirtualEntityTypeKey } import akka.actor.typed.Extension import akka.actor.typed.ExtensionId import akka.actor.typed.ExtensionSetup @@ -388,7 +386,7 @@ object StartEntity { * * Not for user extension. */ -@DoNotInherit trait EntityTypeKey[-T] extends VirtualEntityTypeKey[T] { +@DoNotInherit trait EntityTypeKey[-T] extends akka.actor.typed.EntityTypeKey[T] { /** * Name of the entity type. diff --git a/akka-cluster-sharding-typed/src/test/scala/akka/cluster/sharding/typed/scaladsl/NewEntityWithClusterShardingSpec.scala b/akka-cluster-sharding-typed/src/test/scala/akka/cluster/sharding/typed/scaladsl/NewEntityWithClusterShardingSpec.scala index a802a57d17fb..4b59385af045 100644 --- a/akka-cluster-sharding-typed/src/test/scala/akka/cluster/sharding/typed/scaladsl/NewEntityWithClusterShardingSpec.scala +++ b/akka-cluster-sharding-typed/src/test/scala/akka/cluster/sharding/typed/scaladsl/NewEntityWithClusterShardingSpec.scala @@ -320,23 +320,23 @@ class NewEntityWithClusterShardingSpec */ } -// "EntityRef - AskTimeoutException" in { -// val ignorantKey = EntityTypeKey[TestProtocol]("ignorant") -// -// system.initEntity(Entity(ignorantKey)(_ => Behaviors.ignore[TestProtocol]).withStopMessage(StopPlz())) -// -// val ref = system.entityRefFor(ignorantKey, "sloppy") -// -// val reply = ref.ask(WhoAreYou(_))(Timeout(10.millis)) -// val exc = reply.failed.futureValue -// exc.getClass should ===(classOf[AskTimeoutException]) -// exc.getMessage should startWith("Ask timed out on") -// exc.getMessage should include(ignorantKey.toString) -// exc.getMessage should include("sloppy") // the entity id -// exc.getMessage should include(ref.toString) -// exc.getMessage should include(s"[${classOf[WhoAreYou].getName}]") // message class -// exc.getMessage should include("[10 ms]") // timeout -// } + "EntityRef - AskTimeoutException" in { + val ignorantKey = EntityTypeKey[TestProtocol]("ignorant") + + system.initEntity(Entity(ignorantKey)(_ => Behaviors.ignore[TestProtocol]).withStopMessage(StopPlz())) + + val ref = system.entityRefFor(ignorantKey, "sloppy") + + val reply = ref.ask(WhoAreYou(_))(Timeout(10.millis)) + val exc = reply.failed.futureValue + exc.getClass should ===(classOf[AskTimeoutException]) + exc.getMessage should startWith("Ask timed out on") + exc.getMessage should include(ignorantKey.toString) + exc.getMessage should include("sloppy") // the entity id + exc.getMessage should include(ref.toString) + exc.getMessage should include(s"[${classOf[WhoAreYou].getName}]") // message class + exc.getMessage should include("[10 ms]") // timeout + } "handle classic StartEntity message" in { // it is normally using envelopes, but the classic StartEntity message can be sent internally, @@ -355,11 +355,10 @@ class NewEntityWithClusterShardingSpec } } - "handle typed StartEntity message" ignore { + "handle typed StartEntity message" in { val totalCountBefore = totalEntityCount1() - // FIXME: this one is special, we need to have a local StartEntity message and transform to the sharded one - // shardingRefSystem1WithEnvelope ! StartEntity("startEntity-2") + shardingRefSystem1WithEnvelope ! EntityEnvelope.StartEntity("startEntity-2") eventually { val totalCountAfter = totalEntityCount1()