Navigation Menu

Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft, exploratory implementation of ConsistentHashingRoutingLogic #28064

Closed
wants to merge 8 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -3,14 +3,17 @@
*/

package akka.actor.typed.internal.routing
import akka.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit
import akka.actor.testkit.typed.scaladsl.TestProbe
import akka.actor.testkit.typed.scaladsl.LogCapturing
import org.scalatest.Matchers
import org.scalatest.WordSpecLike
import akka.actor.testkit.typed.scaladsl.{ LogCapturing, ScalaTestWithActorTestKit, TestProbe }
import akka.actor.typed.internal.routing.RoutingLogics.ConsistentHashingLogic
import akka.actor.typed.internal.routing.RoutingLogics.ConsistentHashingLogic.ConsistentHashMapping
import akka.actor.typed.scaladsl.Behaviors
import akka.actor.typed.{ ActorSystem, Behavior }
import org.scalatest.{ Matchers, WordSpecLike }

class RoutingLogicSpec extends ScalaTestWithActorTestKit with WordSpecLike with Matchers with LogCapturing {

val emptyMessage: Any = ""

"The round robin routing logic" must {

"round robin" in {
Expand All @@ -22,10 +25,10 @@ class RoutingLogicSpec extends ScalaTestWithActorTestKit with WordSpecLike with
val logic = new RoutingLogics.RoundRobinLogic[Any]

logic.routeesUpdated(allRoutees)
logic.selectRoutee() should ===(refA)
logic.selectRoutee() should ===(refB)
logic.selectRoutee() should ===(refC)
logic.selectRoutee() should ===(refA)
logic.selectRoutee(emptyMessage) should ===(refA)
logic.selectRoutee(emptyMessage) should ===(refB)
logic.selectRoutee(emptyMessage) should ===(refC)
logic.selectRoutee(emptyMessage) should ===(refA)
}

"not skip one on removal" in {
Expand All @@ -36,12 +39,12 @@ class RoutingLogicSpec extends ScalaTestWithActorTestKit with WordSpecLike with

val logic = new RoutingLogics.RoundRobinLogic[Any]
logic.routeesUpdated(allRoutees)
logic.selectRoutee() should ===(refA)
logic.selectRoutee() should ===(refB)
logic.selectRoutee(emptyMessage) should ===(refA)
logic.selectRoutee(emptyMessage) should ===(refB)

val bRemoved = Set(refA, refC)
logic.routeesUpdated(bRemoved)
logic.selectRoutee() should ===(refC)
logic.selectRoutee(emptyMessage) should ===(refC)
}

"handle last one removed" in {
Expand All @@ -51,11 +54,11 @@ class RoutingLogicSpec extends ScalaTestWithActorTestKit with WordSpecLike with

val logic = new RoutingLogics.RoundRobinLogic[Any]
logic.routeesUpdated(allRoutees)
logic.selectRoutee() should ===(refA)
logic.selectRoutee(emptyMessage) should ===(refA)

val bRemoved = Set(refA)
logic.routeesUpdated(bRemoved)
logic.selectRoutee() should ===(refA)
logic.selectRoutee(emptyMessage) should ===(refA)
}

"move on to next when several removed" in {
Expand All @@ -68,12 +71,12 @@ class RoutingLogicSpec extends ScalaTestWithActorTestKit with WordSpecLike with

val logic = new RoutingLogics.RoundRobinLogic[Any]
logic.routeesUpdated(allRoutees)
logic.selectRoutee() should ===(refA)
logic.selectRoutee() should ===(refB)
logic.selectRoutee(emptyMessage) should ===(refA)
logic.selectRoutee(emptyMessage) should ===(refB)

val severalRemoved = Set(refA, refC)
logic.routeesUpdated(severalRemoved)
logic.selectRoutee() should ===(refC)
logic.selectRoutee(emptyMessage) should ===(refC)
}

"wrap around when several removed" in {
Expand All @@ -86,13 +89,13 @@ class RoutingLogicSpec extends ScalaTestWithActorTestKit with WordSpecLike with

val logic = new RoutingLogics.RoundRobinLogic[Any]
logic.routeesUpdated(allRoutees)
logic.selectRoutee() should ===(refA)
logic.selectRoutee() should ===(refB)
logic.selectRoutee() should ===(refC)
logic.selectRoutee(emptyMessage) should ===(refA)
logic.selectRoutee(emptyMessage) should ===(refB)
logic.selectRoutee(emptyMessage) should ===(refC)

val severalRemoved = Set(refA, refC)
logic.routeesUpdated(severalRemoved)
logic.selectRoutee() should ===(refA)
logic.selectRoutee(emptyMessage) should ===(refA)
}

"pick first in with a completely new set of routees" in {
Expand All @@ -105,13 +108,13 @@ class RoutingLogicSpec extends ScalaTestWithActorTestKit with WordSpecLike with

val logic = new RoutingLogics.RoundRobinLogic[Any]
logic.routeesUpdated(initialRoutees)
logic.selectRoutee() should ===(refA)
logic.selectRoutee() should ===(refB)
logic.selectRoutee() should ===(refA)
logic.selectRoutee(emptyMessage) should ===(refA)
logic.selectRoutee(emptyMessage) should ===(refB)
logic.selectRoutee(emptyMessage) should ===(refA)

val severalRemoved = Set(refC, refD)
logic.routeesUpdated(severalRemoved)
logic.selectRoutee() should ===(refC)
logic.selectRoutee(emptyMessage) should ===(refC)
}

}
Expand All @@ -129,11 +132,92 @@ class RoutingLogicSpec extends ScalaTestWithActorTestKit with WordSpecLike with

(0 to 10).foreach { _ =>
// not much to verify here, but let's exercise it at least
val routee = logic.selectRoutee()
val routee = logic.selectRoutee(emptyMessage)
routees should contain(routee)
}

}

}

