diff --git a/src/main/scala/com/evolutiongaming/cluster/AdaptiveAllocationStrategy.scala b/src/main/scala/com/evolutiongaming/cluster/AdaptiveAllocationStrategy.scala index 31c4267..8b9c103 100644 --- a/src/main/scala/com/evolutiongaming/cluster/AdaptiveAllocationStrategy.scala +++ b/src/main/scala/com/evolutiongaming/cluster/AdaptiveAllocationStrategy.scala @@ -252,7 +252,7 @@ object AdaptiveAllocationStrategy { nodesToDeallocate: () => Set[Address]) (implicit system: ActorSystem, ec: ExecutionContext): AdaptiveAllocationStrategy = { // proxy doesn't depend on typeName, it should just start once - val proxy = AdaptiveAllocationStrategyDistributedDataProxy(system).ref + val proxy = AdaptiveAllocationStrategyDDProxy(system).ref new AdaptiveAllocationStrategy( typeName = typeName, rebalanceThresholdPercent = rebalanceThresholdPercent, diff --git a/src/main/scala/com/evolutiongaming/cluster/AdaptiveAllocationStrategyDistributedDataProxy.scala b/src/main/scala/com/evolutiongaming/cluster/AdaptiveAllocationStrategyDDProxy.scala similarity index 94% rename from src/main/scala/com/evolutiongaming/cluster/AdaptiveAllocationStrategyDistributedDataProxy.scala rename to src/main/scala/com/evolutiongaming/cluster/AdaptiveAllocationStrategyDDProxy.scala index 0254fb2..8e2d973 100644 --- a/src/main/scala/com/evolutiongaming/cluster/AdaptiveAllocationStrategyDistributedDataProxy.scala +++ b/src/main/scala/com/evolutiongaming/cluster/AdaptiveAllocationStrategyDDProxy.scala @@ -22,10 +22,10 @@ import akka.cluster.ddata._ import scala.compat.Platform -class AdaptiveAllocationStrategyDistributedDataProxy extends Actor with ActorLogging { +class AdaptiveAllocationStrategyDDProxy extends Actor with ActorLogging { import AdaptiveAllocationStrategy._ - import AdaptiveAllocationStrategyDistributedDataProxy._ + import AdaptiveAllocationStrategyDDProxy._ implicit lazy val node = Cluster(context.system) private val selfAddress = node.selfAddress.toString @@ -119,9 +119,9 @@ class AdaptiveAllocationStrategyDistributedDataProxy extends Actor with ActorLog } } -object AdaptiveAllocationStrategyDistributedDataProxy extends ExtensionId[ActorRefExtension] { +object AdaptiveAllocationStrategyDDProxy extends ExtensionId[ActorRefExtension] { override def createExtension(system: ExtendedActorSystem): ActorRefExtension = - new ActorRefExtension(system actorOf Props[AdaptiveAllocationStrategyDistributedDataProxy]) + new ActorRefExtension(system actorOf Props[AdaptiveAllocationStrategyDDProxy]) // DData key of entityToNodeCounters map private[cluster] val EntityToNodeCountersKey: ORMultiMapKey[String, String] = diff --git a/src/main/scala/com/evolutiongaming/cluster/MappedAllocationStrategy.scala b/src/main/scala/com/evolutiongaming/cluster/MappedAllocationStrategy.scala index 8a73089..12d442f 100644 --- a/src/main/scala/com/evolutiongaming/cluster/MappedAllocationStrategy.scala +++ b/src/main/scala/com/evolutiongaming/cluster/MappedAllocationStrategy.scala @@ -46,10 +46,10 @@ class MappedAllocationStrategy( logger debug s"rebalance $typeName: currentShardAllocations = $currentShardAllocations, rebalanceInProgress = $rebalanceInProgress" - val result = (for { + val result = ((for { (ref, shards) <- currentShardAllocations shardId <- shards if !(shardToRegionMapping get EntityKey(typeName, shardId) contains ref) - } yield shardId).toSet + } yield shardId).toSet -- rebalanceInProgress) take maxSimultaneousRebalance if (result.nonEmpty) logger info s"Rebalance $typeName\n\t" + s"current:${ currentShardAllocations.mkString("\n\t\t", "\n\t\t", "") }\n\t" + @@ -68,7 +68,7 @@ object MappedAllocationStrategy { maxSimultaneousRebalance: Int) (implicit system: ActorSystem): MappedAllocationStrategy = { // proxy doesn't depend on typeName, it should just start once - val proxy = MappedAllocationStrategyDistributedDataProxy(system).ref + val proxy = MappedAllocationStrategyDDProxy(system).ref new MappedAllocationStrategy( typeName = typeName, fallbackStrategy = fallbackStrategy, @@ -92,7 +92,6 @@ object MappedAllocationStrategy { case class UpdateMapping(typeName: String, id: ShardRegion.ShardId, regionRef: ActorRef) case class Clear(typeName: String, id: ShardRegion.ShardId) - // TODO: check for thread-safety @volatile private[cluster] var shardToRegionMapping: Map[EntityKey, ActorRef] = Map.empty } \ No newline at end of file diff --git a/src/main/scala/com/evolutiongaming/cluster/MappedAllocationStrategyDistributedDataProxy.scala b/src/main/scala/com/evolutiongaming/cluster/MappedAllocationStrategyDDProxy.scala similarity index 88% rename from src/main/scala/com/evolutiongaming/cluster/MappedAllocationStrategyDistributedDataProxy.scala rename to src/main/scala/com/evolutiongaming/cluster/MappedAllocationStrategyDDProxy.scala index e1339ff..29928d6 100644 --- a/src/main/scala/com/evolutiongaming/cluster/MappedAllocationStrategyDistributedDataProxy.scala +++ b/src/main/scala/com/evolutiongaming/cluster/MappedAllocationStrategyDDProxy.scala @@ -20,17 +20,16 @@ import akka.cluster.Cluster import akka.cluster.ddata.Replicator._ import akka.cluster.ddata._ -class MappedAllocationStrategyDistributedDataProxy extends Actor with ActorLogging { +class MappedAllocationStrategyDDProxy extends Actor with ActorLogging { import MappedAllocationStrategy._ - import MappedAllocationStrategyDistributedDataProxy._ + import MappedAllocationStrategyDDProxy._ implicit lazy val node = Cluster(context.system) lazy val replicator: ActorRef = DistributedData(context.system).replicator private val emptyMap = LWWMap.empty[String, ActorRef] - replicator ! Subscribe(MappingKey, self) - + replicator ! Subscribe(MappingKey, self) def receive: Receive = { case UpdateMapping(typeName, id, regionRef) => @@ -60,9 +59,9 @@ class MappedAllocationStrategyDistributedDataProxy extends Actor with ActorLoggi } } -object MappedAllocationStrategyDistributedDataProxy extends ExtensionId[ActorRefExtension] { +object MappedAllocationStrategyDDProxy extends ExtensionId[ActorRefExtension] { override def createExtension(system: ExtendedActorSystem): ActorRefExtension = - new ActorRefExtension(system actorOf Props[MappedAllocationStrategyDistributedDataProxy]) + new ActorRefExtension(system actorOf Props[MappedAllocationStrategyDDProxy]) // DData key of ShardToRegionMapping map private[cluster] val MappingKey: LWWMapKey[String, ActorRef] = diff --git a/src/test/scala/com/evolutiongaming/cluster/AdaptiveAllocationStrategyDistributedDataSpec.scala b/src/test/scala/com/evolutiongaming/cluster/AdaptiveAllocationStrategyDDSpec.scala similarity index 97% rename from src/test/scala/com/evolutiongaming/cluster/AdaptiveAllocationStrategyDistributedDataSpec.scala rename to src/test/scala/com/evolutiongaming/cluster/AdaptiveAllocationStrategyDDSpec.scala index f460bce..cd47430 100644 --- a/src/test/scala/com/evolutiongaming/cluster/AdaptiveAllocationStrategyDistributedDataSpec.scala +++ b/src/test/scala/com/evolutiongaming/cluster/AdaptiveAllocationStrategyDDSpec.scala @@ -32,9 +32,9 @@ import scala.compat.Platform import scala.concurrent.Future import scala.concurrent.duration._ -class AdaptiveAllocationStrategyDistributedDataSpec extends AllocationStrategySpec { +class AdaptiveAllocationStrategyDDSpec extends AllocationStrategySpec { - import AdaptiveAllocationStrategyDistributedDataProxy._ + import AdaptiveAllocationStrategyDDProxy._ "AdaptiveAllocationStrategy" should "correctly increment and clear a counter" in new Scope { @@ -109,7 +109,7 @@ class AdaptiveAllocationStrategyDistributedDataSpec extends AllocationStrategySp } } - it should "allocate a shard on the requester node if the counters is empty" in new Scope { + it should "allocate a shard using the fallback strategy if the counters is empty" in new Scope { val requester = TestProbe().testActor @@ -155,7 +155,7 @@ class AdaptiveAllocationStrategyDistributedDataSpec extends AllocationStrategySp currentShardAllocations = noShard1ShardAllocations).futureValue shouldBe anotherAddressRef2 } - it should "allocate a shard on a node (local) with the biggest counter value (respect cummulative home node counter)" in new Scope { + it should "allocate a shard on a node (local) with the biggest counter value (respect cumulative home node counter)" in new Scope { proxy ! Changed(EntityToNodeCountersKey)(map) @@ -173,7 +173,7 @@ class AdaptiveAllocationStrategyDistributedDataSpec extends AllocationStrategySp currentShardAllocations = noShard1ShardAllocations).futureValue shouldBe localAddressRef } - it should "allocate a shard on a node (remote) with the biggest counter value (respect cummulative home node counter)" in new Scope { + it should "allocate a shard on a node (remote) with the biggest counter value (respect cumulative home node counter)" in new Scope { proxy ! Changed(EntityToNodeCountersKey)(map) @@ -440,7 +440,7 @@ class AdaptiveAllocationStrategyDistributedDataSpec extends AllocationStrategySp case Subscribe(EntityToNodeCountersKey, _) => } - class TestProxy extends AdaptiveAllocationStrategyDistributedDataProxy { + class TestProxy extends AdaptiveAllocationStrategyDDProxy { override lazy val replicator = testActor override implicit lazy val node = clusterNode } diff --git a/src/test/scala/com/evolutiongaming/cluster/MappedAllocationStrategyDDSpec.scala b/src/test/scala/com/evolutiongaming/cluster/MappedAllocationStrategyDDSpec.scala new file mode 100644 index 0000000..f16bd61 --- /dev/null +++ b/src/test/scala/com/evolutiongaming/cluster/MappedAllocationStrategyDDSpec.scala @@ -0,0 +1,230 @@ +/* + * Copyright 2016-2017 Evolution Gaming Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.evolutiongaming.cluster + +import akka.actor._ +import akka.cluster.ddata.LWWMap +import akka.cluster.ddata.Replicator.{Changed, Subscribe, Update, WriteLocal} +import akka.cluster.sharding.ShardRegion +import akka.cluster.sharding.ShardRegion.ShardId +import akka.cluster.{Cluster, UniqueAddress} +import akka.testkit.TestProbe +import org.mockito.Mockito._ + +import scala.collection.immutable +import scala.concurrent.Future + +class MappedAllocationStrategyDDSpec extends AllocationStrategySpec { + + import MappedAllocationStrategyDDProxy._ + + "MappedAllocationStrategy" should "correctly update its internal mapping" in new Scope { + + strategy.mapShardToRegion(entityId, anotherAddressRef1) + + expectMsgPF() { + case Update(`MappingKey`, WriteLocal, _) => + } + + val mapping = MappedAllocationStrategy.shardToRegionMapping get expectedEntityKey + mapping.value shouldBe anotherAddressRef1 + } + + it should "correctly process map updates from distributed data" in new Scope { + + MappedAllocationStrategy.shardToRegionMapping should have size 0 + + override val map = LWWMap.empty[String, ActorRef] + (expectedEntityKey.toString -> anotherAddressRef2) + + proxy ! Changed(MappingKey)(map) + + eventually { + val mapping = MappedAllocationStrategy.shardToRegionMapping get expectedEntityKey + mapping.value shouldBe anotherAddressRef2 + } + } + + it should "allocate a shard using the fallback strategy if the mapping is empty" in new Scope { + + val requester = TestProbe().testActor + + strategy.allocateShard( + requester = requester, + shardId = entityId, + currentShardAllocations = noShard1ShardAllocations).futureValue shouldBe requester + } + + it should "allocate a shard on a node (local) by the mapping" in new Scope { + + proxy ! Changed(MappingKey)(map) + + eventually { + MappedAllocationStrategy.shardToRegionMapping should have size 4 + } + + strategy.allocateShard( + requester = TestProbe().testActor, + shardId = entityId1, + currentShardAllocations = noShard1ShardAllocations).futureValue shouldBe localAddressRef + } + + it should "allocate a shard on a node (remote) by the mapping" in new Scope { + + override val map = LWWMap.empty[String, ActorRef] + + (entityKey1.toString -> anotherAddressRef2) + + (entityKey2.toString -> anotherAddressRef1) + + (entityKey3.toString -> anotherAddressRef1) + + (entityKey4.toString -> localAddressRef) + + proxy ! Changed(MappingKey)(map) + + eventually { + MappedAllocationStrategy.shardToRegionMapping should have size 4 + } + + strategy.allocateShard( + requester = TestProbe().testActor, + shardId = entityId1, + currentShardAllocations = noShard1ShardAllocations).futureValue shouldBe anotherAddressRef2 + } + + it should "rebalance shards if there is difference between mapping and current allocation" in new Scope { + + proxy ! Changed(MappingKey)(map) + + eventually { + MappedAllocationStrategy.shardToRegionMapping should have size 4 + } + + // should rebalance 1,4 + val result1 = strategy.rebalance( + shardAllocations, + rebalanceInProgress = Set.empty[ShardId]).futureValue + + result1 shouldBe Set(entityId1, entityId4) + + // 1 is in progress - should rebalance 4 + val result2 = strategy.rebalance( + shardAllocations, + rebalanceInProgress = Set[ShardId](entityId1)).futureValue + + result2 shouldBe Set(entityId4) + + + // 1,4 is in progress - should not rebalance + val result3 = strategy.rebalance( + shardAllocations, + rebalanceInProgress = Set[ShardId](entityId4, entityId1)).futureValue + + result3 shouldBe Set() + + // limit rebalance to 1 - should rebalance 1 + val strategy1 = new MappedAllocationStrategy( + typeName = TypeName, + fallbackStrategy = fallbackStrategy, + proxy = proxy, + maxSimultaneousRebalance = 1) + + val result5 = strategy1.rebalance( + shardAllocations, + rebalanceInProgress = Set.empty[ShardId]).futureValue + + result5 shouldBe Set(entityId1) + } + + abstract class Scope extends AllocationStrategyScope { + + val MaxSimultaneousRebalance: Int = 10 + + MappedAllocationStrategy.shardToRegionMapping = Map.empty + + val uniqueAddress = UniqueAddress(Address("protocol", "system", "127.0.0.1", 1234), 1L) + implicit val clusterNode = mock[Cluster] + when(clusterNode.selfUniqueAddress) thenReturn uniqueAddress + when(clusterNode.selfAddress) thenReturn uniqueAddress.address + + val TypeName = "typeName" + + val selfAddress = clusterNode.selfAddress.toString + + def entityKey(entityId: String) = MappedAllocationStrategy.EntityKey(TypeName, entityId) + + val entityId = "entityId" + val expectedEntityKey = entityKey(entityId) + val entityId1 = "1" + val entityKey1 = entityKey(entityId1) + val entityId2 = "2" + val entityKey2 = entityKey(entityId2) + val entityId3 = "3" + val entityKey3 = entityKey(entityId3) + val entityId4 = "4" + val entityKey4 = entityKey(entityId4) + + val localAddressRef = mockedAddressRef(clusterNode.selfAddress) + val anotherAddressRef1 = mockedHostRef("anotherAddress1") + val anotherAddressRef2 = mockedHostRef("anotherAddress2") + + val anotherAddress1 = anotherAddressRef1.path.address.toString + val anotherAddress2 = anotherAddressRef2.path.address.toString + + val map = LWWMap.empty[String, ActorRef] + + (entityKey1.toString -> localAddressRef) + + (entityKey2.toString -> anotherAddressRef1) + + (entityKey3.toString -> anotherAddressRef2) + + (entityKey4.toString -> anotherAddressRef2) + + val shardAllocations = Map[ActorRef, immutable.IndexedSeq[ShardId]]( + anotherAddressRef1 -> immutable.IndexedSeq[ShardId](entityId1, entityId2), + anotherAddressRef2 -> immutable.IndexedSeq[ShardId](entityId3), + localAddressRef -> immutable.IndexedSeq[ShardId](entityId4)) + + val noShard1ShardAllocations = Map[ActorRef, immutable.IndexedSeq[ShardId]]( + anotherAddressRef1 -> immutable.IndexedSeq[ShardId](entityId2), + anotherAddressRef2 -> immutable.IndexedSeq[ShardId](entityId3), + localAddressRef -> immutable.IndexedSeq[ShardId](entityId4)) + + val proxy = system actorOf Props(new TestProxy) + + val fallbackStrategy = new ExtendedShardAllocationStrategy { + val maxSimultaneousRebalance = MaxSimultaneousRebalance + val nodesToDeallocate = () => Set.empty[Address] + + protected def doAllocate(requester: ActorRef, shardId: ShardId, + currentShardAllocations: Map[ActorRef, immutable.IndexedSeq[ShardId]]): Future[ActorRef] = + Future successful requester + + override protected def doRebalance( + currentShardAllocations: Map[ActorRef, immutable.IndexedSeq[ShardRegion.ShardId]], + rebalanceInProgress: Set[ShardRegion.ShardId]): Future[Set[ShardRegion.ShardId]] = + Future successful Set.empty + } + + val strategy = new MappedAllocationStrategy( + typeName = TypeName, + fallbackStrategy = fallbackStrategy, + proxy = proxy, + maxSimultaneousRebalance = MaxSimultaneousRebalance) + + expectMsgPF() { + case Subscribe(`MappingKey`, _) => + } + + class TestProxy extends MappedAllocationStrategyDDProxy { + override lazy val replicator = testActor + override implicit lazy val node = clusterNode + } + } +}