Skip to content

Commit

Permalink
Add a forced by-nodes shards deallocation to all the allocation strat…
Browse files Browse the repository at this point in the history
…egies: post-code-review changes
  • Loading branch information
sergiy0 committed May 18, 2017
1 parent db914f8 commit 2a568f2
Show file tree
Hide file tree
Showing 7 changed files with 42 additions and 68 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -35,16 +35,16 @@ import scala.concurrent.duration.FiniteDuration
*/
class AdaptiveAllocationStrategy(
typeName: String,
maxSimultaneousRebalance: Int,
rebalanceThresholdPercent: Int,
cleanupPeriod: FiniteDuration,
metricRegistry: MetricRegistry,
countControl: CountControl.Type,
fallbackStrategy: ShardAllocationStrategy,
proxy: ActorRef,
deallocationTimeout: FiniteDuration,
val maxSimultaneousRebalance: Int,
val nodesToDeallocate: () => Set[Address],
lowTrafficThreshold: Int = 10)(implicit system: ActorSystem, ec: ExecutionContext)
extends ExtendedShardAllocationStrategy(system, ec, maxSimultaneousRebalance, deallocationTimeout) with LazyLogging {
extends ExtendedShardAllocationStrategy with LazyLogging {

import AdaptiveAllocationStrategy._
import addressHelper._
Expand Down Expand Up @@ -226,26 +226,26 @@ object AdaptiveAllocationStrategy {

def apply(
typeName: String,
maxSimultaneousRebalance: Int,
rebalanceThresholdPercent: Int,
cleanupPeriod: FiniteDuration,
metricRegistry: MetricRegistry,
countControl: CountControl.Type = CountControl.Empty,
fallbackStrategy: ShardAllocationStrategy,
deallocationTimeout: FiniteDuration)
maxSimultaneousRebalance: Int,
nodesToDeallocate: () => Set[Address])
(implicit system: ActorSystem, ec: ExecutionContext): AdaptiveAllocationStrategy = {
// proxy doesn't depend on typeName, it should just start once
val proxy = AdaptiveAllocationStrategyDistributedDataProxy(system).ref
new AdaptiveAllocationStrategy(
typeName = typeName,
maxSimultaneousRebalance = maxSimultaneousRebalance,
rebalanceThresholdPercent = rebalanceThresholdPercent,
cleanupPeriod = cleanupPeriod,
metricRegistry = metricRegistry,
countControl,
countControl = countControl,
fallbackStrategy = fallbackStrategy,
proxy = proxy,
deallocationTimeout = deallocationTimeout)(system, ec)
maxSimultaneousRebalance = maxSimultaneousRebalance,
nodesToDeallocate = nodesToDeallocate)(system, ec)
}

case class EntityKey(typeName: String, id: ShardRegion.ShardId) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,22 +15,21 @@
*/
package com.evolutiongaming.cluster

import akka.actor.{ActorRef, ActorSystem}
import akka.actor.{ActorRef, ActorSystem, Address}
import akka.cluster.sharding.ShardCoordinator.ShardAllocationStrategy
import akka.cluster.sharding.ShardRegion
import com.typesafe.scalalogging.LazyLogging

import scala.collection.immutable
import scala.concurrent.{ExecutionContext, Future}
import scala.concurrent.duration.FiniteDuration

// this AllocationStrategy is for debug purposes only
class DirectAllocationStrategy(
fallbackStrategy: ShardAllocationStrategy,
readSettings: () => Option[Map[ShardRegion.ShardId, String]],
maxSimultaneousRebalance: Int,
deallocationTimeout: FiniteDuration)(implicit system: ActorSystem, ec: ExecutionContext)
extends ExtendedShardAllocationStrategy(system, ec, maxSimultaneousRebalance, deallocationTimeout) with LazyLogging {
val maxSimultaneousRebalance: Int,
val nodesToDeallocate: () => Set[Address])(implicit system: ActorSystem, ec: ExecutionContext)
extends ExtendedShardAllocationStrategy with LazyLogging {

import addressHelper._

Expand Down Expand Up @@ -104,12 +103,12 @@ object DirectAllocationStrategy {
fallbackStrategy: ShardAllocationStrategy,
readSettings: () => Option[String],
maxSimultaneousRebalance: Int,
deallocationTimeout: FiniteDuration)(implicit system: ActorSystem, ec: ExecutionContext): DirectAllocationStrategy =
nodesToDeallocate: () => Set[Address])(implicit system: ActorSystem, ec: ExecutionContext): DirectAllocationStrategy =
new DirectAllocationStrategy(
fallbackStrategy,
readAndParseSettings(readSettings),
maxSimultaneousRebalance,
deallocationTimeout)
nodesToDeallocate)

