Skip to content

Commit

Permalink
StartEntity for local
Browse files Browse the repository at this point in the history
  • Loading branch information
octonato committed Dec 9, 2021
1 parent 5ef020a commit 36d846a
Show file tree
Hide file tree
Showing 4 changed files with 53 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,26 @@ import akka.actor.InvalidMessageException
import akka.actor.WrappedMessage
import akka.util.unused

object EntityEnvelope {

final case class StartEntity(entityId: String)

/** Allows starting a specific Entity by its entity identifier */
object StartEntity {

/**
* Returns [[EntityEnvelope]] which can be sent in order to wake up the
* specified (by `entityId`) Entity, ''without'' delivering a real message to it.
*/
def apply[M](entityId: String): EntityEnvelope[M] = {
// StartEntity isn't really of type M, but erased and StartEntity is only handled internally, not delivered to the entity
new EntityEnvelope[M](entityId, new StartEntity(entityId).asInstanceOf[M])
}
}
}
final case class EntityEnvelope[M](entityId: String, message: M) extends WrappedMessage {
if (message == null) throw InvalidMessageException("[null] is not an allowed message")
}
object EntityMessageExtractor {

/**
Expand Down Expand Up @@ -67,7 +87,3 @@ abstract class NoEnvelopeMessageExtractor[M](val numberOfShards: Int) extends En

override def toString = s"NoEnvelopeMessageExtractor($numberOfShards)"
}

final case class EntityEnvelope[M](entityId: String, message: M) extends WrappedMessage {
if (message == null) throw InvalidMessageException("[null] is not an allowed message")
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,18 +8,19 @@ import java.net.URLEncoder

import scala.concurrent.Future

import akka.actor.ActorPath
import akka.actor.ActorRefProvider
import akka.actor.typed.ActorRef
import akka.actor.typed.ActorSystem
import akka.actor.typed.Entity
import akka.actor.typed.Entity.EntityCommand
import akka.actor.typed.EntityContext
import akka.actor.typed.EntityEnvelope
import akka.actor.typed.EntityRef
import akka.actor.typed.EntityTypeKey
import akka.actor.typed.internal.InternalRecipientRef
import akka.actor.typed.internal.entity.EntityProvider
import akka.cluster.sharding.ShardRegion.HashCodeMessageExtractor
import akka.cluster.sharding.ShardRegion.{ StartEntity => ClassicStartEntity }
import akka.cluster.sharding.typed.ClusterShardingSettings
import akka.cluster.sharding.typed.ShardingMessageExtractor
import akka.cluster.sharding.typed.internal.{ EntityTypeKeyImpl => ShardedEntityTypeKeyImpl }
Expand Down Expand Up @@ -106,7 +107,12 @@ private object EntityAdapter {

override def typeKey: EntityTypeKey[M] = toEntityTypeKey(delegate.typeKey)

override def tell(msg: M): Unit = delegate.tell(msg)
override def tell(msg: M): Unit =
msg match {
case EntityEnvelope.StartEntity(entityId) =>
delegate.tell(ClassicStartEntity(entityId).asInstanceOf[M])
case _ => delegate.tell(msg)
}
override def ask[Res](f: ActorRef[Res] => M)(implicit timeout: Timeout): Future[Res] =
delegate.ask(f)(timeout)

Expand All @@ -121,5 +127,10 @@ private object EntityAdapter {

override def refPrefix: String =
URLEncoder.encode(s"${typeKey.name}-$entityId", ByteString.UTF_8)

override def toString: String = delegate.toString

override def hashCode(): Int = delegate.hashCode()
override def equals(obj: Any): Boolean = delegate.equals(obj)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,6 @@ import akka.actor.typed.ActorRef
import akka.actor.typed.ActorSystem
import akka.actor.typed.Behavior
import akka.actor.typed.Entity.EntityCommand
import akka.actor.typed.{ EntityRef => VirtualEntityRef }
import akka.actor.typed.{ EntityTypeKey => VirtualEntityTypeKey }
import akka.actor.typed.Extension
import akka.actor.typed.ExtensionId
import akka.actor.typed.ExtensionSetup
Expand Down Expand Up @@ -388,7 +386,7 @@ object StartEntity {
*
* Not for user extension.
*/
@DoNotInherit trait EntityTypeKey[-T] extends VirtualEntityTypeKey[T] {
@DoNotInherit trait EntityTypeKey[-T] extends akka.actor.typed.EntityTypeKey[T] {

/**
* Name of the entity type.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -320,23 +320,23 @@ class NewEntityWithClusterShardingSpec
*/
}

// "EntityRef - AskTimeoutException" in {
// val ignorantKey = EntityTypeKey[TestProtocol]("ignorant")
//
// system.initEntity(Entity(ignorantKey)(_ => Behaviors.ignore[TestProtocol]).withStopMessage(StopPlz()))
//
// val ref = system.entityRefFor(ignorantKey, "sloppy")
//
// val reply = ref.ask(WhoAreYou(_))(Timeout(10.millis))
// val exc = reply.failed.futureValue
// exc.getClass should ===(classOf[AskTimeoutException])
// exc.getMessage should startWith("Ask timed out on")
// exc.getMessage should include(ignorantKey.toString)
// exc.getMessage should include("sloppy") // the entity id
// exc.getMessage should include(ref.toString)
// exc.getMessage should include(s"[${classOf[WhoAreYou].getName}]") // message class
// exc.getMessage should include("[10 ms]") // timeout
// }
"EntityRef - AskTimeoutException" in {
val ignorantKey = EntityTypeKey[TestProtocol]("ignorant")

system.initEntity(Entity(ignorantKey)(_ => Behaviors.ignore[TestProtocol]).withStopMessage(StopPlz()))

val ref = system.entityRefFor(ignorantKey, "sloppy")

val reply = ref.ask(WhoAreYou(_))(Timeout(10.millis))
val exc = reply.failed.futureValue
exc.getClass should ===(classOf[AskTimeoutException])
exc.getMessage should startWith("Ask timed out on")
exc.getMessage should include(ignorantKey.toString)
exc.getMessage should include("sloppy") // the entity id
exc.getMessage should include(ref.toString)
exc.getMessage should include(s"[${classOf[WhoAreYou].getName}]") // message class
exc.getMessage should include("[10 ms]") // timeout
}

"handle classic StartEntity message" in {
// it is normally using envelopes, but the classic StartEntity message can be sent internally,
Expand All @@ -355,11 +355,10 @@ class NewEntityWithClusterShardingSpec
}
}

"handle typed StartEntity message" ignore {
"handle typed StartEntity message" in {
val totalCountBefore = totalEntityCount1()

// FIXME: this one is special, we need to have a local StartEntity message and transform to the sharded one
// shardingRefSystem1WithEnvelope ! StartEntity("startEntity-2")
shardingRefSystem1WithEnvelope ! EntityEnvelope.StartEntity("startEntity-2")

eventually {
val totalCountAfter = totalEntityCount1()
Expand Down

0 comments on commit 36d846a

Please sign in to comment.