Skip to content

Commit

Permalink
Create MappedAllocationStrategy - add unit tests, do minor renaming
Browse files Browse the repository at this point in the history
  • Loading branch information
sergiy0 committed Aug 14, 2017
1 parent 6991f7f commit fdb1215
Show file tree
Hide file tree
Showing 6 changed files with 249 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -252,7 +252,7 @@ object AdaptiveAllocationStrategy {
nodesToDeallocate: () => Set[Address])
(implicit system: ActorSystem, ec: ExecutionContext): AdaptiveAllocationStrategy = {
// proxy doesn't depend on typeName, it should just start once
val proxy = AdaptiveAllocationStrategyDistributedDataProxy(system).ref
val proxy = AdaptiveAllocationStrategyDDProxy(system).ref
new AdaptiveAllocationStrategy(
typeName = typeName,
rebalanceThresholdPercent = rebalanceThresholdPercent,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,10 @@ import akka.cluster.ddata._

import scala.compat.Platform

class AdaptiveAllocationStrategyDistributedDataProxy extends Actor with ActorLogging {
class AdaptiveAllocationStrategyDDProxy extends Actor with ActorLogging {

import AdaptiveAllocationStrategy._
import AdaptiveAllocationStrategyDistributedDataProxy._
import AdaptiveAllocationStrategyDDProxy._

implicit lazy val node = Cluster(context.system)
private val selfAddress = node.selfAddress.toString
Expand Down Expand Up @@ -119,9 +119,9 @@ class AdaptiveAllocationStrategyDistributedDataProxy extends Actor with ActorLog
}
}

object AdaptiveAllocationStrategyDistributedDataProxy extends ExtensionId[ActorRefExtension] {
object AdaptiveAllocationStrategyDDProxy extends ExtensionId[ActorRefExtension] {
override def createExtension(system: ExtendedActorSystem): ActorRefExtension =
new ActorRefExtension(system actorOf Props[AdaptiveAllocationStrategyDistributedDataProxy])
new ActorRefExtension(system actorOf Props[AdaptiveAllocationStrategyDDProxy])

// DData key of entityToNodeCounters map
private[cluster] val EntityToNodeCountersKey: ORMultiMapKey[String, String] =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,10 +46,10 @@ class MappedAllocationStrategy(
logger debug
s"rebalance $typeName: currentShardAllocations = $currentShardAllocations, rebalanceInProgress = $rebalanceInProgress"

val result = (for {
val result = ((for {
(ref, shards) <- currentShardAllocations
shardId <- shards if !(shardToRegionMapping get EntityKey(typeName, shardId) contains ref)
} yield shardId).toSet
} yield shardId).toSet -- rebalanceInProgress) take maxSimultaneousRebalance

if (result.nonEmpty) logger info s"Rebalance $typeName\n\t" +
s"current:${ currentShardAllocations.mkString("\n\t\t", "\n\t\t", "") }\n\t" +
Expand All @@ -68,7 +68,7 @@ object MappedAllocationStrategy {
maxSimultaneousRebalance: Int)
(implicit system: ActorSystem): MappedAllocationStrategy = {
// proxy doesn't depend on typeName, it should just start once
val proxy = MappedAllocationStrategyDistributedDataProxy(system).ref
val proxy = MappedAllocationStrategyDDProxy(system).ref
new MappedAllocationStrategy(
typeName = typeName,
fallbackStrategy = fallbackStrategy,
Expand All @@ -92,7 +92,6 @@ object MappedAllocationStrategy {
case class UpdateMapping(typeName: String, id: ShardRegion.ShardId, regionRef: ActorRef)
case class Clear(typeName: String, id: ShardRegion.ShardId)

// TODO: check for thread-safety
@volatile
private[cluster] var shardToRegionMapping: Map[EntityKey, ActorRef] = Map.empty
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,17 +20,16 @@ import akka.cluster.Cluster
import akka.cluster.ddata.Replicator._
import akka.cluster.ddata._

class MappedAllocationStrategyDistributedDataProxy extends Actor with ActorLogging {
class MappedAllocationStrategyDDProxy extends Actor with ActorLogging {

import MappedAllocationStrategy._
import MappedAllocationStrategyDistributedDataProxy._
import MappedAllocationStrategyDDProxy._

implicit lazy val node = Cluster(context.system)
lazy val replicator: ActorRef = DistributedData(context.system).replicator
private val emptyMap = LWWMap.empty[String, ActorRef]

replicator ! Subscribe(MappingKey, self)

replicator ! Subscribe(MappingKey, self)

def receive: Receive = {
case UpdateMapping(typeName, id, regionRef) =>
Expand Down Expand Up @@ -60,9 +59,9 @@ class MappedAllocationStrategyDistributedDataProxy extends Actor with ActorLoggi
}
}

object MappedAllocationStrategyDistributedDataProxy extends ExtensionId[ActorRefExtension] {
object MappedAllocationStrategyDDProxy extends ExtensionId[ActorRefExtension] {
override def createExtension(system: ExtendedActorSystem): ActorRefExtension =
new ActorRefExtension(system actorOf Props[MappedAllocationStrategyDistributedDataProxy])
new ActorRefExtension(system actorOf Props[MappedAllocationStrategyDDProxy])

// DData key of ShardToRegionMapping map
private[cluster] val MappingKey: LWWMapKey[String, ActorRef] =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,9 @@ import scala.compat.Platform
import scala.concurrent.Future
import scala.concurrent.duration._

class AdaptiveAllocationStrategyDistributedDataSpec extends AllocationStrategySpec {
class AdaptiveAllocationStrategyDDSpec extends AllocationStrategySpec {

import AdaptiveAllocationStrategyDistributedDataProxy._
import AdaptiveAllocationStrategyDDProxy._

"AdaptiveAllocationStrategy" should "correctly increment and clear a counter" in new Scope {

Expand Down Expand Up @@ -109,7 +109,7 @@ class AdaptiveAllocationStrategyDistributedDataSpec extends AllocationStrategySp
}
}

it should "allocate a shard on the requester node if the counters is empty" in new Scope {
it should "allocate a shard using the fallback strategy if the counters is empty" in new Scope {

val requester = TestProbe().testActor

Expand Down Expand Up @@ -155,7 +155,7 @@ class AdaptiveAllocationStrategyDistributedDataSpec extends AllocationStrategySp
currentShardAllocations = noShard1ShardAllocations).futureValue shouldBe anotherAddressRef2
}

it should "allocate a shard on a node (local) with the biggest counter value (respect cummulative home node counter)" in new Scope {
it should "allocate a shard on a node (local) with the biggest counter value (respect cumulative home node counter)" in new Scope {

proxy ! Changed(EntityToNodeCountersKey)(map)

Expand All @@ -173,7 +173,7 @@ class AdaptiveAllocationStrategyDistributedDataSpec extends AllocationStrategySp
currentShardAllocations = noShard1ShardAllocations).futureValue shouldBe localAddressRef
}

it should "allocate a shard on a node (remote) with the biggest counter value (respect cummulative home node counter)" in new Scope {
it should "allocate a shard on a node (remote) with the biggest counter value (respect cumulative home node counter)" in new Scope {

proxy ! Changed(EntityToNodeCountersKey)(map)

Expand Down Expand Up @@ -440,7 +440,7 @@ class AdaptiveAllocationStrategyDistributedDataSpec extends AllocationStrategySp
case Subscribe(EntityToNodeCountersKey, _) =>
}

class TestProxy extends AdaptiveAllocationStrategyDistributedDataProxy {
class TestProxy extends AdaptiveAllocationStrategyDDProxy {
override lazy val replicator = testActor
override implicit lazy val node = clusterNode
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,230 @@
/*
* Copyright 2016-2017 Evolution Gaming Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.evolutiongaming.cluster

import akka.actor._
import akka.cluster.ddata.LWWMap
import akka.cluster.ddata.Replicator.{Changed, Subscribe, Update, WriteLocal}
import akka.cluster.sharding.ShardRegion
import akka.cluster.sharding.ShardRegion.ShardId
import akka.cluster.{Cluster, UniqueAddress}
import akka.testkit.TestProbe
import org.mockito.Mockito._

import scala.collection.immutable
import scala.concurrent.Future

class MappedAllocationStrategyDDSpec extends AllocationStrategySpec {

import MappedAllocationStrategyDDProxy._

"MappedAllocationStrategy" should "correctly update its internal mapping" in new Scope {

strategy.mapShardToRegion(entityId, anotherAddressRef1)

expectMsgPF() {
case Update(`MappingKey`, WriteLocal, _) =>
}

val mapping = MappedAllocationStrategy.shardToRegionMapping get expectedEntityKey
mapping.value shouldBe anotherAddressRef1
}

it should "correctly process map updates from distributed data" in new Scope {

MappedAllocationStrategy.shardToRegionMapping should have size 0

override val map = LWWMap.empty[String, ActorRef] + (expectedEntityKey.toString -> anotherAddressRef2)

proxy ! Changed(MappingKey)(map)

eventually {
val mapping = MappedAllocationStrategy.shardToRegionMapping get expectedEntityKey
mapping.value shouldBe anotherAddressRef2
}
}

it should "allocate a shard using the fallback strategy if the mapping is empty" in new Scope {

val requester = TestProbe().testActor

strategy.allocateShard(
requester = requester,
shardId = entityId,
currentShardAllocations = noShard1ShardAllocations).futureValue shouldBe requester
}

it should "allocate a shard on a node (local) by the mapping" in new Scope {

proxy ! Changed(MappingKey)(map)

eventually {
MappedAllocationStrategy.shardToRegionMapping should have size 4
}

strategy.allocateShard(
requester = TestProbe().testActor,
shardId = entityId1,
currentShardAllocations = noShard1ShardAllocations).futureValue shouldBe localAddressRef
}

it should "allocate a shard on a node (remote) by the mapping" in new Scope {

override val map = LWWMap.empty[String, ActorRef] +
(entityKey1.toString -> anotherAddressRef2) +
(entityKey2.toString -> anotherAddressRef1) +
(entityKey3.toString -> anotherAddressRef1) +
(entityKey4.toString -> localAddressRef)

proxy ! Changed(MappingKey)(map)

eventually {
MappedAllocationStrategy.shardToRegionMapping should have size 4
}

strategy.allocateShard(
requester = TestProbe().testActor,
shardId = entityId1,
currentShardAllocations = noShard1ShardAllocations).futureValue shouldBe anotherAddressRef2
}

it should "rebalance shards if there is difference between mapping and current allocation" in new Scope {

proxy ! Changed(MappingKey)(map)

eventually {
MappedAllocationStrategy.shardToRegionMapping should have size 4
}

// should rebalance 1,4
val result1 = strategy.rebalance(
shardAllocations,
rebalanceInProgress = Set.empty[ShardId]).futureValue

result1 shouldBe Set(entityId1, entityId4)

// 1 is in progress - should rebalance 4
val result2 = strategy.rebalance(
shardAllocations,
rebalanceInProgress = Set[ShardId](entityId1)).futureValue

result2 shouldBe Set(entityId4)


// 1,4 is in progress - should not rebalance
val result3 = strategy.rebalance(
shardAllocations,
rebalanceInProgress = Set[ShardId](entityId4, entityId1)).futureValue

result3 shouldBe Set()

// limit rebalance to 1 - should rebalance 1
val strategy1 = new MappedAllocationStrategy(
typeName = TypeName,
fallbackStrategy = fallbackStrategy,
proxy = proxy,
maxSimultaneousRebalance = 1)

val result5 = strategy1.rebalance(
shardAllocations,
rebalanceInProgress = Set.empty[ShardId]).futureValue

result5 shouldBe Set(entityId1)
}

abstract class Scope extends AllocationStrategyScope {

val MaxSimultaneousRebalance: Int = 10

MappedAllocationStrategy.shardToRegionMapping = Map.empty

val uniqueAddress = UniqueAddress(Address("protocol", "system", "127.0.0.1", 1234), 1L)
implicit val clusterNode = mock[Cluster]
when(clusterNode.selfUniqueAddress) thenReturn uniqueAddress
when(clusterNode.selfAddress) thenReturn uniqueAddress.address

val TypeName = "typeName"

val selfAddress = clusterNode.selfAddress.toString

def entityKey(entityId: String) = MappedAllocationStrategy.EntityKey(TypeName, entityId)

val entityId = "entityId"
val expectedEntityKey = entityKey(entityId)
val entityId1 = "1"
val entityKey1 = entityKey(entityId1)
val entityId2 = "2"
val entityKey2 = entityKey(entityId2)
val entityId3 = "3"
val entityKey3 = entityKey(entityId3)
val entityId4 = "4"
val entityKey4 = entityKey(entityId4)

val localAddressRef = mockedAddressRef(clusterNode.selfAddress)
val anotherAddressRef1 = mockedHostRef("anotherAddress1")
val anotherAddressRef2 = mockedHostRef("anotherAddress2")

val anotherAddress1 = anotherAddressRef1.path.address.toString
val anotherAddress2 = anotherAddressRef2.path.address.toString

val map = LWWMap.empty[String, ActorRef] +
(entityKey1.toString -> localAddressRef) +
(entityKey2.toString -> anotherAddressRef1) +
(entityKey3.toString -> anotherAddressRef2) +
(entityKey4.toString -> anotherAddressRef2)

val shardAllocations = Map[ActorRef, immutable.IndexedSeq[ShardId]](
anotherAddressRef1 -> immutable.IndexedSeq[ShardId](entityId1, entityId2),
anotherAddressRef2 -> immutable.IndexedSeq[ShardId](entityId3),
localAddressRef -> immutable.IndexedSeq[ShardId](entityId4))

val noShard1ShardAllocations = Map[ActorRef, immutable.IndexedSeq[ShardId]](
anotherAddressRef1 -> immutable.IndexedSeq[ShardId](entityId2),
anotherAddressRef2 -> immutable.IndexedSeq[ShardId](entityId3),
localAddressRef -> immutable.IndexedSeq[ShardId](entityId4))

val proxy = system actorOf Props(new TestProxy)

val fallbackStrategy = new ExtendedShardAllocationStrategy {
val maxSimultaneousRebalance = MaxSimultaneousRebalance
val nodesToDeallocate = () => Set.empty[Address]

protected def doAllocate(requester: ActorRef, shardId: ShardId,
currentShardAllocations: Map[ActorRef, immutable.IndexedSeq[ShardId]]): Future[ActorRef] =
Future successful requester

override protected def doRebalance(
currentShardAllocations: Map[ActorRef, immutable.IndexedSeq[ShardRegion.ShardId]],
rebalanceInProgress: Set[ShardRegion.ShardId]): Future[Set[ShardRegion.ShardId]] =
Future successful Set.empty
}

val strategy = new MappedAllocationStrategy(
typeName = TypeName,
fallbackStrategy = fallbackStrategy,
proxy = proxy,
maxSimultaneousRebalance = MaxSimultaneousRebalance)

expectMsgPF() {
case Subscribe(`MappingKey`, _) =>
}

class TestProxy extends MappedAllocationStrategyDDProxy {
override lazy val replicator = testActor
override implicit lazy val node = clusterNode
}
}
}

0 comments on commit fdb1215

Please sign in to comment.