diff --git a/build.sbt b/build.sbt index 3b804a6..e85c141 100644 --- a/build.sbt +++ b/build.sbt @@ -33,7 +33,7 @@ scalacOptions ++= Seq( scalacOptions in (Compile,doc) ++= Seq("-no-link-warnings") -val AkkaVersion = "2.5.1" +val AkkaVersion = "2.5.3" resolvers += Resolver.bintrayRepo("evolutiongaming", "maven") diff --git a/src/main/scala/com/evolutiongaming/cluster/ActorRefExtension.scala b/src/main/scala/com/evolutiongaming/cluster/ActorRefExtension.scala new file mode 100644 index 0000000..9171550 --- /dev/null +++ b/src/main/scala/com/evolutiongaming/cluster/ActorRefExtension.scala @@ -0,0 +1,5 @@ +package com.evolutiongaming.cluster + +import akka.actor.{ActorRef, Extension} + +class ActorRefExtension(val ref: ActorRef) extends Extension \ No newline at end of file diff --git a/src/main/scala/com/evolutiongaming/cluster/AdaptiveAllocationStrategyDistributedDataProxy.scala b/src/main/scala/com/evolutiongaming/cluster/AdaptiveAllocationStrategyDistributedDataProxy.scala index 41087a9..0254fb2 100644 --- a/src/main/scala/com/evolutiongaming/cluster/AdaptiveAllocationStrategyDistributedDataProxy.scala +++ b/src/main/scala/com/evolutiongaming/cluster/AdaptiveAllocationStrategyDistributedDataProxy.scala @@ -15,10 +15,10 @@ */ package com.evolutiongaming.cluster -import akka.actor.{Actor, ActorLogging, ActorRef, ExtendedActorSystem, Extension, ExtensionId, Props} +import akka.actor.{Actor, ActorLogging, ActorRef, ExtendedActorSystem, ExtensionId, Props} import akka.cluster.Cluster -import akka.cluster.ddata._ import akka.cluster.ddata.Replicator._ +import akka.cluster.ddata._ import scala.compat.Platform @@ -30,12 +30,12 @@ class AdaptiveAllocationStrategyDistributedDataProxy extends Actor with ActorLog implicit lazy val node = Cluster(context.system) private val selfAddress = node.selfAddress.toString lazy val replicator: ActorRef = DistributedData(context.system).replicator + private val emptyMap = ORMultiMap.empty[String, String] replicator ! Subscribe(EntityToNodeCountersKey, self) def sendBindingUpdate(entityKey: String, counterKey: String): Unit = { - val empty = ORMultiMap.empty[String, String] - replicator ! Update(EntityToNodeCountersKey, empty, WriteLocal)(_ addBinding(entityKey, counterKey)) + replicator ! Update(EntityToNodeCountersKey, emptyMap, WriteLocal)(_ addBinding(entityKey, counterKey)) } def receive: Receive = { @@ -54,11 +54,11 @@ class AdaptiveAllocationStrategyDistributedDataProxy extends Actor with ActorLog case Some(counterKeys) if counterKeys contains counterKey => case Some(counterKeys) => - entityToNodeCounters = entityToNodeCounters + (entityKey -> (counterKeys + counterKey)) + entityToNodeCounters += (entityKey -> (counterKeys + counterKey)) sendBindingUpdate(entityKey.toString, counterKey.toString) case None => - entityToNodeCounters = entityToNodeCounters + (entityKey -> Set(counterKey)) + entityToNodeCounters += (entityKey -> Set(counterKey)) sendBindingUpdate(entityKey.toString, counterKey.toString) } @@ -119,13 +119,11 @@ class AdaptiveAllocationStrategyDistributedDataProxy extends Actor with ActorLog } } -class ActorRefExtension(val ref: ActorRef) extends Extension - object AdaptiveAllocationStrategyDistributedDataProxy extends ExtensionId[ActorRefExtension] { override def createExtension(system: ExtendedActorSystem): ActorRefExtension = new ActorRefExtension(system actorOf Props[AdaptiveAllocationStrategyDistributedDataProxy]) - // DData key of the entityToNodeCounterIds map + // DData key of entityToNodeCounters map private[cluster] val EntityToNodeCountersKey: ORMultiMapKey[String, String] = ORMultiMapKey[String, String]("EntityToNodeCounters") } diff --git a/src/main/scala/com/evolutiongaming/cluster/ExtendedShardAllocationStrategy.scala b/src/main/scala/com/evolutiongaming/cluster/ExtendedShardAllocationStrategy.scala index e732252..de573c7 100644 --- a/src/main/scala/com/evolutiongaming/cluster/ExtendedShardAllocationStrategy.scala +++ b/src/main/scala/com/evolutiongaming/cluster/ExtendedShardAllocationStrategy.scala @@ -19,8 +19,8 @@ abstract class ExtendedShardAllocationStrategy( protected def maxSimultaneousRebalance: Int protected def notIgnoredNodes( - currentShardAllocations: Map[ActorRef, immutable.IndexedSeq[ShardRegion.ShardId]]): Set[ActorRef] = { - val ignoredNodes = nodesToDeallocate() + currentShardAllocations: Map[ActorRef, immutable.IndexedSeq[ShardRegion.ShardId]], + ignoredNodes: Set[Address] = nodesToDeallocate()): Set[ActorRef] = { currentShardAllocations.keySet filterNot { ref => ignoredNodes contains (addressHelper toGlobal ref.path.address) } @@ -39,7 +39,23 @@ abstract class ExtendedShardAllocationStrategy( requester: ActorRef, shardId: ShardRegion.ShardId, currentShardAllocations: Map[ActorRef, immutable.IndexedSeq[ShardRegion.ShardId]]): Future[ActorRef] = - doAllocate(requester, shardId, currentShardAllocations) + for (nodeByStrategy <- doAllocate(requester, shardId, currentShardAllocations)) yield { + val ignoredNodes = nodesToDeallocate() + + if (ignoredNodes.isEmpty) + nodeByStrategy + else { + val activeNodes = notIgnoredNodes(currentShardAllocations, ignoredNodes) + val activeAddresses = activeNodes map (_.path.address) + + if (activeAddresses contains (addressHelper toGlobal nodeByStrategy.path.address)) + nodeByStrategy + else if (activeAddresses contains (addressHelper toGlobal requester.path.address)) + requester + else + activeNodes.headOption getOrElse currentShardAllocations.keys.head + } + } final def rebalance( currentShardAllocations: Map[ActorRef, immutable.IndexedSeq[ShardRegion.ShardId]], diff --git a/src/main/scala/com/evolutiongaming/cluster/ExtractShardId.scala b/src/main/scala/com/evolutiongaming/cluster/ExtractShardId.scala index ec4af59..5aa9abe 100644 --- a/src/main/scala/com/evolutiongaming/cluster/ExtractShardId.scala +++ b/src/main/scala/com/evolutiongaming/cluster/ExtractShardId.scala @@ -25,14 +25,21 @@ import scala.util.Try object ExtractShardId extends LazyLogging { val identity: ShardRegion.ExtractShardId = { - case x: ShardedMsg => x.id + case x: ShardedMsg => x.id + case ShardRegion.StartEntity(id) => id } def uniform(numberOfShards: Int): ShardRegion.ExtractShardId = { - case x: ShardedMsg => - val id = x.id - val shardId = math.abs(id.hashCode % numberOfShards).toString - shardId + def shardId(entityId: ShardRegion.EntityId): ShardRegion.ShardId = + math.abs(entityId.hashCode % numberOfShards).toString + + { + case x: ShardedMsg => + val entityId = x.id + shardId(entityId) + case ShardRegion.StartEntity(entityId) => + shardId(entityId) + } } def static( @@ -53,14 +60,19 @@ object ExtractShardId extends LazyLogging { } val mappings = mappingsPairList.toMap logger debug s"$typeName mappings: $mappings" - val result: ShardRegion.ExtractShardId = { - case x: ShardedMsg => - val entityId = x.id - mappings get entityId match { - case Some(shardId) => shardId - case None => fallback(x) - } + + def shardId(entityId: ShardRegion.EntityId, msg: ShardRegion.Msg): ShardRegion.ShardId = + mappings get entityId match { + case Some(shardId) => shardId + case None => fallback(msg) + } + + { + case msg: ShardedMsg => + val entityId = msg.id + shardId(entityId, msg) + case msg@ShardRegion.StartEntity(entityId) => + shardId(entityId, msg) } - result } } \ No newline at end of file diff --git a/src/main/scala/com/evolutiongaming/cluster/MappedAllocationStrategy.scala b/src/main/scala/com/evolutiongaming/cluster/MappedAllocationStrategy.scala new file mode 100644 index 0000000..8a73089 --- /dev/null +++ b/src/main/scala/com/evolutiongaming/cluster/MappedAllocationStrategy.scala @@ -0,0 +1,98 @@ +package com.evolutiongaming.cluster + +import akka.actor.{ActorRef, ActorSystem} +import akka.cluster.sharding.ShardCoordinator.ShardAllocationStrategy +import akka.cluster.sharding.ShardRegion +import com.typesafe.scalalogging.LazyLogging + +import scala.collection.immutable +import scala.concurrent.Future + +class MappedAllocationStrategy( + typeName: String, + fallbackStrategy: ShardAllocationStrategy, + proxy: ActorRef, + val maxSimultaneousRebalance: Int) + extends ShardAllocationStrategy with LazyLogging { + + import MappedAllocationStrategy._ + + def mapShardToRegion(shardId: ShardRegion.ShardId, regionRef: ActorRef) = + proxy ! UpdateMapping(typeName, shardId, regionRef) + + def allocateShard( + requester: ActorRef, + shardId: ShardRegion.ShardId, + currentShardAllocations: Map[ActorRef, immutable.IndexedSeq[ShardRegion.ShardId]]): Future[ActorRef] = { + + shardToRegionMapping get EntityKey(typeName, shardId) match { + case Some(toNode) => + logger debug s"AllocateShard $typeName\n\t" + + s"shardId:\t$shardId\n\t" + + s"on node:\t$toNode\n\t" + + s"requester:\t$requester\n\t" + Future successful toNode + case None => + logger debug s"AllocateShard fallback $typeName, shardId:\t$shardId" + fallbackStrategy.allocateShard(requester, shardId, currentShardAllocations) + } + } + + + def rebalance( + currentShardAllocations: Map[ActorRef, immutable.IndexedSeq[ShardRegion.ShardId]], + rebalanceInProgress: Set[ShardRegion.ShardId]): Future[Set[ShardRegion.ShardId]] = { + + logger debug + s"rebalance $typeName: currentShardAllocations = $currentShardAllocations, rebalanceInProgress = $rebalanceInProgress" + + val result = (for { + (ref, shards) <- currentShardAllocations + shardId <- shards if !(shardToRegionMapping get EntityKey(typeName, shardId) contains ref) + } yield shardId).toSet + + if (result.nonEmpty) logger info s"Rebalance $typeName\n\t" + + s"current:${ currentShardAllocations.mkString("\n\t\t", "\n\t\t", "") }\n\t" + + s"rebalanceInProgress:\t$rebalanceInProgress\n\t" + + s"result:\t$result" + + Future successful result + } +} + +object MappedAllocationStrategy { + + def apply( + typeName: String, + fallbackStrategy: ShardAllocationStrategy, + maxSimultaneousRebalance: Int) + (implicit system: ActorSystem): MappedAllocationStrategy = { + // proxy doesn't depend on typeName, it should just start once + val proxy = MappedAllocationStrategyDistributedDataProxy(system).ref + new MappedAllocationStrategy( + typeName = typeName, + fallbackStrategy = fallbackStrategy, + proxy = proxy, + maxSimultaneousRebalance = maxSimultaneousRebalance) + } + + case class EntityKey(typeName: String, id: ShardRegion.ShardId) { + override def toString: String = s"$typeName#$id" + } + + object EntityKey { + def unapply(arg: List[String]): Option[EntityKey] = arg match { + case typeName :: id :: Nil => Some(EntityKey(typeName, id)) + case _ => None + } + + def unapply(arg: String): Option[EntityKey] = unapply((arg split "#").toList) + } + + case class UpdateMapping(typeName: String, id: ShardRegion.ShardId, regionRef: ActorRef) + case class Clear(typeName: String, id: ShardRegion.ShardId) + + // TODO: check for thread-safety + @volatile + private[cluster] var shardToRegionMapping: Map[EntityKey, ActorRef] = Map.empty +} \ No newline at end of file diff --git a/src/main/scala/com/evolutiongaming/cluster/MappedAllocationStrategyDistributedDataProxy.scala b/src/main/scala/com/evolutiongaming/cluster/MappedAllocationStrategyDistributedDataProxy.scala new file mode 100644 index 0000000..e1339ff --- /dev/null +++ b/src/main/scala/com/evolutiongaming/cluster/MappedAllocationStrategyDistributedDataProxy.scala @@ -0,0 +1,70 @@ +/* + * Copyright 2016-2017 Evolution Gaming Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.evolutiongaming.cluster + +import akka.actor.{Actor, ActorLogging, ActorRef, ExtendedActorSystem, ExtensionId, Props} +import akka.cluster.Cluster +import akka.cluster.ddata.Replicator._ +import akka.cluster.ddata._ + +class MappedAllocationStrategyDistributedDataProxy extends Actor with ActorLogging { + + import MappedAllocationStrategy._ + import MappedAllocationStrategyDistributedDataProxy._ + + implicit lazy val node = Cluster(context.system) + lazy val replicator: ActorRef = DistributedData(context.system).replicator + private val emptyMap = LWWMap.empty[String, ActorRef] + + replicator ! Subscribe(MappingKey, self) + + + def receive: Receive = { + case UpdateMapping(typeName, id, regionRef) => + val entityKey = EntityKey(typeName, id) + shardToRegionMapping += entityKey -> regionRef + replicator ! Update(MappingKey, emptyMap, WriteLocal)(_ + (entityKey.toString -> regionRef)) + + case Clear(typeName, id) => + val entityKey = EntityKey(typeName, id) + shardToRegionMapping -= entityKey + replicator ! Update(MappingKey, emptyMap, WriteLocal)(_ - entityKey.toString) + + case UpdateSuccess(_, _) => + + case UpdateTimeout(key, _) => + // probably should never happen for local updates + log warning s"Update timeout for key $key" + + case c@Changed(MappingKey) => + val newData = (c get MappingKey).entries flatMap { + case (key, ref) => + EntityKey unapply key map { entityKey => + entityKey -> ref + } + } + shardToRegionMapping = newData + } +} + +object MappedAllocationStrategyDistributedDataProxy extends ExtensionId[ActorRefExtension] { + override def createExtension(system: ExtendedActorSystem): ActorRefExtension = + new ActorRefExtension(system actorOf Props[MappedAllocationStrategyDistributedDataProxy]) + + // DData key of ShardToRegionMapping map + private[cluster] val MappingKey: LWWMapKey[String, ActorRef] = + LWWMapKey[String, ActorRef]("ShardToRegionMapping") +} diff --git a/src/test/scala/com/evolutiongaming/cluster/ExtendedShardAllocationStrategySpec.scala b/src/test/scala/com/evolutiongaming/cluster/ExtendedShardAllocationStrategySpec.scala index b5ba832..8b7bfe3 100644 --- a/src/test/scala/com/evolutiongaming/cluster/ExtendedShardAllocationStrategySpec.scala +++ b/src/test/scala/com/evolutiongaming/cluster/ExtendedShardAllocationStrategySpec.scala @@ -8,11 +8,60 @@ import scala.concurrent.Future class ExtendedShardAllocationStrategySpec extends AllocationStrategySpec { - "ExtendedShardAllocationStrategy" should "pass through requests if no nodesToDeallocate, limit result size" in new Scope { + "ExtendedShardAllocationStrategy" should "pass through allocate result if no nodesToDeallocate" in new Scope { val strategy = new TestExtendedShardAllocationStrategy() { override val nodesToDeallocate = () => Set.empty[Address] - override val result = ShardIds.toSet -- RebalanceInProgress + override val allocateResult = Some(Refs.head) + } + + strategy.allocateShard(Refs.last, ShardIds.head, CurrentShardAllocations).futureValue shouldBe Refs.head + + strategy.passedCurrentShardAllocations shouldBe Some(CurrentShardAllocations) + } + + it should "pass through allocate result if the result not in the list of ignored nodes" in new Scope { + + val strategy = new TestExtendedShardAllocationStrategy() { + override val nodesToDeallocate = () => Set(testAddress("Address2"), testAddress("Address4")) + override val allocateResult = Some(Refs.head) + } + + strategy.allocateShard(Refs.last, ShardIds.head, CurrentShardAllocations).futureValue shouldBe Refs.head + + strategy.passedCurrentShardAllocations shouldBe Some(CurrentShardAllocations) + } + + + it should "return requester if the result is in the list of ignored nodes" in new Scope { + + val strategy = new TestExtendedShardAllocationStrategy() { + override val nodesToDeallocate = () => Set(testAddress("Address1"), testAddress("Address4")) + override val allocateResult = Some(Refs.head) + } + + strategy.allocateShard(Refs.last, ShardIds.head, CurrentShardAllocations).futureValue shouldBe Refs.last + + strategy.passedCurrentShardAllocations shouldBe Some(CurrentShardAllocations) + } + + it should "return arbitrary non-ignored node if the result and requester are both in the list of ignored nodes" in new Scope { + + val strategy = new TestExtendedShardAllocationStrategy() { + override val nodesToDeallocate = () => Set(testAddress("Address1"), testAddress("Address5")) + override val allocateResult = Some(Refs.head) + } + + strategy.allocateShard(Refs.last, ShardIds.head, CurrentShardAllocations).futureValue should (not equal Refs.head and not equal Refs.last) + + strategy.passedCurrentShardAllocations shouldBe Some(CurrentShardAllocations) + } + + it should "pass through rebalance requests if no nodesToDeallocate, limit result size" in new Scope { + + val strategy = new TestExtendedShardAllocationStrategy() { + override val nodesToDeallocate = () => Set.empty[Address] + override val rebalanceResult = ShardIds.toSet -- RebalanceInProgress } strategy.rebalance(CurrentShardAllocations, RebalanceInProgress).futureValue shouldBe @@ -27,7 +76,7 @@ class ExtendedShardAllocationStrategySpec extends AllocationStrategySpec { val strategy = new TestExtendedShardAllocationStrategy() { override val maxSimultaneousRebalance = 15 override val nodesToDeallocate = () => Set(testAddress("Address2"), testAddress("Address4")) - override val result = ShardIds.slice(24, 29).toSet -- RebalanceInProgress + override val rebalanceResult = ShardIds.slice(24, 29).toSet -- RebalanceInProgress } val mandatoryRebalance = (ShardIds.slice(10, 20) ++ ShardIds.slice(30, 40)).toSet @@ -49,12 +98,17 @@ class ExtendedShardAllocationStrategySpec extends AllocationStrategySpec { i <- 1 to 50 } yield s"Shard$i" - val CurrentShardAllocations: Map[ActorRef, immutable.IndexedSeq[ShardId]] = Map( - mockedHostRef("Address1") -> ShardIds.slice(0, 10), - mockedHostRef("Address2") -> ShardIds.slice(10, 20), - mockedHostRef("Address3") -> ShardIds.slice(20, 30), - mockedHostRef("Address4") -> ShardIds.slice(30, 40), - mockedHostRef("Address5") -> ShardIds.slice(40, 50)) + val Refs = Seq( + mockedHostRef("Address1"), + mockedHostRef("Address2"), + mockedHostRef("Address3"), + mockedHostRef("Address4"), + mockedHostRef("Address5")) + + val CurrentShardAllocations: Map[ActorRef, immutable.IndexedSeq[ShardId]] = + (for { + (ref, index) <- Refs.zipWithIndex + } yield ref -> ShardIds.slice(0 + index * 10, 10 + index * 10)).toMap val RebalanceInProgress: Set[ShardId] = Set( ShardIds(0), @@ -65,7 +119,8 @@ class ExtendedShardAllocationStrategySpec extends AllocationStrategySpec { abstract class TestExtendedShardAllocationStrategy extends ExtendedShardAllocationStrategy { - def result: Set[ShardId] + def allocateResult: Option[ActorRef] = None + def rebalanceResult: Set[ShardId] = Set.empty var passedCurrentShardAllocations: Option[Map[ActorRef, immutable.IndexedSeq[ShardId]]] = None var passedRebalanceInProgress: Option[Set[ShardId]] = None @@ -77,7 +132,7 @@ class ExtendedShardAllocationStrategySpec extends AllocationStrategySpec { passedCurrentShardAllocations = Some(currentShardAllocations) passedRebalanceInProgress = Some(rebalanceInProgress) - Future successful result + Future successful rebalanceResult } protected def doAllocate( @@ -86,7 +141,7 @@ class ExtendedShardAllocationStrategySpec extends AllocationStrategySpec { currentShardAllocations: Map[ActorRef, immutable.IndexedSeq[ShardId]]): Future[ActorRef] = { passedCurrentShardAllocations = Some(currentShardAllocations) - Future successful requester + Future successful (allocateResult getOrElse requester) } } }