Skip to content

Commit

Permalink
Create MappedAllocationStrategy, improve switchover support in alloca…
Browse files Browse the repository at this point in the history
…tion strategies, add support for ShardRegion.StartEntity
  • Loading branch information
sergiy0 committed Aug 10, 2017
1 parent 360a8dc commit 6991f7f
Show file tree
Hide file tree
Showing 8 changed files with 292 additions and 38 deletions.
2 changes: 1 addition & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ scalacOptions ++= Seq(

scalacOptions in (Compile,doc) ++= Seq("-no-link-warnings")

val AkkaVersion = "2.5.1"
val AkkaVersion = "2.5.3"

resolvers += Resolver.bintrayRepo("evolutiongaming", "maven")

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
package com.evolutiongaming.cluster

import akka.actor.{ActorRef, Extension}

class ActorRefExtension(val ref: ActorRef) extends Extension
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,10 @@
*/
package com.evolutiongaming.cluster

import akka.actor.{Actor, ActorLogging, ActorRef, ExtendedActorSystem, Extension, ExtensionId, Props}
import akka.actor.{Actor, ActorLogging, ActorRef, ExtendedActorSystem, ExtensionId, Props}
import akka.cluster.Cluster
import akka.cluster.ddata._
import akka.cluster.ddata.Replicator._
import akka.cluster.ddata._

import scala.compat.Platform

Expand All @@ -30,12 +30,12 @@ class AdaptiveAllocationStrategyDistributedDataProxy extends Actor with ActorLog
implicit lazy val node = Cluster(context.system)
private val selfAddress = node.selfAddress.toString
lazy val replicator: ActorRef = DistributedData(context.system).replicator
private val emptyMap = ORMultiMap.empty[String, String]

replicator ! Subscribe(EntityToNodeCountersKey, self)

def sendBindingUpdate(entityKey: String, counterKey: String): Unit = {
val empty = ORMultiMap.empty[String, String]
replicator ! Update(EntityToNodeCountersKey, empty, WriteLocal)(_ addBinding(entityKey, counterKey))
replicator ! Update(EntityToNodeCountersKey, emptyMap, WriteLocal)(_ addBinding(entityKey, counterKey))
}

def receive: Receive = {
Expand All @@ -54,11 +54,11 @@ class AdaptiveAllocationStrategyDistributedDataProxy extends Actor with ActorLog
case Some(counterKeys) if counterKeys contains counterKey =>

case Some(counterKeys) =>
entityToNodeCounters = entityToNodeCounters + (entityKey -> (counterKeys + counterKey))
entityToNodeCounters += (entityKey -> (counterKeys + counterKey))
sendBindingUpdate(entityKey.toString, counterKey.toString)

case None =>
entityToNodeCounters = entityToNodeCounters + (entityKey -> Set(counterKey))
entityToNodeCounters += (entityKey -> Set(counterKey))
sendBindingUpdate(entityKey.toString, counterKey.toString)
}

Expand Down Expand Up @@ -119,13 +119,11 @@ class AdaptiveAllocationStrategyDistributedDataProxy extends Actor with ActorLog
}
}

class ActorRefExtension(val ref: ActorRef) extends Extension

