Skip to content

Commit

Permalink
Ticket 2547 Create cluster metrics-aware Adaptive Load Balancing Routers
Browse files Browse the repository at this point in the history
  • Loading branch information
Helena Edelson committed Oct 18, 2012
1 parent 3f52eef commit a6bf53d
Show file tree
Hide file tree
Showing 10 changed files with 548 additions and 77 deletions.
11 changes: 8 additions & 3 deletions akka-cluster/src/main/resources/reference.conf
Expand Up @@ -107,6 +107,13 @@ akka {
}

# Uses JMX and Hyperic SIGAR, if SIGAR is on the classpath.
# Metrics that are only available with SIGAR for load balancing of nodes are:
# 1. Network latency
# 2. CPU
# A. Combined CPU (sum of User + Sys + Nice + Wait, in percentage)
# B. System load average: on some OS the JMX load average may not be available,
# however if SIGAR is on the classpath it is available on any OS.
# C. The number of cores per processor
metrics {
# Enable or disable metrics collector for load-balancing nodes.
enabled = on
Expand All @@ -118,9 +125,7 @@ akka {
gossip-interval = 3s

# How quickly the exponential weighting of past data is decayed compared to
# new data.
# If set to 0 data streaming over time will be turned off.
# Set higher to increase the bias toward newer values
# new data. Set higher to increase the bias toward newer values.
rate-of-decay = 10
}

Expand Down
177 changes: 154 additions & 23 deletions akka-cluster/src/main/scala/akka/cluster/ClusterMetricsCollector.scala
Expand Up @@ -9,7 +9,7 @@ import scala.concurrent.duration._
import scala.collection.immutable.{ SortedSet, Map }
import scala.concurrent.forkjoin.ThreadLocalRandom
import scala.util.{ Try, Success, Failure }
import scala.math.ScalaNumericConversions
import math.{ ScalaNumber, ScalaNumericConversions }
import scala.runtime.{ RichLong, RichDouble, RichInt }

import akka.actor._
Expand Down Expand Up @@ -43,6 +43,8 @@ private[cluster] class ClusterMetricsCollector(publisher: ActorRef) extends Acto
import cluster.{ selfAddress, scheduler, settings }
import settings._

val DefaultRateOfDecay: Int = 10

/**
* The node ring gossipped that contains only members that are Up.
*/
Expand All @@ -51,7 +53,7 @@ private[cluster] class ClusterMetricsCollector(publisher: ActorRef) extends Acto
/**
* The latest metric values with their statistical data.
*/
var latestGossip: MetricsGossip = MetricsGossip(MetricsRateOfDecay)
var latestGossip: MetricsGossip = MetricsGossip(if (MetricsRateOfDecay <= 0) DefaultRateOfDecay else MetricsRateOfDecay)

/**
* The metrics collector that samples data on the node.
Expand Down Expand Up @@ -252,12 +254,17 @@ private[cluster] case class DataStream(decay: Int, ewma: ScalaNumericConversions
* The datam can be too large to fit into an int or long, thus we use ScalaNumericConversions,
* and defer to BigInt or BigDecimal.
*
* FIXME - look into the math: the new ewma becomes the new value
* value: 416979936, prev ewma: 416581760, new ewma: 416979936
*
* @param xn the new data point
* @return an new [[akka.cluster.DataStream]] with the updated yn and timestamp
*
* @return a [[akka.cluster.DataStream]] with the updated yn and timestamp
*/
def :+(xn: ScalaNumericConversions): DataStream = convert(xn) fold (
nl copy(ewma = BigInt* nl + 1 - α * ewma.longValue()), timestamp = newTimestamp),
nd copy(ewma = BigDecimal* nd + 1 - α * ewma.doubleValue()), timestamp = newTimestamp))
def :+(xn: ScalaNumericConversions): DataStream = if (xn != ewma) convert(xn) fold (
nl copy(ewma = BigInt((α * nl) + (1 - α) * ewma.longValue()), timestamp = newTimestamp),
nd copy(ewma = BigDecimal((α * nd) + (1 - α) * ewma.doubleValue()), timestamp = newTimestamp))
else this

/**
* The duration of observation for this data stream
Expand All @@ -266,18 +273,6 @@ private[cluster] case class DataStream(decay: Int, ewma: ScalaNumericConversions

}

/**
* INTERNAL API
*
* Companion object of DataStream class.
*/
private[cluster] object DataStream {

def apply(decay: Int, data: ScalaNumericConversions): Option[DataStream] = if (decay > 0)
Some(DataStream(decay, data, newTimestamp, newTimestamp)) else None

}

