From f0e42d2b9ce78bb71e1bff9c69b2a656eaa437f0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Johan=20Andr=C3=A9n?= Date: Mon, 15 Jul 2019 15:25:00 +0200 Subject: [PATCH] Cluster aware routers for typed #26355 --- .../receptionist/ReceptionistApiSpec.scala | 17 +- .../actor/typed/scaladsl/RoutersSpec.scala | 34 ++++ .../receptionist/LocalReceptionist.scala | 12 +- .../receptionist/ReceptionistMessages.scala | 16 +- .../internal/routing/GroupRouterImpl.scala | 19 ++- .../typed/receptionist/Receptionist.scala | 92 ++++++++++- .../receptionist/ClusterReceptionist.scala | 71 ++++++-- .../internal/receptionist/Registry.scala | 47 +++++- ...lusterReceptionistUnreachabilitySpec.scala | 152 ++++++++++++++++++ .../ClusterReceptionistSpec.scala | 15 +- .../src/main/paradox/typed/actor-discovery.md | 13 +- akka-docs/src/main/paradox/typed/routers.md | 16 +- 12 files changed, 447 insertions(+), 57 deletions(-) create mode 100644 akka-cluster-typed/src/multi-jvm/scala/akka/cluster/typed/internal/ClusterReceptionistUnreachabilitySpec.scala diff --git a/akka-actor-typed-tests/src/test/scala/akka/actor/typed/receptionist/ReceptionistApiSpec.scala b/akka-actor-typed-tests/src/test/scala/akka/actor/typed/receptionist/ReceptionistApiSpec.scala index fdcc2988f0d..e7b3d7e9899 100644 --- a/akka-actor-typed-tests/src/test/scala/akka/actor/typed/receptionist/ReceptionistApiSpec.scala +++ b/akka-actor-typed-tests/src/test/scala/akka/actor/typed/receptionist/ReceptionistApiSpec.scala @@ -69,16 +69,15 @@ object ReceptionistApiSpec { // to cover as much of the API as possible context.system.receptionist ! Receptionist.Register(key, context.self.narrow, context.self.narrow) - Behaviors.receive { (context, message) => - message match { - case key.Listing(services) => - services.foreach(_ ! "woho") - Behaviors.same - case key.Registered(service) => // ack on Register above - service ! "woho" - Behaviors.same - } + Behaviors.receiveMessage { + case key.Listing(services) => + services.foreach(_ ! "woho") + Behaviors.same + case key.Registered(service) => // ack on Register above + service ! "woho" + Behaviors.same } + } } diff --git a/akka-actor-typed-tests/src/test/scala/akka/actor/typed/scaladsl/RoutersSpec.scala b/akka-actor-typed-tests/src/test/scala/akka/actor/typed/scaladsl/RoutersSpec.scala index d406e673726..9d2111c567a 100644 --- a/akka-actor-typed-tests/src/test/scala/akka/actor/typed/scaladsl/RoutersSpec.scala +++ b/akka-actor-typed-tests/src/test/scala/akka/actor/typed/scaladsl/RoutersSpec.scala @@ -8,7 +8,10 @@ import java.util.concurrent.atomic.AtomicInteger import akka.actor.Dropped import akka.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit import akka.actor.testkit.typed.scaladsl.TestProbe +import akka.actor.typed.ActorRef import akka.actor.typed.Behavior +import akka.actor.typed.internal.routing.GroupRouterImpl +import akka.actor.typed.internal.routing.RoutingLogics import akka.actor.typed.receptionist.Receptionist import akka.actor.typed.receptionist.ServiceKey import akka.actor.typed.scaladsl.adapter._ @@ -212,6 +215,37 @@ class RoutersSpec extends ScalaTestWithActorTestKit(""" } + "not route to unreachable when there are reachable" in { + val serviceKey = ServiceKey[String]("group-routing-4") + val router = spawn(Behaviors.setup[String](context => + new GroupRouterImpl(context, serviceKey, new RoutingLogics.RoundRobinLogic[String], true))) + + val reachableProbe = createTestProbe[String] + val unreachableProbe = createTestProbe[String] + router + .unsafeUpcast[Any] ! Receptionist.Listing(serviceKey, Set(reachableProbe.ref), Set(unreachableProbe.ref), false) + router ! "one" + router ! "two" + reachableProbe.expectMessage("one") + reachableProbe.expectMessage("two") + } + + "route to unreachable when there are no reachable" in { + val serviceKey = ServiceKey[String]("group-routing-4") + val router = spawn(Behaviors.setup[String](context => + new GroupRouterImpl(context, serviceKey, new RoutingLogics.RoundRobinLogic[String], true))) + + val unreachableProbe = createTestProbe[String] + router.unsafeUpcast[Any] ! Receptionist.Listing( + serviceKey, + Set.empty[ActorRef[String]], + Set(unreachableProbe.ref), + true) + router ! "one" + router ! "two" + unreachableProbe.expectMessage("one") + unreachableProbe.expectMessage("two") + } } } diff --git a/akka-actor-typed/src/main/scala/akka/actor/typed/internal/receptionist/LocalReceptionist.scala b/akka-actor-typed/src/main/scala/akka/actor/typed/internal/receptionist/LocalReceptionist.scala index f865306b2f6..cd98cf90853 100644 --- a/akka-actor-typed/src/main/scala/akka/actor/typed/internal/receptionist/LocalReceptionist.scala +++ b/akka-actor-typed/src/main/scala/akka/actor/typed/internal/receptionist/LocalReceptionist.scala @@ -80,15 +80,21 @@ private[akka] object LocalReceptionist extends ReceptionistBehaviorProvider { def notifySubscribersFor[T](key: AbstractServiceKey): Unit = { val newListing = newRegistry.get(key) - subscriptions.get(key).foreach(_ ! ReceptionistMessages.Listing(key.asServiceKey, newListing)) + subscriptions + .get(key) + .foreach( + _ ! ReceptionistMessages + .Listing(key.asServiceKey, newListing, newListing, servicesWereAddedOrRemoved = true)) } changedKeysHint.foreach(notifySubscribersFor) next(newRegistry = newRegistry) } - def replyWithListing[T](key: ServiceKey[T], replyTo: ActorRef[Listing]): Unit = - replyTo ! ReceptionistMessages.Listing(key, serviceRegistry.get(key)) + def replyWithListing[T](key: ServiceKey[T], replyTo: ActorRef[Listing]): Unit = { + val listing = serviceRegistry.get(key) + replyTo ! ReceptionistMessages.Listing(key, listing, listing, servicesWereAddedOrRemoved = true) + } def onCommand(ctx: ActorContext[Any], cmd: Command): Behavior[Any] = cmd match { case ReceptionistMessages.Register(key, serviceInstance, maybeReplyTo) => diff --git a/akka-actor-typed/src/main/scala/akka/actor/typed/internal/receptionist/ReceptionistMessages.scala b/akka-actor-typed/src/main/scala/akka/actor/typed/internal/receptionist/ReceptionistMessages.scala index 9870889057b..608fe8b773b 100644 --- a/akka-actor-typed/src/main/scala/akka/actor/typed/internal/receptionist/ReceptionistMessages.scala +++ b/akka-actor-typed/src/main/scala/akka/actor/typed/internal/receptionist/ReceptionistMessages.scala @@ -8,7 +8,6 @@ import akka.actor.typed.ActorRef import akka.actor.typed.receptionist.Receptionist.Command import akka.actor.typed.receptionist.{ Receptionist, ServiceKey } import akka.annotation.InternalApi - import akka.util.ccompat.JavaConverters._ /** @@ -43,7 +42,11 @@ private[akka] object ReceptionistMessages { final case class Find[T] private[akka] (key: ServiceKey[T], replyTo: ActorRef[Receptionist.Listing]) extends Command - final case class Listing[T] private[akka] (key: ServiceKey[T], _serviceInstances: Set[ActorRef[T]]) + final case class Listing[T] private[akka] ( + key: ServiceKey[T], + _serviceInstances: Set[ActorRef[T]], + _allServiceInstances: Set[ActorRef[T]], + servicesWereAddedOrRemoved: Boolean) extends Receptionist.Listing { def isForKey(key: ServiceKey[_]): Boolean = key == this.key @@ -56,6 +59,15 @@ private[akka] object ReceptionistMessages { def getServiceInstances[M](key: ServiceKey[M]): java.util.Set[ActorRef[M]] = serviceInstances(key).asJava + + override def allServiceInstances[M](key: ServiceKey[M]): Set[ActorRef[M]] = { + if (key != this.key) + throw new IllegalArgumentException(s"Wrong key [$key] used, must use listing key [${this.key}]") + _allServiceInstances.asInstanceOf[Set[ActorRef[M]]] + } + + override def getAllServiceInstances[M](key: ServiceKey[M]): java.util.Set[ActorRef[M]] = + allServiceInstances(key).asJava } final case class Subscribe[T] private[akka] (key: ServiceKey[T], subscriber: ActorRef[Receptionist.Listing]) diff --git a/akka-actor-typed/src/main/scala/akka/actor/typed/internal/routing/GroupRouterImpl.scala b/akka-actor-typed/src/main/scala/akka/actor/typed/internal/routing/GroupRouterImpl.scala index 655a31f9e2f..459a54070ec 100644 --- a/akka-actor-typed/src/main/scala/akka/actor/typed/internal/routing/GroupRouterImpl.scala +++ b/akka-actor-typed/src/main/scala/akka/actor/typed/internal/routing/GroupRouterImpl.scala @@ -73,23 +73,26 @@ private final class InitialGroupRouterImpl[T]( * INTERNAL API */ @InternalApi -private final class GroupRouterImpl[T]( +private[akka] final class GroupRouterImpl[T]( ctx: ActorContext[T], serviceKey: ServiceKey[T], routingLogic: RoutingLogic[T], routeesInitiallyEmpty: Boolean) extends AbstractBehavior[T] { - // casting trix to avoid having to wrap incoming messages - note that this will cause problems if intercepting - // messages to a router - ctx.system.receptionist ! Receptionist.Subscribe(serviceKey, ctx.self.unsafeUpcast[Any].narrow[Receptionist.Listing]) private var routeesEmpty = routeesInitiallyEmpty def onMessage(msg: T): Behavior[T] = msg match { - case serviceKey.Listing(update) => - // we don't need to watch, because receptionist already does that - routingLogic.routeesUpdated(update) - routeesEmpty = update.isEmpty + case l @ serviceKey.Listing(update) => + ctx.log.debug("Update from receptionist: [{}]", l) + val routees = + if (update.nonEmpty) update + else + // empty listing in a cluster context can mean all nodes with registered services + // are unreachable, in that case trying the unreachable ones is better than dropping messages + l.allServiceInstances(serviceKey) + routeesEmpty = routees.isEmpty + routingLogic.routeesUpdated(routees) this case msg: T @unchecked => import akka.actor.typed.scaladsl.adapter._ diff --git a/akka-actor-typed/src/main/scala/akka/actor/typed/receptionist/Receptionist.scala b/akka-actor-typed/src/main/scala/akka/actor/typed/receptionist/Receptionist.scala index 8fda2308c7c..c8bbb65cd37 100644 --- a/akka-actor-typed/src/main/scala/akka/actor/typed/receptionist/Receptionist.scala +++ b/akka-actor-typed/src/main/scala/akka/actor/typed/receptionist/Receptionist.scala @@ -54,7 +54,19 @@ abstract class ServiceKey[T] extends AbstractServiceKey { key => def asServiceKey: ServiceKey[T] = this /** - * Scala API: Provides a type safe pattern match for listings + * Scala API: Provides a type safe pattern match for listings. + * + * Using it for pattern match like this will return the reachable service instances: + * + * ``` + * case MyServiceKey.Listing(reachable) => + * ``` + * + * In a non-clustered `ActorSystem` this will always be all registered instances + * for a service key. For a clustered environment services on nodes that have + * been observed unreachable are not among these (note that they could have + * become unreachable between this message being sent and the receiving actor + * processing it). */ object Listing { def unapply(l: Receptionist.Listing): Option[Set[ActorRef[T]]] = @@ -258,15 +270,68 @@ object Receptionist extends ExtensionId[Receptionist] { def isForKey(key: ServiceKey[_]): Boolean /** - * Scala API + * Scala API: Return the reachable service instances. + * + * In a non-clustered `ActorSystem` this will always be all registered instances + * for a service key. + * + * For a clustered `ActorSystem` it only contain services on nodes that + * are not seen as unreachable (note that they could have still have become + * unreachable between this message being sent and the receiving actor processing it). + * + * For a list including both reachable and unreachable instances see [[#allServiceInstances]] * * Also, see [[ServiceKey.Listing]] for more convenient pattern matching */ def serviceInstances[T](key: ServiceKey[T]): Set[ActorRef[T]] - /** Java API */ + /** + * Java API: Return the reachable service instances. + * + * In a non-clustered `ActorSystem` this will always be all registered instances + * for a service key. + * + * For a clustered `ActorSystem` it only contain services on nodes that has + * are not seen as unreachable (note that they could have still have become + * unreachable between this message being sent and the receiving actor processing it). + * + * For a list including both reachable and unreachable instances see [[#getAllServiceInstances]] + */ def getServiceInstances[T](key: ServiceKey[T]): java.util.Set[ActorRef[T]] + /** + * Scala API: Return both the reachable and the unreachable service instances. + * + * In a non-clustered `ActorSystem` this will always be the same as [[#serviceInstances]]. + * + * For a clustered `ActorSystem` this include both services on nodes that are reachable + * and nodes that are unreachable. + */ + def allServiceInstances[T](key: ServiceKey[T]): Set[ActorRef[T]] + + /** + * Java API: Return both the reachable and the unreachable service instances. + * + * In a non-clustered `ActorSystem` this will always be the same as [[#getServiceInstances]]. + * + * For a clustered `ActorSystem` this include both services on nodes that are reachable + * and nodes that are unreachable. + */ + def getAllServiceInstances[T](key: ServiceKey[T]): java.util.Set[ActorRef[T]] + + /** + * Returns `true` only if this `Listing` was sent triggered by new actors added or removed to the receptionist. + * When `false` the event is only about reachability changes - meaning that the full set of actors + * ([[#allServiceInstances]] or [[#getAllServiceInstances]]) is the same as the previous `Listing`. + * + * knowing this is useful for subscribers only concerned with [[#allServiceInstances]] or [[#getAllServiceInstances]] + * that can then ignore `Listing`s related to reachability. + * + * In a non-clustered `ActorSystem` this will be `true` for all listings. + * For `Find` queries and the initial listing for a `Subscribe` this will always be `true`. + */ + def servicesWereAddedOrRemoved: Boolean + } /** @@ -276,15 +341,32 @@ object Receptionist extends ExtensionId[Receptionist] { /** Scala API: */ def apply[T](key: ServiceKey[T], serviceInstances: Set[ActorRef[T]]): Listing = - new ReceptionistMessages.Listing[T](key, serviceInstances) + apply(key, serviceInstances, serviceInstances, servicesWereAddedOrRemoved = true) + /** Scala API: */ + def apply[T]( + key: ServiceKey[T], + serviceInstances: Set[ActorRef[T]], + allServiceInstances: Set[ActorRef[T]], + servicesWereAddedOrRemoved: Boolean): Listing = + new ReceptionistMessages.Listing[T](key, serviceInstances, allServiceInstances, servicesWereAddedOrRemoved) } /** * Java API: Sent by the receptionist, available here for easier testing */ def listing[T](key: ServiceKey[T], serviceInstances: java.util.Set[ActorRef[T]]): Listing = - Listing(key, Set[ActorRef[T]](serviceInstances.asScala.toSeq: _*)) + Listing(key, serviceInstances.asScala.toSet) + + /** + * Java API: Sent by the receptionist, available here for easier testing + */ + def listing[T]( + key: ServiceKey[T], + serviceInstances: java.util.Set[ActorRef[T]], + allServiceInstances: java.util.Set[ActorRef[T]], + servicesWereAddedOrRemoved: Boolean): Listing = + Listing(key, serviceInstances.asScala.toSet, allServiceInstances.asScala.toSet, servicesWereAddedOrRemoved) } diff --git a/akka-cluster-typed/src/main/scala/akka/cluster/typed/internal/receptionist/ClusterReceptionist.scala b/akka-cluster-typed/src/main/scala/akka/cluster/typed/internal/receptionist/ClusterReceptionist.scala index f39c768205e..9f8e0a78081 100644 --- a/akka-cluster-typed/src/main/scala/akka/cluster/typed/internal/receptionist/ClusterReceptionist.scala +++ b/akka-cluster-typed/src/main/scala/akka/cluster/typed/internal/receptionist/ClusterReceptionist.scala @@ -16,14 +16,17 @@ import akka.cluster.ddata.{ ORMultiMap, ORMultiMapKey, Replicator } import akka.cluster.{ Cluster, ClusterEvent, UniqueAddress } import akka.remote.AddressUidExtension import akka.util.TypedMultiMap -import scala.concurrent.duration._ +import scala.concurrent.duration._ import akka.actor.Address import akka.cluster.ClusterEvent.ClusterDomainEvent import akka.cluster.ClusterEvent.ClusterShuttingDown import akka.cluster.ClusterEvent.MemberJoined import akka.cluster.ClusterEvent.MemberUp import akka.cluster.ClusterEvent.MemberWeaklyUp +import akka.cluster.ClusterEvent.ReachabilityEvent +import akka.cluster.ClusterEvent.ReachableMember +import akka.cluster.ClusterEvent.UnreachableMember import akka.cluster.ddata.SelfUniqueAddress // just to provide a log class @@ -60,6 +63,8 @@ private[typed] object ClusterReceptionist extends ReceptionistBehaviorProvider { extends InternalCommand private final case class NodeAdded(addresses: UniqueAddress) extends InternalCommand private final case class NodeRemoved(addresses: UniqueAddress) extends InternalCommand + private final case class NodeUnreachable(addresses: UniqueAddress) extends InternalCommand + private final case class NodeReachable(addresses: UniqueAddress) extends InternalCommand private final case class ChangeFromReplicator(key: DDataKey, value: ORMultiMap[ServiceKey[_], Entry]) extends InternalCommand private case object RemoveTick extends InternalCommand @@ -109,11 +114,13 @@ private[typed] object ClusterReceptionist extends ReceptionistBehaviorProvider { // remove entries when members are removed val clusterEventMessageAdapter: ActorRef[ClusterDomainEvent] = ctx.messageAdapter[ClusterDomainEvent] { - case MemberJoined(member) => NodeAdded(member.uniqueAddress) - case MemberWeaklyUp(member) => NodeAdded(member.uniqueAddress) - case MemberUp(member) => NodeAdded(member.uniqueAddress) - case MemberRemoved(member, _) => NodeRemoved(member.uniqueAddress) - case ClusterShuttingDown => NodeRemoved(setup.cluster.selfUniqueAddress) + case MemberJoined(member) => NodeAdded(member.uniqueAddress) + case MemberWeaklyUp(member) => NodeAdded(member.uniqueAddress) + case MemberUp(member) => NodeAdded(member.uniqueAddress) + case MemberRemoved(member, _) => NodeRemoved(member.uniqueAddress) + case UnreachableMember(member) => NodeUnreachable(member.uniqueAddress) + case ReachableMember(member) => NodeReachable(member.uniqueAddress) + case ClusterShuttingDown => NodeRemoved(setup.cluster.selfUniqueAddress) case other => throw new IllegalStateException(s"Unexpected ClusterDomainEvent $other. Please report bug.") } @@ -124,6 +131,7 @@ private[typed] object ClusterReceptionist extends ReceptionistBehaviorProvider { classOf[MemberWeaklyUp], classOf[MemberUp], classOf[MemberRemoved], + classOf[ReachabilityEvent], ClusterShuttingDown.getClass) // also periodic cleanup in case removal from ORMultiMap is skipped due to concurrent update, @@ -211,6 +219,20 @@ private[typed] object ClusterReceptionist extends ReceptionistBehaviorProvider { } } + def reachabilityChanged(keysForNode: Set[AbstractServiceKey], newRegistry: ShardedServiceRegistry): Unit = { + keysForNode.foreach { changedKey => + val serviceKey = changedKey.asServiceKey + + val subscribers = subscriptions.get(changedKey) + if (subscribers.nonEmpty) { + val (reachable, all) = newRegistry.activeActorRefsFor(serviceKey, selfUniqueAddress) + val listing = + ReceptionistMessages.Listing(serviceKey, reachable, all, servicesWereAddedOrRemoved = false) + subscribers.foreach(_ ! listing) + } + } + } + def onCommand(cmd: Command): Behavior[Command] = cmd match { case ReceptionistMessages.Register(key, serviceInstance, maybeReplyTo) => if (serviceInstance.path.address.hasLocalScope) { @@ -231,15 +253,18 @@ private[typed] object ClusterReceptionist extends ReceptionistBehaviorProvider { Behaviors.same case ReceptionistMessages.Find(key, replyTo) => - replyTo ! ReceptionistMessages.Listing(key.asServiceKey, registry.activeActorRefsFor(key, selfUniqueAddress)) + val (reachable, all) = registry.activeActorRefsFor(key, selfUniqueAddress) + replyTo ! ReceptionistMessages.Listing(key.asServiceKey, reachable, all, servicesWereAddedOrRemoved = true) Behaviors.same case ReceptionistMessages.Subscribe(key, subscriber) => watchWith(ctx, subscriber, SubscriberTerminated(key, subscriber)) // immediately reply with initial listings to the new subscriber - val listing = - ReceptionistMessages.Listing(key.asServiceKey, registry.activeActorRefsFor(key, selfUniqueAddress)) + val listing = { + val (reachable, all) = registry.activeActorRefsFor(key, selfUniqueAddress) + ReceptionistMessages.Listing(key.asServiceKey, reachable, all, servicesWereAddedOrRemoved = true) + } subscriber ! listing next(newSubscriptions = subscriptions.inserted(key)(subscriber)) @@ -287,9 +312,9 @@ private[typed] object ClusterReceptionist extends ReceptionistBehaviorProvider { val subscribers = subscriptions.get(changedKey) if (subscribers.nonEmpty) { + val (reachable, all) = newRegistry.activeActorRefsFor(serviceKey, selfUniqueAddress) val listing = - ReceptionistMessages - .Listing(serviceKey, newRegistry.activeActorRefsFor(serviceKey, selfUniqueAddress)) + ReceptionistMessages.Listing(serviceKey, reachable, all, servicesWereAddedOrRemoved = true) subscribers.foreach(_ ! listing) } @@ -340,6 +365,30 @@ private[typed] object ClusterReceptionist extends ReceptionistBehaviorProvider { next(registry.removeNode(uniqueAddress)) } + case NodeUnreachable(uniqueAddress) => + val keysForNode = registry.keysFor(uniqueAddress) + val newRegistry = registry.addUnreachable(uniqueAddress) + if (keysForNode.nonEmpty) { + ctx.log.debug( + "ClusterReceptionist [{}] - Node with registered services unreachable [{}]", + cluster.selfAddress, + uniqueAddress) + reachabilityChanged(keysForNode, newRegistry) + } + next(newRegistry) + + case NodeReachable(uniqueAddress) => + val keysForNode = registry.keysFor(uniqueAddress) + val newRegistry = registry.removeUnreachable(uniqueAddress) + if (keysForNode.nonEmpty) { + ctx.log.debug( + "ClusterReceptionist [{}] - Node with registered services reachable again [{}]", + cluster.selfAddress, + uniqueAddress) + reachabilityChanged(keysForNode, newRegistry) + } + next(newRegistry) + case RemoveTick => // ok to update from several nodes but more efficient to try to do it from one node if (isLeader) { diff --git a/akka-cluster-typed/src/main/scala/akka/cluster/typed/internal/receptionist/Registry.scala b/akka-cluster-typed/src/main/scala/akka/cluster/typed/internal/receptionist/Registry.scala index 497aef1df93..54065aabb26 100644 --- a/akka-cluster-typed/src/main/scala/akka/cluster/typed/internal/receptionist/Registry.scala +++ b/akka-cluster-typed/src/main/scala/akka/cluster/typed/internal/receptionist/Registry.scala @@ -23,7 +23,7 @@ import scala.concurrent.duration.Deadline val key = ORMultiMapKey[ServiceKey[_], Entry](s"ReceptionistKey_$n") key -> new ServiceRegistry(EmptyORMultiMap) }.toMap - new ShardedServiceRegistry(emptyRegistries, Map.empty, Set.empty) + new ShardedServiceRegistry(emptyRegistries, Map.empty, Set.empty, Set.empty) } } @@ -42,7 +42,8 @@ import scala.concurrent.duration.Deadline @InternalApi private[akka] final case class ShardedServiceRegistry( serviceRegistries: Map[DDataKey, ServiceRegistry], tombstones: Map[ActorRef[_], Deadline], - nodes: Set[UniqueAddress]) { + nodes: Set[UniqueAddress], + unreachable: Set[UniqueAddress]) { private val keys = serviceRegistries.keySet.toArray @@ -63,14 +64,34 @@ import scala.concurrent.duration.Deadline serviceRegistries(ddataKey).actorRefsFor(key) } - def activeActorRefsFor[T](key: ServiceKey[T], selfUniqueAddress: UniqueAddress): Set[ActorRef[T]] = { + /** + * @return keys that has a registered service instance on the given `address` + */ + def keysFor(address: UniqueAddress)(implicit node: SelfUniqueAddress): Set[AbstractServiceKey] = + serviceRegistries.valuesIterator.flatMap(_.keysFor(address)).toSet + + /** + * @return (reachable-nodes, all) + */ + def activeActorRefsFor[T]( + key: ServiceKey[T], + selfUniqueAddress: UniqueAddress): (Set[ActorRef[T]], Set[ActorRef[T]]) = { val ddataKey = ddataKeyFor(key) val entries = serviceRegistries(ddataKey).entriesFor(key) val selfAddress = selfUniqueAddress.address - entries.collect { - case entry if nodes.contains(entry.uniqueAddress(selfAddress)) && !hasTombstone(entry.ref) => - entry.ref.asInstanceOf[ActorRef[key.Protocol]] + val reachable = Set.newBuilder[ActorRef[T]] + val all = Set.newBuilder[ActorRef[T]] + entries.foreach { entry => + val entryAddress = entry.uniqueAddress(selfAddress) + if (nodes.contains(entryAddress) && !hasTombstone(entry.ref)) { + val ref = entry.ref.asInstanceOf[ActorRef[key.Protocol]] + all += ref + if (!unreachable.contains(entryAddress)) { + reachable += ref + } + } } + (reachable.result(), all.result()) } def withServiceRegistry(ddataKey: DDataKey, registry: ServiceRegistry): ShardedServiceRegistry = @@ -113,7 +134,13 @@ import scala.concurrent.duration.Deadline copy(nodes = nodes + node) def removeNode(node: UniqueAddress): ShardedServiceRegistry = - copy(nodes = nodes - node) + copy(nodes = nodes - node, unreachable = unreachable - node) + + def addUnreachable(uniqueAddress: UniqueAddress): ShardedServiceRegistry = + copy(unreachable = unreachable + uniqueAddress) + + def removeUnreachable(uniqueAddress: UniqueAddress): ShardedServiceRegistry = + copy(unreachable = unreachable - uniqueAddress) } @@ -129,6 +156,12 @@ import scala.concurrent.duration.Deadline def entriesFor(key: AbstractServiceKey): Set[Entry] = entries.getOrElse(key.asServiceKey, Set.empty[Entry]) + def keysFor(address: UniqueAddress)(implicit node: SelfUniqueAddress): Set[ServiceKey[_]] = + entries.entries.collect { + case (key, entries) if entries.exists(_.uniqueAddress(node.uniqueAddress.address) == address) => + key + }.toSet + def addBinding[T](key: ServiceKey[T], value: Entry)(implicit node: SelfUniqueAddress): ServiceRegistry = copy(entries = entries.addBinding(node, key, value)) diff --git a/akka-cluster-typed/src/multi-jvm/scala/akka/cluster/typed/internal/ClusterReceptionistUnreachabilitySpec.scala b/akka-cluster-typed/src/multi-jvm/scala/akka/cluster/typed/internal/ClusterReceptionistUnreachabilitySpec.scala new file mode 100644 index 00000000000..5da13395d5e --- /dev/null +++ b/akka-cluster-typed/src/multi-jvm/scala/akka/cluster/typed/internal/ClusterReceptionistUnreachabilitySpec.scala @@ -0,0 +1,152 @@ +/* + * Copyright (C) 2009-2019 Lightbend Inc. + */ + +package akka.cluster.typed.internal + +import akka.actor.testkit.typed.scaladsl.TestProbe +import akka.actor.typed.ActorRef +import akka.actor.typed.Behavior +import akka.actor.typed.SpawnProtocol +import akka.actor.typed.receptionist.Receptionist +import akka.actor.typed.receptionist.ServiceKey +import akka.actor.typed.scaladsl.AskPattern._ +import akka.actor.typed.scaladsl.Behaviors +import akka.actor.typed.scaladsl.adapter._ +import akka.cluster.MultiNodeClusterSpec +import akka.cluster.typed.MultiDcClusterSingletonSpecConfig.first +import akka.cluster.typed.MultiDcClusterSingletonSpecConfig.second +import akka.cluster.typed.MultiDcClusterSingletonSpecConfig.third +import akka.cluster.typed.MultiNodeTypedClusterSpec +import akka.remote.testkit.MultiNodeConfig +import akka.remote.testkit.MultiNodeSpec +import akka.remote.transport.ThrottlerTransportAdapter.Direction +import akka.util.Timeout +import com.typesafe.config.ConfigFactory + +import scala.concurrent.Await +import scala.concurrent.Future +import scala.concurrent.duration._ + +object ClusterReceptionistUnreachabilitySpecConfig extends MultiNodeConfig { + val first = role("first") + val second = role("second") + val third = role("third") + + commonConfig(ConfigFactory.parseString(""" + akka.loglevel = INFO + """).withFallback(MultiNodeClusterSpec.clusterConfig)) + + testTransport(on = true) +} + +object ClusterReceptionistUnreachabilitySpec { + val MyServiceKey = ServiceKey[String]("my-service") +} + +class ClusterReceptionistUnreachabilityMultiJvmNode1 extends ClusterReceptionistUnreachabilitySpec +class ClusterReceptionistUnreachabilityMultiJvmNode2 extends ClusterReceptionistUnreachabilitySpec +class ClusterReceptionistUnreachabilityMultiJvmNode3 extends ClusterReceptionistUnreachabilitySpec + +abstract class ClusterReceptionistUnreachabilitySpec + extends MultiNodeSpec(ClusterReceptionistUnreachabilitySpecConfig) + with MultiNodeTypedClusterSpec { + + import ClusterReceptionistUnreachabilitySpec._ + + val spawnActor = system.actorOf(PropsAdapter(SpawnProtocol.behavior)).toTyped[SpawnProtocol] + def spawn[T](behavior: Behavior[T], name: String): ActorRef[T] = { + implicit val timeout: Timeout = 3.seconds + implicit val scheduler = typedSystem.scheduler + val f: Future[ActorRef[T]] = spawnActor.ask(ref => SpawnProtocol.Spawn(behavior, name)(ref)) + + Await.result(f, 3.seconds) + } + + val probe = TestProbe[AnyRef]() + val receptionistProbe = TestProbe[AnyRef]() + + "The clustered receptionist" must { + + "subscribe to the receptionist" in { + typedSystem.receptionist ! Receptionist.Subscribe(MyServiceKey, receptionistProbe.ref) + val listing = receptionistProbe.expectMessageType[Receptionist.Listing] + listing.serviceInstances(MyServiceKey) should ===(Set.empty) + listing.allServiceInstances(MyServiceKey) should ===(Set.empty) + listing.servicesWereAddedOrRemoved should ===(true) + enterBarrier("all subscribed") + } + + "form a cluster" in { + formCluster(first, second, third) + enterBarrier("cluster started") + } + + "register a service" in { + val localServiceRef = spawn(Behaviors.receiveMessage[String] { + case msg => + probe.ref ! msg + Behaviors.same + }, "my-service") + typedSystem.receptionist ! Receptionist.Register(MyServiceKey, localServiceRef) + enterBarrier("all registered") + } + + "see registered services" in { + awaitAssert({ + val listing = receptionistProbe.expectMessageType[Receptionist.Listing] + listing.serviceInstances(MyServiceKey) should have size (3) + listing.allServiceInstances(MyServiceKey) should have size (3) + listing.servicesWereAddedOrRemoved should ===(true) + }, 20.seconds) + + enterBarrier("all seen registered") + } + + "remove unreachable from listing" in { + // make second unreachable + runOn(first) { + testConductor.blackhole(first, second, Direction.Both).await + testConductor.blackhole(third, second, Direction.Both).await + } + + runOn(first, third) { + // assert service on 2 is not in listing but in all and flag is false + awaitAssert({ + val listing = receptionistProbe.expectMessageType[Receptionist.Listing] + listing.serviceInstances(MyServiceKey) should have size (2) + listing.allServiceInstances(MyServiceKey) should have size (3) + listing.servicesWereAddedOrRemoved should ===(false) + }, 20.seconds) + } + runOn(second) { + // assert service on 1 and 3 is not in listing but in all and flag is false + awaitAssert({ + val listing = receptionistProbe.expectMessageType[Receptionist.Listing] + listing.serviceInstances(MyServiceKey) should have size (1) + listing.allServiceInstances(MyServiceKey) should have size (3) + listing.servicesWereAddedOrRemoved should ===(false) + }, 20.seconds) + } + enterBarrier("all seen unreachable") + } + + "add again-reachable to list again" in { + // make second unreachable + runOn(first) { + testConductor.passThrough(first, second, Direction.Both).await + testConductor.passThrough(third, second, Direction.Both).await + } + + awaitAssert({ + val listing = receptionistProbe.expectMessageType[Receptionist.Listing] + listing.serviceInstances(MyServiceKey) should have size (3) + listing.allServiceInstances(MyServiceKey) should have size (3) + listing.servicesWereAddedOrRemoved should ===(false) + }) + enterBarrier("all seen reachable-again") + } + + } + +} diff --git a/akka-cluster-typed/src/test/scala/akka/cluster/typed/internal/receptionist/ClusterReceptionistSpec.scala b/akka-cluster-typed/src/test/scala/akka/cluster/typed/internal/receptionist/ClusterReceptionistSpec.scala index bb7c685e72d..88baffeb176 100644 --- a/akka-cluster-typed/src/test/scala/akka/cluster/typed/internal/receptionist/ClusterReceptionistSpec.scala +++ b/akka-cluster-typed/src/test/scala/akka/cluster/typed/internal/receptionist/ClusterReceptionistSpec.scala @@ -191,7 +191,10 @@ class ClusterReceptionistSpec extends WordSpec with Matchers { clusterNode1.manager ! Leave(clusterNode2.selfMember.address) } - regProbe1.expectMessage(10.seconds, Listing(PingKey, Set(service1))) + regProbe1.awaitAssert({ + // we will also potentially get an update that the service was unreachable before the expected one + regProbe1.expectMessage(10.seconds, Listing(PingKey, Set(service1))) + }, 10.seconds) // register another after removal val service1b = testKit1.spawn(pingPongBehavior) @@ -243,7 +246,10 @@ class ClusterReceptionistSpec extends WordSpec with Matchers { clusterNode2.manager ! Down(clusterNode1.selfMember.address) // service1 removed - regProbe2.expectMessage(10.seconds, Listing(PingKey, Set(service2))) + regProbe2.awaitAssert({ + // we will also potentially get an update that the service was unreachable before the expected one + regProbe2.expectMessage(10.seconds, Listing(PingKey, Set(service2))) + }, 10.seconds) } finally { testKit1.shutdownTestKit() testKit2.shutdownTestKit() @@ -298,8 +304,11 @@ class ClusterReceptionistSpec extends WordSpec with Matchers { system2.terminate() Await.ready(system2.whenTerminated, 10.seconds) clusterNode1.manager ! Down(clusterNode2.selfMember.address) + regProbe1.awaitAssert({ - regProbe1.expectMessage(10.seconds, Listing(PingKey, Set.empty[ActorRef[PingProtocol]])) + // we will also potentially get an update that the service was unreachable before the expected one + regProbe1.expectMessage(10.seconds, Listing(PingKey, Set.empty[ActorRef[PingProtocol]])) + }, 10.seconds) } finally { testKit1.shutdownTestKit() testKit2.shutdownTestKit() diff --git a/akka-docs/src/main/paradox/typed/actor-discovery.md b/akka-docs/src/main/paradox/typed/actor-discovery.md index a70bebb44ff..58ccb5c3954 100644 --- a/akka-docs/src/main/paradox/typed/actor-discovery.md +++ b/akka-docs/src/main/paradox/typed/actor-discovery.md @@ -73,8 +73,15 @@ sends a `Ping` message and when receiving the `Pong` reply it stops. ## Cluster Receptionist -The `Receptionist` also works in a cluster, an actor registered to the receptionist will appear in the receptionist of the other nodes of the cluster. +The `Receptionist` also works in a cluster, an actor registered to the receptionist will appear in the receptionist +of the other nodes of the cluster. -The state for the receptionist is propagated via @ref:[distributed data](../distributed-data.md) which means that each node will eventually reach the same set of actors per `ServiceKey`. +The state for the receptionist is propagated via @ref:[distributed data](../distributed-data.md) which means that each node +will eventually reach the same set of actors per `ServiceKey`. -One important difference from a local only receptions is the serialisation concerns, all messages sent to and back from an actor on another node must be serializable, see @ref:[clustering](cluster.md#serialization). +`Subscription`s and `Find` queries to a clustered receptionist will keep track of cluster reachability and only list +registered actors that are reachable. The full set of actors, including unreachable ones, is available through +@scala[`Listing.allServiceInstances`]@java[`Listing.getAllServiceInstances`]. + +One important difference from local only receptions are the serialization concerns, all messages sent to and back from +an actor on another node must be serializable, see @ref:[clustering](cluster.md#serialization). diff --git a/akka-docs/src/main/paradox/typed/routers.md b/akka-docs/src/main/paradox/typed/routers.md index bdc4bd1a320..90c2ff7b1d2 100644 --- a/akka-docs/src/main/paradox/typed/routers.md +++ b/akka-docs/src/main/paradox/typed/routers.md @@ -43,12 +43,16 @@ Java The group router is created with a `ServiceKey` and uses the receptionist (see @ref:[Receptionist](actor-discovery.md#receptionist)) to discover available actors for that key and routes messages to one of the currently known registered actors for a key. -Since the receptionist is used this means the group router is cluster aware out of the box and will pick up routees -registered on any node in the cluster (there is currently no logic to avoid routing to unreachable nodes, see [#26355](https://github.com/akka/akka/issues/26355)). +Since the receptionist is used this means the group router is cluster aware out of the box. The router route +messages to registered actors on any node in the cluster that is reachable. If no reachable actor exists the router +will fallback and route messages to actors on nodes marked as unreachable. -It also means that the set of routees is eventually consistent, and that immediately when the group router is started -the set of routees it knows about is empty. When the set of routees is empty messages sent to the router is forwarded -to dead letters. +That the receptionist is used also means that the set of routees is eventually consistent, and that immediately when +the group router is started the set of routees it knows about is empty, until it has seen a listing from the receptionist +it stashes incoming messages and forwards them as soon as it gets a listing from the receptionist. + +When the router has received a listing from the receptionist and the set of registered actors is empty the router will +drop them (published them to the event stream as `akka.actor.Dropped`). Scala : @@snip [RouterSpec.scala](/akka-actor-typed-tests/src/test/scala/docs/akka/typed/RouterSpec.scala) { #group } @@ -88,4 +92,4 @@ it will not give better performance to create more routees than there are thread Since the router itself is an actor and has a mailbox this means that messages are routed sequentially to the routees where it can be processed in parallel (depending on the available threads in the dispatcher). -In a high throughput use cases the sequential routing could be a bottle neck. Akka Typed does not provide an optimized tool for this. +In a high throughput use cases the sequential routing could be a bottle neck. Akka Typed does not provide an optimized tool for this. \ No newline at end of file