diff --git a/bench-jmh/src/main/scala/org/apache/pekko/stream/InterpreterBenchmark.scala b/bench-jmh/src/main/scala/org/apache/pekko/stream/InterpreterBenchmark.scala index 30724ab7751..2045852a691 100644 --- a/bench-jmh/src/main/scala/org/apache/pekko/stream/InterpreterBenchmark.scala +++ b/bench-jmh/src/main/scala/org/apache/pekko/stream/InterpreterBenchmark.scala @@ -15,19 +15,20 @@ package org.apache.pekko.stream import java.util.concurrent.TimeUnit +import scala.concurrent.Await +import scala.concurrent.duration._ + import org.openjdk.jmh.annotations._ import org.apache.pekko -import pekko.event._ import pekko.stream.impl.fusing.GraphInterpreter.{ DownstreamBoundaryStageLogic, UpstreamBoundaryStageLogic } import pekko.stream.impl.fusing.GraphInterpreterSpecKit -import pekko.stream.impl.fusing.GraphStages import pekko.stream.stage._ @State(Scope.Benchmark) @OutputTimeUnit(TimeUnit.MILLISECONDS) @BenchmarkMode(Array(Mode.Throughput)) -class InterpreterBenchmark { +class InterpreterBenchmark extends GraphInterpreterSpecKit { import InterpreterBenchmark._ // manual, and not via @Param, because we want @OperationsPerInvocation on our tests @@ -36,32 +37,59 @@ class InterpreterBenchmark { @Param(Array("1", "5", "10")) var numberOfIds: Int = 0 + // Earlier this benchmark instantiated `new GraphInterpreterSpecKit` inside @Benchmark, which + // created (and leaked) a fresh ActorSystem on every invocation and would exhaust native threads + // on long runs. Extending the SpecKit means JMH's @State(Scope.Benchmark) lifecycle reuses a + // single ActorSystem across all invocations. + + @TearDown(Level.Trial) + def shutdown(): Unit = { + Await.result(system.terminate(), 10.seconds) + } + @Benchmark @OperationsPerInvocation(100000) def graph_interpreter_100k_elements(): Unit = { - new GraphInterpreterSpecKit { - new TestSetup { - val identities = Vector.fill(numberOfIds)(GraphStages.identity[Int]) - val source = new GraphDataSource("source", data100k) - val sink = new GraphDataSink[Int]("sink", data100k.size) - - val b = builder(identities: _*).connect(source, identities.head.in).connect(identities.last.out, sink) + new TestSetup { + val identities = Vector.fill(numberOfIds)(new IdentityStage[Int]) + val source = new GraphDataSource("source", data100k) + val sink = new GraphDataSink[Int]("sink", data100k.size) - // FIXME: This should not be here, this is pure setup overhead - for (i <- 0 until identities.size - 1) { - b.connect(identities(i).out, identities(i + 1).in) - } + val b = builder(identities: _*).connect(source, identities.head.in).connect(identities.last.out, sink) - b.init() - sink.requestOne() - interpreter.execute(Int.MaxValue) + // FIXME: This should not be here, this is pure setup overhead + for (i <- 0 until identities.size - 1) { + b.connect(identities(i).out, identities(i + 1).in) } + + b.init() + sink.requestOne() + interpreter.execute(Int.MaxValue) } } } object InterpreterBenchmark { + /** + * Per-instance identity stage. Cannot reuse [[GraphStages.identity]] because it is a singleton + * whose Inlet/Outlet shape is shared across all references — chaining N copies of the singleton + * collapses to a single shape and mis-wires the assembly (manifests as `Cannot pull port twice`). + */ + final class IdentityStage[T] extends GraphStage[FlowShape[T, T]] { + val in = Inlet[T]("Identity.in") + val out = Outlet[T]("Identity.out") + override val shape: FlowShape[T, T] = FlowShape(in, out) + + override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = + new GraphStageLogic(shape) with InHandler with OutHandler { + override def onPush(): Unit = push(out, grab(in)) + override def onPull(): Unit = pull(in) + setHandler(in, this) + setHandler(out, this) + } + } + case class GraphDataSource[T](override val toString: String, data: Vector[T]) extends UpstreamBoundaryStageLogic[T] { var idx: Int = 0 override val out: pekko.stream.Outlet[T] = Outlet[T]("out") @@ -98,11 +126,4 @@ object InterpreterBenchmark { def requestOne(): Unit = pull(in) } - - val NoopBus = new LoggingBus { - override def subscribe(subscriber: Subscriber, to: Classifier): Boolean = true - override def publish(event: Event): Unit = () - override def unsubscribe(subscriber: Subscriber, from: Classifier): Boolean = true - override def unsubscribe(subscriber: Subscriber): Unit = () - } } diff --git a/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/GraphInterpreter.scala b/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/GraphInterpreter.scala index b774cfb6ab4..aedcd0b301a 100644 --- a/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/GraphInterpreter.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/GraphInterpreter.scala @@ -259,6 +259,10 @@ import pekko.stream.stage._ private[this] var chaseCounter = 0 // the first events in preStart blocks should be not chased private[this] var chasedPush: Connection = NoEvent private[this] var chasedPull: Connection = NoEvent + // Set whenever a stage's shutdownCounter transitions to 0 (i.e. the stage just became completed and + // needs finalization). Lets the chase / dispatch loops skip the per-iteration shutdownCounter array + // load in afterStageHasRun when no stage has completed since the last finalization pass. + private[this] var pendingFinalization: Boolean = false private def queueStatus: String = { val contents = (queueHead until queueTail).map(idx => { @@ -420,7 +424,10 @@ import pekko.stream.stage._ catch { case NonFatal(e) => reportStageError(e) } - afterStageHasRun(activeStage) + if (pendingFinalization) { + pendingFinalization = false + afterStageHasRun(activeStage) + } /* * "Event chasing" optimization follows from here. This optimization works under the assumption that a Push or @@ -453,7 +460,10 @@ import pekko.stream.stage._ catch { case NonFatal(e) => reportStageError(e) } - afterStageHasRun(activeStage) + if (pendingFinalization) { + pendingFinalization = false + afterStageHasRun(activeStage) + } } // Chasing PULL events @@ -464,7 +474,10 @@ import pekko.stream.stage._ catch { case NonFatal(e) => reportStageError(e) } - afterStageHasRun(activeStage) + if (pendingFinalization) { + pendingFinalization = false + afterStageHasRun(activeStage) + } } if (chasedPush != NoEvent) { @@ -627,12 +640,20 @@ import pekko.stream.stage._ // itself might stop, too. private def completeConnection(stageId: Int): Unit = { val activeConnections = shutdownCounter(stageId) - if (activeConnections > 0) shutdownCounter(stageId) = activeConnections - 1 + if (activeConnections > 0) { + val next = activeConnections - 1 + shutdownCounter(stageId) = next + if (next == 0) pendingFinalization = true + } } private[stream] def setKeepGoing(logic: GraphStageLogic, enabled: Boolean): Unit = if (enabled) shutdownCounter(logic.stageId) |= KeepGoingFlag - else shutdownCounter(logic.stageId) &= KeepGoingMask + else { + val next = shutdownCounter(logic.stageId) & KeepGoingMask + shutdownCounter(logic.stageId) = next + if (next == 0) pendingFinalization = true + } @InternalStableApi private[stream] def finalizeStage(logic: GraphStageLogic): Unit = {