From 2a568f26213cdcf206c2b2a383bce960da5f1952 Mon Sep 17 00:00:00 2001 From: Sergiy Prydatchenko Date: Thu, 18 May 2017 15:57:00 +0300 Subject: [PATCH] Add a forced by-nodes shards deallocation to all the allocation strategies: post-code-review changes --- .../cluster/AdaptiveAllocationStrategy.scala | 16 ++++---- .../cluster/DirectAllocationStrategy.scala | 13 +++---- .../cluster/DualAllocationStrategy.scala | 13 +++---- .../ExtendedShardAllocationStrategy.scala | 38 ++++--------------- .../cluster/RequesterAllocationStrategy.scala | 9 ++--- .../SingleNodeAllocationStrategy.scala | 7 ++-- ...llocationStrategyDistributedDataSpec.scala | 14 +++---- 7 files changed, 42 insertions(+), 68 deletions(-) diff --git a/src/main/scala/com/evolutiongaming/cluster/AdaptiveAllocationStrategy.scala b/src/main/scala/com/evolutiongaming/cluster/AdaptiveAllocationStrategy.scala index 06dbf5a..e6913bd 100644 --- a/src/main/scala/com/evolutiongaming/cluster/AdaptiveAllocationStrategy.scala +++ b/src/main/scala/com/evolutiongaming/cluster/AdaptiveAllocationStrategy.scala @@ -35,16 +35,16 @@ import scala.concurrent.duration.FiniteDuration */ class AdaptiveAllocationStrategy( typeName: String, - maxSimultaneousRebalance: Int, rebalanceThresholdPercent: Int, cleanupPeriod: FiniteDuration, metricRegistry: MetricRegistry, countControl: CountControl.Type, fallbackStrategy: ShardAllocationStrategy, proxy: ActorRef, - deallocationTimeout: FiniteDuration, + val maxSimultaneousRebalance: Int, + val nodesToDeallocate: () => Set[Address], lowTrafficThreshold: Int = 10)(implicit system: ActorSystem, ec: ExecutionContext) - extends ExtendedShardAllocationStrategy(system, ec, maxSimultaneousRebalance, deallocationTimeout) with LazyLogging { + extends ExtendedShardAllocationStrategy with LazyLogging { import AdaptiveAllocationStrategy._ import addressHelper._ @@ -226,26 +226,26 @@ object AdaptiveAllocationStrategy { def apply( typeName: String, - maxSimultaneousRebalance: Int, rebalanceThresholdPercent: Int, cleanupPeriod: FiniteDuration, metricRegistry: MetricRegistry, countControl: CountControl.Type = CountControl.Empty, fallbackStrategy: ShardAllocationStrategy, - deallocationTimeout: FiniteDuration) + maxSimultaneousRebalance: Int, + nodesToDeallocate: () => Set[Address]) (implicit system: ActorSystem, ec: ExecutionContext): AdaptiveAllocationStrategy = { // proxy doesn't depend on typeName, it should just start once val proxy = AdaptiveAllocationStrategyDistributedDataProxy(system).ref new AdaptiveAllocationStrategy( typeName = typeName, - maxSimultaneousRebalance = maxSimultaneousRebalance, rebalanceThresholdPercent = rebalanceThresholdPercent, cleanupPeriod = cleanupPeriod, metricRegistry = metricRegistry, - countControl, + countControl = countControl, fallbackStrategy = fallbackStrategy, proxy = proxy, - deallocationTimeout = deallocationTimeout)(system, ec) + maxSimultaneousRebalance = maxSimultaneousRebalance, + nodesToDeallocate = nodesToDeallocate)(system, ec) } case class EntityKey(typeName: String, id: ShardRegion.ShardId) { diff --git a/src/main/scala/com/evolutiongaming/cluster/DirectAllocationStrategy.scala b/src/main/scala/com/evolutiongaming/cluster/DirectAllocationStrategy.scala index e4c9261..b0f0ed9 100644 --- a/src/main/scala/com/evolutiongaming/cluster/DirectAllocationStrategy.scala +++ b/src/main/scala/com/evolutiongaming/cluster/DirectAllocationStrategy.scala @@ -15,22 +15,21 @@ */ package com.evolutiongaming.cluster -import akka.actor.{ActorRef, ActorSystem} +import akka.actor.{ActorRef, ActorSystem, Address} import akka.cluster.sharding.ShardCoordinator.ShardAllocationStrategy import akka.cluster.sharding.ShardRegion import com.typesafe.scalalogging.LazyLogging import scala.collection.immutable import scala.concurrent.{ExecutionContext, Future} -import scala.concurrent.duration.FiniteDuration // this AllocationStrategy is for debug purposes only class DirectAllocationStrategy( fallbackStrategy: ShardAllocationStrategy, readSettings: () => Option[Map[ShardRegion.ShardId, String]], - maxSimultaneousRebalance: Int, - deallocationTimeout: FiniteDuration)(implicit system: ActorSystem, ec: ExecutionContext) - extends ExtendedShardAllocationStrategy(system, ec, maxSimultaneousRebalance, deallocationTimeout) with LazyLogging { + val maxSimultaneousRebalance: Int, + val nodesToDeallocate: () => Set[Address])(implicit system: ActorSystem, ec: ExecutionContext) + extends ExtendedShardAllocationStrategy with LazyLogging { import addressHelper._ @@ -104,12 +103,12 @@ object DirectAllocationStrategy { fallbackStrategy: ShardAllocationStrategy, readSettings: () => Option[String], maxSimultaneousRebalance: Int, - deallocationTimeout: FiniteDuration)(implicit system: ActorSystem, ec: ExecutionContext): DirectAllocationStrategy = + nodesToDeallocate: () => Set[Address])(implicit system: ActorSystem, ec: ExecutionContext): DirectAllocationStrategy = new DirectAllocationStrategy( fallbackStrategy, readAndParseSettings(readSettings), maxSimultaneousRebalance, - deallocationTimeout) + nodesToDeallocate) private def readAndParseSettings( readSettings: () => Option[String]): () => Option[Map[ShardRegion.ShardId, String]] = diff --git a/src/main/scala/com/evolutiongaming/cluster/DualAllocationStrategy.scala b/src/main/scala/com/evolutiongaming/cluster/DualAllocationStrategy.scala index c1158c2..0e1b385 100644 --- a/src/main/scala/com/evolutiongaming/cluster/DualAllocationStrategy.scala +++ b/src/main/scala/com/evolutiongaming/cluster/DualAllocationStrategy.scala @@ -15,22 +15,21 @@ */ package com.evolutiongaming.cluster -import akka.actor.{ActorRef, ActorSystem} +import akka.actor.{ActorRef, ActorSystem, Address} import akka.cluster.sharding.ShardCoordinator.ShardAllocationStrategy import akka.cluster.sharding.ShardRegion import com.typesafe.scalalogging.LazyLogging import scala.collection.immutable import scala.concurrent.{ExecutionContext, Future} -import scala.concurrent.duration.FiniteDuration class DualAllocationStrategy( baseAllocationStrategy: ShardAllocationStrategy, additionalAllocationStrategy: ShardAllocationStrategy, readSettings: () => Option[Set[ShardRegion.ShardId]], - maxSimultaneousRebalance: Int, - deallocationTimeout: FiniteDuration)(implicit system: ActorSystem, ec: ExecutionContext) - extends ExtendedShardAllocationStrategy(system, ec, maxSimultaneousRebalance, deallocationTimeout) with LazyLogging { + val maxSimultaneousRebalance: Int, + val nodesToDeallocate: () => Set[Address])(implicit system: ActorSystem, ec: ExecutionContext) + extends ExtendedShardAllocationStrategy with LazyLogging { @volatile private var additionalShardIds = Set.empty[ShardRegion.ShardId] @@ -87,13 +86,13 @@ object DualAllocationStrategy { additionalAllocationStrategy: ShardAllocationStrategy, readSettings: () => Option[String], maxSimultaneousRebalance: Int, - deallocationTimeout: FiniteDuration)(implicit system: ActorSystem, ec: ExecutionContext): DualAllocationStrategy = + nodesToDeallocate: () => Set[Address])(implicit system: ActorSystem, ec: ExecutionContext): DualAllocationStrategy = new DualAllocationStrategy( baseAllocationStrategy, additionalAllocationStrategy, readAndParseSettings(readSettings), maxSimultaneousRebalance, - deallocationTimeout) + nodesToDeallocate) private def readAndParseSettings( readSettings: () => Option[String]): () => Option[Set[ShardRegion.ShardId]] = diff --git a/src/main/scala/com/evolutiongaming/cluster/ExtendedShardAllocationStrategy.scala b/src/main/scala/com/evolutiongaming/cluster/ExtendedShardAllocationStrategy.scala index 90f6255..3dca94c 100644 --- a/src/main/scala/com/evolutiongaming/cluster/ExtendedShardAllocationStrategy.scala +++ b/src/main/scala/com/evolutiongaming/cluster/ExtendedShardAllocationStrategy.scala @@ -1,38 +1,22 @@ package com.evolutiongaming.cluster -import java.time.Instant - import akka.actor.{ActorRef, ActorSystem, Address} import akka.cluster.sharding.ShardCoordinator.ShardAllocationStrategy import akka.cluster.sharding.ShardRegion import scala.collection.immutable import scala.concurrent.{ExecutionContext, Future} -import scala.concurrent.duration._ abstract class ExtendedShardAllocationStrategy( - system: ActorSystem, - implicit val ec: ExecutionContext, - maxSimultaneousRebalance: Int, - deallocationTimeout: FiniteDuration) extends ShardAllocationStrategy { + implicit system: ActorSystem, + ec: ExecutionContext) extends ShardAllocationStrategy { + + protected def nodesToDeallocate: () => Set[Address] val addressHelper = AddressHelperExtension(system) import addressHelper._ - class Node(val address: Address, added: Instant = Instant.now()) { - def expired: Boolean = Instant.now() isAfter (added plusMillis deallocationTimeout.toMillis) - override def equals(obj: Any): Boolean = obj match { - case node: Node => address equals node.address - case _ => false - } - override def hashCode(): Int = address.hashCode - } - - @volatile - private var nodesToForcedDeallocation: Set[Node] = Set.empty - - def deallocateShardsFromNode(regionGlobalAddress: Address): Unit = - nodesToForcedDeallocation += new Node(regionGlobalAddress) + protected def maxSimultaneousRebalance: Int protected def doRebalance( currentShardAllocations: Map[ActorRef, immutable.IndexedSeq[ShardRegion.ShardId]], @@ -46,20 +30,14 @@ abstract class ExtendedShardAllocationStrategy( if (rebalanceInProgress.size >= maxSimultaneousRebalance) Set.empty else f take maxSimultaneousRebalance + val nodesToForcedDeallocation = nodesToDeallocate() + val shardsToRebalance: Future[Set[ShardRegion.ShardId]] = if (nodesToForcedDeallocation.isEmpty) { doRebalance(currentShardAllocations, rebalanceInProgress) } else { - val emptyNodes = for { - (k, v) <- currentShardAllocations if v.isEmpty - } yield new Node(k.path.address.global) - - val nodesToRemove = (nodesToForcedDeallocation filter (_.expired)) ++ emptyNodes.toSet - - nodesToForcedDeallocation = nodesToForcedDeallocation -- nodesToRemove - val shardsToForcedDeallocation = (for { - (k, v) <- currentShardAllocations if nodesToForcedDeallocation contains new Node(k.path.address.global) + (k, v) <- currentShardAllocations if nodesToForcedDeallocation contains k.path.address.global } yield v).flatten.toSet -- rebalanceInProgress for { diff --git a/src/main/scala/com/evolutiongaming/cluster/RequesterAllocationStrategy.scala b/src/main/scala/com/evolutiongaming/cluster/RequesterAllocationStrategy.scala index 8ad18cf..0f1bcf2 100644 --- a/src/main/scala/com/evolutiongaming/cluster/RequesterAllocationStrategy.scala +++ b/src/main/scala/com/evolutiongaming/cluster/RequesterAllocationStrategy.scala @@ -1,19 +1,18 @@ package com.evolutiongaming.cluster -import akka.actor.{ActorRef, ActorSystem} +import akka.actor.{ActorRef, ActorSystem, Address} import akka.cluster.sharding.ShardRegion import scala.collection.immutable import scala.concurrent.{ExecutionContext, Future} -import scala.concurrent.duration.FiniteDuration // Allocate all shards on requester nodes // to be used primarily for debugging purposes and, // for example, as fallbackAllocationStrategy in DirectAllocationStrategy class RequesterAllocationStrategy( - maxSimultaneousRebalance: Int, - deallocationTimeout: FiniteDuration)(implicit system: ActorSystem, ec: ExecutionContext) - extends ExtendedShardAllocationStrategy(system, ec, maxSimultaneousRebalance, deallocationTimeout) { + val maxSimultaneousRebalance: Int, + val nodesToDeallocate: () => Set[Address])(implicit system: ActorSystem, ec: ExecutionContext) + extends ExtendedShardAllocationStrategy { def allocateShard( requester: ActorRef, diff --git a/src/main/scala/com/evolutiongaming/cluster/SingleNodeAllocationStrategy.scala b/src/main/scala/com/evolutiongaming/cluster/SingleNodeAllocationStrategy.scala index 1e7617c..689d9e9 100644 --- a/src/main/scala/com/evolutiongaming/cluster/SingleNodeAllocationStrategy.scala +++ b/src/main/scala/com/evolutiongaming/cluster/SingleNodeAllocationStrategy.scala @@ -7,13 +7,12 @@ import akka.cluster.sharding.ShardRegion.ShardId import scala.collection.immutable.IndexedSeq import scala.concurrent.{ExecutionContext, Future} -import scala.concurrent.duration.FiniteDuration class SingleNodeAllocationStrategy( address: => Option[Address], - maxSimultaneousRebalance: Int, - deallocationTimeout: FiniteDuration)(implicit system: ActorSystem, ec: ExecutionContext) - extends ExtendedShardAllocationStrategy(system, ec, maxSimultaneousRebalance, deallocationTimeout) { + val maxSimultaneousRebalance: Int, + val nodesToDeallocate: () => Set[Address])(implicit system: ActorSystem, ec: ExecutionContext) + extends ExtendedShardAllocationStrategy { private lazy val leastShardAllocation = new LeastShardAllocationStrategy( rebalanceThreshold = 10, diff --git a/src/test/scala/com/evolutiongaming/cluster/AdaptiveAllocationStrategyDistributedDataSpec.scala b/src/test/scala/com/evolutiongaming/cluster/AdaptiveAllocationStrategyDistributedDataSpec.scala index f827f00..98f4311 100644 --- a/src/test/scala/com/evolutiongaming/cluster/AdaptiveAllocationStrategyDistributedDataSpec.scala +++ b/src/test/scala/com/evolutiongaming/cluster/AdaptiveAllocationStrategyDistributedDataSpec.scala @@ -309,14 +309,14 @@ class AdaptiveAllocationStrategyDistributedDataSpec extends FlatSpec // limit rebalance to 1 - should rebalance 4 val strategy1 = new AdaptiveAllocationStrategy( typeName = TypeName, - maxSimultaneousRebalance = 1, rebalanceThresholdPercent = RebalanceThresholdPercent, cleanupPeriod = CleanupPeriod, metricRegistry = metricRegistry, countControl = CountControl.Increment, fallbackStrategy = fallbackStrategy, proxy = proxy, - deallocationTimeout = deallocationTimeout) + maxSimultaneousRebalance = 1, + nodesToDeallocate = () => Set.empty) val result5 = strategy1.rebalance( shardAllocations, @@ -437,20 +437,20 @@ class AdaptiveAllocationStrategyDistributedDataSpec extends FlatSpec val proxy = system actorOf Props(new TestProxy) - val deallocationTimeout = 5.minutes - - val fallbackStrategy = new RequesterAllocationStrategy(MaxSimultaneousRebalance, deallocationTimeout) + val fallbackStrategy = new RequesterAllocationStrategy( + maxSimultaneousRebalance = MaxSimultaneousRebalance, + nodesToDeallocate = () => Set.empty) val strategy = new AdaptiveAllocationStrategy( typeName = TypeName, - maxSimultaneousRebalance = MaxSimultaneousRebalance, rebalanceThresholdPercent = RebalanceThresholdPercent, cleanupPeriod = CleanupPeriod, metricRegistry = metricRegistry, countControl = CountControl.Increment, fallbackStrategy = fallbackStrategy, proxy = proxy, - deallocationTimeout = deallocationTimeout) + maxSimultaneousRebalance = MaxSimultaneousRebalance, + nodesToDeallocate = () => Set.empty) expectMsgPF() { case Subscribe(EntityToNodeCountersKey, _) =>