From db914f8b30056d1ab31762c3de12f0ac0ba135a7 Mon Sep 17 00:00:00 2001 From: Sergiy Prydatchenko Date: Wed, 17 May 2017 23:10:29 +0300 Subject: [PATCH] Add a forced by-node shards deallocation to all the allocation strategies --- .../cluster/AdaptiveAllocationStrategy.scala | 28 +++---- .../cluster/DirectAllocationStrategy.scala | 25 +++--- .../cluster/DualAllocationStrategy.scala | 21 +++-- .../ExtendedShardAllocationStrategy.scala | 76 +++++++++++++++++++ .../cluster/ExtractShardId.scala | 9 ++- .../cluster/RequesterAllocationStrategy.scala | 14 ++-- .../SingleNodeAllocationStrategy.scala | 38 +++++----- ...llocationStrategyDistributedDataSpec.scala | 18 +++-- 8 files changed, 162 insertions(+), 67 deletions(-) create mode 100644 src/main/scala/com/evolutiongaming/cluster/ExtendedShardAllocationStrategy.scala diff --git a/src/main/scala/com/evolutiongaming/cluster/AdaptiveAllocationStrategy.scala b/src/main/scala/com/evolutiongaming/cluster/AdaptiveAllocationStrategy.scala index df38933..06dbf5a 100644 --- a/src/main/scala/com/evolutiongaming/cluster/AdaptiveAllocationStrategy.scala +++ b/src/main/scala/com/evolutiongaming/cluster/AdaptiveAllocationStrategy.scala @@ -21,10 +21,11 @@ import akka.cluster.sharding.ShardCoordinator.ShardAllocationStrategy import akka.cluster.sharding.ShardRegion import com.codahale.metrics.MetricRegistry import com.typesafe.scalalogging.LazyLogging + import scala.collection.concurrent.TrieMap import scala.collection.{immutable, mutable} import scala.compat.Platform -import scala.concurrent.Future +import scala.concurrent.{ExecutionContext, Future} import scala.concurrent.duration.FiniteDuration /** @@ -34,7 +35,6 @@ import scala.concurrent.duration.FiniteDuration */ class AdaptiveAllocationStrategy( typeName: String, - system: ActorSystem, maxSimultaneousRebalance: Int, rebalanceThresholdPercent: Int, cleanupPeriod: FiniteDuration, @@ -42,12 +42,11 @@ class AdaptiveAllocationStrategy( countControl: CountControl.Type, fallbackStrategy: ShardAllocationStrategy, proxy: ActorRef, - lowTrafficThreshold: Int = 10) extends ShardAllocationStrategy with LazyLogging { + deallocationTimeout: FiniteDuration, + lowTrafficThreshold: Int = 10)(implicit system: ActorSystem, ec: ExecutionContext) + extends ExtendedShardAllocationStrategy(system, ec, maxSimultaneousRebalance, deallocationTimeout) with LazyLogging { import AdaptiveAllocationStrategy._ - import system.dispatcher - - val addressHelper = AddressHelperExtension(system) import addressHelper._ private implicit val node = Cluster(system) @@ -145,14 +144,10 @@ class AdaptiveAllocationStrategy( } /** Should be executed every rebalance-interval only on a node with ShardCoordinator */ - def rebalance( + protected def doRebalance( currentShardAllocations: Map[ActorRef, immutable.IndexedSeq[ShardRegion.ShardId]], rebalanceInProgress: Set[ShardRegion.ShardId]): Future[Set[ShardRegion.ShardId]] = Future { - def limitRebalance(f: => Set[ShardRegion.ShardId]): Set[ShardRegion.ShardId] = - if (rebalanceInProgress.size >= maxSimultaneousRebalance) Set.empty - else f take maxSimultaneousRebalance - val entityToNodeCountersByType = entityToNodeCounters filterKeys { _.typeName == typeName } val shardsToClear = mutable.Set.empty[ShardRegion.ShardId] @@ -211,7 +206,7 @@ class AdaptiveAllocationStrategy( shard <- notRebalancingShards if rebalanceShard(shard, regionAddress) } yield shard - val result = limitRebalance(shardsToRebalance.toSet) + val result = shardsToRebalance.toSet for (id <- shardsToClear -- result) { logger debug s"Shard $typeName#$id counter cleanup" @@ -236,20 +231,21 @@ object AdaptiveAllocationStrategy { cleanupPeriod: FiniteDuration, metricRegistry: MetricRegistry, countControl: CountControl.Type = CountControl.Empty, - fallbackStrategy: ShardAllocationStrategy = new RequesterAllocationStrategy) - (implicit system: ActorSystem): AdaptiveAllocationStrategy = { + fallbackStrategy: ShardAllocationStrategy, + deallocationTimeout: FiniteDuration) + (implicit system: ActorSystem, ec: ExecutionContext): AdaptiveAllocationStrategy = { // proxy doesn't depend on typeName, it should just start once val proxy = AdaptiveAllocationStrategyDistributedDataProxy(system).ref new AdaptiveAllocationStrategy( typeName = typeName, - system = system, maxSimultaneousRebalance = maxSimultaneousRebalance, rebalanceThresholdPercent = rebalanceThresholdPercent, cleanupPeriod = cleanupPeriod, metricRegistry = metricRegistry, countControl, fallbackStrategy = fallbackStrategy, - proxy = proxy) + proxy = proxy, + deallocationTimeout = deallocationTimeout)(system, ec) } case class EntityKey(typeName: String, id: ShardRegion.ShardId) { diff --git a/src/main/scala/com/evolutiongaming/cluster/DirectAllocationStrategy.scala b/src/main/scala/com/evolutiongaming/cluster/DirectAllocationStrategy.scala index 85f208e..e4c9261 100644 --- a/src/main/scala/com/evolutiongaming/cluster/DirectAllocationStrategy.scala +++ b/src/main/scala/com/evolutiongaming/cluster/DirectAllocationStrategy.scala @@ -21,18 +21,17 @@ import akka.cluster.sharding.ShardRegion import com.typesafe.scalalogging.LazyLogging import scala.collection.immutable -import scala.concurrent.Future +import scala.concurrent.{ExecutionContext, Future} +import scala.concurrent.duration.FiniteDuration // this AllocationStrategy is for debug purposes only class DirectAllocationStrategy( fallbackStrategy: ShardAllocationStrategy, - readSettings: () => Option[Map[ShardRegion.ShardId, String]]) - (implicit system: ActorSystem) extends ShardAllocationStrategy - with LazyLogging { + readSettings: () => Option[Map[ShardRegion.ShardId, String]], + maxSimultaneousRebalance: Int, + deallocationTimeout: FiniteDuration)(implicit system: ActorSystem, ec: ExecutionContext) + extends ExtendedShardAllocationStrategy(system, ec, maxSimultaneousRebalance, deallocationTimeout) with LazyLogging { - import system.dispatcher - - val addressHelper = AddressHelperExtension(system) import addressHelper._ @volatile @@ -70,7 +69,7 @@ class DirectAllocationStrategy( } } - def rebalance( + protected def doRebalance( currentShardAllocations: Map[ActorRef, immutable.IndexedSeq[ShardRegion.ShardId]], rebalanceInProgress: Set[ShardRegion.ShardId]): Future[Set[ShardRegion.ShardId]] = { @@ -103,8 +102,14 @@ class DirectAllocationStrategy( object DirectAllocationStrategy { def apply( fallbackStrategy: ShardAllocationStrategy, - readSettings: () => Option[String])(implicit system: ActorSystem): DirectAllocationStrategy = - new DirectAllocationStrategy(fallbackStrategy, readAndParseSettings(readSettings)) + readSettings: () => Option[String], + maxSimultaneousRebalance: Int, + deallocationTimeout: FiniteDuration)(implicit system: ActorSystem, ec: ExecutionContext): DirectAllocationStrategy = + new DirectAllocationStrategy( + fallbackStrategy, + readAndParseSettings(readSettings), + maxSimultaneousRebalance, + deallocationTimeout) private def readAndParseSettings( readSettings: () => Option[String]): () => Option[Map[ShardRegion.ShardId, String]] = diff --git a/src/main/scala/com/evolutiongaming/cluster/DualAllocationStrategy.scala b/src/main/scala/com/evolutiongaming/cluster/DualAllocationStrategy.scala index 91ad3c0..c1158c2 100644 --- a/src/main/scala/com/evolutiongaming/cluster/DualAllocationStrategy.scala +++ b/src/main/scala/com/evolutiongaming/cluster/DualAllocationStrategy.scala @@ -21,15 +21,16 @@ import akka.cluster.sharding.ShardRegion import com.typesafe.scalalogging.LazyLogging import scala.collection.immutable -import scala.concurrent.Future +import scala.concurrent.{ExecutionContext, Future} +import scala.concurrent.duration.FiniteDuration class DualAllocationStrategy( baseAllocationStrategy: ShardAllocationStrategy, additionalAllocationStrategy: ShardAllocationStrategy, - readSettings: () => Option[Set[ShardRegion.ShardId]]) - (implicit system: ActorSystem) extends ShardAllocationStrategy with LazyLogging { - - import system.dispatcher + readSettings: () => Option[Set[ShardRegion.ShardId]], + maxSimultaneousRebalance: Int, + deallocationTimeout: FiniteDuration)(implicit system: ActorSystem, ec: ExecutionContext) + extends ExtendedShardAllocationStrategy(system, ec, maxSimultaneousRebalance, deallocationTimeout) with LazyLogging { @volatile private var additionalShardIds = Set.empty[ShardRegion.ShardId] @@ -49,7 +50,7 @@ class DualAllocationStrategy( baseAllocationStrategy.allocateShard(requester, shardId, currentShardAllocations) } - def rebalance( + protected def doRebalance( currentShardAllocations: Map[ActorRef, immutable.IndexedSeq[ShardRegion.ShardId]], rebalanceInProgress: Set[ShardRegion.ShardId]): Future[Set[ShardRegion.ShardId]] = { @@ -84,11 +85,15 @@ object DualAllocationStrategy { def apply( baseAllocationStrategy: ShardAllocationStrategy, additionalAllocationStrategy: ShardAllocationStrategy, - readSettings: () => Option[String])(implicit system: ActorSystem): DualAllocationStrategy = + readSettings: () => Option[String], + maxSimultaneousRebalance: Int, + deallocationTimeout: FiniteDuration)(implicit system: ActorSystem, ec: ExecutionContext): DualAllocationStrategy = new DualAllocationStrategy( baseAllocationStrategy, additionalAllocationStrategy, - readAndParseSettings(readSettings)) + readAndParseSettings(readSettings), + maxSimultaneousRebalance, + deallocationTimeout) private def readAndParseSettings( readSettings: () => Option[String]): () => Option[Set[ShardRegion.ShardId]] = diff --git a/src/main/scala/com/evolutiongaming/cluster/ExtendedShardAllocationStrategy.scala b/src/main/scala/com/evolutiongaming/cluster/ExtendedShardAllocationStrategy.scala new file mode 100644 index 0000000..90f6255 --- /dev/null +++ b/src/main/scala/com/evolutiongaming/cluster/ExtendedShardAllocationStrategy.scala @@ -0,0 +1,76 @@ +package com.evolutiongaming.cluster + +import java.time.Instant + +import akka.actor.{ActorRef, ActorSystem, Address} +import akka.cluster.sharding.ShardCoordinator.ShardAllocationStrategy +import akka.cluster.sharding.ShardRegion + +import scala.collection.immutable +import scala.concurrent.{ExecutionContext, Future} +import scala.concurrent.duration._ + +abstract class ExtendedShardAllocationStrategy( + system: ActorSystem, + implicit val ec: ExecutionContext, + maxSimultaneousRebalance: Int, + deallocationTimeout: FiniteDuration) extends ShardAllocationStrategy { + + val addressHelper = AddressHelperExtension(system) + import addressHelper._ + + class Node(val address: Address, added: Instant = Instant.now()) { + def expired: Boolean = Instant.now() isAfter (added plusMillis deallocationTimeout.toMillis) + override def equals(obj: Any): Boolean = obj match { + case node: Node => address equals node.address + case _ => false + } + override def hashCode(): Int = address.hashCode + } + + @volatile + private var nodesToForcedDeallocation: Set[Node] = Set.empty + + def deallocateShardsFromNode(regionGlobalAddress: Address): Unit = + nodesToForcedDeallocation += new Node(regionGlobalAddress) + + protected def doRebalance( + currentShardAllocations: Map[ActorRef, immutable.IndexedSeq[ShardRegion.ShardId]], + rebalanceInProgress: Set[ShardRegion.ShardId]): Future[Set[ShardRegion.ShardId]] + + final def rebalance( + currentShardAllocations: Map[ActorRef, immutable.IndexedSeq[ShardRegion.ShardId]], + rebalanceInProgress: Set[ShardRegion.ShardId]): Future[Set[ShardRegion.ShardId]] = { + + def limitRebalance(f: => Set[ShardRegion.ShardId]): Set[ShardRegion.ShardId] = + if (rebalanceInProgress.size >= maxSimultaneousRebalance) Set.empty + else f take maxSimultaneousRebalance + + val shardsToRebalance: Future[Set[ShardRegion.ShardId]] = + if (nodesToForcedDeallocation.isEmpty) { + doRebalance(currentShardAllocations, rebalanceInProgress) + } else { + val emptyNodes = for { + (k, v) <- currentShardAllocations if v.isEmpty + } yield new Node(k.path.address.global) + + val nodesToRemove = (nodesToForcedDeallocation filter (_.expired)) ++ emptyNodes.toSet + + nodesToForcedDeallocation = nodesToForcedDeallocation -- nodesToRemove + + val shardsToForcedDeallocation = (for { + (k, v) <- currentShardAllocations if nodesToForcedDeallocation contains new Node(k.path.address.global) + } yield v).flatten.toSet -- rebalanceInProgress + + for { + doRebalanceResult <- doRebalance(currentShardAllocations, rebalanceInProgress -- shardsToForcedDeallocation) + } yield shardsToForcedDeallocation ++ doRebalanceResult + } + + val result = for { + shardsToRebalance <- shardsToRebalance + } yield limitRebalance(shardsToRebalance) + + result + } +} \ No newline at end of file diff --git a/src/main/scala/com/evolutiongaming/cluster/ExtractShardId.scala b/src/main/scala/com/evolutiongaming/cluster/ExtractShardId.scala index da6a6c1..5750acc 100644 --- a/src/main/scala/com/evolutiongaming/cluster/ExtractShardId.scala +++ b/src/main/scala/com/evolutiongaming/cluster/ExtractShardId.scala @@ -28,7 +28,14 @@ object ExtractShardId extends LazyLogging { case x: ShardedMsg => x.id } - def apply(typeName: String, config: Config): ShardRegion.ExtractShardId = { + def uniform(numberOfShards: Int): ShardRegion.ExtractShardId = { + case x: ShardedMsg => + val id = x.id + val shardId = math.abs(id.hashCode % numberOfShards).toString + shardId + } + + def static(typeName: String, config: Config): ShardRegion.ExtractShardId = { // "shardId: entityId1, entityId2, entityId3" val mappingsList = Try(config.get[List[String]](s"$typeName.id-shard-mapping")) getOrElse List.empty[String] val mappingsPairList = mappingsList flatMap { mapping => diff --git a/src/main/scala/com/evolutiongaming/cluster/RequesterAllocationStrategy.scala b/src/main/scala/com/evolutiongaming/cluster/RequesterAllocationStrategy.scala index c616a21..8ad18cf 100644 --- a/src/main/scala/com/evolutiongaming/cluster/RequesterAllocationStrategy.scala +++ b/src/main/scala/com/evolutiongaming/cluster/RequesterAllocationStrategy.scala @@ -1,16 +1,19 @@ package com.evolutiongaming.cluster -import akka.actor.ActorRef -import akka.cluster.sharding.ShardCoordinator.ShardAllocationStrategy +import akka.actor.{ActorRef, ActorSystem} import akka.cluster.sharding.ShardRegion import scala.collection.immutable -import scala.concurrent.Future +import scala.concurrent.{ExecutionContext, Future} +import scala.concurrent.duration.FiniteDuration // Allocate all shards on requester nodes // to be used primarily for debugging purposes and, // for example, as fallbackAllocationStrategy in DirectAllocationStrategy -class RequesterAllocationStrategy extends ShardAllocationStrategy { +class RequesterAllocationStrategy( + maxSimultaneousRebalance: Int, + deallocationTimeout: FiniteDuration)(implicit system: ActorSystem, ec: ExecutionContext) + extends ExtendedShardAllocationStrategy(system, ec, maxSimultaneousRebalance, deallocationTimeout) { def allocateShard( requester: ActorRef, @@ -18,8 +21,7 @@ class RequesterAllocationStrategy extends ShardAllocationStrategy { currentShardAllocations: Map[ActorRef, immutable.IndexedSeq[ShardRegion.ShardId]]): Future[ActorRef] = Future.successful(requester) - - def rebalance( + protected def doRebalance( currentShardAllocations: Map[ActorRef, immutable.IndexedSeq[ShardRegion.ShardId]], rebalanceInProgress: Set[ShardRegion.ShardId]): Future[Set[ShardRegion.ShardId]] = Future.successful(Set.empty) diff --git a/src/main/scala/com/evolutiongaming/cluster/SingleNodeAllocationStrategy.scala b/src/main/scala/com/evolutiongaming/cluster/SingleNodeAllocationStrategy.scala index 55d501f..1e7617c 100644 --- a/src/main/scala/com/evolutiongaming/cluster/SingleNodeAllocationStrategy.scala +++ b/src/main/scala/com/evolutiongaming/cluster/SingleNodeAllocationStrategy.scala @@ -1,15 +1,19 @@ package com.evolutiongaming.cluster -import akka.actor.{ActorRef, Address} -import akka.cluster.sharding.ShardCoordinator.{LeastShardAllocationStrategy, ShardAllocationStrategy} +import akka.actor.{ActorRef, ActorSystem, Address} +import akka.cluster.sharding.ShardCoordinator.LeastShardAllocationStrategy +import akka.cluster.sharding.ShardRegion import akka.cluster.sharding.ShardRegion.ShardId import scala.collection.immutable.IndexedSeq -import scala.concurrent.Future +import scala.concurrent.{ExecutionContext, Future} +import scala.concurrent.duration.FiniteDuration class SingleNodeAllocationStrategy( address: => Option[Address], - maxSimultaneousRebalance: Int = 10) extends ShardAllocationStrategy { + maxSimultaneousRebalance: Int, + deallocationTimeout: FiniteDuration)(implicit system: ActorSystem, ec: ExecutionContext) + extends ExtendedShardAllocationStrategy(system, ec, maxSimultaneousRebalance, deallocationTimeout) { private lazy val leastShardAllocation = new LeastShardAllocationStrategy( rebalanceThreshold = 10, @@ -29,21 +33,15 @@ class SingleNodeAllocationStrategy( result map { Future.successful } getOrElse leastShardAllocation.allocateShard(requester, shardId, current) } - def rebalance(current: Map[ActorRef, IndexedSeq[ShardId]], rebalanceInProgress: Set[ShardId]) = { - def limitRebalance(f: => Set[ShardId]): Set[ShardId] = { - if (rebalanceInProgress.size >= maxSimultaneousRebalance) Set.empty - else f take maxSimultaneousRebalance - } - - val result = limitRebalance { - val result = for { - address <- address.toIterable - (actor, shards) <- current if actor.path.address != address - shard <- shards - } yield shard - result.toSet -- rebalanceInProgress - } - - Future successful result + protected def doRebalance( + current: Map[ActorRef, IndexedSeq[ShardId]], + rebalanceInProgress: Set[ShardId]): Future[Set[ShardRegion.ShardId]]= { + val result = for { + address <- address.toIterable + (actor, shards) <- current if actor.path.address != address + shard <- shards + } yield shard + + Future successful result.toSet -- rebalanceInProgress } } \ No newline at end of file diff --git a/src/test/scala/com/evolutiongaming/cluster/AdaptiveAllocationStrategyDistributedDataSpec.scala b/src/test/scala/com/evolutiongaming/cluster/AdaptiveAllocationStrategyDistributedDataSpec.scala index 6169eb4..f827f00 100644 --- a/src/test/scala/com/evolutiongaming/cluster/AdaptiveAllocationStrategyDistributedDataSpec.scala +++ b/src/test/scala/com/evolutiongaming/cluster/AdaptiveAllocationStrategyDistributedDataSpec.scala @@ -309,14 +309,14 @@ class AdaptiveAllocationStrategyDistributedDataSpec extends FlatSpec // limit rebalance to 1 - should rebalance 4 val strategy1 = new AdaptiveAllocationStrategy( typeName = TypeName, - system = system, maxSimultaneousRebalance = 1, rebalanceThresholdPercent = RebalanceThresholdPercent, cleanupPeriod = CleanupPeriod, metricRegistry = metricRegistry, countControl = CountControl.Increment, - fallbackStrategy = new RequesterAllocationStrategy, - proxy = proxy) + fallbackStrategy = fallbackStrategy, + proxy = proxy, + deallocationTimeout = deallocationTimeout) val result5 = strategy1.rebalance( shardAllocations, @@ -336,6 +336,8 @@ class AdaptiveAllocationStrategyDistributedDataSpec extends FlatSpec implicit val node = Cluster(system) + implicit val ec = system.dispatcher + val TypeName = "typeName" val selfAddress = node.selfAddress.toString @@ -435,16 +437,20 @@ class AdaptiveAllocationStrategyDistributedDataSpec extends FlatSpec val proxy = system actorOf Props(new TestProxy) + val deallocationTimeout = 5.minutes + + val fallbackStrategy = new RequesterAllocationStrategy(MaxSimultaneousRebalance, deallocationTimeout) + val strategy = new AdaptiveAllocationStrategy( typeName = TypeName, - system = system, maxSimultaneousRebalance = MaxSimultaneousRebalance, rebalanceThresholdPercent = RebalanceThresholdPercent, cleanupPeriod = CleanupPeriod, metricRegistry = metricRegistry, countControl = CountControl.Increment, - fallbackStrategy = new RequesterAllocationStrategy, - proxy = proxy) + fallbackStrategy = fallbackStrategy, + proxy = proxy, + deallocationTimeout = deallocationTimeout) expectMsgPF() { case Subscribe(EntityToNodeCountersKey, _) =>