Skip to content

Commit

Permalink
2243 Second pass at topology API with topology-aware routers
Browse files Browse the repository at this point in the history
* Includes cleanup of API for data center failover (as well as instance but may remove this)
  • Loading branch information
Helena Edelson committed Sep 7, 2013
1 parent beba5d9 commit 45956d1
Show file tree
Hide file tree
Showing 8 changed files with 409 additions and 10 deletions.
17 changes: 17 additions & 0 deletions akka-cluster/src/main/resources/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,23 @@ akka {
}
}

topology {

data-centers {

# DataCenters by Region
# eu-west-1 {
# eu-west-1a {
# instances = ["zoo.rack0.node1", "zoo.rack0.node2", "zoo.rack0.node3"]
# }
# eu-west-1b {
# instances = ["zoo.rack1.node1", "zoo.rack1.node2", "zoo.rack1.node3"]
# }
# }

}
}

metrics {
# Enable or disable metrics collector for load-balancing nodes.
enabled = on
Expand Down
7 changes: 6 additions & 1 deletion akka-cluster/src/main/scala/akka/cluster/ClusterEvent.scala
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@ import akka.actor.{ Actor, ActorLogging, ActorRef, Address }
import akka.cluster.ClusterEvent._
import akka.cluster.MemberStatus._
import akka.event.EventStream
import akka.actor.AddressTerminated
import akka.dispatch.{ UnboundedMessageQueueSemantics, RequiresMessageQueue }
import akka.cluster.ClusterNetworkTopology.Topology

/**
* Domain events published to the event bus.
Expand Down Expand Up @@ -149,6 +149,11 @@ object ClusterEvent {
*/
case class UnreachableMember(member: Member) extends ClusterDomainEvent

/**
* Current snapshot of cluster node metrics. Published to subscribers.
*/
case class ClusterTopologyChanged(topology: Topology) extends ClusterDomainEvent

/**
* Current snapshot of cluster node metrics. Published to subscribers.
*/
Expand Down
116 changes: 116 additions & 0 deletions akka-cluster/src/main/scala/akka/cluster/ClusterNetworkTopology.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
/*
* Copyright (C) 2009-2013 Typesafe Inc. <http://www.typesafe.com>
*/

package akka.cluster

import java.lang.System.{ currentTimeMillis newTimestamp }

import language.postfixOps
import scala.util.Try
import scala.collection.immutable
import scala.collection.JavaConverters._
import akka.actor._
import com.typesafe.config._

/**
* INTERNAL API.
*/
private[cluster] class ClusterNetworkTopology(publisher: ActorRef) extends Actor with ActorLogging {

import akka.cluster.ClusterNetworkTopology._
import ClusterEvent._

val cluster = Cluster(context.system)

import cluster.InfoLogger._

private val config = cluster.settings.TopologyAvailabilityZones

val topology = Topology(config) // var when we can detect changes, i.e. responding to network changes: add gossip

override def preStart(): Unit = {
cluster.subscribe(self, classOf[MemberEvent])
logInfo("NetworkTopology has started successfully with {} regions and {} valid availability dataCenters", topology.regions.size, topology.dataCenters.size)
}

override def postStop(): Unit = cluster unsubscribe self

def receive = {
case state: CurrentClusterState receiveState(state)
case _: MemberEvent
}

def receiveState(state: CurrentClusterState): Unit = {
// TODO
}
}

/**
* Description intentionally leaves out the notion of 'Region' in the network hierarchy.
*/
private[cluster] object ClusterNetworkTopology {

@SerialVersionUID(1L)
trait Graph
case class Nearest(edge: Int) extends Graph
case class Instance(host: String) extends Graph

/**
* Defines a data center within a Region.
*
* @param name the data center name
*
* @param instances the instances within the data center - TODO this would vary over time
*
* @param edges the nearest neighbors
*
* @param timestamp TODO UTC
*
*/
case class DataCenter(id: Int, name: String, edges: immutable.Set[Nearest], instances: immutable.Set[Instance], timestamp: Long) extends Graph

object DataCenter {
def apply(id: Int, name: String, edges: immutable.Set[Nearest], instances: immutable.Set[Instance]): DataCenter =
DataCenter(id, name, edges, instances, newTimestamp)
}

/**
* Light weight partition in the topology describing clusters of data centers.
*/
case class Region(name: String, dataCenters: immutable.Set[DataCenter] = Set.empty)

case class Topology(regions: immutable.Set[Region]) extends Graph {

def dataCenters: Set[DataCenter] = regions flatMap (_.dataCenters)
}

object Topology {

def apply(config: Config): Topology = {
val regions = config.root.asScala.flatMap {
case (key, value: ConfigObject) parseConfig(key, value.toConfig)
case _ None
}.toSet

Topology(regions)
}

/**
* Partitions proximal dataCenters per region.
*/
def parseConfig(region: String, config: Config): Option[Region] = {
val dataCenters = config.root.asScala.flatMap {
case (dataCenterName, deployed: ConfigObject) Try {
val dc = deployed.toConfig
val id = dc.getInt("zone-id")
val edges = dc.getIntList("proximal-to").asScala.map(Nearest(_)).toSet
val instances = dc.getStringList("instances").asScala.collect { case host if host.nonEmpty Instance(host) }.toSet
DataCenter(id, dataCenterName, edges, instances)
}.toOption filter (_.instances.nonEmpty)
case _ None
}
if (dataCenters.nonEmpty) Some(Region(region, dataCenters.toSet)) else None
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -63,9 +63,10 @@ private[akka] class ClusterReadView(cluster: Cluster) extends Closeable {
state = state.copy(leader = leader)
case RoleLeaderChanged(role, leader)
state = state.copy(roleLeaderMap = state.roleLeaderMap + (role -> leader))
case s: CurrentClusterState state = s
case stats: CurrentInternalStats _latestStats = stats
case ClusterMetricsChanged(nodes) _clusterMetrics = nodes
case s: CurrentClusterState state = s
case stats: CurrentInternalStats _latestStats = stats
case ClusterMetricsChanged(nodes) _clusterMetrics = nodes
case ClusterTopologyChanged(topology) // TODO
}
}
}).withDispatcher(cluster.settings.UseDispatcher).withDeploy(Deploy.local), name = "clusterEventBusListener")
Expand Down
12 changes: 6 additions & 6 deletions akka-cluster/src/main/scala/akka/cluster/ClusterSettings.scala
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,17 @@
*/
package akka.cluster

