diff --git a/bench-jmh/src/main/scala/org/apache/pekko/stream/StageActorRefBenchmark.scala b/bench-jmh/src/main/scala/org/apache/pekko/stream/StageActorRefBenchmark.scala new file mode 100644 index 00000000000..007253e275f --- /dev/null +++ b/bench-jmh/src/main/scala/org/apache/pekko/stream/StageActorRefBenchmark.scala @@ -0,0 +1,136 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.pekko.stream + +import java.util.concurrent.CountDownLatch +import java.util.concurrent.TimeUnit + +import scala.concurrent.Await +import scala.concurrent.Promise +import scala.concurrent.duration._ + +import org.openjdk.jmh.annotations._ + +import org.apache.pekko +import pekko.actor.ActorRef +import pekko.actor.ActorSystem +import pekko.actor.NoSerializationVerificationNeeded +import pekko.stream.scaladsl.Keep +import pekko.stream.scaladsl.Sink +import pekko.stream.scaladsl.Source +import pekko.stream.stage.GraphStageLogic +import pekko.stream.stage.GraphStageWithMaterializedValue +import pekko.stream.stage.InHandler + +object StageActorRefBenchmark { + final val OperationsPerInvocation = 10000 + private case object CountDown extends NoSerializationVerificationNeeded + + private final class Control { + private val ready = new CountDownLatch(1) + @volatile private var ref: ActorRef = _ + @volatile private var latch: CountDownLatch = _ + + def init(ref: ActorRef): Unit = { + this.ref = ref + ready.countDown() + } + + def stageActorRef: ActorRef = { + if (!ready.await(10, TimeUnit.SECONDS)) + throw new RuntimeException("Stage actor ref was not initialized") + ref + } + + def reset(expectedMessages: Int): Unit = + latch = new CountDownLatch(expectedMessages) + + def countDown(): Unit = + latch.countDown() + + def awaitDone(): Unit = + if (!latch.await(10, TimeUnit.SECONDS)) + throw new RuntimeException("Stage actor ref benchmark messages timed out") + } + + private final class StageActorSink extends GraphStageWithMaterializedValue[SinkShape[Any], Control] { + val in: Inlet[Any] = Inlet("StageActorSink.in") + override val shape: SinkShape[Any] = SinkShape(in) + + override def createLogicAndMaterializedValue(inheritedAttributes: Attributes): (GraphStageLogic, Control) = { + val control = new Control + + val logic = new GraphStageLogic(shape) { + override def preStart(): Unit = { + control.init(getStageActor { + case (_, CountDown) => control.countDown() + }.ref) + pull(in) + } + + setHandler( + in, + new InHandler { + override def onPush(): Unit = pull(in) + }) + } + + logic -> control + } + } +} + +@State(Scope.Benchmark) +@OutputTimeUnit(TimeUnit.SECONDS) +@BenchmarkMode(Array(Mode.Throughput)) +class StageActorRefBenchmark { + import StageActorRefBenchmark._ + + implicit val system: ActorSystem = ActorSystem("StageActorRefBenchmark") + + private var completion: Promise[Option[Any]] = _ + private var control: Control = _ + private var stageActorRef: ActorRef = _ + + @Setup + def setup(): Unit = { + SystemMaterializer(system).materializer + val materialized = Source.maybe[Any].toMat(Sink.fromGraph(new StageActorSink))(Keep.both).run() + completion = materialized._1 + control = materialized._2 + stageActorRef = control.stageActorRef + } + + @TearDown + def shutdown(): Unit = { + completion.trySuccess(None) + Await.result(system.terminate(), 5.seconds) + } + + @Benchmark + @OperationsPerInvocation(OperationsPerInvocation) + def lazy_stage_actor_ref_tell_10k(): Unit = { + control.reset(OperationsPerInvocation) + var remaining = OperationsPerInvocation + while (remaining > 0) { + stageActorRef ! CountDown + remaining -= 1 + } + control.awaitDone() + } +} diff --git a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/StageActorRefSpec.scala b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/StageActorRefSpec.scala index c57068397de..3c7f2cd265e 100644 --- a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/StageActorRefSpec.scala +++ b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/StageActorRefSpec.scala @@ -18,16 +18,19 @@ import scala.concurrent.Promise import scala.concurrent.duration._ import org.apache.pekko +import pekko.actor.ActorPath import pekko.actor.ActorRef import pekko.actor.Kill import pekko.actor.NoSerializationVerificationNeeded import pekko.actor.PoisonPill import pekko.event.Logging import pekko.stream._ +import pekko.stream.impl.fusing.GraphInterpreter import pekko.stream.stage.GraphStageLogic import pekko.stream.stage.GraphStageWithMaterializedValue import pekko.stream.stage.InHandler import pekko.stream.testkit.StreamSpec +import pekko.stream.testkit.scaladsl.TestSink import pekko.testkit.EventFilter import pekko.testkit.ImplicitSender import pekko.testkit.TestEvent @@ -181,6 +184,51 @@ class StageActorRefSpec extends StreamSpec with ImplicitSender { res.futureValue should ===(42) } + "run non-eager stage actor messages in the graph interpreter" in { + val (_, res) = Source.maybe[Int].toMat(sumStage(testActor))(Keep.both).run() + + val stageRef = expectMsgType[ActorRef] + stageRef ! AddAndTell(1) + expectMsg(1) + + stageRef ! ReportStageActorInterpreter + val location = expectMsgType[StageActorLocation] + + location.stageActorParent should ===(location.supervisor) + location.interpreter should !==(location.supervisor) + + stageRef ! StopNow + res.futureValue should ===(1) + } + + "keep eagerly materialized stage actors usable before stream demand" in { + val (ref, probe) = Source + .actorRef[Int]({ + case CompleteNow => CompletionStrategy.Immediately + }, PartialFunction.empty, bufferSize = 8, OverflowStrategy.fail) + .toMat(TestSink[Int]())(Keep.both) + .run() + + ref ! 1 + probe.request(1).expectNext(1) + ref ! CompleteNow + probe.expectComplete() + } + + "keep eagerly materialized stage actors attached to the stream supervisor" in { + val (source, res) = Source.maybe[Int].toMat(eagerLocationStage(testActor))(Keep.both).run() + + val stageRef = expectMsgType[ActorRef] + stageRef ! ReportEagerStageActorInterpreter + val location = expectMsgType[EagerStageActorLocation] + + location.stageActorParent should ===(location.supervisor) + location.stageActorParent should !==(location.interpreter) + + source.success(None) + res.futureValue should ===(0) + } + } } @@ -194,10 +242,19 @@ object StageActorRefSpec { case object BecomeStringEcho extends NoSerializationVerificationNeeded case object PullNow extends NoSerializationVerificationNeeded case object StopNow extends NoSerializationVerificationNeeded + case object ReportStageActorInterpreter extends NoSerializationVerificationNeeded + case object ReportEagerStageActorInterpreter extends NoSerializationVerificationNeeded + case object CompleteNow extends NoSerializationVerificationNeeded + final case class StageActorLocation(stageActorParent: ActorPath, supervisor: ActorPath, interpreter: ActorPath) + extends NoSerializationVerificationNeeded + final case class EagerStageActorLocation(stageActorParent: ActorPath, supervisor: ActorPath, interpreter: ActorPath) + extends NoSerializationVerificationNeeded } import ControlProtocol._ + def eagerLocationStage(probe: ActorRef) = EagerLocationStage(probe) + case class SumTestStage(probe: ActorRef) extends GraphStageWithMaterializedValue[SinkShape[Int], Future[Int]] { val in = Inlet[Int]("IntSum.in") override val shape: SinkShape[Int] = SinkShape.of(in) @@ -216,10 +273,15 @@ object StageActorRefSpec { def behavior(m: (ActorRef, Any)): Unit = { m match { - case (_, Add(n)) => sum += n - case (_, PullNow) => pull(in) - case (sender, CallInitStageActorRef) => sender ! getStageActor(behavior).ref - case (_, BecomeStringEcho) => + case (_, Add(n)) => sum += n + case (_, PullNow) => pull(in) + case (sender, CallInitStageActorRef) => sender ! getStageActor(behavior).ref + case (sender, ReportStageActorInterpreter) => + sender ! StageActorLocation( + stageActor.ref.path.parent, + interpreter.materializer.supervisor.path, + GraphInterpreter.currentInterpreter.context.path) + case (_, BecomeStringEcho) => getStageActor { case (theSender, msg) => theSender ! msg.toString } @@ -258,4 +320,52 @@ object StageActorRefSpec { } } + case class EagerLocationStage(probe: ActorRef) extends GraphStageWithMaterializedValue[SinkShape[Int], Future[Int]] { + val in = Inlet[Int]("EagerLocation.in") + override val shape: SinkShape[Int] = SinkShape.of(in) + + override def createLogicAndMaterializedValue(inheritedAttributes: Attributes): (GraphStageLogic, Future[Int]) = { + val p: Promise[Int] = Promise() + + val logic = new GraphStageLogic(shape) { + var stageRef: ActorRef = _ + var interpreterPath: ActorPath = _ + var supervisorPath: ActorPath = _ + + override def preStart(): Unit = { + interpreterPath = interpreter.context.path + supervisorPath = interpreter.materializer.supervisor.path + stageRef = getEagerStageActor(interpreter.materializer) { + case (sender, ReportEagerStageActorInterpreter) => + sender ! EagerStageActorLocation(stageRef.path.parent, supervisorPath, interpreterPath) + case _ => throw new RuntimeException("unexpected message") + }.ref + pull(in) + probe ! stageRef + } + + setHandler( + in, + new InHandler { + override def onPush(): Unit = { + p.trySuccess(grab(in)) + completeStage() + } + + override def onUpstreamFinish(): Unit = { + p.trySuccess(0) + completeStage() + } + + override def onUpstreamFailure(ex: Throwable): Unit = { + p.tryFailure(ex) + failStage(ex) + } + }) + } + + logic -> p.future + } + } + } diff --git a/stream/src/main/resources/reference.conf b/stream/src/main/resources/reference.conf index f4e4bd31934..059d28079ae 100644 --- a/stream/src/main/resources/reference.conf +++ b/stream/src/main/resources/reference.conf @@ -83,6 +83,13 @@ pekko { # Allows to accelerate message processing that happening within same actor but keep system responsive. sync-processing-limit = 1000 + # Upper bound on stage-actor messages drained per envelope for non-eager `getStageActor` refs. Lazy + # stage actors batch external `tell` deliveries into a MPSC queue and elect a single drain envelope; + # this cap bounds the burst so that other BoundaryEvents (pull/push/complete) can still interleave + # naturally via the actor mailbox. Smaller = better fairness for upstream/downstream events; + # larger = better tell throughput. Must be >= 1. + stage-actor-drain-batch = 16 + debug { # Enables the fuzzing mode which increases the chance of race conditions # by aggressively reordering events and making certain operations more diff --git a/stream/src/main/scala/org/apache/pekko/stream/stage/GraphStage.scala b/stream/src/main/scala/org/apache/pekko/stream/stage/GraphStage.scala index 20cd0d36152..ee406b85416 100644 --- a/stream/src/main/scala/org/apache/pekko/stream/stage/GraphStage.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/stage/GraphStage.scala @@ -14,6 +14,7 @@ package org.apache.pekko.stream.stage import java.util.Spliterator +import java.lang.invoke.{ MethodHandles, VarHandle } import java.util.concurrent.{ CompletionStage, ConcurrentHashMap } import java.util.concurrent.atomic.AtomicReference @@ -27,6 +28,7 @@ import org.apache.pekko import pekko.{ Done, NotUsed } import pekko.actor._ import pekko.annotation.InternalApi +import pekko.dispatch.AbstractNodeQueue import pekko.japi.function.{ Effect, Procedure } import pekko.stream._ import pekko.stream.Attributes.SourceLocation @@ -206,29 +208,61 @@ object GraphStageLogic { * * Not for user instantiation, use [[GraphStageLogic.getStageActor]]. */ - final class StageActor @InternalApi() private[pekko] ( + final class StageActor @InternalApi() private ( materializer: Materializer, - getAsyncCallback: StageActorRef.Receive => AsyncCallback[(ActorRef, Any)], initialReceive: StageActorRef.Receive, - name: String) { + name: String, + cell: ActorCell, + buildDispatch: StageActorRef.Receive => ((ActorRef, Any)) => Unit) { + + @InternalApi private[pekko] def this( + materializer: Materializer, + getAsyncCallback: StageActorRef.Receive => AsyncCallback[(ActorRef, Any)], + initialReceive: StageActorRef.Receive, + name: String) = + this( + materializer, + initialReceive, + name, + StageActor.localCell(materializer.supervisor, "Stream supervisor"), + receive => getAsyncCallback(receive).invoke) + + @InternalApi private[pekko] def this( + materializer: Materializer, + interpreter: GraphInterpreter, + logic: GraphStageLogic, + initialReceive: StageActorRef.Receive, + name: String) = + this( + materializer, + initialReceive, + name, + StageActor.localCell(materializer.supervisor, "Stream supervisor"), + // Coalesce per-tell mailbox traffic: N tells produce 1 AsyncInput envelope (amortized). + receive => + new StageActor.LazyDispatch( + interpreter, + logic, + receive.asInstanceOf[Any => Unit], + StageActor.drainBatchSize(materializer))) + + // Monomorphic Function1 captured once; JIT can inline the apply at the FunctionRef call site. + private val dispatch: ((ActorRef, Any)) => Unit = buildDispatch(internalReceive) - private val callback = getAsyncCallback(internalReceive) - - private def cell = materializer.supervisor match { - case ref: LocalActorRef => ref.underlying - case unknown => - throw new IllegalStateException(s"Stream supervisor must be a local actor, was [${unknown.getClass.getName}]") - } private val functionRef: FunctionRef = { - val f: (ActorRef, Any) => Unit = { - case (_, m @ (PoisonPill | Kill)) => - materializer.logger.warning( - "{} message sent to StageActor({}) will be ignored, since it is not a real Actor. " + - "Use a custom message type to communicate with it instead.", - m, - functionRef.path) - case pair => callback.invoke(pair) - } + // Explicit (sender, msg) lambda (not a pattern-match Function2 literal) so the PoisonPill / Kill + // branch matches on `msg` directly and does not allocate a Tuple2. The regular branch still + // constructs one tuple per tell, as required by the `((ActorRef, Any)) => Unit` public Receive type. + val f: (ActorRef, Any) => Unit = (sender, msg) => + msg match { + case PoisonPill | Kill => + materializer.logger.warning( + "{} message sent to StageActor({}) will be ignored, since it is not a real Actor. " + + "Use a custom message type to communicate with it instead.", + msg, + functionRef.path) + case _ => dispatch((sender, msg)) + } cell.addFunctionRef(f, name) } @@ -275,6 +309,135 @@ object GraphStageLogic { type Receive = ((ActorRef, Any)) => Unit } + private object StageActor { + def localCell(ref: ActorRef, description: String): ActorCell = + ref match { + case ref: LocalActorRef => ref.underlying + case ref: RepointableActorRef => + ref.underlying match { + case cell: ActorCell => cell + case unknown => + throw new IllegalStateException(s"$description must be a local actor, was [${unknown.getClass.getName}]") + } + case unknown => + throw new IllegalStateException(s"$description must be a local actor, was [${unknown.getClass.getName}]") + } + + /** + * Reads `pekko.stream.materializer.stage-actor-drain-batch` from the materializer's ActorSystem config. + * Called once per lazy StageActor construction (never on the hot path). Bounded to `>= 1`. + */ + def drainBatchSize(materializer: Materializer): Int = + Math.max(1, materializer.system.settings.config.getInt("pekko.stream.materializer.stage-actor-drain-batch")) + + private final val SchedStateIdle: Int = 0 + private final val SchedStateScheduled: Int = 1 + + /** + * Lazy-path dispatch: producers enqueue into a Vyukov MPSC queue and elect a single drain via + * IDLE -> SCHEDULED CAS; only the elected producer pays a mailbox enqueue. The drain runs on the + * interpreter thread, polls in a tight loop bounded by `drainBatchSize`, then either publishes IDLE + * (with a recheck for the publish-window race) or re-schedules another envelope to yield to other + * BoundaryEvents. + * + * JIT/GC notes: + * - `final class` + monomorphic per-StageActor instance → JIT devirtualizes the apply at the + * FunctionRef call site. + * - Extends `AbstractNodeQueue` directly so the queue head atomic and the dispatch function share one + * object (one allocation per StageActor, one fewer field deref on the producer hot path). + * - Implements `Any => Unit` directly — serves as both the producer callback and the drain callback, + * eliminating the separate `drainCallback` lambda allocation. + * - `state` is a plain `@volatile var Int` driven by a static `VarHandle` in the companion object + * (via `MethodHandles.privateLookupIn`), same pattern as `AbstractNodeQueue` itself; + * avoids per-instance `AtomicInteger`. + * - `drainBatchSize` is read once into a stack-local at the top of `drain` so the JIT can treat the loop + * bound as a constant. + * - Per-tell allocation = 1 Node (`AbstractNodeQueue.Node`, ~24 bytes) + 1 Tuple2 (~24 bytes). The + * Tuple2 is forced by the public `StageActorRef.Receive` type. No AsyncInput / Envelope per tell — + * those are amortized across the batch. + */ + // Not marked `private` so that `class StageActor`'s aux constructor (compiled outside of the companion + // object on Scala 3) can reference it; the enclosing `object StageActor` is itself private. + final class LazyDispatch( + interpreter: GraphInterpreter, + logic: GraphStageLogic, + handler: Any => Unit, + drainBatchSize: Int) + extends AbstractNodeQueue[(ActorRef, Any)] + with (Any => Unit) { + + // IDLE/SCHEDULED election state. VarHandle avoids per-instance AtomicInteger; + // the handle lives in the companion object (true JVM static), same pattern as + // AbstractNodeQueue._tailDoNotCallMeDirectly. + @volatile var state: Int = SchedStateIdle + + // Typed local witnesses the Int-returning signature-polymorphic VarHandle.get overload + // (required for Scala 3 cross-compile; on Scala 2 this is a no-op). + private def getState(): Int = { + val v: Int = LazyDispatch.stateHandle.get(this) + v + } + private def setState(v: Int): Unit = LazyDispatch.stateHandle.set(this, v) + private def casState(expect: Int, update: Int): Boolean = + LazyDispatch.stateHandle.compareAndSet(this, expect, update) + + // null msg = drain signal from onAsyncInput; non-null = (ActorRef, Any) tuple from FunctionRef. + override def apply(msg: Any): Unit = + if (msg == null) drain() + else { + val pair = msg.asInstanceOf[(ActorRef, Any)] + if (!interpreter.isStageCompleted(logic)) { + add(pair) + if (getState() == SchedStateIdle && casState(SchedStateIdle, SchedStateScheduled)) { + if (interpreter.isStageCompleted(logic)) setState(SchedStateIdle) + else scheduleDrain() + } + } + } + + private def scheduleDrain(): Unit = + // 1 AsyncInput + 1 Envelope per drain batch (amortized across up to drainBatchSize tells). + // `this` serves as the drain callback (Any => Unit); onAsyncInput calls apply(null). + interpreter.onAsyncInput(logic, null, NoPromise, this) + + private def drain(): Unit = { + val limit = drainBatchSize + var processed = 0 + while (processed < limit) { + if (interpreter.isStageCompleted(logic)) { + while (poll() ne null) () + setState(SchedStateIdle) + return + } + val item = poll() + if (item eq null) { + setState(SchedStateIdle) + // Recheck race: a producer may have added between `poll == null` and the IDLE publish above. + if (!isEmpty && casState(SchedStateIdle, SchedStateScheduled)) + scheduleDrain() + return + } + handler(item) + processed += 1 + } + // The last handler(item) may have completed the stage; check before re-scheduling. + if (interpreter.isStageCompleted(logic)) { + while (poll() ne null) () + setState(SchedStateIdle) + return + } + scheduleDrain() + } + } + + object LazyDispatch { + private val stateHandle: VarHandle = { + val lookup = MethodHandles.privateLookupIn(classOf[LazyDispatch], MethodHandles.lookup()) + lookup.findVarHandle(classOf[LazyDispatch], "state", Integer.TYPE) + } + } + } + /** * Internal API * @@ -1339,8 +1502,8 @@ abstract class GraphStageLogic private[stream] (val inCount: Int, val outCount: /** * Initialize a [[GraphStageLogic.StageActorRef]] which can be used to interact with from the outside world "as-if" a [[pekko.actor.Actor]]. - * The messages are looped through the [[getAsyncCallback]] mechanism of [[GraphStage]] so they are safe to modify - * internal state of this operator. + * The messages are delivered through the owning stream interpreter so they are safe to modify internal state of this + * operator. * * This method must (the earliest) be called after the [[GraphStageLogic]] constructor has finished running, * for example from the [[preStart]] callback the graph operator logic provides. @@ -1358,7 +1521,20 @@ abstract class GraphStageLogic private[stream] (val inCount: Int, val outCount: * @return minimal actor with watch method */ final protected def getStageActor(receive: ((ActorRef, Any)) => Unit): StageActor = - getEagerStageActor(interpreter.materializer)(receive) + _stageActor match { + case null => + val currentInterpreter = interpreter + _stageActor = new StageActor( + currentInterpreter.materializer, + currentInterpreter, + this, + receive, + stageActorName) + _stageActor + case existing => + existing.become(receive) + existing + } /** * INTERNAL API @@ -1382,7 +1558,7 @@ abstract class GraphStageLogic private[stream] (val inCount: Int, val outCount: * Override and return a name to be given to the StageActor of this operator. * * This method will be only invoked and used once, during the first [[getStageActor]] - * invocation whichc reates the actor, since subsequent `getStageActors` calls function + * invocation which creates the actor, since subsequent `getStageActor` calls function * like `become`, rather than creating new actors. * * Returns an empty string by default, which means that the name will a unique generated String (e.g. "$$a").