object AdaptiveAllocationStrategyDistributedDataProxy extends ExtensionId[ActorRefExtension] {
override def createExtension(system: ExtendedActorSystem): ActorRefExtension =
new ActorRefExtension(system actorOf Props[AdaptiveAllocationStrategyDistributedDataProxy])

// DData key of the entityToNodeCounterIds map
// DData key of entityToNodeCounters map
private[cluster] val EntityToNodeCountersKey: ORMultiMapKey[String, String] =
ORMultiMapKey[String, String]("EntityToNodeCounters")
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@ abstract class ExtendedShardAllocationStrategy(
protected def maxSimultaneousRebalance: Int

protected def notIgnoredNodes(
currentShardAllocations: Map[ActorRef, immutable.IndexedSeq[ShardRegion.ShardId]]): Set[ActorRef] = {
val ignoredNodes = nodesToDeallocate()
currentShardAllocations: Map[ActorRef, immutable.IndexedSeq[ShardRegion.ShardId]],
ignoredNodes: Set[Address] = nodesToDeallocate()): Set[ActorRef] = {
currentShardAllocations.keySet filterNot { ref =>
ignoredNodes contains (addressHelper toGlobal ref.path.address)
}
Expand All @@ -39,7 +39,23 @@ abstract class ExtendedShardAllocationStrategy(
requester: ActorRef,
shardId: ShardRegion.ShardId,
currentShardAllocations: Map[ActorRef, immutable.IndexedSeq[ShardRegion.ShardId]]): Future[ActorRef] =
doAllocate(requester, shardId, currentShardAllocations)
for (nodeByStrategy <- doAllocate(requester, shardId, currentShardAllocations)) yield {
val ignoredNodes = nodesToDeallocate()

if (ignoredNodes.isEmpty)
nodeByStrategy
else {
val activeNodes = notIgnoredNodes(currentShardAllocations, ignoredNodes)
val activeAddresses = activeNodes map (_.path.address)

if (activeAddresses contains (addressHelper toGlobal nodeByStrategy.path.address))
nodeByStrategy
else if (activeAddresses contains (addressHelper toGlobal requester.path.address))
requester
else
activeNodes.headOption getOrElse currentShardAllocations.keys.head
}
}

final def rebalance(
currentShardAllocations: Map[ActorRef, immutable.IndexedSeq[ShardRegion.ShardId]],
Expand Down
38 changes: 25 additions & 13 deletions src/main/scala/com/evolutiongaming/cluster/ExtractShardId.scala
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,21 @@ import scala.util.Try
object ExtractShardId extends LazyLogging {

val identity: ShardRegion.ExtractShardId = {
case x: ShardedMsg => x.id
case x: ShardedMsg => x.id
case ShardRegion.StartEntity(id) => id
}

def uniform(numberOfShards: Int): ShardRegion.ExtractShardId = {
case x: ShardedMsg =>
val id = x.id
val shardId = math.abs(id.hashCode % numberOfShards).toString
shardId
def shardId(entityId: ShardRegion.EntityId): ShardRegion.ShardId =
math.abs(entityId.hashCode % numberOfShards).toString

{
case x: ShardedMsg =>
val entityId = x.id
shardId(entityId)
case ShardRegion.StartEntity(entityId) =>
shardId(entityId)
}
}

def static(
Expand All @@ -53,14 +60,19 @@ object ExtractShardId extends LazyLogging {
}
val mappings = mappingsPairList.toMap
logger debug s"$typeName mappings: $mappings"
val result: ShardRegion.ExtractShardId = {
case x: ShardedMsg =>
val entityId = x.id
mappings get entityId match {
case Some(shardId) => shardId
case None => fallback(x)
}

def shardId(entityId: ShardRegion.EntityId, msg: ShardRegion.Msg): ShardRegion.ShardId =
mappings get entityId match {
case Some(shardId) => shardId
case None => fallback(msg)
}

{
case msg: ShardedMsg =>
val entityId = msg.id
shardId(entityId, msg)
case msg@ShardRegion.StartEntity(entityId) =>
shardId(entityId, msg)
}
result
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
package com.evolutiongaming.cluster

import akka.actor.{ActorRef, ActorSystem}
import akka.cluster.sharding.ShardCoordinator.ShardAllocationStrategy
import akka.cluster.sharding.ShardRegion
import com.typesafe.scalalogging.LazyLogging

import scala.collection.immutable
import scala.concurrent.Future

class MappedAllocationStrategy(
typeName: String,
fallbackStrategy: ShardAllocationStrategy,
proxy: ActorRef,
val maxSimultaneousRebalance: Int)
extends ShardAllocationStrategy with LazyLogging {

import MappedAllocationStrategy._

def mapShardToRegion(shardId: ShardRegion.ShardId, regionRef: ActorRef) =
proxy ! UpdateMapping(typeName, shardId, regionRef)

def allocateShard(
requester: ActorRef,
shardId: ShardRegion.ShardId,
currentShardAllocations: Map[ActorRef, immutable.IndexedSeq[ShardRegion.ShardId]]): Future[ActorRef] = {

shardToRegionMapping get EntityKey(typeName, shardId) match {
case Some(toNode) =>
logger debug s"AllocateShard $typeName\n\t" +
s"shardId:\t$shardId\n\t" +
s"on node:\t$toNode\n\t" +
s"requester:\t$requester\n\t"
Future successful toNode
case None =>
logger debug s"AllocateShard fallback $typeName, shardId:\t$shardId"
fallbackStrategy.allocateShard(requester, shardId, currentShardAllocations)
}
}


def rebalance(
currentShardAllocations: Map[ActorRef, immutable.IndexedSeq[ShardRegion.ShardId]],
rebalanceInProgress: Set[ShardRegion.ShardId]): Future[Set[ShardRegion.ShardId]] = {

logger debug
s"rebalance $typeName: currentShardAllocations = $currentShardAllocations, rebalanceInProgress = $rebalanceInProgress"

val result = (for {
(ref, shards) <- currentShardAllocations
shardId <- shards if !(shardToRegionMapping get EntityKey(typeName, shardId) contains ref)
} yield shardId).toSet

if (result.nonEmpty) logger info s"Rebalance $typeName\n\t" +
s"current:${ currentShardAllocations.mkString("\n\t\t", "\n\t\t", "") }\n\t" +
s"rebalanceInProgress:\t$rebalanceInProgress\n\t" +
s"result:\t$result"

Future successful result
}
}

object MappedAllocationStrategy {

def apply(
typeName: String,
fallbackStrategy: ShardAllocationStrategy,
maxSimultaneousRebalance: Int)
(implicit system: ActorSystem): MappedAllocationStrategy = {
// proxy doesn't depend on typeName, it should just start once
val proxy = MappedAllocationStrategyDistributedDataProxy(system).ref
new MappedAllocationStrategy(
typeName = typeName,
fallbackStrategy = fallbackStrategy,
proxy = proxy,
maxSimultaneousRebalance = maxSimultaneousRebalance)
}

case class EntityKey(typeName: String, id: ShardRegion.ShardId) {
override def toString: String = s"$typeName#$id"
}

object EntityKey {
def unapply(arg: List[String]): Option[EntityKey] = arg match {
case typeName :: id :: Nil => Some(EntityKey(typeName, id))
case _ => None
}

def unapply(arg: String): Option[EntityKey] = unapply((arg split "#").toList)
}

case class UpdateMapping(typeName: String, id: ShardRegion.ShardId, regionRef: ActorRef)
case class Clear(typeName: String, id: ShardRegion.ShardId)

// TODO: check for thread-safety
@volatile
private[cluster] var shardToRegionMapping: Map[EntityKey, ActorRef] = Map.empty
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
/*
* Copyright 2016-2017 Evolution Gaming Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.evolutiongaming.cluster

import akka.actor.{Actor, ActorLogging, ActorRef, ExtendedActorSystem, ExtensionId, Props}
import akka.cluster.Cluster
import akka.cluster.ddata.Replicator._
import akka.cluster.ddata._

class MappedAllocationStrategyDistributedDataProxy extends Actor with ActorLogging {

import MappedAllocationStrategy._
import MappedAllocationStrategyDistributedDataProxy._

implicit lazy val node = Cluster(context.system)
lazy val replicator: ActorRef = DistributedData(context.system).replicator
private val emptyMap = LWWMap.empty[String, ActorRef]

replicator ! Subscribe(MappingKey, self)


def receive: Receive = {
case UpdateMapping(typeName, id, regionRef) =>
val entityKey = EntityKey(typeName, id)
shardToRegionMapping += entityKey -> regionRef
replicator ! Update(MappingKey, emptyMap, WriteLocal)(_ + (entityKey.toString -> regionRef))

case Clear(typeName, id) =>
val entityKey = EntityKey(typeName, id)
shardToRegionMapping -= entityKey
replicator ! Update(MappingKey, emptyMap, WriteLocal)(_ - entityKey.toString)

case UpdateSuccess(_, _) =>

case UpdateTimeout(key, _) =>
// probably should never happen for local updates
log warning s"Update timeout for key $key"

case c@Changed(MappingKey) =>
val newData = (c get MappingKey).entries flatMap {
case (key, ref) =>
EntityKey unapply key map { entityKey =>
entityKey -> ref
}
}
shardToRegionMapping = newData
}
}

object MappedAllocationStrategyDistributedDataProxy extends ExtensionId[ActorRefExtension] {
override def createExtension(system: ExtendedActorSystem): ActorRefExtension =
new ActorRefExtension(system actorOf Props[MappedAllocationStrategyDistributedDataProxy])

// DData key of ShardToRegionMapping map
private[cluster] val MappingKey: LWWMapKey[String, ActorRef] =
LWWMapKey[String, ActorRef]("ShardToRegionMapping")
}
Loading

0 comments on commit 6991f7f

Please sign in to comment.