diff --git a/atlas-akka/src/main/scala/com/netflix/atlas/akka/StreamOps.scala b/atlas-akka/src/main/scala/com/netflix/atlas/akka/StreamOps.scala index c68b757e4..4922e283d 100644 --- a/atlas-akka/src/main/scala/com/netflix/atlas/akka/StreamOps.scala +++ b/atlas-akka/src/main/scala/com/netflix/atlas/akka/StreamOps.scala @@ -37,7 +37,6 @@ import akka.stream.stage.InHandler import akka.stream.stage.OutHandler import com.netflix.spectator.api.Clock import com.netflix.spectator.api.Registry -import com.netflix.spectator.api.Timer import com.typesafe.scalalogging.StrictLogging /** @@ -148,6 +147,9 @@ object StreamOps extends StrictLogging { * - `akka.stream.upstreamDelay`: a timer measuring the delay for the upstream to push * a new element after one has been requested. * + * Updates to meters will be batched so the updates may be delayed if used for a low + * throughput stream. + * * @param registry * Spectator registry to manage metrics. * @param id @@ -173,28 +175,57 @@ object StreamOps extends StrictLogging { new GraphStageLogic(shape) with InHandler with OutHandler { - private var upstreamStart: Long = -1L - private var downstreamStart: Long = -1L + import MonitorFlow._ + + private var lastUpdate = registry.clock().monotonicTime() + private var upstreamIdx = 0 + private val upstreamTimes = new Array[Long](MeterBatchSize) + private var downstreamIdx = 0 + private val downstreamTimes = new Array[Long](MeterBatchSize) + + private var upstreamStart = -1L + private var downstreamStart = -1L override def onPush(): Unit = { - upstreamStart = record(upstreamTimer, upstreamStart) + val now = registry.clock().monotonicTime() + if (upstreamStart != -1L) { + upstreamTimes(upstreamIdx) = now - upstreamStart + upstreamIdx += 1 + upstreamStart = -1L + } push(out, grab(in)) - numEvents.increment() - downstreamStart = registry.clock().monotonicTime() + downstreamStart = now + if (upstreamIdx == MeterBatchSize || now - lastUpdate > MeterUpdateInterval) { + updateMeters(now) + } } override def onPull(): Unit = { - downstreamStart = record(downstreamTimer, downstreamStart) + val now = registry.clock().monotonicTime() + if (downstreamStart != -1L) { + downstreamTimes(downstreamIdx) = now - downstreamStart + downstreamIdx += 1 + downstreamStart = -1L + } pull(in) - upstreamStart = registry.clock().monotonicTime() + upstreamStart = now + if (downstreamIdx == MeterBatchSize) { + updateMeters(now) + } } - private def record(timer: Timer, start: Long): Long = { - if (start > 0L) { - val delay = registry.clock().monotonicTime() - start - timer.record(delay, TimeUnit.NANOSECONDS) - } - -1L + override def onUpstreamFinish(): Unit = { + updateMeters(registry.clock().monotonicTime()) + super.onUpstreamFinish() + } + + private def updateMeters(now: Long): Unit = { + numEvents.increment(upstreamIdx) + upstreamTimer.record(upstreamTimes, upstreamIdx, TimeUnit.NANOSECONDS) + upstreamIdx = 0 + downstreamTimer.record(downstreamTimes, downstreamIdx, TimeUnit.NANOSECONDS) + downstreamIdx = 0 + lastUpdate = now } setHandlers(in, out, this) @@ -202,6 +233,12 @@ object StreamOps extends StrictLogging { } } + private object MonitorFlow { + + private val MeterBatchSize = 10000 + private val MeterUpdateInterval = TimeUnit.SECONDS.toNanos(1L) + } + /** * Map operation that passes in the materializer for the graph stage. * @@ -278,13 +315,13 @@ object StreamOps extends StrictLogging { private var previous: V = _ private var lastPushedAt: Long = 0 - private def isExpired(): Boolean = { + private def isExpired: Boolean = { lastPushedAt == 0 || clock.wallTime() - lastPushedAt > timeout } override def onPush(): Unit = { val v = grab(in) - if (v == previous && !isExpired()) { + if (v == previous && !isExpired) { pull(in) } else { previous = v diff --git a/atlas-akka/src/test/scala/com/netflix/atlas/akka/StreamOpsSuite.scala b/atlas-akka/src/test/scala/com/netflix/atlas/akka/StreamOpsSuite.scala index d47087e99..ea11700fe 100644 --- a/atlas-akka/src/test/scala/com/netflix/atlas/akka/StreamOpsSuite.scala +++ b/atlas-akka/src/test/scala/com/netflix/atlas/akka/StreamOpsSuite.scala @@ -165,11 +165,11 @@ class StreamOpsSuite extends FunSuite { } test("monitor flow: downstream delay") { - testMonitorFlow("downstreamDelay", Map("count" -> 8.0, "totalTime" -> 24.0)) + testMonitorFlow("downstreamDelay", Map("count" -> 10.0, "totalTime" -> 27.0)) } test("monitor flow: upstream delay") { - testMonitorFlow("upstreamDelay", Map("count" -> 8.0, "totalTime" -> 0.0)) + testMonitorFlow("upstreamDelay", Map("count" -> 10.0, "totalTime" -> 0.0)) } private class Message(latch: CountDownLatch, val data: Source[Int, NotUsed]) { diff --git a/atlas-jmh/src/main/scala/com/netflix/atlas/akka/MonitorFlowBench.scala b/atlas-jmh/src/main/scala/com/netflix/atlas/akka/MonitorFlowBench.scala new file mode 100644 index 000000000..4322bee14 --- /dev/null +++ b/atlas-jmh/src/main/scala/com/netflix/atlas/akka/MonitorFlowBench.scala @@ -0,0 +1,132 @@ +/* + * Copyright 2014-2022 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.netflix.atlas.akka + +import akka.actor.ActorSystem +import akka.stream.Attributes +import akka.stream.FlowShape +import akka.stream.Inlet +import akka.stream.Outlet +import akka.stream.scaladsl.Sink +import akka.stream.scaladsl.Source +import akka.stream.stage.GraphStage +import akka.stream.stage.GraphStageLogic +import akka.stream.stage.InHandler +import akka.stream.stage.OutHandler +import com.netflix.spectator.api.Clock +import com.netflix.spectator.api.Registry +import com.netflix.spectator.api.Timer +import com.netflix.spectator.atlas.AtlasRegistry +import org.openjdk.jmh.annotations.Benchmark +import org.openjdk.jmh.annotations.Scope +import org.openjdk.jmh.annotations.Setup +import org.openjdk.jmh.annotations.State +import org.openjdk.jmh.annotations.TearDown +import org.openjdk.jmh.infra.Blackhole + +import java.util.concurrent.TimeUnit +import scala.concurrent.Await +import scala.concurrent.duration.Duration + +@State(Scope.Thread) +class MonitorFlowBench { + + import MonitorFlowBench._ + + private implicit var system: ActorSystem = _ + private var registry: Registry = _ + + @Setup + def setup(): Unit = { + system = ActorSystem(getClass.getSimpleName) + registry = new AtlasRegistry(Clock.SYSTEM, k => System.getProperty(k)) + } + + @TearDown + def tearDown(): Unit = { + Await.result(system.terminate(), Duration.Inf) + } + + @Benchmark + def individual(bh: Blackhole): Unit = { + val future = Source(0 until 1_000_000) + .via(new MonitorFlow(registry, "1")) + .via(new MonitorFlow(registry, "2")) + .via(new MonitorFlow(registry, "3")) + .reduce(_ + _) + .runWith(Sink.head) + bh.consume(Await.result(future, Duration.Inf)) + } + + @Benchmark + def batched(bh: Blackhole): Unit = { + val future = Source(0 until 1_000_000) + .via(StreamOps.monitorFlow(registry, "1")) + .via(StreamOps.monitorFlow(registry, "2")) + .via(StreamOps.monitorFlow(registry, "3")) + .reduce(_ + _) + .runWith(Sink.head) + bh.consume(Await.result(future, Duration.Inf)) + } +} + +object MonitorFlowBench { + + private final class MonitorFlow[T](registry: Registry, id: String) + extends GraphStage[FlowShape[T, T]] { + + private val numEvents = registry.counter("akka.stream.numEvents", "id", id) + private val upstreamTimer = registry.timer("akka.stream.upstreamDelay", "id", id) + private val downstreamTimer = registry.timer("akka.stream.downstreamDelay", "id", id) + + private val in = Inlet[T]("MonitorBackpressure.in") + private val out = Outlet[T]("MonitorBackpressure.out") + + override val shape: FlowShape[T, T] = FlowShape(in, out) + + override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = { + + new GraphStageLogic(shape) with InHandler with OutHandler { + + private var upstreamStart: Long = -1L + private var downstreamStart: Long = -1L + + override def onPush(): Unit = { + upstreamStart = record(upstreamTimer, upstreamStart) + push(out, grab(in)) + numEvents.increment() + downstreamStart = registry.clock().monotonicTime() + } + + override def onPull(): Unit = { + downstreamStart = record(downstreamTimer, downstreamStart) + pull(in) + upstreamStart = registry.clock().monotonicTime() + } + + private def record(timer: Timer, start: Long): Long = { + if (start > 0L) { + val delay = registry.clock().monotonicTime() - start + timer.record(delay, TimeUnit.NANOSECONDS) + } + -1L + } + + setHandlers(in, out, this) + } + } + } +} diff --git a/build.sbt b/build.sbt index 7780fda34..8113beacd 100644 --- a/build.sbt +++ b/build.sbt @@ -73,6 +73,9 @@ lazy val `atlas-eval` = project lazy val `atlas-jmh` = project .configure(BuildSettings.profile) .dependsOn(`atlas-chart`, `atlas-core`, `atlas-eval`, `atlas-json`, `atlas-webapi`) + .settings(libraryDependencies ++= Seq( + Dependencies.spectatorAtlas + )) .enablePlugins(pl.project13.scala.sbt.SbtJmh) lazy val `atlas-json` = project diff --git a/project/Dependencies.scala b/project/Dependencies.scala index 53244d84a..5136eaf74 100644 --- a/project/Dependencies.scala +++ b/project/Dependencies.scala @@ -70,6 +70,7 @@ object Dependencies { val slf4jLog4j = "org.slf4j" % "slf4j-log4j12" % slf4j val slf4jSimple = "org.slf4j" % "slf4j-simple" % slf4j val spectatorApi = "com.netflix.spectator" % "spectator-api" % spectator + val spectatorAtlas = "com.netflix.spectator" % "spectator-reg-atlas" % spectator val spectatorIpc = "com.netflix.spectator" % "spectator-ext-ipc" % spectator val spectatorLog4j = "com.netflix.spectator" % "spectator-ext-log4j2" % spectator val spectatorM2 = "com.netflix.spectator" % "spectator-reg-metrics2" % spectator