Skip to content

Commit

Permalink
Cluster aware routers for typed #26355
Browse files Browse the repository at this point in the history
  • Loading branch information
johanandren committed Jul 15, 2019
1 parent d2f5d2d commit f0e42d2
Show file tree
Hide file tree
Showing 12 changed files with 447 additions and 57 deletions.
Expand Up @@ -69,16 +69,15 @@ object ReceptionistApiSpec {
// to cover as much of the API as possible
context.system.receptionist ! Receptionist.Register(key, context.self.narrow, context.self.narrow)

Behaviors.receive { (context, message) =>
message match {
case key.Listing(services) =>
services.foreach(_ ! "woho")
Behaviors.same
case key.Registered(service) => // ack on Register above
service ! "woho"
Behaviors.same
}
Behaviors.receiveMessage {
case key.Listing(services) =>
services.foreach(_ ! "woho")
Behaviors.same
case key.Registered(service) => // ack on Register above
service ! "woho"
Behaviors.same
}

}
}

Expand Down
Expand Up @@ -8,7 +8,10 @@ import java.util.concurrent.atomic.AtomicInteger
import akka.actor.Dropped
import akka.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit
import akka.actor.testkit.typed.scaladsl.TestProbe
import akka.actor.typed.ActorRef
import akka.actor.typed.Behavior
import akka.actor.typed.internal.routing.GroupRouterImpl
import akka.actor.typed.internal.routing.RoutingLogics
import akka.actor.typed.receptionist.Receptionist
import akka.actor.typed.receptionist.ServiceKey
import akka.actor.typed.scaladsl.adapter._
Expand Down Expand Up @@ -212,6 +215,37 @@ class RoutersSpec extends ScalaTestWithActorTestKit("""

}

"not route to unreachable when there are reachable" in {
val serviceKey = ServiceKey[String]("group-routing-4")
val router = spawn(Behaviors.setup[String](context =>
new GroupRouterImpl(context, serviceKey, new RoutingLogics.RoundRobinLogic[String], true)))

val reachableProbe = createTestProbe[String]
val unreachableProbe = createTestProbe[String]
router
.unsafeUpcast[Any] ! Receptionist.Listing(serviceKey, Set(reachableProbe.ref), Set(unreachableProbe.ref), false)
router ! "one"
router ! "two"
reachableProbe.expectMessage("one")
reachableProbe.expectMessage("two")
}

"route to unreachable when there are no reachable" in {
val serviceKey = ServiceKey[String]("group-routing-4")
val router = spawn(Behaviors.setup[String](context =>
new GroupRouterImpl(context, serviceKey, new RoutingLogics.RoundRobinLogic[String], true)))

val unreachableProbe = createTestProbe[String]
router.unsafeUpcast[Any] ! Receptionist.Listing(
serviceKey,
Set.empty[ActorRef[String]],
Set(unreachableProbe.ref),
true)
router ! "one"
router ! "two"
unreachableProbe.expectMessage("one")
unreachableProbe.expectMessage("two")
}
}

}
Expand Up @@ -80,15 +80,21 @@ private[akka] object LocalReceptionist extends ReceptionistBehaviorProvider {

def notifySubscribersFor[T](key: AbstractServiceKey): Unit = {
val newListing = newRegistry.get(key)
subscriptions.get(key).foreach(_ ! ReceptionistMessages.Listing(key.asServiceKey, newListing))
subscriptions
.get(key)
.foreach(
_ ! ReceptionistMessages
.Listing(key.asServiceKey, newListing, newListing, servicesWereAddedOrRemoved = true))
}

changedKeysHint.foreach(notifySubscribersFor)
next(newRegistry = newRegistry)
}

def replyWithListing[T](key: ServiceKey[T], replyTo: ActorRef[Listing]): Unit =
replyTo ! ReceptionistMessages.Listing(key, serviceRegistry.get(key))
def replyWithListing[T](key: ServiceKey[T], replyTo: ActorRef[Listing]): Unit = {
val listing = serviceRegistry.get(key)
replyTo ! ReceptionistMessages.Listing(key, listing, listing, servicesWereAddedOrRemoved = true)
}

def onCommand(ctx: ActorContext[Any], cmd: Command): Behavior[Any] = cmd match {
case ReceptionistMessages.Register(key, serviceInstance, maybeReplyTo) =>
Expand Down
Expand Up @@ -8,7 +8,6 @@ import akka.actor.typed.ActorRef
import akka.actor.typed.receptionist.Receptionist.Command
import akka.actor.typed.receptionist.{ Receptionist, ServiceKey }
import akka.annotation.InternalApi

import akka.util.ccompat.JavaConverters._

/**
Expand Down Expand Up @@ -43,7 +42,11 @@ private[akka] object ReceptionistMessages {

final case class Find[T] private[akka] (key: ServiceKey[T], replyTo: ActorRef[Receptionist.Listing]) extends Command

final case class Listing[T] private[akka] (key: ServiceKey[T], _serviceInstances: Set[ActorRef[T]])
final case class Listing[T] private[akka] (
key: ServiceKey[T],
_serviceInstances: Set[ActorRef[T]],
_allServiceInstances: Set[ActorRef[T]],
servicesWereAddedOrRemoved: Boolean)
extends Receptionist.Listing {

def isForKey(key: ServiceKey[_]): Boolean = key == this.key
Expand All @@ -56,6 +59,15 @@ private[akka] object ReceptionistMessages {

def getServiceInstances[M](key: ServiceKey[M]): java.util.Set[ActorRef[M]] =
serviceInstances(key).asJava

override def allServiceInstances[M](key: ServiceKey[M]): Set[ActorRef[M]] = {
if (key != this.key)
throw new IllegalArgumentException(s"Wrong key [$key] used, must use listing key [${this.key}]")
_allServiceInstances.asInstanceOf[Set[ActorRef[M]]]
}

override def getAllServiceInstances[M](key: ServiceKey[M]): java.util.Set[ActorRef[M]] =
allServiceInstances(key).asJava
}

final case class Subscribe[T] private[akka] (key: ServiceKey[T], subscriber: ActorRef[Receptionist.Listing])
Expand Down
Expand Up @@ -73,23 +73,26 @@ private final class InitialGroupRouterImpl[T](
* INTERNAL API
*/
@InternalApi
private final class GroupRouterImpl[T](
private[akka] final class GroupRouterImpl[T](
ctx: ActorContext[T],
serviceKey: ServiceKey[T],
routingLogic: RoutingLogic[T],
routeesInitiallyEmpty: Boolean)
extends AbstractBehavior[T] {

// casting trix to avoid having to wrap incoming messages - note that this will cause problems if intercepting
// messages to a router
ctx.system.receptionist ! Receptionist.Subscribe(serviceKey, ctx.self.unsafeUpcast[Any].narrow[Receptionist.Listing])
private var routeesEmpty = routeesInitiallyEmpty

def onMessage(msg: T): Behavior[T] = msg match {
case serviceKey.Listing(update) =>
// we don't need to watch, because receptionist already does that
routingLogic.routeesUpdated(update)
routeesEmpty = update.isEmpty
case l @ serviceKey.Listing(update) =>
ctx.log.debug("Update from receptionist: [{}]", l)
val routees =
if (update.nonEmpty) update
else
// empty listing in a cluster context can mean all nodes with registered services
// are unreachable, in that case trying the unreachable ones is better than dropping messages
l.allServiceInstances(serviceKey)
routeesEmpty = routees.isEmpty
routingLogic.routeesUpdated(routees)
this
case msg: T @unchecked =>
import akka.actor.typed.scaladsl.adapter._
Expand Down
Expand Up @@ -54,7 +54,19 @@ abstract class ServiceKey[T] extends AbstractServiceKey { key =>
def asServiceKey: ServiceKey[T] = this

/**
* Scala API: Provides a type safe pattern match for listings
* Scala API: Provides a type safe pattern match for listings.
*
* Using it for pattern match like this will return the reachable service instances:
*
* ```
* case MyServiceKey.Listing(reachable) =>
* ```
*
* In a non-clustered `ActorSystem` this will always be all registered instances
* for a service key. For a clustered environment services on nodes that have
* been observed unreachable are not among these (note that they could have
* become unreachable between this message being sent and the receiving actor
* processing it).
*/
object Listing {
def unapply(l: Receptionist.Listing): Option[Set[ActorRef[T]]] =
Expand Down Expand Up @@ -258,15 +270,68 @@ object Receptionist extends ExtensionId[Receptionist] {
def isForKey(key: ServiceKey[_]): Boolean

/**
* Scala API
* Scala API: Return the reachable service instances.
*
* In a non-clustered `ActorSystem` this will always be all registered instances
* for a service key.
*
* For a clustered `ActorSystem` it only contain services on nodes that
* are not seen as unreachable (note that they could have still have become
* unreachable between this message being sent and the receiving actor processing it).
*
* For a list including both reachable and unreachable instances see [[#allServiceInstances]]
*
* Also, see [[ServiceKey.Listing]] for more convenient pattern matching
*/
def serviceInstances[T](key: ServiceKey[T]): Set[ActorRef[T]]

/** Java API */
/**
* Java API: Return the reachable service instances.
*
* In a non-clustered `ActorSystem` this will always be all registered instances
* for a service key.
*
* For a clustered `ActorSystem` it only contain services on nodes that has
* are not seen as unreachable (note that they could have still have become
* unreachable between this message being sent and the receiving actor processing it).
*
* For a list including both reachable and unreachable instances see [[#getAllServiceInstances]]
*/
def getServiceInstances[T](key: ServiceKey[T]): java.util.Set[ActorRef[T]]

/**
* Scala API: Return both the reachable and the unreachable service instances.
*
* In a non-clustered `ActorSystem` this will always be the same as [[#serviceInstances]].
*
* For a clustered `ActorSystem` this include both services on nodes that are reachable
* and nodes that are unreachable.
*/
def allServiceInstances[T](key: ServiceKey[T]): Set[ActorRef[T]]

/**
* Java API: Return both the reachable and the unreachable service instances.
*
* In a non-clustered `ActorSystem` this will always be the same as [[#getServiceInstances]].
*
* For a clustered `ActorSystem` this include both services on nodes that are reachable
* and nodes that are unreachable.
*/
def getAllServiceInstances[T](key: ServiceKey[T]): java.util.Set[ActorRef[T]]

/**
* Returns `true` only if this `Listing` was sent triggered by new actors added or removed to the receptionist.
* When `false` the event is only about reachability changes - meaning that the full set of actors
* ([[#allServiceInstances]] or [[#getAllServiceInstances]]) is the same as the previous `Listing`.
*
* knowing this is useful for subscribers only concerned with [[#allServiceInstances]] or [[#getAllServiceInstances]]
* that can then ignore `Listing`s related to reachability.
*
* In a non-clustered `ActorSystem` this will be `true` for all listings.
* For `Find` queries and the initial listing for a `Subscribe` this will always be `true`.
*/
def servicesWereAddedOrRemoved: Boolean

}

/**
Expand All @@ -276,15 +341,32 @@ object Receptionist extends ExtensionId[Receptionist] {

/** Scala API: */
def apply[T](key: ServiceKey[T], serviceInstances: Set[ActorRef[T]]): Listing =
new ReceptionistMessages.Listing[T](key, serviceInstances)
apply(key, serviceInstances, serviceInstances, servicesWereAddedOrRemoved = true)

/** Scala API: */
def apply[T](
key: ServiceKey[T],
serviceInstances: Set[ActorRef[T]],
allServiceInstances: Set[ActorRef[T]],
servicesWereAddedOrRemoved: Boolean): Listing =
new ReceptionistMessages.Listing[T](key, serviceInstances, allServiceInstances, servicesWereAddedOrRemoved)
}

/**
* Java API: Sent by the receptionist, available here for easier testing
*/
def listing[T](key: ServiceKey[T], serviceInstances: java.util.Set[ActorRef[T]]): Listing =
Listing(key, Set[ActorRef[T]](serviceInstances.asScala.toSeq: _*))
Listing(key, serviceInstances.asScala.toSet)

/**
* Java API: Sent by the receptionist, available here for easier testing
*/
def listing[T](
key: ServiceKey[T],
serviceInstances: java.util.Set[ActorRef[T]],
allServiceInstances: java.util.Set[ActorRef[T]],
servicesWereAddedOrRemoved: Boolean): Listing =
Listing(key, serviceInstances.asScala.toSet, allServiceInstances.asScala.toSet, servicesWereAddedOrRemoved)

}

Expand Down

0 comments on commit f0e42d2

Please sign in to comment.