import scala.collection.immutable
import com.typesafe.config.Config
import com.typesafe.config.ConfigObject
import scala.concurrent.duration.Duration
import java.util.concurrent.TimeUnit.MILLISECONDS

import scala.collection.immutable
import scala.concurrent.duration.{ Duration, FiniteDuration }
import akka.actor.Address
import akka.actor.AddressFromURIString
import akka.dispatch.Dispatchers
import akka.util.Helpers.Requiring
import scala.concurrent.duration.FiniteDuration
import akka.japi.Util.immutableSeq
import com.typesafe.config.Config
import com.typesafe.config.ConfigObject

final class ClusterSettings(val config: Config, val systemName: String) {

Expand Down Expand Up @@ -90,6 +90,6 @@ final class ClusterSettings(val config: Config, val systemName: String) {
val MetricsMovingAverageHalfLife: FiniteDuration = {
Duration(cc.getMilliseconds("metrics.moving-average-half-life"), MILLISECONDS)
} requiring (_ > Duration.Zero, "metrics.moving-average-half-life must be > 0")

val TopologyAvailabilityZones = cc.getConfig("topology.data-centers")
}

Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
/*
* Copyright (C) 2009-2013 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.cluster.routing

import scala.collection.immutable
import scala.concurrent.forkjoin.ThreadLocalRandom
import akka.actor._
import akka.cluster.Cluster
import akka.event.Logging
import akka.routing._
import akka.routing.{ Broadcast, Destination }
import akka.cluster.ClusterEvent.{ ClusterTopologyChanged, CurrentClusterState }
import akka.dispatch.Dispatchers
import akka.cluster.ClusterNetworkTopology._

object TopologyAwareRouter {
private val escalateStrategy: SupervisorStrategy = OneForOneStrategy() {
case _ SupervisorStrategy.Escalate
}
}

@SerialVersionUID(1L)
case class TopologyAwareRouter(
selector: ProximalSelector = NearestNeighborSelector,
nrOfInstances: Int = 0, routees: immutable.Iterable[String] = Nil,
override val resizer: Option[Resizer] = None,
val routerDispatcher: String = Dispatchers.DefaultDispatcherId,
val supervisorStrategy: SupervisorStrategy = TopologyAwareRouter.escalateStrategy)
extends RouterConfig with TopologyAwareRouterLike with OverrideUnsetConfig[TopologyAwareRouter] {

override def withFallback(other: RouterConfig): RouterConfig = other match {
case _: FromConfig | _: NoRouter | _: TopologyAwareRouter this.overrideUnsetConfig(other)
case _ throw new IllegalArgumentException(s"Expected TopologyAwareRouter but found ${other}")
}

/**
* Java API for setting the supervisor strategy to be used for the “head”
* Router actor.
*/
def withSupervisorStrategy(strategy: SupervisorStrategy): TopologyAwareRouter =
copy(supervisorStrategy = strategy)
/**
* Java API for setting the resizer to be used.
*/
def withResizer(resizer: Resizer): TopologyAwareRouter = copy(resizer = Some(resizer))

}

