Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
batch updates to meters when monitoring flows (#1485)
Updates the monitor flow stage to batch the updates to
the meters. This avoids some of the overhead for calling
gettimeofday frequently for high volume streams.
  • Loading branch information
brharrington committed Nov 10, 2022
1 parent 3e935c1 commit 2374aff
Show file tree
Hide file tree
Showing 5 changed files with 191 additions and 18 deletions.
69 changes: 53 additions & 16 deletions atlas-akka/src/main/scala/com/netflix/atlas/akka/StreamOps.scala
Expand Up @@ -37,7 +37,6 @@ import akka.stream.stage.InHandler
import akka.stream.stage.OutHandler
import com.netflix.spectator.api.Clock
import com.netflix.spectator.api.Registry
import com.netflix.spectator.api.Timer
import com.typesafe.scalalogging.StrictLogging

/**
Expand Down Expand Up @@ -148,6 +147,9 @@ object StreamOps extends StrictLogging {
* - `akka.stream.upstreamDelay`: a timer measuring the delay for the upstream to push
* a new element after one has been requested.
*
* Updates to meters will be batched so the updates may be delayed if used for a low
* throughput stream.
*
* @param registry
* Spectator registry to manage metrics.
* @param id
Expand All @@ -173,35 +175,70 @@ object StreamOps extends StrictLogging {

new GraphStageLogic(shape) with InHandler with OutHandler {

private var upstreamStart: Long = -1L
private var downstreamStart: Long = -1L
import MonitorFlow._

private var lastUpdate = registry.clock().monotonicTime()
private var upstreamIdx = 0
private val upstreamTimes = new Array[Long](MeterBatchSize)
private var downstreamIdx = 0
private val downstreamTimes = new Array[Long](MeterBatchSize)

private var upstreamStart = -1L
private var downstreamStart = -1L

override def onPush(): Unit = {
upstreamStart = record(upstreamTimer, upstreamStart)
val now = registry.clock().monotonicTime()
if (upstreamStart != -1L) {
upstreamTimes(upstreamIdx) = now - upstreamStart
upstreamIdx += 1
upstreamStart = -1L
}
push(out, grab(in))
numEvents.increment()
downstreamStart = registry.clock().monotonicTime()
downstreamStart = now
if (upstreamIdx == MeterBatchSize || now - lastUpdate > MeterUpdateInterval) {
updateMeters(now)
}
}

override def onPull(): Unit = {
downstreamStart = record(downstreamTimer, downstreamStart)
val now = registry.clock().monotonicTime()
if (downstreamStart != -1L) {
downstreamTimes(downstreamIdx) = now - downstreamStart
downstreamIdx += 1
downstreamStart = -1L
}
pull(in)
upstreamStart = registry.clock().monotonicTime()
upstreamStart = now
if (downstreamIdx == MeterBatchSize) {
updateMeters(now)
}
}

private def record(timer: Timer, start: Long): Long = {
if (start > 0L) {
val delay = registry.clock().monotonicTime() - start
timer.record(delay, TimeUnit.NANOSECONDS)
}
-1L
override def onUpstreamFinish(): Unit = {
updateMeters(registry.clock().monotonicTime())
super.onUpstreamFinish()
}

private def updateMeters(now: Long): Unit = {
numEvents.increment(upstreamIdx)
upstreamTimer.record(upstreamTimes, upstreamIdx, TimeUnit.NANOSECONDS)
upstreamIdx = 0
downstreamTimer.record(downstreamTimes, downstreamIdx, TimeUnit.NANOSECONDS)
downstreamIdx = 0
lastUpdate = now
}

setHandlers(in, out, this)
}
}
}

private object MonitorFlow {

private val MeterBatchSize = 10000
private val MeterUpdateInterval = TimeUnit.SECONDS.toNanos(1L)
}

/**
* Map operation that passes in the materializer for the graph stage.
*
Expand Down Expand Up @@ -278,13 +315,13 @@ object StreamOps extends StrictLogging {
private var previous: V = _
private var lastPushedAt: Long = 0

private def isExpired(): Boolean = {
private def isExpired: Boolean = {
lastPushedAt == 0 || clock.wallTime() - lastPushedAt > timeout
}

override def onPush(): Unit = {
val v = grab(in)
if (v == previous && !isExpired()) {
if (v == previous && !isExpired) {
pull(in)
} else {
previous = v
Expand Down
Expand Up @@ -165,11 +165,11 @@ class StreamOpsSuite extends FunSuite {
}

test("monitor flow: downstream delay") {
testMonitorFlow("downstreamDelay", Map("count" -> 8.0, "totalTime" -> 24.0))
testMonitorFlow("downstreamDelay", Map("count" -> 10.0, "totalTime" -> 27.0))
}

test("monitor flow: upstream delay") {
testMonitorFlow("upstreamDelay", Map("count" -> 8.0, "totalTime" -> 0.0))
testMonitorFlow("upstreamDelay", Map("count" -> 10.0, "totalTime" -> 0.0))
}

private class Message(latch: CountDownLatch, val data: Source[Int, NotUsed]) {
Expand Down
132 changes: 132 additions & 0 deletions atlas-jmh/src/main/scala/com/netflix/atlas/akka/MonitorFlowBench.scala
@@ -0,0 +1,132 @@
/*
* Copyright 2014-2022 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.netflix.atlas.akka

import akka.actor.ActorSystem
import akka.stream.Attributes
import akka.stream.FlowShape
import akka.stream.Inlet
import akka.stream.Outlet
import akka.stream.scaladsl.Sink
import akka.stream.scaladsl.Source
import akka.stream.stage.GraphStage
import akka.stream.stage.GraphStageLogic
import akka.stream.stage.InHandler
import akka.stream.stage.OutHandler
import com.netflix.spectator.api.Clock
import com.netflix.spectator.api.Registry
import com.netflix.spectator.api.Timer
import com.netflix.spectator.atlas.AtlasRegistry
import org.openjdk.jmh.annotations.Benchmark
import org.openjdk.jmh.annotations.Scope
import org.openjdk.jmh.annotations.Setup
import org.openjdk.jmh.annotations.State
import org.openjdk.jmh.annotations.TearDown
import org.openjdk.jmh.infra.Blackhole

import java.util.concurrent.TimeUnit
import scala.concurrent.Await
import scala.concurrent.duration.Duration

@State(Scope.Thread)
class MonitorFlowBench {

import MonitorFlowBench._

private implicit var system: ActorSystem = _
private var registry: Registry = _

@Setup
def setup(): Unit = {
system = ActorSystem(getClass.getSimpleName)
registry = new AtlasRegistry(Clock.SYSTEM, k => System.getProperty(k))
}

@TearDown
def tearDown(): Unit = {
Await.result(system.terminate(), Duration.Inf)
}

@Benchmark
def individual(bh: Blackhole): Unit = {
val future = Source(0 until 1_000_000)
.via(new MonitorFlow(registry, "1"))
.via(new MonitorFlow(registry, "2"))
.via(new MonitorFlow(registry, "3"))
.reduce(_ + _)
.runWith(Sink.head)
bh.consume(Await.result(future, Duration.Inf))
}

@Benchmark
def batched(bh: Blackhole): Unit = {
val future = Source(0 until 1_000_000)
.via(StreamOps.monitorFlow(registry, "1"))
.via(StreamOps.monitorFlow(registry, "2"))
.via(StreamOps.monitorFlow(registry, "3"))
.reduce(_ + _)
.runWith(Sink.head)
bh.consume(Await.result(future, Duration.Inf))
}
}

object MonitorFlowBench {

private final class MonitorFlow[T](registry: Registry, id: String)
extends GraphStage[FlowShape[T, T]] {

private val numEvents = registry.counter("akka.stream.numEvents", "id", id)
private val upstreamTimer = registry.timer("akka.stream.upstreamDelay", "id", id)
private val downstreamTimer = registry.timer("akka.stream.downstreamDelay", "id", id)

private val in = Inlet[T]("MonitorBackpressure.in")
private val out = Outlet[T]("MonitorBackpressure.out")

override val shape: FlowShape[T, T] = FlowShape(in, out)

override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = {

new GraphStageLogic(shape) with InHandler with OutHandler {

private var upstreamStart: Long = -1L
private var downstreamStart: Long = -1L

override def onPush(): Unit = {
upstreamStart = record(upstreamTimer, upstreamStart)
push(out, grab(in))
numEvents.increment()
downstreamStart = registry.clock().monotonicTime()
}

override def onPull(): Unit = {
downstreamStart = record(downstreamTimer, downstreamStart)
pull(in)
upstreamStart = registry.clock().monotonicTime()
}

private def record(timer: Timer, start: Long): Long = {
if (start > 0L) {
val delay = registry.clock().monotonicTime() - start
timer.record(delay, TimeUnit.NANOSECONDS)
}
-1L
}

setHandlers(in, out, this)
}
}
}
}
3 changes: 3 additions & 0 deletions build.sbt
Expand Up @@ -73,6 +73,9 @@ lazy val `atlas-eval` = project
lazy val `atlas-jmh` = project
.configure(BuildSettings.profile)
.dependsOn(`atlas-chart`, `atlas-core`, `atlas-eval`, `atlas-json`, `atlas-webapi`)
.settings(libraryDependencies ++= Seq(
Dependencies.spectatorAtlas
))
.enablePlugins(pl.project13.scala.sbt.SbtJmh)

lazy val `atlas-json` = project
Expand Down
1 change: 1 addition & 0 deletions project/Dependencies.scala
Expand Up @@ -70,6 +70,7 @@ object Dependencies {
val slf4jLog4j = "org.slf4j" % "slf4j-log4j12" % slf4j
val slf4jSimple = "org.slf4j" % "slf4j-simple" % slf4j
val spectatorApi = "com.netflix.spectator" % "spectator-api" % spectator
val spectatorAtlas = "com.netflix.spectator" % "spectator-reg-atlas" % spectator
val spectatorIpc = "com.netflix.spectator" % "spectator-ext-ipc" % spectator
val spectatorLog4j = "com.netflix.spectator" % "spectator-ext-log4j2" % spectator
val spectatorM2 = "com.netflix.spectator" % "spectator-reg-metrics2" % spectator
Expand Down

0 comments on commit 2374aff

Please sign in to comment.