/**
* INTERNAL API
*
Expand All @@ -295,7 +290,7 @@ private[cluster] case class Metric(name: String, value: Option[ScalaNumericConve
* Returns the metric with a new data stream for data trending if eligible,
* otherwise returns the unchanged metric.
*/
def initialize(decay: Int): Metric = if (initializable) copy(average = DataStream(decay, value.get)) else this
def initialize(decay: Int): Metric = if (initializable) copy(average = Some(DataStream(decay, value.get, newTimestamp, newTimestamp))) else this

/**
* If defined ( [[akka.cluster.MetricNumericConverter.defined()]] ), updates the new
Expand Down Expand Up @@ -356,6 +351,49 @@ private[cluster] object Metric extends MetricNumericConverter {
case Some(v) if defined(v) Metric(name, value, None)
case _ Metric(name, None, None)
}
}

/**
* Reusable logic for particular metric categories or to leverage all for routing.
*/
private[cluster] trait MetricsAwareClusterNodeSelector {
import NodeMetrics._
import NodeMetrics.NodeMetricsComparator._

/**
* Returns the address of the available node with the lowest cumulative difference
* between heap memory max and used/committed.
*/
def selectByMemory(nodes: Set[NodeMetrics]): Option[Address] = Try(Some(nodes.map {
n
val (used, committed, max) = MetricValues.unapply(n.heapMemory)
(n.address, max match {
case Some(m) ((committed - used) + (m - used) + (m - committed))
case None committed - used
})
}.min._1)) getOrElse None

// TODO
def selectByNetworkLatency(nodes: Set[NodeMetrics]): Option[Address] = None
/* Try(nodes.map {
n ⇒
val (rxBytes, txBytes) = MetricValues.unapply(n.networkLatency).get
(n.address, (rxBytes + txBytes))
}.min._1) getOrElse None // TODO: min or max
*/

// TODO
def selectByCpu(nodes: Set[NodeMetrics]): Option[Address] = None
/* Try(nodes.map {
n ⇒
val (loadAverage, processors, combinedCpu, cores) = MetricValues.unapply(n.cpu)
var cumulativeDifference = 0
// TODO: calculate
combinedCpu.get
cores.get
(n.address, cumulativeDifference)
}.min._1) getOrElse None // TODO min or max
}*/

}