trait TopologyAwareRouterLike { this: RouterConfig

/**
* Proximity is calculated by the selector via the `topologyListener`.
*/
def selector: ProximalSelector

def nrOfInstances: Int

def routees: immutable.Iterable[String]

def routerDispatcher: String

override def createRoute(routeeProvider: RouteeProvider): Route = {
if (resizer.isEmpty) {
if (routees.isEmpty) routeeProvider.createRoutees(nrOfInstances)
else routeeProvider.registerRouteesFor(routees)
}

val log = Logging(routeeProvider.context.system, routeeProvider.context.self)

/**
* The current proximal routees, if any.
*/
@volatile var proximalRoutees: Option[ProximalRoutees] = None

/**
* While proximity is only updated by the topologyListener, access occurs from the threads of the senders.
*/
val topologyListener = routeeProvider.context.actorOf(Props(new Actor {

val cluster = Cluster(context.system)

override def preStart(): Unit = cluster.subscribe(self, classOf[ClusterTopologyChanged])

override def postStop(): Unit = cluster.unsubscribe(self)

def receive = {
case ClusterTopologyChanged(topology) receiveTopology(topology)
case _: CurrentClusterState // ignore
}

def receiveTopology(topology: Topology): Unit =
proximalRoutees = Some(new ProximalRoutees(cluster.selfAddress, routeeProvider.routees, selector.proximity(topology)))

}).withDispatcher(routerDispatcher).withDeploy(Deploy.local), "topologyListener")

def getNearest: ActorRef = proximalRoutees match {
case Some(nearest) if nearest.isEmpty
if (nearest.isEmpty) routeeProvider.context.system.deadLetters
else nearest(ThreadLocalRandom.current.nextInt(nearest.total) + 1)
case _
val currentRoutees = routeeProvider.routees
if (currentRoutees.isEmpty) routeeProvider.context.system.deadLetters
else currentRoutees(ThreadLocalRandom.current.nextInt(currentRoutees.size))
}

{
case (sender, message)
message match {
case Broadcast(msg) toAll(sender, routeeProvider.routees)
case msg List(Destination(sender, getNearest))
}
}
}
}

trait ProximalSelector extends Serializable {

/**
* The proximity per address by zone, based on the network topology.
*/
def proximity(topology: Topology): Map[Address, Int]
}

@SerialVersionUID(1L)
case object NearestNeighborSelector extends ProximalSelector {

def proximity(topology: Topology): Map[Address, Int] = {
// TODO update nearest
Map.empty
}
}

/**
* INTERNAL API
*
* Select a routee based on its proximity (nearest neighbors) in the network topology. Lower proximity, lower latency.
*/
private[cluster] class ProximalRoutees(selfAddress: Address, refs: immutable.IndexedSeq[ActorRef], proximities: Map[Address, Int]) {

def isEmpty: Boolean = proximities.isEmpty

def total: Int = 1 // TODO

/**
* Pick the appropriate routee.
*/
def apply(value: Int): ActorRef = refs.head // TODO
}
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ class ClusterConfigSpec extends AkkaSpec {
MetricsInterval must be(3 seconds)
MetricsGossipInterval must be(3 seconds)
MetricsMovingAverageHalfLife must be(12 seconds)
TopologyAvailabilityZones must not be (TopologyAvailabilityZones.isEmpty)
}
}
}

2 comments on commit 45956d1

@helena
Copy link
Owner

@helena helena commented on 45956d1 Sep 7, 2013

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See 53d467e as well: minor change

@helena
Copy link
Owner

@helena helena commented on 45956d1 Sep 7, 2013

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  • I have not updated the router's getNearest function with the topology changes
  • Removed gossip but in future, getting hosts from the akka cluster and mapping that with the configured instances per DC to allow for instance array changes on Member.Up, Member.Downed etc would be nice
  • Have not considered this in the context of the akka cluster - would be great to have a skype chat for this - in terms of DC > akka cluster > instances

Please sign in to comment.