private def readAndParseSettings(
readSettings: () => Option[String]): () => Option[Map[ShardRegion.ShardId, String]] =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,22 +15,21 @@
*/
package com.evolutiongaming.cluster

import akka.actor.{ActorRef, ActorSystem}
import akka.actor.{ActorRef, ActorSystem, Address}
import akka.cluster.sharding.ShardCoordinator.ShardAllocationStrategy
import akka.cluster.sharding.ShardRegion
import com.typesafe.scalalogging.LazyLogging

import scala.collection.immutable
import scala.concurrent.{ExecutionContext, Future}
import scala.concurrent.duration.FiniteDuration

class DualAllocationStrategy(
baseAllocationStrategy: ShardAllocationStrategy,
additionalAllocationStrategy: ShardAllocationStrategy,
readSettings: () => Option[Set[ShardRegion.ShardId]],
maxSimultaneousRebalance: Int,
deallocationTimeout: FiniteDuration)(implicit system: ActorSystem, ec: ExecutionContext)
extends ExtendedShardAllocationStrategy(system, ec, maxSimultaneousRebalance, deallocationTimeout) with LazyLogging {
val maxSimultaneousRebalance: Int,
val nodesToDeallocate: () => Set[Address])(implicit system: ActorSystem, ec: ExecutionContext)
extends ExtendedShardAllocationStrategy with LazyLogging {

@volatile
private var additionalShardIds = Set.empty[ShardRegion.ShardId]
Expand Down Expand Up @@ -87,13 +86,13 @@ object DualAllocationStrategy {
additionalAllocationStrategy: ShardAllocationStrategy,
readSettings: () => Option[String],
maxSimultaneousRebalance: Int,
deallocationTimeout: FiniteDuration)(implicit system: ActorSystem, ec: ExecutionContext): DualAllocationStrategy =
nodesToDeallocate: () => Set[Address])(implicit system: ActorSystem, ec: ExecutionContext): DualAllocationStrategy =
new DualAllocationStrategy(
baseAllocationStrategy,
additionalAllocationStrategy,
readAndParseSettings(readSettings),
maxSimultaneousRebalance,
deallocationTimeout)
nodesToDeallocate)

private def readAndParseSettings(
readSettings: () => Option[String]): () => Option[Set[ShardRegion.ShardId]] =
Expand Down
Original file line number Diff line number Diff line change
@@ -1,38 +1,22 @@
package com.evolutiongaming.cluster

import java.time.Instant

import akka.actor.{ActorRef, ActorSystem, Address}
import akka.cluster.sharding.ShardCoordinator.ShardAllocationStrategy
import akka.cluster.sharding.ShardRegion

import scala.collection.immutable
import scala.concurrent.{ExecutionContext, Future}
import scala.concurrent.duration._