"The consistent hashing logic" must {
val behavior: Behavior[Int] = Behaviors.empty[Int]
val typedSystem: ActorSystem[Int] = ActorSystem(behavior, "emptySystem")
val modulo10Mapping: ConsistentHashMapping[Int] = { case in: Int => (in % 10).toString }
val messages: Map[Any, Seq[Int]] = (1 to 1000).groupBy(modulo10Mapping)

"not accept virtualization factor lesser than 1" in {
val caught = intercept[IllegalArgumentException] {
new RoutingLogics.ConsistentHashingLogic[Int](0, ConsistentHashingLogic.emptyHashMapping, typedSystem)
}
caught.getMessage shouldEqual "virtualNodesFactor must be >= 1"
}

"not accept null actor system" in {
val caught = intercept[IllegalArgumentException] {
new RoutingLogics.ConsistentHashingLogic[String](2, ConsistentHashingLogic.emptyHashMapping, null)
}
caught.getMessage shouldEqual "requirement failed: system argument of ConsistentHashingLogic cannot be null."
}

"return deadLetters when there are no routees" in {
val logic =
new RoutingLogics.ConsistentHashingLogic[Int](1, modulo10Mapping, typedSystem)
logic.selectRoutee(0) shouldBe typedSystem.deadLetters
}

"hash consistently" in {
consitentHashingTestWithVirtualizationFactor(1)
}

"hash consistently with virtualization factor" in {
consitentHashingTestWithVirtualizationFactor(13)
}

"hash consistently when several new added" in {
val logic =
new RoutingLogics.ConsistentHashingLogic[Int](2, modulo10Mapping, typedSystem)
val refA = TestProbe("a").ref
val refB = TestProbe("b").ref
val refC = TestProbe("c").ref
val refD = TestProbe("d").ref
logic.routeesUpdated(Set(refA, refB, refC, refD))
// every group should have the same actor ref
verifyConsistentHashing(logic)
logic.routeesUpdated(Set(refA, refB))
verifyConsistentHashing(logic)
}

"hash consistently when several new removed" in {
val logic =
new RoutingLogics.ConsistentHashingLogic[Int](2, modulo10Mapping, typedSystem)
val refA = TestProbe("a").ref
val refB = TestProbe("b").ref
val refC = TestProbe("c").ref
val refD = TestProbe("d").ref
logic.routeesUpdated(Set(refA, refB))
// every group should have the same actor ref
verifyConsistentHashing(logic)
logic.routeesUpdated(Set(refA, refB, refC, refD))
verifyConsistentHashing(logic)
}

def consitentHashingTestWithVirtualizationFactor(virtualizationFactor: Int): Boolean = {
val logic =
new RoutingLogics.ConsistentHashingLogic[Int](virtualizationFactor, modulo10Mapping, typedSystem)
val refA = TestProbe("a").ref
val refB = TestProbe("b").ref
val refC = TestProbe("c").ref
val refD = TestProbe("d").ref
logic.routeesUpdated(Set(refA, refB, refC, refD))
verifyConsistentHashing(logic)
}

def verifyConsistentHashing(logic: ConsistentHashingLogic[Int]): Boolean = {
messages.mapValues(_.map(logic.selectRoutee)).forall {
case (_, refs) => refs.headOption.forall(head => refs.forall(_ == head))
}
}

}
}
Expand Up @@ -5,13 +5,12 @@
package akka.actor.typed.scaladsl
import java.util.concurrent.atomic.AtomicInteger

