Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -15,19 +15,20 @@ package org.apache.pekko.stream

import java.util.concurrent.TimeUnit

import scala.concurrent.Await
import scala.concurrent.duration._

import org.openjdk.jmh.annotations._

import org.apache.pekko
import pekko.event._
import pekko.stream.impl.fusing.GraphInterpreter.{ DownstreamBoundaryStageLogic, UpstreamBoundaryStageLogic }
import pekko.stream.impl.fusing.GraphInterpreterSpecKit
import pekko.stream.impl.fusing.GraphStages
import pekko.stream.stage._

@State(Scope.Benchmark)
@OutputTimeUnit(TimeUnit.MILLISECONDS)
@BenchmarkMode(Array(Mode.Throughput))
class InterpreterBenchmark {
class InterpreterBenchmark extends GraphInterpreterSpecKit {
import InterpreterBenchmark._

// manual, and not via @Param, because we want @OperationsPerInvocation on our tests
Expand All @@ -36,32 +37,59 @@ class InterpreterBenchmark {
@Param(Array("1", "5", "10"))
var numberOfIds: Int = 0

// Earlier this benchmark instantiated `new GraphInterpreterSpecKit` inside @Benchmark, which
// created (and leaked) a fresh ActorSystem on every invocation and would exhaust native threads
// on long runs. Extending the SpecKit means JMH's @State(Scope.Benchmark) lifecycle reuses a
// single ActorSystem across all invocations.

@TearDown(Level.Trial)
def shutdown(): Unit = {
Await.result(system.terminate(), 10.seconds)
}

@Benchmark
@OperationsPerInvocation(100000)
def graph_interpreter_100k_elements(): Unit = {
new GraphInterpreterSpecKit {
new TestSetup {
val identities = Vector.fill(numberOfIds)(GraphStages.identity[Int])
val source = new GraphDataSource("source", data100k)
val sink = new GraphDataSink[Int]("sink", data100k.size)

val b = builder(identities: _*).connect(source, identities.head.in).connect(identities.last.out, sink)
new TestSetup {
val identities = Vector.fill(numberOfIds)(new IdentityStage[Int])
val source = new GraphDataSource("source", data100k)
val sink = new GraphDataSink[Int]("sink", data100k.size)

// FIXME: This should not be here, this is pure setup overhead
for (i <- 0 until identities.size - 1) {
b.connect(identities(i).out, identities(i + 1).in)
}
val b = builder(identities: _*).connect(source, identities.head.in).connect(identities.last.out, sink)

b.init()
sink.requestOne()
interpreter.execute(Int.MaxValue)
// FIXME: This should not be here, this is pure setup overhead
for (i <- 0 until identities.size - 1) {
b.connect(identities(i).out, identities(i + 1).in)
}

b.init()
sink.requestOne()
interpreter.execute(Int.MaxValue)
}
}
}

object InterpreterBenchmark {

/**
* Per-instance identity stage. Cannot reuse [[GraphStages.identity]] because it is a singleton
* whose Inlet/Outlet shape is shared across all references — chaining N copies of the singleton
* collapses to a single shape and mis-wires the assembly (manifests as `Cannot pull port twice`).
*/
final class IdentityStage[T] extends GraphStage[FlowShape[T, T]] {
val in = Inlet[T]("Identity.in")
val out = Outlet[T]("Identity.out")
override val shape: FlowShape[T, T] = FlowShape(in, out)

override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
new GraphStageLogic(shape) with InHandler with OutHandler {
override def onPush(): Unit = push(out, grab(in))
override def onPull(): Unit = pull(in)
setHandler(in, this)
setHandler(out, this)
}
}

case class GraphDataSource[T](override val toString: String, data: Vector[T]) extends UpstreamBoundaryStageLogic[T] {
var idx: Int = 0
override val out: pekko.stream.Outlet[T] = Outlet[T]("out")
Expand Down Expand Up @@ -98,11 +126,4 @@ object InterpreterBenchmark {

def requestOne(): Unit = pull(in)
}

val NoopBus = new LoggingBus {
override def subscribe(subscriber: Subscriber, to: Classifier): Boolean = true
override def publish(event: Event): Unit = ()
override def unsubscribe(subscriber: Subscriber, from: Classifier): Boolean = true
override def unsubscribe(subscriber: Subscriber): Unit = ()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -259,6 +259,10 @@ import pekko.stream.stage._
private[this] var chaseCounter = 0 // the first events in preStart blocks should be not chased
private[this] var chasedPush: Connection = NoEvent
private[this] var chasedPull: Connection = NoEvent
// Set whenever a stage's shutdownCounter transitions to 0 (i.e. the stage just became completed and
// needs finalization). Lets the chase / dispatch loops skip the per-iteration shutdownCounter array
// load in afterStageHasRun when no stage has completed since the last finalization pass.
private[this] var pendingFinalization: Boolean = false

private def queueStatus: String = {
val contents = (queueHead until queueTail).map(idx => {
Expand Down Expand Up @@ -420,7 +424,10 @@ import pekko.stream.stage._
catch {
case NonFatal(e) => reportStageError(e)
}
afterStageHasRun(activeStage)
if (pendingFinalization) {
pendingFinalization = false
afterStageHasRun(activeStage)
}

/*
* "Event chasing" optimization follows from here. This optimization works under the assumption that a Push or
Expand Down Expand Up @@ -453,7 +460,10 @@ import pekko.stream.stage._
catch {
case NonFatal(e) => reportStageError(e)
}
afterStageHasRun(activeStage)
if (pendingFinalization) {
pendingFinalization = false
afterStageHasRun(activeStage)
}
}

// Chasing PULL events
Expand All @@ -464,7 +474,10 @@ import pekko.stream.stage._
catch {
case NonFatal(e) => reportStageError(e)
}
afterStageHasRun(activeStage)
if (pendingFinalization) {
pendingFinalization = false
afterStageHasRun(activeStage)
}
}

if (chasedPush != NoEvent) {
Expand Down Expand Up @@ -627,12 +640,20 @@ import pekko.stream.stage._
// itself might stop, too.
private def completeConnection(stageId: Int): Unit = {
val activeConnections = shutdownCounter(stageId)
if (activeConnections > 0) shutdownCounter(stageId) = activeConnections - 1
if (activeConnections > 0) {
val next = activeConnections - 1
shutdownCounter(stageId) = next
if (next == 0) pendingFinalization = true
}
}

private[stream] def setKeepGoing(logic: GraphStageLogic, enabled: Boolean): Unit =
if (enabled) shutdownCounter(logic.stageId) |= KeepGoingFlag
else shutdownCounter(logic.stageId) &= KeepGoingMask
else {
val next = shutdownCounter(logic.stageId) & KeepGoingMask
shutdownCounter(logic.stageId) = next
if (next == 0) pendingFinalization = true
}

@InternalStableApi
private[stream] def finalizeStage(logic: GraphStageLogic): Unit = {
Expand Down