Expand All @@ -378,6 +416,7 @@ private[cluster] object Metric extends MetricNumericConverter {
* @param metrics the array of sampled [[akka.actor.Metric]]
*/
private[cluster] case class NodeMetrics(address: Address, timestamp: Long, metrics: Set[Metric] = Set.empty[Metric]) extends ClusterMessage {
import NodeMetrics._

/**
* Returns the most recent data.
Expand All @@ -394,6 +433,99 @@ private[cluster] case class NodeMetrics(address: Address, timestamp: Long, metri
*/
def same(that: NodeMetrics): Boolean = address == that.address

/**
* Of all the data streams, this fluctuates the most.
*/
def heapMemory: HeapMemory = HeapMemory(metric("heap-memory-used"), metric("heap-memory-committed"), metric("heap-memory-max"))

def networkLatency: NetworkLatency = NetworkLatency(metric("network-max-rx"), metric("network-max-tx"))

def cpu: Cpu = Cpu(metric("system-load-average"), metric("processors"), metric("cpu-combined"), metric("total-cores"))

def metric(key: String): Metric = metrics.collectFirst { case m if m.name == key m } getOrElse Metric(key, None)

}

/**
* INTERNAL API
*
* Companion object of Metric class - used by metrics consumers such as the load balancing routers.
*
* The following extractors and orderings hide the implementation from cluster metric consumers
* such as load balancers.
*/
private[cluster] object NodeMetrics {

object NodeMetricsComparator extends MetricNumericConverter {

implicit val longMinOrdering: Ordering[Long] = Ordering.fromLessThan[Long] { (a, b) (a < b) }

implicit val longMinAddressOrdering: Ordering[(Address, Long)] = new Ordering[(Address, Long)] {
def compare(a: (Address, Long), b: (Address, Long)): Int = longMinOrdering.compare(a._2, b._2)
}

def maxAddressLong(seq: Seq[(Address, Long)]): (Address, Long) =
seq.reduceLeft((a: (Address, Long), b: (Address, Long)) if (a._2 > b._2) a else b)

implicit val doubleMinOrdering: Ordering[Double] = Ordering.fromLessThan[Double] { (a, b) (a < b) }

implicit val doubleMinAddressOrdering: Ordering[(Address, Double)] = new Ordering[(Address, Double)] {
def compare(a: (Address, Double), b: (Address, Double)): Int = doubleMinOrdering.compare(a._2, b._2)
}

def maxAddressDouble(seq: Seq[(Address, Double)]): (Address, Double) =
seq.reduceLeft((a: (Address, Double), b: (Address, Double)) if (a._2 > b._2) a else b)
}

sealed trait MetricValues

object MetricValues {

def unapply(v: HeapMemory): Tuple3[Long, Long, Option[Long]] =
(v.used.average.get.ewma.longValue(),
v.committed.average.get.ewma.longValue(),
Try(Some(v.max.average.get.ewma.longValue())) getOrElse None)

def unapply(v: NetworkLatency): Option[(Long, Long)] =
Try(Some(v.maxRxIO.average.get.ewma.longValue(), v.maxTxIO.average.get.ewma.longValue())) getOrElse None

def unapply(v: Cpu): Tuple4[Double, Int, Option[Double], Option[Int]] =
(v.systemLoadAverage.value.get.doubleValue(),
v.processors.value.get.intValue(),
Try(Some(v.combinedCpu.average.get.ewma.doubleValue())) getOrElse None,
Try(Some(v.cores.value.get.intValue())) getOrElse None)
}

/**
* @param used the current sum of heap memory used from all heap memory pools (in bytes)
*
* @param committed the current sum of heap memory guaranteed to be available to the JVM
* from all heap memory pools (in bytes). Committed will always be greater than or equal to used.
*
* @param max the maximum amount of memory (in bytes) that can be used for JVM memory management.
* Can be undefined on some OS.
*/
case class HeapMemory(used: Metric, committed: Metric, max: Metric) extends MetricValues

/**
* @param maxRxIO the max network IO rx value, in bytes
*
* @param maxTxIO the max network IO tx value, in bytes
*/
case class NetworkLatency(maxRxIO: Metric, maxTxIO: Metric) extends MetricValues

/**
* @param systemLoadAverage OS-specific average load on the CPUs in the system, for the past 1 minute
*
* @param processors the number of available processors
*
* @param combinedCpu combined CPU sum of User + Sys + Nice + Wait, in percentage. This metric can describe
* the amount of time the CPU spent executing code during n-interval and how much more it could theoretically.
*
* @param cores the number of cores (multi-core: per processor)
*/
private[cluster] case class Cpu(systemLoadAverage: Metric, processors: Metric, combinedCpu: Metric, cores: Metric) extends MetricValues

}

/**
Expand Down Expand Up @@ -462,7 +594,7 @@ private[cluster] class MetricsCollector private (private val sigar: Option[AnyRe
systemLoadAverage, used, committed, max, processors, networkMaxRx, networkMaxTx))

/**
* (SIGAR / JMX) Returns the OS-specific average system load on the CPUs in the system, for the past 1 minute.
* (SIGAR / JMX) Returns the OS-specific average load on the CPUs in the system, for the past 1 minute.
* On some systems the JMX OS system load average may not be available, in which case a -1 is returned.
* Hyperic SIGAR provides more precise values, thus, if the library is on the classpath, it is the default.
*/
Expand All @@ -481,8 +613,7 @@ private[cluster] class MetricsCollector private (private val sigar: Option[AnyRe

/**
* (JMX) Returns the current sum of heap memory guaranteed to be available to the JVM
* from all heap memory pools (in bytes). Committed will always be greater
* than or equal to used.
* from all heap memory pools (in bytes).
*/
def committed: Metric = Metric("heap-memory-committed", Some(BigInt(memoryMBean.getHeapMemoryUsage.getCommitted)))

Expand All @@ -503,11 +634,11 @@ private[cluster] class MetricsCollector private (private val sigar: Option[AnyRe
def cpuCombined: Metric = Metric("cpu-combined", Try(BigDecimal(CombinedCpu.get.invoke(Cpu.get.invoke(sigar.get)).asInstanceOf[Double])).toOption)

/**
* FIXME: Array[Int].head - expose all if cores per processor might differ.
* (SIGAR) Returns the total number of cores.
*/
def totalCores: Metric = Metric("total-cores", Try(BigInt(CpuList.get.invoke(sigar.get).asInstanceOf[Array[AnyRef]].map(cpu
createMethodFrom(Some(cpu), "getTotalCores").get.invoke(cpu).asInstanceOf[Int]).head)).toOption)
//Array[Int].head - if this would differ on some servers, expose all. In testing each int was always equal.

/**
* (SIGAR) Returns the max network IO read/write value, in bytes, for network latency evaluation.
Expand Down

This file was deleted.

@@ -0,0 +1,61 @@
/*
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
*/

package akka.cluster.routing

import akka.remote.testkit.{MultiNodeSpec, MultiNodeConfig}
import akka.testkit.{LongRunningTest, DefaultTimeout, ImplicitSender}
import akka.actor._
import akka.cluster.{ MemberStatus, MultiNodeClusterSpec }
import akka.cluster.routing.ClusterRoundRobinRoutedActorMultiJvmSpec.SomeActor


object ClusterAdaptiveLoadBalancingRouterMultiJvmSpec extends MultiNodeConfig {

val first = role("first")
val second = role("second")
val third = role("third")
val fourth = role("fourth")
val fifth = role("fifth")

// TODO - config
commonConfig(debugConfig(on = false).withFallback(MultiNodeClusterSpec.clusterConfig))

}

class ClusterAdaptiveLoadBalancingRouterMultiJvmNode1 extends ClusterAdaptiveLoadBalancingRouterSpec
class ClusterAdaptiveLoadBalancingRouterMultiJvmNode2 extends ClusterAdaptiveLoadBalancingRouterSpec
class ClusterAdaptiveLoadBalancingRouterMultiJvmNode3 extends ClusterAdaptiveLoadBalancingRouterSpec
class ClusterAdaptiveLoadBalancingRouterMultiJvmNode4 extends ClusterAdaptiveLoadBalancingRouterSpec
class ClusterAdaptiveLoadBalancingRouterMultiJvmNode5 extends ClusterAdaptiveLoadBalancingRouterSpec

abstract class ClusterAdaptiveLoadBalancingRouterSpec extends MultiNodeSpec(ClusterAdaptiveLoadBalancingRouterMultiJvmSpec)
with MultiNodeClusterSpec
with ImplicitSender with DefaultTimeout {
import ClusterAdaptiveLoadBalancingRouterMultiJvmSpec._

// TODO configure properly and leverage the other pending load balancing routers
lazy val router1 = system.actorOf(Props[SomeActor].withRouter(ClusterRouterConfig(MemoryLoadBalancingRouter(),
ClusterRouterSettings(totalInstances = 3, maxInstancesPerNode = 1))), "router1")
lazy val router2 = system.actorOf(Props[SomeActor].withRouter(ClusterRouterConfig(MemoryLoadBalancingRouter(),
ClusterRouterSettings(totalInstances = 3, maxInstancesPerNode = 1))), "router2")
lazy val router3 = system.actorOf(Props[SomeActor].withRouter(ClusterRouterConfig(MemoryLoadBalancingRouter(),
ClusterRouterSettings(totalInstances = 3, maxInstancesPerNode = 1))), "router3")
lazy val router4 = system.actorOf(Props[SomeActor].withRouter(ClusterRouterConfig(MemoryLoadBalancingRouter(),
ClusterRouterSettings(totalInstances = 3, maxInstancesPerNode = 1))), "router4")
lazy val router5 = system.actorOf(Props[SomeActor].withRouter(ClusterRouterConfig(MemoryLoadBalancingRouter(),
ClusterRouterSettings(totalInstances = 3, maxInstancesPerNode = 1))), "router5")

"A cluster with a ClusterAdaptiveLoadBalancingRouter" must {
"start cluster with 5 nodes" taggedAs LongRunningTest in {
awaitClusterUp(roles: _*)
enterBarrier("cluster-started")
awaitCond(clusterView.members.filter(_.status == MemberStatus.Up).size == roles.size)
awaitCond(clusterView.clusterMetrics.size == roles.size)
enterBarrier("cluster-metrics-consumer-ready")
}
// TODO the rest of the necessary testing. All the work needed for consumption and extraction
// of the data needed is in ClusterMetricsCollector._
}
}

0 comments on commit a6bf53d

Please sign in to comment.