Skip to content

Commit

Permalink
Add a forced by-node shards deallocation to all the allocation strate…
Browse files Browse the repository at this point in the history
…gies
  • Loading branch information
sergiy0 committed May 17, 2017
1 parent 7e45bdf commit db914f8
Show file tree
Hide file tree
Showing 8 changed files with 162 additions and 67 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,11 @@ import akka.cluster.sharding.ShardCoordinator.ShardAllocationStrategy
import akka.cluster.sharding.ShardRegion
import com.codahale.metrics.MetricRegistry
import com.typesafe.scalalogging.LazyLogging

import scala.collection.concurrent.TrieMap
import scala.collection.{immutable, mutable}
import scala.compat.Platform
import scala.concurrent.Future
import scala.concurrent.{ExecutionContext, Future}
import scala.concurrent.duration.FiniteDuration

/**
Expand All @@ -34,20 +35,18 @@ import scala.concurrent.duration.FiniteDuration
*/
class AdaptiveAllocationStrategy(
typeName: String,
system: ActorSystem,
maxSimultaneousRebalance: Int,
rebalanceThresholdPercent: Int,
cleanupPeriod: FiniteDuration,
metricRegistry: MetricRegistry,
countControl: CountControl.Type,
fallbackStrategy: ShardAllocationStrategy,
proxy: ActorRef,
lowTrafficThreshold: Int = 10) extends ShardAllocationStrategy with LazyLogging {
deallocationTimeout: FiniteDuration,
lowTrafficThreshold: Int = 10)(implicit system: ActorSystem, ec: ExecutionContext)
extends ExtendedShardAllocationStrategy(system, ec, maxSimultaneousRebalance, deallocationTimeout) with LazyLogging {

import AdaptiveAllocationStrategy._
import system.dispatcher

val addressHelper = AddressHelperExtension(system)
import addressHelper._

private implicit val node = Cluster(system)
Expand Down Expand Up @@ -145,14 +144,10 @@ class AdaptiveAllocationStrategy(
}

/** Should be executed every rebalance-interval only on a node with ShardCoordinator */
def rebalance(
protected def doRebalance(
currentShardAllocations: Map[ActorRef, immutable.IndexedSeq[ShardRegion.ShardId]],
rebalanceInProgress: Set[ShardRegion.ShardId]): Future[Set[ShardRegion.ShardId]] = Future {

def limitRebalance(f: => Set[ShardRegion.ShardId]): Set[ShardRegion.ShardId] =
if (rebalanceInProgress.size >= maxSimultaneousRebalance) Set.empty
else f take maxSimultaneousRebalance

val entityToNodeCountersByType = entityToNodeCounters filterKeys { _.typeName == typeName }

val shardsToClear = mutable.Set.empty[ShardRegion.ShardId]
Expand Down Expand Up @@ -211,7 +206,7 @@ class AdaptiveAllocationStrategy(
shard <- notRebalancingShards if rebalanceShard(shard, regionAddress)
} yield shard

val result = limitRebalance(shardsToRebalance.toSet)
val result = shardsToRebalance.toSet

for (id <- shardsToClear -- result) {
logger debug s"Shard $typeName#$id counter cleanup"
Expand All @@ -236,20 +231,21 @@ object AdaptiveAllocationStrategy {
cleanupPeriod: FiniteDuration,
metricRegistry: MetricRegistry,
countControl: CountControl.Type = CountControl.Empty,
fallbackStrategy: ShardAllocationStrategy = new RequesterAllocationStrategy)
(implicit system: ActorSystem): AdaptiveAllocationStrategy = {
fallbackStrategy: ShardAllocationStrategy,
deallocationTimeout: FiniteDuration)
(implicit system: ActorSystem, ec: ExecutionContext): AdaptiveAllocationStrategy = {
// proxy doesn't depend on typeName, it should just start once
val proxy = AdaptiveAllocationStrategyDistributedDataProxy(system).ref
new AdaptiveAllocationStrategy(
typeName = typeName,
system = system,
maxSimultaneousRebalance = maxSimultaneousRebalance,
rebalanceThresholdPercent = rebalanceThresholdPercent,
cleanupPeriod = cleanupPeriod,
metricRegistry = metricRegistry,
countControl,
fallbackStrategy = fallbackStrategy,
proxy = proxy)
proxy = proxy,
deallocationTimeout = deallocationTimeout)(system, ec)
}

case class EntityKey(typeName: String, id: ShardRegion.ShardId) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,18 +21,17 @@ import akka.cluster.sharding.ShardRegion
import com.typesafe.scalalogging.LazyLogging

import scala.collection.immutable
import scala.concurrent.Future
import scala.concurrent.{ExecutionContext, Future}
import scala.concurrent.duration.FiniteDuration

// this AllocationStrategy is for debug purposes only
class DirectAllocationStrategy(
fallbackStrategy: ShardAllocationStrategy,
readSettings: () => Option[Map[ShardRegion.ShardId, String]])
(implicit system: ActorSystem) extends ShardAllocationStrategy
with LazyLogging {
readSettings: () => Option[Map[ShardRegion.ShardId, String]],
maxSimultaneousRebalance: Int,
deallocationTimeout: FiniteDuration)(implicit system: ActorSystem, ec: ExecutionContext)
extends ExtendedShardAllocationStrategy(system, ec, maxSimultaneousRebalance, deallocationTimeout) with LazyLogging {

import system.dispatcher

val addressHelper = AddressHelperExtension(system)
import addressHelper._

@volatile
Expand Down Expand Up @@ -70,7 +69,7 @@ class DirectAllocationStrategy(
}
}

def rebalance(
protected def doRebalance(
currentShardAllocations: Map[ActorRef, immutable.IndexedSeq[ShardRegion.ShardId]],
rebalanceInProgress: Set[ShardRegion.ShardId]): Future[Set[ShardRegion.ShardId]] = {

Expand Down Expand Up @@ -103,8 +102,14 @@ class DirectAllocationStrategy(
object DirectAllocationStrategy {
def apply(
fallbackStrategy: ShardAllocationStrategy,
readSettings: () => Option[String])(implicit system: ActorSystem): DirectAllocationStrategy =
new DirectAllocationStrategy(fallbackStrategy, readAndParseSettings(readSettings))
readSettings: () => Option[String],
maxSimultaneousRebalance: Int,
deallocationTimeout: FiniteDuration)(implicit system: ActorSystem, ec: ExecutionContext): DirectAllocationStrategy =
new DirectAllocationStrategy(
fallbackStrategy,
readAndParseSettings(readSettings),
maxSimultaneousRebalance,
deallocationTimeout)

private def readAndParseSettings(
readSettings: () => Option[String]): () => Option[Map[ShardRegion.ShardId, String]] =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,16 @@ import akka.cluster.sharding.ShardRegion
import com.typesafe.scalalogging.LazyLogging

import scala.collection.immutable
import scala.concurrent.Future
import scala.concurrent.{ExecutionContext, Future}
import scala.concurrent.duration.FiniteDuration

class DualAllocationStrategy(
baseAllocationStrategy: ShardAllocationStrategy,
additionalAllocationStrategy: ShardAllocationStrategy,
readSettings: () => Option[Set[ShardRegion.ShardId]])
(implicit system: ActorSystem) extends ShardAllocationStrategy with LazyLogging {

import system.dispatcher
readSettings: () => Option[Set[ShardRegion.ShardId]],
maxSimultaneousRebalance: Int,
deallocationTimeout: FiniteDuration)(implicit system: ActorSystem, ec: ExecutionContext)
extends ExtendedShardAllocationStrategy(system, ec, maxSimultaneousRebalance, deallocationTimeout) with LazyLogging {

@volatile
private var additionalShardIds = Set.empty[ShardRegion.ShardId]
Expand All @@ -49,7 +50,7 @@ class DualAllocationStrategy(
baseAllocationStrategy.allocateShard(requester, shardId, currentShardAllocations)
}

def rebalance(
protected def doRebalance(
currentShardAllocations: Map[ActorRef, immutable.IndexedSeq[ShardRegion.ShardId]],
rebalanceInProgress: Set[ShardRegion.ShardId]): Future[Set[ShardRegion.ShardId]] = {

Expand Down Expand Up @@ -84,11 +85,15 @@ object DualAllocationStrategy {
def apply(
baseAllocationStrategy: ShardAllocationStrategy,
additionalAllocationStrategy: ShardAllocationStrategy,
readSettings: () => Option[String])(implicit system: ActorSystem): DualAllocationStrategy =
readSettings: () => Option[String],
maxSimultaneousRebalance: Int,
deallocationTimeout: FiniteDuration)(implicit system: ActorSystem, ec: ExecutionContext): DualAllocationStrategy =
new DualAllocationStrategy(
baseAllocationStrategy,
additionalAllocationStrategy,
readAndParseSettings(readSettings))
readAndParseSettings(readSettings),
maxSimultaneousRebalance,
deallocationTimeout)

private def readAndParseSettings(
readSettings: () => Option[String]): () => Option[Set[ShardRegion.ShardId]] =
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
package com.evolutiongaming.cluster

import java.time.Instant

import akka.actor.{ActorRef, ActorSystem, Address}
import akka.cluster.sharding.ShardCoordinator.ShardAllocationStrategy
import akka.cluster.sharding.ShardRegion

import scala.collection.immutable
import scala.concurrent.{ExecutionContext, Future}
import scala.concurrent.duration._

abstract class ExtendedShardAllocationStrategy(
system: ActorSystem,
implicit val ec: ExecutionContext,
maxSimultaneousRebalance: Int,
deallocationTimeout: FiniteDuration) extends ShardAllocationStrategy {

val addressHelper = AddressHelperExtension(system)
import addressHelper._

class Node(val address: Address, added: Instant = Instant.now()) {
def expired: Boolean = Instant.now() isAfter (added plusMillis deallocationTimeout.toMillis)
override def equals(obj: Any): Boolean = obj match {
case node: Node => address equals node.address
case _ => false
}
override def hashCode(): Int = address.hashCode
}

@volatile
private var nodesToForcedDeallocation: Set[Node] = Set.empty

def deallocateShardsFromNode(regionGlobalAddress: Address): Unit =
nodesToForcedDeallocation += new Node(regionGlobalAddress)

protected def doRebalance(
currentShardAllocations: Map[ActorRef, immutable.IndexedSeq[ShardRegion.ShardId]],
rebalanceInProgress: Set[ShardRegion.ShardId]): Future[Set[ShardRegion.ShardId]]

final def rebalance(
currentShardAllocations: Map[ActorRef, immutable.IndexedSeq[ShardRegion.ShardId]],
rebalanceInProgress: Set[ShardRegion.ShardId]): Future[Set[ShardRegion.ShardId]] = {

def limitRebalance(f: => Set[ShardRegion.ShardId]): Set[ShardRegion.ShardId] =
if (rebalanceInProgress.size >= maxSimultaneousRebalance) Set.empty
else f take maxSimultaneousRebalance

val shardsToRebalance: Future[Set[ShardRegion.ShardId]] =
if (nodesToForcedDeallocation.isEmpty) {
doRebalance(currentShardAllocations, rebalanceInProgress)
} else {
val emptyNodes = for {
(k, v) <- currentShardAllocations if v.isEmpty
} yield new Node(k.path.address.global)

val nodesToRemove = (nodesToForcedDeallocation filter (_.expired)) ++ emptyNodes.toSet

nodesToForcedDeallocation = nodesToForcedDeallocation -- nodesToRemove

val shardsToForcedDeallocation = (for {
(k, v) <- currentShardAllocations if nodesToForcedDeallocation contains new Node(k.path.address.global)
} yield v).flatten.toSet -- rebalanceInProgress

for {
doRebalanceResult <- doRebalance(currentShardAllocations, rebalanceInProgress -- shardsToForcedDeallocation)
} yield shardsToForcedDeallocation ++ doRebalanceResult
}

val result = for {
shardsToRebalance <- shardsToRebalance
} yield limitRebalance(shardsToRebalance)

result
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,14 @@ object ExtractShardId extends LazyLogging {
case x: ShardedMsg => x.id
}

def apply(typeName: String, config: Config): ShardRegion.ExtractShardId = {
def uniform(numberOfShards: Int): ShardRegion.ExtractShardId = {
case x: ShardedMsg =>
val id = x.id
val shardId = math.abs(id.hashCode % numberOfShards).toString
shardId
}

def static(typeName: String, config: Config): ShardRegion.ExtractShardId = {
// "shardId: entityId1, entityId2, entityId3"
val mappingsList = Try(config.get[List[String]](s"$typeName.id-shard-mapping")) getOrElse List.empty[String]
val mappingsPairList = mappingsList flatMap { mapping =>
Expand Down
Original file line number Diff line number Diff line change
@@ -1,25 +1,27 @@
package com.evolutiongaming.cluster

import akka.actor.ActorRef
import akka.cluster.sharding.ShardCoordinator.ShardAllocationStrategy
import akka.actor.{ActorRef, ActorSystem}
import akka.cluster.sharding.ShardRegion

import scala.collection.immutable
import scala.concurrent.Future
import scala.concurrent.{ExecutionContext, Future}
import scala.concurrent.duration.FiniteDuration

// Allocate all shards on requester nodes
// to be used primarily for debugging purposes and,
// for example, as fallbackAllocationStrategy in DirectAllocationStrategy
class RequesterAllocationStrategy extends ShardAllocationStrategy {
class RequesterAllocationStrategy(
maxSimultaneousRebalance: Int,
deallocationTimeout: FiniteDuration)(implicit system: ActorSystem, ec: ExecutionContext)
extends ExtendedShardAllocationStrategy(system, ec, maxSimultaneousRebalance, deallocationTimeout) {

def allocateShard(
requester: ActorRef,
shardId: ShardRegion.ShardId,
currentShardAllocations: Map[ActorRef, immutable.IndexedSeq[ShardRegion.ShardId]]): Future[ActorRef] =
Future.successful(requester)


def rebalance(
protected def doRebalance(
currentShardAllocations: Map[ActorRef, immutable.IndexedSeq[ShardRegion.ShardId]],
rebalanceInProgress: Set[ShardRegion.ShardId]): Future[Set[ShardRegion.ShardId]] =
Future.successful(Set.empty)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,15 +1,19 @@
package com.evolutiongaming.cluster

import akka.actor.{ActorRef, Address}
import akka.cluster.sharding.ShardCoordinator.{LeastShardAllocationStrategy, ShardAllocationStrategy}
import akka.actor.{ActorRef, ActorSystem, Address}
import akka.cluster.sharding.ShardCoordinator.LeastShardAllocationStrategy
import akka.cluster.sharding.ShardRegion
import akka.cluster.sharding.ShardRegion.ShardId

import scala.collection.immutable.IndexedSeq
import scala.concurrent.Future
import scala.concurrent.{ExecutionContext, Future}
import scala.concurrent.duration.FiniteDuration

class SingleNodeAllocationStrategy(
address: => Option[Address],
maxSimultaneousRebalance: Int = 10) extends ShardAllocationStrategy {
maxSimultaneousRebalance: Int,
deallocationTimeout: FiniteDuration)(implicit system: ActorSystem, ec: ExecutionContext)
extends ExtendedShardAllocationStrategy(system, ec, maxSimultaneousRebalance, deallocationTimeout) {

private lazy val leastShardAllocation = new LeastShardAllocationStrategy(
rebalanceThreshold = 10,
Expand All @@ -29,21 +33,15 @@ class SingleNodeAllocationStrategy(
result map { Future.successful } getOrElse leastShardAllocation.allocateShard(requester, shardId, current)
}

def rebalance(current: Map[ActorRef, IndexedSeq[ShardId]], rebalanceInProgress: Set[ShardId]) = {
def limitRebalance(f: => Set[ShardId]): Set[ShardId] = {
if (rebalanceInProgress.size >= maxSimultaneousRebalance) Set.empty
else f take maxSimultaneousRebalance
}

val result = limitRebalance {
val result = for {
address <- address.toIterable
(actor, shards) <- current if actor.path.address != address
shard <- shards
} yield shard
result.toSet -- rebalanceInProgress
}

Future successful result
protected def doRebalance(
current: Map[ActorRef, IndexedSeq[ShardId]],
rebalanceInProgress: Set[ShardId]): Future[Set[ShardRegion.ShardId]]= {
val result = for {
address <- address.toIterable
(actor, shards) <- current if actor.path.address != address
shard <- shards
} yield shard

Future successful result.toSet -- rebalanceInProgress
}
}
Loading

0 comments on commit db914f8

Please sign in to comment.