import akka.actor.Dropped
import akka.actor.{ ActorSystem, Dropped }
import akka.actor.testkit.typed.scaladsl.LoggingTestKit
import akka.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit
import akka.actor.testkit.typed.scaladsl.TestProbe
import akka.actor.testkit.typed.scaladsl.LogCapturing
import akka.actor.typed.ActorRef
import akka.actor.typed.Behavior
import akka.actor.typed.{ ActorRef, ActorSystem => TypedActorSystem, Behavior }
import akka.actor.typed.eventstream.EventStream
import akka.actor.typed.internal.routing.GroupRouterImpl
import akka.actor.typed.internal.routing.RoutingLogics
Expand All @@ -26,13 +25,16 @@ class RoutersSpec extends ScalaTestWithActorTestKit("""
""") with WordSpecLike with Matchers with LogCapturing {

// needed for the event filter
implicit val classicSystem = system.toClassic
implicit val classicSystem: ActorSystem = system.toClassic

def compileOnlyApiCoverage(): Unit = {
Routers.group(ServiceKey[String]("key")).withRandomRouting().withRoundRobinRouting()

Routers.pool(10)(Behaviors.empty[Any]).withRandomRouting()
Routers.pool(10)(Behaviors.empty[Any]).withRoundRobinRouting()
Routers
.pool(10)(Behaviors.empty[Any])
.withConsistentHashingRouting(actorSystem = TypedActorSystem(Behaviors.empty[Any], "emptySystem"))
}

"The router pool" must {
Expand Down
Expand Up @@ -7,6 +7,7 @@ package akka.actor.typed.internal.routing
import akka.actor.Dropped
import akka.actor.typed._
import akka.actor.typed.eventstream.EventStream
import akka.actor.typed.internal.routing.RoutingLogics.ConsistentHashingLogic.ConsistentHashMapping
import akka.actor.typed.receptionist.Receptionist
import akka.actor.typed.receptionist.ServiceKey
import akka.actor.typed.scaladsl.{ AbstractBehavior, ActorContext, StashBuffer }
Expand All @@ -31,6 +32,11 @@ private[akka] final case class GroupRouterBuilder[T] private[akka] (

def withRoundRobinRouting(): GroupRouterBuilder[T] = copy(logicFactory = () => new RoutingLogics.RoundRobinLogic[T])

override def withConsistentHashingRouting(
virtualNodesFactor: Int,
mapping: ConsistentHashMapping[T],
system: ActorSystem[T]): GroupRouterBuilder[T] =
copy(logicFactory = () => new RoutingLogics.ConsistentHashingLogic[T](virtualNodesFactor, mapping, system))
}

/**
Expand Down Expand Up @@ -98,7 +104,7 @@ private[akka] final class GroupRouterImpl[T](
this
case msg: T @unchecked =>
import akka.actor.typed.scaladsl.adapter._
if (!routeesEmpty) routingLogic.selectRoutee() ! msg
if (!routeesEmpty) routingLogic.selectRoutee(msg) ! msg
else
context.system.eventStream ! EventStream.Publish(
Dropped(msg, s"No routees in group router for [$serviceKey]", context.self.toClassic))
Expand Down
Expand Up @@ -5,9 +5,9 @@
package akka.actor.typed.internal.routing

import akka.actor.typed._
import akka.actor.typed.scaladsl.AbstractBehavior
import akka.actor.typed.scaladsl.ActorContext
import akka.actor.typed.scaladsl.Behaviors
import akka.actor.typed.internal.routing.RoutingLogics.ConsistentHashingLogic
import akka.actor.typed.internal.routing.RoutingLogics.ConsistentHashingLogic.ConsistentHashMapping
import akka.actor.typed.scaladsl.{AbstractBehavior, ActorContext, Behaviors}
import akka.annotation.InternalApi

/**
Expand All @@ -30,6 +30,12 @@ private[akka] final case class PoolRouterBuilder[T](

def withRoundRobinRouting(): PoolRouterBuilder[T] = copy(logicFactory = () => new RoutingLogics.RoundRobinLogic[T])

def withConsistentHashingRouting(
virtualNodesFactor: Int = 0,
mapping: ConsistentHashMapping[T] = ConsistentHashingLogic.emptyHashMapping,
system: ActorSystem[T]): PoolRouterBuilder[T] =
copy(logicFactory = () => new RoutingLogics.ConsistentHashingLogic[T](virtualNodesFactor, mapping, system))

def withPoolSize(poolSize: Int): PoolRouterBuilder[T] = copy(poolSize = poolSize)
}

Expand Down Expand Up @@ -57,7 +63,7 @@ private final class PoolRouterImpl[T](
}

def onMessage(msg: T): Behavior[T] = {
logic.selectRoutee() ! msg
logic.selectRoutee(msg) ! msg
this
}

Expand Down