From e8488f2255276b4959400728a00b12006b4459ce Mon Sep 17 00:00:00 2001 From: He-Pin Date: Mon, 1 Jun 2026 13:11:36 +0800 Subject: [PATCH 1/7] fix: optimize lazy stage actor dispatch via MPSC drain coalescing Motivation: Lazy `getStageActor` refs paid one actor mailbox enqueue per external tell: sender -> FunctionRef -> ConcurrentAsyncCallback.invokeWithPromise -> interpreter self ! AsyncInput. Under high tell rate to a single stage actor the bottleneck is mailbox traffic (envelope alloc, cross-thread wakeup, dequeue), not the dispatch lambda. Each tell also allocated a Tuple2, an AsyncInput, and a mailbox Envelope. Modification: Lazy `getStageActor` now installs an MPSC dispatch (`LazyDispatch`) that: - enqueues (sender, msg) into a Vyukov MPSC queue (`AbstractNodeQueue`) - elects a single drain via IDLE -> SCHEDULED CAS; only the elected producer pays a mailbox enqueue - drains on the interpreter thread in a tight loop bounded by `stage-actor-drain-batch` (default 16), then either publishes IDLE (with the canonical recheck race fix) or re-schedules another envelope so other BoundaryEvents interleave naturally via the actor mailbox - preserves `isStageCompleted` semantics: items added after completion are dropped exactly as the old per-tell path silently skipped them. The eager construction path (used before stream demand) is unchanged and still routes through the materializer supervisor + `AsyncCallback`. JIT/GC notes: - `LazyDispatch` is a `final class` and extends `AbstractNodeQueue` directly so it is its own queue (one fewer allocation and field deref). - `scheduledState` is a plain `@volatile var Int` driven by a static `VarHandle` (created via `MethodHandles.privateLookupIn`), avoiding the per-instance `AtomicBoolean` wrapper. Same pattern as `AbstractNodeQueue` itself. - The dispatch `apply` is monomorphic per StageActor instance; the drain callback is allocated once and reused. The FunctionRef lambda is rewritten as `(sender, msg) =>` to skip the Tuple2 allocation on the PoisonPill / Kill warning path. - Per-tell allocation is now 1 Node + 1 Tuple2 (the Tuple2 is forced by the public `StageActorRef.Receive` type); AsyncInput and Envelope are amortized across the batch. Configuration: `pekko.stream.materializer.stage-actor-drain-batch` (default 16) bounds the per-envelope drain. The default aligns with `InputBuffer.max` and keeps the per-actor-wakeup work in the same order of magnitude as the dispatcher throughput; smaller values trade tell throughput for tighter interleaving with upstream/downstream events, larger values do the opposite. Binary compatibility: The original 4-arg `private[pekko] StageActor` constructor (`materializer, getAsyncCallback, initialReceive, name`) is preserved as an auxiliary constructor and continues to use the eager `AsyncCallback` path. A new 5-arg `private[pekko]` constructor (`materializer, interpreter, logic, initialReceive, name`) is added for the lazy path. `sbt stream/mimaReportBinaryIssues` passes clean. Result: `StageActorRefBenchmark.lazy_stage_actor_ref_tell_10k` (JMH 2 forks x 10 iter x 2s, macOS) - throughput is now bounded by Vyukov enqueue + drain loop rather than per-tell mailbox traffic: | Variant | Throughput (ops/s) | vs main | |----------------------------------|----------------------|---------| | main | 6,587,561 +- 616,243 | 1.00x | | MPSC + drain coalescing (cap=16) | 13,044,829 +- 1,525K | 1.98x | | MPSC + drain coalescing (cap=8) | 13,589,612 +- 2,114K | 2.06x | BroadcastHubBenchmark is unchanged in this measurement (its bottleneck is fan-out broadcasting, not stage-actor tell traffic). Tests: - sbt "stream / compile" "stream / mimaReportBinaryIssues" - sbt "stream-tests / Test / testOnly org.apache.pekko.stream.scaladsl.StageActorRefSpec" (11/11) - sbt "stream-tests / Test / testOnly org.apache.pekko.stream.scaladsl.ActorRefSinkSpec org.apache.pekko.stream.scaladsl.ActorRefSourceSpec org.apache.pekko.stream.scaladsl.ActorRefBackpressureSinkSpec org.apache.pekko.stream.scaladsl.ActorRefBackpressureSourceSpec" (42/42) - sbt "stream-tests / Test / testOnly org.apache.pekko.stream.scaladsl.QueueSinkSpec org.apache.pekko.stream.scaladsl.QueueSourceSpec org.apache.pekko.stream.scaladsl.HubSpec" (94/94) - sbt scalafmt headerCheck - sbt "bench-jmh / Jmh / run -i 10 -wi 5 -f 2 -r 2s -w 2s .*StageActorRefBenchmark.*" References: Refs https://github.com/akka/akka-core/issues/26857 (public issue only; clean-room implementation) --- .../pekko/stream/StageActorRefBenchmark.scala | 136 ++++++++++++ .../stream/scaladsl/StageActorRefSpec.scala | 114 +++++++++- stream/src/main/resources/reference.conf | 7 + .../pekko/stream/stage/GraphStage.scala | 198 +++++++++++++++--- 4 files changed, 427 insertions(+), 28 deletions(-) create mode 100644 bench-jmh/src/main/scala/org/apache/pekko/stream/StageActorRefBenchmark.scala diff --git a/bench-jmh/src/main/scala/org/apache/pekko/stream/StageActorRefBenchmark.scala b/bench-jmh/src/main/scala/org/apache/pekko/stream/StageActorRefBenchmark.scala new file mode 100644 index 00000000000..007253e275f --- /dev/null +++ b/bench-jmh/src/main/scala/org/apache/pekko/stream/StageActorRefBenchmark.scala @@ -0,0 +1,136 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.pekko.stream + +import java.util.concurrent.CountDownLatch +import java.util.concurrent.TimeUnit + +import scala.concurrent.Await +import scala.concurrent.Promise +import scala.concurrent.duration._ + +import org.openjdk.jmh.annotations._ + +import org.apache.pekko +import pekko.actor.ActorRef +import pekko.actor.ActorSystem +import pekko.actor.NoSerializationVerificationNeeded +import pekko.stream.scaladsl.Keep +import pekko.stream.scaladsl.Sink +import pekko.stream.scaladsl.Source +import pekko.stream.stage.GraphStageLogic +import pekko.stream.stage.GraphStageWithMaterializedValue +import pekko.stream.stage.InHandler + +object StageActorRefBenchmark { + final val OperationsPerInvocation = 10000 + private case object CountDown extends NoSerializationVerificationNeeded + + private final class Control { + private val ready = new CountDownLatch(1) + @volatile private var ref: ActorRef = _ + @volatile private var latch: CountDownLatch = _ + + def init(ref: ActorRef): Unit = { + this.ref = ref + ready.countDown() + } + + def stageActorRef: ActorRef = { + if (!ready.await(10, TimeUnit.SECONDS)) + throw new RuntimeException("Stage actor ref was not initialized") + ref + } + + def reset(expectedMessages: Int): Unit = + latch = new CountDownLatch(expectedMessages) + + def countDown(): Unit = + latch.countDown() + + def awaitDone(): Unit = + if (!latch.await(10, TimeUnit.SECONDS)) + throw new RuntimeException("Stage actor ref benchmark messages timed out") + } + + private final class StageActorSink extends GraphStageWithMaterializedValue[SinkShape[Any], Control] { + val in: Inlet[Any] = Inlet("StageActorSink.in") + override val shape: SinkShape[Any] = SinkShape(in) + + override def createLogicAndMaterializedValue(inheritedAttributes: Attributes): (GraphStageLogic, Control) = { + val control = new Control + + val logic = new GraphStageLogic(shape) { + override def preStart(): Unit = { + control.init(getStageActor { + case (_, CountDown) => control.countDown() + }.ref) + pull(in) + } + + setHandler( + in, + new InHandler { + override def onPush(): Unit = pull(in) + }) + } + + logic -> control + } + } +} + +@State(Scope.Benchmark) +@OutputTimeUnit(TimeUnit.SECONDS) +@BenchmarkMode(Array(Mode.Throughput)) +class StageActorRefBenchmark { + import StageActorRefBenchmark._ + + implicit val system: ActorSystem = ActorSystem("StageActorRefBenchmark") + + private var completion: Promise[Option[Any]] = _ + private var control: Control = _ + private var stageActorRef: ActorRef = _ + + @Setup + def setup(): Unit = { + SystemMaterializer(system).materializer + val materialized = Source.maybe[Any].toMat(Sink.fromGraph(new StageActorSink))(Keep.both).run() + completion = materialized._1 + control = materialized._2 + stageActorRef = control.stageActorRef + } + + @TearDown + def shutdown(): Unit = { + completion.trySuccess(None) + Await.result(system.terminate(), 5.seconds) + } + + @Benchmark + @OperationsPerInvocation(OperationsPerInvocation) + def lazy_stage_actor_ref_tell_10k(): Unit = { + control.reset(OperationsPerInvocation) + var remaining = OperationsPerInvocation + while (remaining > 0) { + stageActorRef ! CountDown + remaining -= 1 + } + control.awaitDone() + } +} diff --git a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/StageActorRefSpec.scala b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/StageActorRefSpec.scala index c57068397de..bf364bd4e67 100644 --- a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/StageActorRefSpec.scala +++ b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/StageActorRefSpec.scala @@ -18,16 +18,19 @@ import scala.concurrent.Promise import scala.concurrent.duration._ import org.apache.pekko +import pekko.actor.ActorPath import pekko.actor.ActorRef import pekko.actor.Kill import pekko.actor.NoSerializationVerificationNeeded import pekko.actor.PoisonPill import pekko.event.Logging import pekko.stream._ +import pekko.stream.impl.fusing.GraphInterpreter import pekko.stream.stage.GraphStageLogic import pekko.stream.stage.GraphStageWithMaterializedValue import pekko.stream.stage.InHandler import pekko.stream.testkit.StreamSpec +import pekko.stream.testkit.scaladsl.TestSink import pekko.testkit.EventFilter import pekko.testkit.ImplicitSender import pekko.testkit.TestEvent @@ -181,6 +184,50 @@ class StageActorRefSpec extends StreamSpec with ImplicitSender { res.futureValue should ===(42) } + "run non-eager stage actor messages in the graph interpreter actor" in { + val (_, res) = Source.maybe[Int].toMat(sumStage(testActor))(Keep.both).run() + + val stageRef = expectMsgType[ActorRef] + stageRef ! AddAndTell(1) + expectMsg(1) + + stageRef ! ReportStageActorInterpreter + val location = expectMsgType[StageActorLocation] + + location.stageActorParent should ===(location.interpreter) + + stageRef ! StopNow + res.futureValue should ===(1) + } + + "keep eagerly materialized stage actors usable before stream demand" in { + val (ref, probe) = Source + .actorRef[Int]({ + case CompleteNow => CompletionStrategy.Immediately + }, PartialFunction.empty, bufferSize = 8, OverflowStrategy.fail) + .toMat(TestSink[Int]())(Keep.both) + .run() + + ref ! 1 + probe.request(1).expectNext(1) + ref ! CompleteNow + probe.expectComplete() + } + + "keep eagerly materialized stage actors attached to the stream supervisor" in { + val (source, res) = Source.maybe[Int].toMat(eagerLocationStage(testActor))(Keep.both).run() + + val stageRef = expectMsgType[ActorRef] + stageRef ! ReportEagerStageActorInterpreter + val location = expectMsgType[EagerStageActorLocation] + + location.stageActorParent should ===(location.supervisor) + location.stageActorParent should !==(location.interpreter) + + source.success(None) + res.futureValue should ===(0) + } + } } @@ -194,10 +241,19 @@ object StageActorRefSpec { case object BecomeStringEcho extends NoSerializationVerificationNeeded case object PullNow extends NoSerializationVerificationNeeded case object StopNow extends NoSerializationVerificationNeeded + case object ReportStageActorInterpreter extends NoSerializationVerificationNeeded + case object ReportEagerStageActorInterpreter extends NoSerializationVerificationNeeded + case object CompleteNow extends NoSerializationVerificationNeeded + final case class StageActorLocation(stageActorParent: ActorPath, interpreter: ActorPath) + extends NoSerializationVerificationNeeded + final case class EagerStageActorLocation(stageActorParent: ActorPath, supervisor: ActorPath, interpreter: ActorPath) + extends NoSerializationVerificationNeeded } import ControlProtocol._ + def eagerLocationStage(probe: ActorRef) = EagerLocationStage(probe) + case class SumTestStage(probe: ActorRef) extends GraphStageWithMaterializedValue[SinkShape[Int], Future[Int]] { val in = Inlet[Int]("IntSum.in") override val shape: SinkShape[Int] = SinkShape.of(in) @@ -216,10 +272,12 @@ object StageActorRefSpec { def behavior(m: (ActorRef, Any)): Unit = { m match { - case (_, Add(n)) => sum += n - case (_, PullNow) => pull(in) - case (sender, CallInitStageActorRef) => sender ! getStageActor(behavior).ref - case (_, BecomeStringEcho) => + case (_, Add(n)) => sum += n + case (_, PullNow) => pull(in) + case (sender, CallInitStageActorRef) => sender ! getStageActor(behavior).ref + case (sender, ReportStageActorInterpreter) => + sender ! StageActorLocation(stageActor.ref.path.parent, GraphInterpreter.currentInterpreter.context.path) + case (_, BecomeStringEcho) => getStageActor { case (theSender, msg) => theSender ! msg.toString } @@ -258,4 +316,52 @@ object StageActorRefSpec { } } + case class EagerLocationStage(probe: ActorRef) extends GraphStageWithMaterializedValue[SinkShape[Int], Future[Int]] { + val in = Inlet[Int]("EagerLocation.in") + override val shape: SinkShape[Int] = SinkShape.of(in) + + override def createLogicAndMaterializedValue(inheritedAttributes: Attributes): (GraphStageLogic, Future[Int]) = { + val p: Promise[Int] = Promise() + + val logic = new GraphStageLogic(shape) { + var stageRef: ActorRef = _ + var interpreterPath: ActorPath = _ + var supervisorPath: ActorPath = _ + + override def preStart(): Unit = { + interpreterPath = interpreter.context.path + supervisorPath = interpreter.materializer.supervisor.path + stageRef = getEagerStageActor(interpreter.materializer) { + case (sender, ReportEagerStageActorInterpreter) => + sender ! EagerStageActorLocation(stageRef.path.parent, supervisorPath, interpreterPath) + case _ => throw new RuntimeException("unexpected message") + }.ref + pull(in) + probe ! stageRef + } + + setHandler( + in, + new InHandler { + override def onPush(): Unit = { + p.trySuccess(grab(in)) + completeStage() + } + + override def onUpstreamFinish(): Unit = { + p.trySuccess(0) + completeStage() + } + + override def onUpstreamFailure(ex: Throwable): Unit = { + p.tryFailure(ex) + failStage(ex) + } + }) + } + + logic -> p.future + } + } + } diff --git a/stream/src/main/resources/reference.conf b/stream/src/main/resources/reference.conf index f4e4bd31934..059d28079ae 100644 --- a/stream/src/main/resources/reference.conf +++ b/stream/src/main/resources/reference.conf @@ -83,6 +83,13 @@ pekko { # Allows to accelerate message processing that happening within same actor but keep system responsive. sync-processing-limit = 1000 + # Upper bound on stage-actor messages drained per envelope for non-eager `getStageActor` refs. Lazy + # stage actors batch external `tell` deliveries into a MPSC queue and elect a single drain envelope; + # this cap bounds the burst so that other BoundaryEvents (pull/push/complete) can still interleave + # naturally via the actor mailbox. Smaller = better fairness for upstream/downstream events; + # larger = better tell throughput. Must be >= 1. + stage-actor-drain-batch = 16 + debug { # Enables the fuzzing mode which increases the chance of race conditions # by aggressively reordering events and making certain operations more diff --git a/stream/src/main/scala/org/apache/pekko/stream/stage/GraphStage.scala b/stream/src/main/scala/org/apache/pekko/stream/stage/GraphStage.scala index 20cd0d36152..fc7774bbe96 100644 --- a/stream/src/main/scala/org/apache/pekko/stream/stage/GraphStage.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/stage/GraphStage.scala @@ -15,7 +15,7 @@ package org.apache.pekko.stream.stage import java.util.Spliterator import java.util.concurrent.{ CompletionStage, ConcurrentHashMap } -import java.util.concurrent.atomic.AtomicReference +import java.util.concurrent.atomic.{ AtomicInteger, AtomicReference } import scala.annotation.nowarn import scala.annotation.tailrec @@ -27,6 +27,7 @@ import org.apache.pekko import pekko.{ Done, NotUsed } import pekko.actor._ import pekko.annotation.InternalApi +import pekko.dispatch.AbstractNodeQueue import pekko.japi.function.{ Effect, Procedure } import pekko.stream._ import pekko.stream.Attributes.SourceLocation @@ -206,29 +207,61 @@ object GraphStageLogic { * * Not for user instantiation, use [[GraphStageLogic.getStageActor]]. */ - final class StageActor @InternalApi() private[pekko] ( + final class StageActor @InternalApi() private ( materializer: Materializer, - getAsyncCallback: StageActorRef.Receive => AsyncCallback[(ActorRef, Any)], initialReceive: StageActorRef.Receive, - name: String) { + name: String, + cell: ActorCell, + buildDispatch: StageActorRef.Receive => ((ActorRef, Any)) => Unit) { + + @InternalApi private[pekko] def this( + materializer: Materializer, + getAsyncCallback: StageActorRef.Receive => AsyncCallback[(ActorRef, Any)], + initialReceive: StageActorRef.Receive, + name: String) = + this( + materializer, + initialReceive, + name, + StageActor.localCell(materializer.supervisor, "Stream supervisor"), + receive => getAsyncCallback(receive).invoke) + + @InternalApi private[pekko] def this( + materializer: Materializer, + interpreter: GraphInterpreter, + logic: GraphStageLogic, + initialReceive: StageActorRef.Receive, + name: String) = + this( + materializer, + initialReceive, + name, + StageActor.localCell(interpreter.context, "Graph interpreter"), + // Coalesce per-tell mailbox traffic: N tells produce 1 AsyncInput envelope (amortized). + receive => + new StageActor.LazyDispatch( + interpreter, + logic, + receive.asInstanceOf[Any => Unit], + StageActor.drainBatchSize(materializer))) + + // Monomorphic Function1 captured once; JIT can inline the apply at the FunctionRef call site. + private val dispatch: ((ActorRef, Any)) => Unit = buildDispatch(internalReceive) - private val callback = getAsyncCallback(internalReceive) - - private def cell = materializer.supervisor match { - case ref: LocalActorRef => ref.underlying - case unknown => - throw new IllegalStateException(s"Stream supervisor must be a local actor, was [${unknown.getClass.getName}]") - } private val functionRef: FunctionRef = { - val f: (ActorRef, Any) => Unit = { - case (_, m @ (PoisonPill | Kill)) => - materializer.logger.warning( - "{} message sent to StageActor({}) will be ignored, since it is not a real Actor. " + - "Use a custom message type to communicate with it instead.", - m, - functionRef.path) - case pair => callback.invoke(pair) - } + // Explicit (sender, msg) lambda (not a pattern-match Function2 literal) so the PoisonPill / Kill + // branch matches on `msg` directly and does not allocate a Tuple2. The regular branch still + // constructs one tuple per tell, as required by the `((ActorRef, Any)) => Unit` public Receive type. + val f: (ActorRef, Any) => Unit = (sender, msg) => + msg match { + case PoisonPill | Kill => + materializer.logger.warning( + "{} message sent to StageActor({}) will be ignored, since it is not a real Actor. " + + "Use a custom message type to communicate with it instead.", + msg, + functionRef.path) + case _ => dispatch((sender, msg)) + } cell.addFunctionRef(f, name) } @@ -275,6 +308,110 @@ object GraphStageLogic { type Receive = ((ActorRef, Any)) => Unit } + private object StageActor { + def localCell(ref: ActorRef, description: String): ActorCell = + ref match { + case ref: LocalActorRef => ref.underlying + case ref: RepointableActorRef => + ref.underlying match { + case cell: ActorCell => cell + case unknown => + throw new IllegalStateException(s"$description must be a local actor, was [${unknown.getClass.getName}]") + } + case unknown => + throw new IllegalStateException(s"$description must be a local actor, was [${unknown.getClass.getName}]") + } + + /** + * Reads `pekko.stream.materializer.stage-actor-drain-batch` from the materializer's ActorSystem config. + * Called once per lazy StageActor construction (never on the hot path). Bounded to `>= 1`. + */ + def drainBatchSize(materializer: Materializer): Int = + Math.max(1, materializer.system.settings.config.getInt("pekko.stream.materializer.stage-actor-drain-batch")) + + private final val SchedStateIdle: Int = 0 + private final val SchedStateScheduled: Int = 1 + + /** + * Lazy-path dispatch: producers enqueue into a Vyukov MPSC queue and elect a single drain via + * IDLE -> SCHEDULED CAS; only the elected producer pays a mailbox enqueue. The drain runs on the + * interpreter thread, polls in a tight loop bounded by `drainBatchSize`, then either publishes IDLE + * (with a recheck for the publish-window race) or re-schedules another envelope to yield to other + * BoundaryEvents. + * + * JIT/GC notes: + * - `final class` + monomorphic per-StageActor instance → JIT devirtualizes the apply at the + * FunctionRef call site. + * - Extends `AbstractNodeQueue` directly so the queue head atomic and the dispatch function share one + * object (one allocation per StageActor, one fewer field deref on the producer hot path). + * - All hot-path state is `private[this]` → direct field access, no accessor methods. + * - `drainBatchSize` is read once into a stack-local at the top of `drain` so the JIT can treat the loop + * bound as a constant. + * - Per-tell allocation = 1 Node (`AbstractNodeQueue.Node`, ~24 bytes) + 1 Tuple2 (~24 bytes). The + * Tuple2 is forced by the public `StageActorRef.Receive` type. No AsyncInput / Envelope per tell — + * those are amortized across the batch. + */ + // Not marked `private` so that `class StageActor`'s aux constructor (compiled outside of the companion + // object on Scala 3) can reference it; the enclosing `object StageActor` is itself private. + final class LazyDispatch( + interpreter: GraphInterpreter, + logic: GraphStageLogic, + handler: Any => Unit, + drainBatchSize: Int) + extends AbstractNodeQueue[(ActorRef, Any)] + with (((ActorRef, Any)) => Unit) { + + // IDLE/SCHEDULED election state. AtomicInteger gives us volatile read + CAS without the cross-Scala + // VarHandle / field-updater access fuss; the wrapper costs one extra reference per StageActor, which + // is negligible against the per-tell mailbox traffic we are saving. + private[this] val state = new AtomicInteger(SchedStateIdle) + + // Reused across all drain batches; allocated once at construction. + private[this] val drainCallback: Any => Unit = (_: Any) => drain() + + override def apply(pair: (ActorRef, Any)): Unit = { + add(pair) // Vyukov producer path: getAndSet + release-store, no CAS spin + // Double-checked CAS: uncontended fast path is one volatile read; only the IDLE->SCHEDULED winner + // pays a CAS + mailbox push. + if (state.get() == SchedStateIdle && state.compareAndSet(SchedStateIdle, SchedStateScheduled)) + scheduleDrain() + } + + private def scheduleDrain(): Unit = + // 1 AsyncInput + 1 Envelope per drain batch (amortized across up to drainBatchSize tells). + interpreter.onAsyncInput(logic, null, NoPromise, drainCallback) + + private def drain(): Unit = { + val limit = drainBatchSize // hoisted to a local so JIT treats it as a loop-invariant constant + var processed = 0 + while (processed < limit) { + if (interpreter.isStageCompleted(logic)) { + // Stage completed mid-drain; drop the remainder (matches the original per-tell behaviour where + // runAsyncInput silently skipped completed stages). Don't reschedule — no future drain will run. + while (poll() ne null) () + state.set(SchedStateIdle) + return + } + val item = poll() + if (item eq null) { + state.set(SchedStateIdle) + // Recheck race: a producer may have added between `poll == null` and the IDLE publish above. + // That producer saw state=SCHEDULED and skipped the mailbox send, so we must re-elect. + if (!isEmpty && state.compareAndSet(SchedStateIdle, SchedStateScheduled)) + scheduleDrain() + return + } + handler(item) + processed += 1 + } + // Hit batch cap with items potentially still queued. Re-schedule another envelope so other + // BoundaryEvents (pull/push/complete) can interleave via the actor mailbox. `scheduledState` stays + // SCHEDULED: concurrent producers correctly observe SCHEDULED and skip; the new envelope will drain. + scheduleDrain() + } + } + } + /** * Internal API * @@ -1339,8 +1476,8 @@ abstract class GraphStageLogic private[stream] (val inCount: Int, val outCount: /** * Initialize a [[GraphStageLogic.StageActorRef]] which can be used to interact with from the outside world "as-if" a [[pekko.actor.Actor]]. - * The messages are looped through the [[getAsyncCallback]] mechanism of [[GraphStage]] so they are safe to modify - * internal state of this operator. + * The messages are delivered through the owning stream interpreter so they are safe to modify internal state of this + * operator. * * This method must (the earliest) be called after the [[GraphStageLogic]] constructor has finished running, * for example from the [[preStart]] callback the graph operator logic provides. @@ -1358,7 +1495,20 @@ abstract class GraphStageLogic private[stream] (val inCount: Int, val outCount: * @return minimal actor with watch method */ final protected def getStageActor(receive: ((ActorRef, Any)) => Unit): StageActor = - getEagerStageActor(interpreter.materializer)(receive) + _stageActor match { + case null => + val currentInterpreter = interpreter + _stageActor = new StageActor( + currentInterpreter.materializer, + currentInterpreter, + this, + receive, + stageActorName) + _stageActor + case existing => + existing.become(receive) + existing + } /** * INTERNAL API @@ -1382,7 +1532,7 @@ abstract class GraphStageLogic private[stream] (val inCount: Int, val outCount: * Override and return a name to be given to the StageActor of this operator. * * This method will be only invoked and used once, during the first [[getStageActor]] - * invocation whichc reates the actor, since subsequent `getStageActors` calls function + * invocation which creates the actor, since subsequent `getStageActor` calls function * like `become`, rather than creating new actors. * * Returns an empty string by default, which means that the name will a unique generated String (e.g. "$$a"). From aeee30548473765005901240985fe29a3f1cb0f1 Mon Sep 17 00:00:00 2001 From: He-Pin Date: Tue, 2 Jun 2026 14:53:28 +0800 Subject: [PATCH 2/7] fix: keep lazy stage actor refs under supervisor Motivation: PR #3035 moved lazy stage actor FunctionRefs under the graph interpreter actor. During ActorGraphInterpreter preStart the interpreter context can still be backed by an UnstartedCell, causing CI failures in TCP/TLS stream stages that create stage actors during preStart. Modification: Register lazy stage actor FunctionRefs under the stream supervisor again, while keeping the new LazyDispatch path that drains messages through the graph interpreter. Update StageActorRefSpec to assert that the FunctionRef parent remains the supervisor and that message handling still runs in the interpreter. Result: TCP/TLS stages can create stage actors during interpreter startup without hitting UnstartedCell, while lazy stage actor messages still execute on the graph interpreter thread. Tests: - scalafmt --mode diff-ref=e04e721ab67d3418621ba568601bea2c5483c475 --non-interactive - scalafmt --list --mode diff-ref=e04e721ab67d3418621ba568601bea2c5483c475 --non-interactive - sbt "stream-tests / Test / testOnly org.apache.pekko.stream.scaladsl.StageActorRefSpec" - sbt "stream-tests / Test / testOnly org.apache.pekko.stream.io.TlsGraphStageSpec" - git diff --check References: Refs #3035 --- .../pekko/stream/scaladsl/StageActorRefSpec.scala | 12 ++++++++---- .../org/apache/pekko/stream/stage/GraphStage.scala | 2 +- 2 files changed, 9 insertions(+), 5 deletions(-) diff --git a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/StageActorRefSpec.scala b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/StageActorRefSpec.scala index bf364bd4e67..3c7f2cd265e 100644 --- a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/StageActorRefSpec.scala +++ b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/StageActorRefSpec.scala @@ -184,7 +184,7 @@ class StageActorRefSpec extends StreamSpec with ImplicitSender { res.futureValue should ===(42) } - "run non-eager stage actor messages in the graph interpreter actor" in { + "run non-eager stage actor messages in the graph interpreter" in { val (_, res) = Source.maybe[Int].toMat(sumStage(testActor))(Keep.both).run() val stageRef = expectMsgType[ActorRef] @@ -194,7 +194,8 @@ class StageActorRefSpec extends StreamSpec with ImplicitSender { stageRef ! ReportStageActorInterpreter val location = expectMsgType[StageActorLocation] - location.stageActorParent should ===(location.interpreter) + location.stageActorParent should ===(location.supervisor) + location.interpreter should !==(location.supervisor) stageRef ! StopNow res.futureValue should ===(1) @@ -244,7 +245,7 @@ object StageActorRefSpec { case object ReportStageActorInterpreter extends NoSerializationVerificationNeeded case object ReportEagerStageActorInterpreter extends NoSerializationVerificationNeeded case object CompleteNow extends NoSerializationVerificationNeeded - final case class StageActorLocation(stageActorParent: ActorPath, interpreter: ActorPath) + final case class StageActorLocation(stageActorParent: ActorPath, supervisor: ActorPath, interpreter: ActorPath) extends NoSerializationVerificationNeeded final case class EagerStageActorLocation(stageActorParent: ActorPath, supervisor: ActorPath, interpreter: ActorPath) extends NoSerializationVerificationNeeded @@ -276,7 +277,10 @@ object StageActorRefSpec { case (_, PullNow) => pull(in) case (sender, CallInitStageActorRef) => sender ! getStageActor(behavior).ref case (sender, ReportStageActorInterpreter) => - sender ! StageActorLocation(stageActor.ref.path.parent, GraphInterpreter.currentInterpreter.context.path) + sender ! StageActorLocation( + stageActor.ref.path.parent, + interpreter.materializer.supervisor.path, + GraphInterpreter.currentInterpreter.context.path) case (_, BecomeStringEcho) => getStageActor { case (theSender, msg) => theSender ! msg.toString diff --git a/stream/src/main/scala/org/apache/pekko/stream/stage/GraphStage.scala b/stream/src/main/scala/org/apache/pekko/stream/stage/GraphStage.scala index fc7774bbe96..6bd5b90abf2 100644 --- a/stream/src/main/scala/org/apache/pekko/stream/stage/GraphStage.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/stage/GraphStage.scala @@ -236,7 +236,7 @@ object GraphStageLogic { materializer, initialReceive, name, - StageActor.localCell(interpreter.context, "Graph interpreter"), + StageActor.localCell(materializer.supervisor, "Stream supervisor"), // Coalesce per-tell mailbox traffic: N tells produce 1 AsyncInput envelope (amortized). receive => new StageActor.LazyDispatch( From 8bb84797823b237cc49732c45c6bfabd9c319b95 Mon Sep 17 00:00:00 2001 From: He-Pin Date: Wed, 3 Jun 2026 02:25:02 +0800 Subject: [PATCH 3/7] fix: guard LazyDispatch producer path against post-completion enqueue Motivation: Once a lazy stage actor's underlying stage is completed, `GraphInterpreter.runAsyncInput` short-circuits the drain handler. Without a producer-side check the CAS-winning producer left `state=SCHEDULED` forever, subsequent producers skipped the mailbox push, and the MPSC queue could grow unbounded until the stage actor was GC'd. Modification: - `LazyDispatch.apply` pre-checks `interpreter.isStageCompleted(logic)` and drops the message, matching the original per-tell behaviour where `runAsyncInput` silently ignored post-completion sends. - After winning the IDLE -> SCHEDULED CAS, re-check completion before scheduling: if completion landed in between, reset state to IDLE rather than posting an envelope that the interpreter would skip. - Fix missing space in the PoisonPill / Kill warning so the concatenated message reads "...not a real Actor. Use ..." instead of "...Actor.Use ...". Result: - Post-completion sends are silently dropped and the MPSC queue cannot grow unbounded. - StageActorRefSpec, ActorRefSink/Source, ActorRefBackpressureSink/Source, QueueSink/ Source and HubSpec all pass (125/125). - `sbt stream/mimaReportBinaryIssues` clean. References: Refs #3035 (Copilot review comments). --- .../apache/pekko/stream/stage/GraphStage.scala | 17 +++++++++++++++-- 1 file changed, 15 insertions(+), 2 deletions(-) diff --git a/stream/src/main/scala/org/apache/pekko/stream/stage/GraphStage.scala b/stream/src/main/scala/org/apache/pekko/stream/stage/GraphStage.scala index 6bd5b90abf2..1e219f86a78 100644 --- a/stream/src/main/scala/org/apache/pekko/stream/stage/GraphStage.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/stage/GraphStage.scala @@ -370,11 +370,24 @@ object GraphStageLogic { private[this] val drainCallback: Any => Unit = (_: Any) => drain() override def apply(pair: (ActorRef, Any)): Unit = { + // Producer-side completion guard. `runAsyncInput` short-circuits the drain handler when the stage + // is completed, so without this check a CAS-winning producer would leave state=SCHEDULED forever + // and subsequent producers would skip the mailbox push, growing the queue unbounded. Dropping + // post-completion sends here matches the original per-tell behaviour where `runAsyncInput` + // silently ignored them. The read is racy against the interpreter thread that flips + // `shutdownCounter`, but eventually visible — enough to bound the leak to messages enqueued + // before completion becomes visible to this thread. + if (interpreter.isStageCompleted(logic)) return add(pair) // Vyukov producer path: getAndSet + release-store, no CAS spin // Double-checked CAS: uncontended fast path is one volatile read; only the IDLE->SCHEDULED winner // pays a CAS + mailbox push. - if (state.get() == SchedStateIdle && state.compareAndSet(SchedStateIdle, SchedStateScheduled)) - scheduleDrain() + if (state.get() == SchedStateIdle && state.compareAndSet(SchedStateIdle, SchedStateScheduled)) { + // Re-check after winning election: if completion landed between the initial guard and the CAS, + // `scheduleDrain` would post an envelope that `runAsyncInput` skips, leaving state=SCHEDULED. + // Reset to IDLE and skip the schedule. + if (interpreter.isStageCompleted(logic)) state.set(SchedStateIdle) + else scheduleDrain() + } } private def scheduleDrain(): Unit = From 23501c1c876f0e5518a62918e84e0aaa2a3c2dc9 Mon Sep 17 00:00:00 2001 From: He-Pin Date: Wed, 3 Jun 2026 11:09:55 +0800 Subject: [PATCH 4/7] fix: drop queue and skip drain re-schedule when stage completes mid-batch Motivation: After processing `drainBatchSize` items, `LazyDispatch.drain` unconditionally posted another drain envelope so other BoundaryEvents could interleave. If the last `handler(item)` call in the batch completed the stage (e.g. user code called `completeStage()`), `GraphInterpreter.runAsyncInput` would skip the freshly scheduled envelope, leaving `state=SCHEDULED` forever and retaining any already-enqueued items until the FunctionRef was stopped. Modification: Mirror the existing mid-loop completion branch at the post-batch tail: check `interpreter.isStageCompleted(logic)`, and if true, drain the remainder, publish IDLE and return without re-scheduling. Result: The MPSC queue is always drained (or cleared) before the dispatch goes idle, even when completion lands on the last handled item of a batch. StageActorRefSpec, stream/compile, scalafmt and stream/mimaReportBinaryIssues all clean. References: Refs #3035 (Copilot review on da487e3c74). --- .../apache/pekko/stream/stage/GraphStage.scala | 15 ++++++++++++--- 1 file changed, 12 insertions(+), 3 deletions(-) diff --git a/stream/src/main/scala/org/apache/pekko/stream/stage/GraphStage.scala b/stream/src/main/scala/org/apache/pekko/stream/stage/GraphStage.scala index 1e219f86a78..3ccdf570b9d 100644 --- a/stream/src/main/scala/org/apache/pekko/stream/stage/GraphStage.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/stage/GraphStage.scala @@ -417,9 +417,18 @@ object GraphStageLogic { handler(item) processed += 1 } - // Hit batch cap with items potentially still queued. Re-schedule another envelope so other - // BoundaryEvents (pull/push/complete) can interleave via the actor mailbox. `scheduledState` stays - // SCHEDULED: concurrent producers correctly observe SCHEDULED and skip; the new envelope will drain. + // Hit batch cap with items potentially still queued. The last `handler(item)` call may have + // completed the stage (e.g. user code called `completeStage()`); a fresh `scheduleDrain` would + // post an envelope that `runAsyncInput` skips, leaving state=SCHEDULED forever. Mirror the + // mid-loop branch: drain remainder, publish IDLE, do not reschedule. + if (interpreter.isStageCompleted(logic)) { + while (poll() ne null) () + state.set(SchedStateIdle) + return + } + // Re-schedule another envelope so other BoundaryEvents (pull/push/complete) can interleave via + // the actor mailbox. `state` stays SCHEDULED: concurrent producers observe SCHEDULED and skip; + // the new envelope will drain. scheduleDrain() } } From a2acf5dde063128749c944ffb4baa3f8e0ee73ca Mon Sep 17 00:00:00 2001 From: He-Pin Date: Wed, 3 Jun 2026 12:38:17 +0800 Subject: [PATCH 5/7] fix: replace AtomicInteger with VarHandle and remove drainCallback in LazyDispatch Motivation: Per-instance AtomicInteger wrapper costs ~16 bytes per StageActor. Separate drainCallback lambda is an unnecessary allocation. Modification: - Replace AtomicInteger state field with @volatile var Int driven by static VarHandle in companion object (MethodHandles.privateLookupIn), same pattern as AbstractNodeQueue._tailDoNotCallMeDirectly. - LazyDispatch now extends Any => Unit directly, serving as both producer callback and drain callback, eliminating the drainCallback lambda allocation. - Remove private[this] (deprecated in Scala 3). Result: Each LazyDispatch instance saves ~16 bytes (AtomicInteger) + one lambda allocation (drainCallback). VarHandle lives on the companion object (true JVM static), shared across all instances. Tests: - sbt "stream / compile" "stream / mimaReportBinaryIssues" - sbt "stream-tests / Test / testOnly org.apache.pekko.stream.scaladsl.StageActorRefSpec" (11/11) Refs: #3035 --- .../pekko/stream/stage/GraphStage.scala | 56 ++++++++++++------- 1 file changed, 36 insertions(+), 20 deletions(-) diff --git a/stream/src/main/scala/org/apache/pekko/stream/stage/GraphStage.scala b/stream/src/main/scala/org/apache/pekko/stream/stage/GraphStage.scala index 3ccdf570b9d..e2237270862 100644 --- a/stream/src/main/scala/org/apache/pekko/stream/stage/GraphStage.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/stage/GraphStage.scala @@ -14,8 +14,9 @@ package org.apache.pekko.stream.stage import java.util.Spliterator +import java.lang.invoke.{ MethodHandles, VarHandle } import java.util.concurrent.{ CompletionStage, ConcurrentHashMap } -import java.util.concurrent.atomic.{ AtomicInteger, AtomicReference } +import java.util.concurrent.atomic.AtomicReference import scala.annotation.nowarn import scala.annotation.tailrec @@ -344,7 +345,11 @@ object GraphStageLogic { * FunctionRef call site. * - Extends `AbstractNodeQueue` directly so the queue head atomic and the dispatch function share one * object (one allocation per StageActor, one fewer field deref on the producer hot path). - * - All hot-path state is `private[this]` → direct field access, no accessor methods. + * - Implements `Any => Unit` directly — serves as both the producer callback and the drain callback, + * eliminating the separate `drainCallback` lambda allocation. + * - `state` is a plain `@volatile var Int` driven by a static `VarHandle` in the companion object + * (via `MethodHandles.privateLookupIn`), same pattern as `AbstractNodeQueue` itself; + * avoids per-instance `AtomicInteger`. * - `drainBatchSize` is read once into a stack-local at the top of `drain` so the JIT can treat the loop * bound as a constant. * - Per-tell allocation = 1 Node (`AbstractNodeQueue.Node`, ~24 bytes) + 1 Tuple2 (~24 bytes). The @@ -359,17 +364,18 @@ object GraphStageLogic { handler: Any => Unit, drainBatchSize: Int) extends AbstractNodeQueue[(ActorRef, Any)] - with (((ActorRef, Any)) => Unit) { - - // IDLE/SCHEDULED election state. AtomicInteger gives us volatile read + CAS without the cross-Scala - // VarHandle / field-updater access fuss; the wrapper costs one extra reference per StageActor, which - // is negligible against the per-tell mailbox traffic we are saving. - private[this] val state = new AtomicInteger(SchedStateIdle) - - // Reused across all drain batches; allocated once at construction. - private[this] val drainCallback: Any => Unit = (_: Any) => drain() - - override def apply(pair: (ActorRef, Any)): Unit = { + with (Any => Unit) { + + // IDLE/SCHEDULED election state. VarHandle avoids per-instance AtomicInteger; + // the handle lives in the companion object (true JVM static), same pattern as + // AbstractNodeQueue._tailDoNotCallMeDirectly. + @volatile var state: Int = SchedStateIdle + + override def apply(msg: Any): Unit = { + // Drain path: onAsyncInput passes null as the message when scheduling a drain envelope. + if (msg.asInstanceOf[AnyRef] eq null) { drain(); return } + // Producer path: msg is an (ActorRef, Any) tuple from the FunctionRef. + val pair = msg.asInstanceOf[(ActorRef, Any)] // Producer-side completion guard. `runAsyncInput` short-circuits the drain handler when the stage // is completed, so without this check a CAS-winning producer would leave state=SCHEDULED forever // and subsequent producers would skip the mailbox push, growing the queue unbounded. Dropping @@ -381,20 +387,23 @@ object GraphStageLogic { add(pair) // Vyukov producer path: getAndSet + release-store, no CAS spin // Double-checked CAS: uncontended fast path is one volatile read; only the IDLE->SCHEDULED winner // pays a CAS + mailbox push. - if (state.get() == SchedStateIdle && state.compareAndSet(SchedStateIdle, SchedStateScheduled)) { + val u = LazyDispatch.stateHandle + if (u.get(this) == SchedStateIdle && u.compareAndSet(this, SchedStateIdle, SchedStateScheduled)) { // Re-check after winning election: if completion landed between the initial guard and the CAS, // `scheduleDrain` would post an envelope that `runAsyncInput` skips, leaving state=SCHEDULED. // Reset to IDLE and skip the schedule. - if (interpreter.isStageCompleted(logic)) state.set(SchedStateIdle) + if (interpreter.isStageCompleted(logic)) u.set(this, SchedStateIdle) else scheduleDrain() } } private def scheduleDrain(): Unit = // 1 AsyncInput + 1 Envelope per drain batch (amortized across up to drainBatchSize tells). - interpreter.onAsyncInput(logic, null, NoPromise, drainCallback) + // `this` serves as the drain callback (Any => Unit); onAsyncInput calls apply(null). + interpreter.onAsyncInput(logic, null, NoPromise, this) private def drain(): Unit = { + val u = LazyDispatch.stateHandle val limit = drainBatchSize // hoisted to a local so JIT treats it as a loop-invariant constant var processed = 0 while (processed < limit) { @@ -402,15 +411,15 @@ object GraphStageLogic { // Stage completed mid-drain; drop the remainder (matches the original per-tell behaviour where // runAsyncInput silently skipped completed stages). Don't reschedule — no future drain will run. while (poll() ne null) () - state.set(SchedStateIdle) + u.set(this, SchedStateIdle) return } val item = poll() if (item eq null) { - state.set(SchedStateIdle) + u.set(this, SchedStateIdle) // Recheck race: a producer may have added between `poll == null` and the IDLE publish above. // That producer saw state=SCHEDULED and skipped the mailbox send, so we must re-elect. - if (!isEmpty && state.compareAndSet(SchedStateIdle, SchedStateScheduled)) + if (!isEmpty && u.compareAndSet(this, SchedStateIdle, SchedStateScheduled)) scheduleDrain() return } @@ -423,7 +432,7 @@ object GraphStageLogic { // mid-loop branch: drain remainder, publish IDLE, do not reschedule. if (interpreter.isStageCompleted(logic)) { while (poll() ne null) () - state.set(SchedStateIdle) + u.set(this, SchedStateIdle) return } // Re-schedule another envelope so other BoundaryEvents (pull/push/complete) can interleave via @@ -432,6 +441,13 @@ object GraphStageLogic { scheduleDrain() } } + + object LazyDispatch { + private val stateHandle: VarHandle = { + val lookup = MethodHandles.privateLookupIn(classOf[LazyDispatch], MethodHandles.lookup()) + lookup.findVarHandle(classOf[LazyDispatch], "state", Integer.TYPE) + } + } } /** From c76b9d6641c6a95f1f458d319d761a2930514e16 Mon Sep 17 00:00:00 2001 From: He-Pin Date: Wed, 3 Jun 2026 13:32:54 +0800 Subject: [PATCH 6/7] fix: type-witness VarHandle.get for Scala 3 cross-compile Motivation: The previous commit re-introduced VarHandle in LazyDispatch to avoid per-instance AtomicInteger overhead, but Scala 3's strict inference cannot pick the Int-returning signature-polymorphic overload of VarHandle.get without explicit return-type context, breaking the Binary Compatibility and Tests (3.3.x) jobs on PR #3035 with "Values of types Object and Int cannot be compared with == or !=" at GraphStage.scala:391. Modification: Read the state through a typed local (val cur: Int = u.get(this)) so the call site witnesses the Int-returning overload. Compiles on both Scala 2.13 and Scala 3.3. Keeps the VarHandle (no per-instance AtomicInteger) and the double-checked plain-read fast path under producer contention. Result: Scala 2.13 + Scala 3.3 compile clean, stream/mimaReportBinaryIssues passes, scalafmt unchanged, StageActorRefSpec 11/11 green. --- .../scala/org/apache/pekko/stream/stage/GraphStage.scala | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/stream/src/main/scala/org/apache/pekko/stream/stage/GraphStage.scala b/stream/src/main/scala/org/apache/pekko/stream/stage/GraphStage.scala index e2237270862..b3a7fed4ef6 100644 --- a/stream/src/main/scala/org/apache/pekko/stream/stage/GraphStage.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/stage/GraphStage.scala @@ -388,7 +388,12 @@ object GraphStageLogic { // Double-checked CAS: uncontended fast path is one volatile read; only the IDLE->SCHEDULED winner // pays a CAS + mailbox push. val u = LazyDispatch.stateHandle - if (u.get(this) == SchedStateIdle && u.compareAndSet(this, SchedStateIdle, SchedStateScheduled)) { + // Scala 3's strict inference cannot pick the Int-returning signature-polymorphic overload of + // `VarHandle.get` without explicit return-type context — a typed local witnesses it. On Scala 2 + // this is a no-op. Keep the double-checked plain read fast path: under producer contention + // only the IDLE→SCHEDULED winner pays a CAS, the rest just read. + val cur: Int = u.get(this) + if (cur == SchedStateIdle && u.compareAndSet(this, SchedStateIdle, SchedStateScheduled)) { // Re-check after winning election: if completion landed between the initial guard and the CAS, // `scheduleDrain` would post an envelope that `runAsyncInput` skips, leaving state=SCHEDULED. // Reset to IDLE and skip the schedule. From bec6527f7504d2d53d89ab9e1cd69f18c5c4f740 Mon Sep 17 00:00:00 2001 From: He-Pin Date: Sun, 7 Jun 2026 15:14:33 +0800 Subject: [PATCH 7/7] fix: encapsulate VarHandle access and clean up LazyDispatch style MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Motivation: PR review feedback from pjfanning: 1. `apply` method used `asInstanceOf[AnyRef] eq null`, semicolons on one line, and early `return` — non-idiomatic Scala style. 2. VarHandle accessed directly via companion `stateHandle` field from call sites — should be encapsulated behind methods on LazyDispatch. Modification: - Add private `getState()/setState()/casState()` methods on LazyDispatch that encapsulate all VarHandle access. The typed local in `getState()` witnesses the Int-returning signature-polymorphic overload for Scala 3. - Restructure `apply` to use `if/else` instead of early `return`, `msg == null` instead of `msg.asInstanceOf[AnyRef] eq null`, and no semicolons. - Replace all `val u = LazyDispatch.stateHandle; u.xxx(this, ...)` patterns in `drain()` with the new accessor methods. Result: VarHandle is retained (saves ~16 bytes/instance vs AtomicInteger) but fully encapsulated. Code style aligned with project conventions. Compilation clean on Scala 2.13. References: Refs #3035 (pjfanning review comments). --- .../pekko/stream/stage/GraphStage.scala | 73 +++++++------------ 1 file changed, 28 insertions(+), 45 deletions(-) diff --git a/stream/src/main/scala/org/apache/pekko/stream/stage/GraphStage.scala b/stream/src/main/scala/org/apache/pekko/stream/stage/GraphStage.scala index b3a7fed4ef6..ee406b85416 100644 --- a/stream/src/main/scala/org/apache/pekko/stream/stage/GraphStage.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/stage/GraphStage.scala @@ -371,36 +371,29 @@ object GraphStageLogic { // AbstractNodeQueue._tailDoNotCallMeDirectly. @volatile var state: Int = SchedStateIdle - override def apply(msg: Any): Unit = { - // Drain path: onAsyncInput passes null as the message when scheduling a drain envelope. - if (msg.asInstanceOf[AnyRef] eq null) { drain(); return } - // Producer path: msg is an (ActorRef, Any) tuple from the FunctionRef. - val pair = msg.asInstanceOf[(ActorRef, Any)] - // Producer-side completion guard. `runAsyncInput` short-circuits the drain handler when the stage - // is completed, so without this check a CAS-winning producer would leave state=SCHEDULED forever - // and subsequent producers would skip the mailbox push, growing the queue unbounded. Dropping - // post-completion sends here matches the original per-tell behaviour where `runAsyncInput` - // silently ignored them. The read is racy against the interpreter thread that flips - // `shutdownCounter`, but eventually visible — enough to bound the leak to messages enqueued - // before completion becomes visible to this thread. - if (interpreter.isStageCompleted(logic)) return - add(pair) // Vyukov producer path: getAndSet + release-store, no CAS spin - // Double-checked CAS: uncontended fast path is one volatile read; only the IDLE->SCHEDULED winner - // pays a CAS + mailbox push. - val u = LazyDispatch.stateHandle - // Scala 3's strict inference cannot pick the Int-returning signature-polymorphic overload of - // `VarHandle.get` without explicit return-type context — a typed local witnesses it. On Scala 2 - // this is a no-op. Keep the double-checked plain read fast path: under producer contention - // only the IDLE→SCHEDULED winner pays a CAS, the rest just read. - val cur: Int = u.get(this) - if (cur == SchedStateIdle && u.compareAndSet(this, SchedStateIdle, SchedStateScheduled)) { - // Re-check after winning election: if completion landed between the initial guard and the CAS, - // `scheduleDrain` would post an envelope that `runAsyncInput` skips, leaving state=SCHEDULED. - // Reset to IDLE and skip the schedule. - if (interpreter.isStageCompleted(logic)) u.set(this, SchedStateIdle) - else scheduleDrain() - } + // Typed local witnesses the Int-returning signature-polymorphic VarHandle.get overload + // (required for Scala 3 cross-compile; on Scala 2 this is a no-op). + private def getState(): Int = { + val v: Int = LazyDispatch.stateHandle.get(this) + v } + private def setState(v: Int): Unit = LazyDispatch.stateHandle.set(this, v) + private def casState(expect: Int, update: Int): Boolean = + LazyDispatch.stateHandle.compareAndSet(this, expect, update) + + // null msg = drain signal from onAsyncInput; non-null = (ActorRef, Any) tuple from FunctionRef. + override def apply(msg: Any): Unit = + if (msg == null) drain() + else { + val pair = msg.asInstanceOf[(ActorRef, Any)] + if (!interpreter.isStageCompleted(logic)) { + add(pair) + if (getState() == SchedStateIdle && casState(SchedStateIdle, SchedStateScheduled)) { + if (interpreter.isStageCompleted(logic)) setState(SchedStateIdle) + else scheduleDrain() + } + } + } private def scheduleDrain(): Unit = // 1 AsyncInput + 1 Envelope per drain batch (amortized across up to drainBatchSize tells). @@ -408,41 +401,31 @@ object GraphStageLogic { interpreter.onAsyncInput(logic, null, NoPromise, this) private def drain(): Unit = { - val u = LazyDispatch.stateHandle - val limit = drainBatchSize // hoisted to a local so JIT treats it as a loop-invariant constant + val limit = drainBatchSize var processed = 0 while (processed < limit) { if (interpreter.isStageCompleted(logic)) { - // Stage completed mid-drain; drop the remainder (matches the original per-tell behaviour where - // runAsyncInput silently skipped completed stages). Don't reschedule — no future drain will run. while (poll() ne null) () - u.set(this, SchedStateIdle) + setState(SchedStateIdle) return } val item = poll() if (item eq null) { - u.set(this, SchedStateIdle) + setState(SchedStateIdle) // Recheck race: a producer may have added between `poll == null` and the IDLE publish above. - // That producer saw state=SCHEDULED and skipped the mailbox send, so we must re-elect. - if (!isEmpty && u.compareAndSet(this, SchedStateIdle, SchedStateScheduled)) + if (!isEmpty && casState(SchedStateIdle, SchedStateScheduled)) scheduleDrain() return } handler(item) processed += 1 } - // Hit batch cap with items potentially still queued. The last `handler(item)` call may have - // completed the stage (e.g. user code called `completeStage()`); a fresh `scheduleDrain` would - // post an envelope that `runAsyncInput` skips, leaving state=SCHEDULED forever. Mirror the - // mid-loop branch: drain remainder, publish IDLE, do not reschedule. + // The last handler(item) may have completed the stage; check before re-scheduling. if (interpreter.isStageCompleted(logic)) { while (poll() ne null) () - u.set(this, SchedStateIdle) + setState(SchedStateIdle) return } - // Re-schedule another envelope so other BoundaryEvents (pull/push/complete) can interleave via - // the actor mailbox. `state` stays SCHEDULED: concurrent producers observe SCHEDULED and skip; - // the new envelope will drain. scheduleDrain() } }