abstract class ExtendedShardAllocationStrategy(
system: ActorSystem,
implicit val ec: ExecutionContext,
maxSimultaneousRebalance: Int,
deallocationTimeout: FiniteDuration) extends ShardAllocationStrategy {
implicit system: ActorSystem,
ec: ExecutionContext) extends ShardAllocationStrategy {

protected def nodesToDeallocate: () => Set[Address]

val addressHelper = AddressHelperExtension(system)
import addressHelper._

class Node(val address: Address, added: Instant = Instant.now()) {
def expired: Boolean = Instant.now() isAfter (added plusMillis deallocationTimeout.toMillis)
override def equals(obj: Any): Boolean = obj match {
case node: Node => address equals node.address
case _ => false
}
override def hashCode(): Int = address.hashCode
}

@volatile
private var nodesToForcedDeallocation: Set[Node] = Set.empty

def deallocateShardsFromNode(regionGlobalAddress: Address): Unit =
nodesToForcedDeallocation += new Node(regionGlobalAddress)
protected def maxSimultaneousRebalance: Int

protected def doRebalance(
currentShardAllocations: Map[ActorRef, immutable.IndexedSeq[ShardRegion.ShardId]],
Expand All @@ -46,20 +30,14 @@ abstract class ExtendedShardAllocationStrategy(
if (rebalanceInProgress.size >= maxSimultaneousRebalance) Set.empty
else f take maxSimultaneousRebalance

val nodesToForcedDeallocation = nodesToDeallocate()

val shardsToRebalance: Future[Set[ShardRegion.ShardId]] =
if (nodesToForcedDeallocation.isEmpty) {
doRebalance(currentShardAllocations, rebalanceInProgress)
} else {
val emptyNodes = for {
(k, v) <- currentShardAllocations if v.isEmpty
} yield new Node(k.path.address.global)

val nodesToRemove = (nodesToForcedDeallocation filter (_.expired)) ++ emptyNodes.toSet

nodesToForcedDeallocation = nodesToForcedDeallocation -- nodesToRemove

val shardsToForcedDeallocation = (for {
(k, v) <- currentShardAllocations if nodesToForcedDeallocation contains new Node(k.path.address.global)
(k, v) <- currentShardAllocations if nodesToForcedDeallocation contains k.path.address.global
} yield v).flatten.toSet -- rebalanceInProgress

for {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,19 +1,18 @@
package com.evolutiongaming.cluster

import akka.actor.{ActorRef, ActorSystem}
import akka.actor.{ActorRef, ActorSystem, Address}
import akka.cluster.sharding.ShardRegion

import scala.collection.immutable
import scala.concurrent.{ExecutionContext, Future}
import scala.concurrent.duration.FiniteDuration

// Allocate all shards on requester nodes
// to be used primarily for debugging purposes and,
// for example, as fallbackAllocationStrategy in DirectAllocationStrategy
class RequesterAllocationStrategy(
maxSimultaneousRebalance: Int,
deallocationTimeout: FiniteDuration)(implicit system: ActorSystem, ec: ExecutionContext)
extends ExtendedShardAllocationStrategy(system, ec, maxSimultaneousRebalance, deallocationTimeout) {
val maxSimultaneousRebalance: Int,
val nodesToDeallocate: () => Set[Address])(implicit system: ActorSystem, ec: ExecutionContext)
extends ExtendedShardAllocationStrategy {

def allocateShard(
requester: ActorRef,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,12 @@ import akka.cluster.sharding.ShardRegion.ShardId

import scala.collection.immutable.IndexedSeq
import scala.concurrent.{ExecutionContext, Future}
import scala.concurrent.duration.FiniteDuration

class SingleNodeAllocationStrategy(
address: => Option[Address],
maxSimultaneousRebalance: Int,
deallocationTimeout: FiniteDuration)(implicit system: ActorSystem, ec: ExecutionContext)
extends ExtendedShardAllocationStrategy(system, ec, maxSimultaneousRebalance, deallocationTimeout) {
val maxSimultaneousRebalance: Int,
val nodesToDeallocate: () => Set[Address])(implicit system: ActorSystem, ec: ExecutionContext)
extends ExtendedShardAllocationStrategy {

private lazy val leastShardAllocation = new LeastShardAllocationStrategy(
rebalanceThreshold = 10,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -309,14 +309,14 @@ class AdaptiveAllocationStrategyDistributedDataSpec extends FlatSpec
// limit rebalance to 1 - should rebalance 4
val strategy1 = new AdaptiveAllocationStrategy(
typeName = TypeName,
maxSimultaneousRebalance = 1,
rebalanceThresholdPercent = RebalanceThresholdPercent,
cleanupPeriod = CleanupPeriod,
metricRegistry = metricRegistry,
countControl = CountControl.Increment,
fallbackStrategy = fallbackStrategy,
proxy = proxy,
deallocationTimeout = deallocationTimeout)
maxSimultaneousRebalance = 1,
nodesToDeallocate = () => Set.empty)

val result5 = strategy1.rebalance(
shardAllocations,
Expand Down Expand Up @@ -437,20 +437,20 @@ class AdaptiveAllocationStrategyDistributedDataSpec extends FlatSpec

val proxy = system actorOf Props(new TestProxy)

val deallocationTimeout = 5.minutes

val fallbackStrategy = new RequesterAllocationStrategy(MaxSimultaneousRebalance, deallocationTimeout)
val fallbackStrategy = new RequesterAllocationStrategy(
maxSimultaneousRebalance = MaxSimultaneousRebalance,
nodesToDeallocate = () => Set.empty)

val strategy = new AdaptiveAllocationStrategy(
typeName = TypeName,
maxSimultaneousRebalance = MaxSimultaneousRebalance,
rebalanceThresholdPercent = RebalanceThresholdPercent,
cleanupPeriod = CleanupPeriod,
metricRegistry = metricRegistry,
countControl = CountControl.Increment,
fallbackStrategy = fallbackStrategy,
proxy = proxy,
deallocationTimeout = deallocationTimeout)
maxSimultaneousRebalance = MaxSimultaneousRebalance,
nodesToDeallocate = () => Set.empty)

expectMsgPF() {
case Subscribe(EntityToNodeCountersKey, _) =>
Expand Down

0 comments on commit 2a568f2

Please sign in to comment.