Skip to content

Commit

Permalink
Create MappedAllocationStrategy - post-code-review changes
Browse files Browse the repository at this point in the history
  • Loading branch information
sergiy0 committed Aug 15, 2017
1 parent fdb1215 commit a925dca
Show file tree
Hide file tree
Showing 5 changed files with 24 additions and 19 deletions.
2 changes: 1 addition & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ scalacOptions ++= Seq(

scalacOptions in (Compile,doc) ++= Seq("-no-link-warnings")

val AkkaVersion = "2.5.3"
val AkkaVersion = "2.5.4"

resolvers += Resolver.bintrayRepo("evolutiongaming", "maven")

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,9 @@ class AdaptiveAllocationStrategyDDProxy extends Actor with ActorLogging {
import AdaptiveAllocationStrategyDDProxy._

implicit lazy val node = Cluster(context.system)
private val selfAddress = node.selfAddress.toString
lazy val selfAddress = node.selfAddress.toString
lazy val replicator: ActorRef = DistributedData(context.system).replicator
private val emptyMap = ORMultiMap.empty[String, String]
val emptyMap = ORMultiMap.empty[String, String]

replicator ! Subscribe(EntityToNodeCountersKey, self)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,22 +38,26 @@ abstract class ExtendedShardAllocationStrategy(
final def allocateShard(
requester: ActorRef,
shardId: ShardRegion.ShardId,
currentShardAllocations: Map[ActorRef, immutable.IndexedSeq[ShardRegion.ShardId]]): Future[ActorRef] =
for (nodeByStrategy <- doAllocate(requester, shardId, currentShardAllocations)) yield {
currentShardAllocations: Map[ActorRef, immutable.IndexedSeq[ShardRegion.ShardId]]): Future[ActorRef] = {

val nodeByStrategyFuture = doAllocate(requester, shardId, currentShardAllocations)
val ignoredNodes = nodesToDeallocate()

if (ignoredNodes.isEmpty)
nodeByStrategy
nodeByStrategyFuture
else {
val activeNodes = notIgnoredNodes(currentShardAllocations, ignoredNodes)
val activeAddresses = activeNodes map (_.path.address)

if (activeAddresses contains (addressHelper toGlobal nodeByStrategy.path.address))
nodeByStrategy
else if (activeAddresses contains (addressHelper toGlobal requester.path.address))
requester
else
activeNodes.headOption getOrElse currentShardAllocations.keys.head
for (nodeByStrategy <- nodeByStrategyFuture) yield {

val activeNodes = notIgnoredNodes(currentShardAllocations, ignoredNodes)
val activeAddresses = activeNodes map (_.path.address)

if (activeAddresses contains (addressHelper toGlobal nodeByStrategy.path.address))
nodeByStrategy
else if (activeAddresses contains (addressHelper toGlobal requester.path.address))
requester
else
activeNodes.headOption getOrElse currentShardAllocations.keys.head
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,18 +38,19 @@ class MappedAllocationStrategy(
}
}


def rebalance(
currentShardAllocations: Map[ActorRef, immutable.IndexedSeq[ShardRegion.ShardId]],
rebalanceInProgress: Set[ShardRegion.ShardId]): Future[Set[ShardRegion.ShardId]] = {

logger debug
s"rebalance $typeName: currentShardAllocations = $currentShardAllocations, rebalanceInProgress = $rebalanceInProgress"

val result = ((for {
val shardsToRebalance = for {
(ref, shards) <- currentShardAllocations
shardId <- shards if !(shardToRegionMapping get EntityKey(typeName, shardId) contains ref)
} yield shardId).toSet -- rebalanceInProgress) take maxSimultaneousRebalance
} yield shardId

val result = (shardsToRebalance.toSet -- rebalanceInProgress) take maxSimultaneousRebalance

if (result.nonEmpty) logger info s"Rebalance $typeName\n\t" +
s"current:${ currentShardAllocations.mkString("\n\t\t", "\n\t\t", "") }\n\t" +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ class MappedAllocationStrategyDDProxy extends Actor with ActorLogging {

implicit lazy val node = Cluster(context.system)
lazy val replicator: ActorRef = DistributedData(context.system).replicator
private val emptyMap = LWWMap.empty[String, ActorRef]
val emptyMap = LWWMap.empty[String, ActorRef]

replicator ! Subscribe(MappingKey, self)

Expand Down

0 comments on commit a925dca

Please sign in to comment.