Optimize lazy stage actor dispatch#3035
Conversation
3619719 to
491de7f
Compare
There was a problem hiding this comment.
Pull request overview
This PR optimizes the lazy GraphStageLogic.getStageActor message delivery path by coalescing external tells into an MPSC queue and draining them in bounded batches on the stream interpreter thread, reducing per-message mailbox/envelope overhead under high producer rates.
Changes:
- Add a lazy-path
StageActordispatch (LazyDispatch) that batches external messages and schedules drains via a single elected producer. - Introduce
pekko.stream.materializer.stage-actor-drain-batch(default16) to cap per-drain processing for fairness. - Add/adjust tests to validate lazy vs eager stage-actor attachment semantics; add a JMH benchmark for the lazy tell path.
Reviewed changes
Copilot reviewed 5 out of 5 changed files in this pull request and generated 2 comments.
Show a summary per file
| File | Description |
|---|---|
| stream/src/main/scala/org/apache/pekko/stream/stage/GraphStage.scala | Implements lazy stage-actor MPSC dispatch/drain coalescing; updates getStageActor implementation and related docs. |
| stream/src/main/resources/reference.conf | Adds the stage-actor-drain-batch materializer setting and documentation. |
| stream/src/main/java/org/apache/pekko/stream/stage/AbstractMpscDispatch.java | Adds a small internal Java helper combining AbstractNodeQueue with a VarHandle-driven election state. |
| stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/StageActorRefSpec.scala | Extends tests to assert lazy stage actors attach to the interpreter actor and eager ones to the supervisor; adds a usability regression test for eager stage actors. |
| bench-jmh/src/main/scala/org/apache/pekko/stream/StageActorRefBenchmark.scala | Adds a JMH benchmark for lazy getStageActor tell throughput. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
3ac2264 to
a6c91c9
Compare
Motivation: PR apache#3035 moved lazy stage actor FunctionRefs under the graph interpreter actor. During ActorGraphInterpreter preStart the interpreter context can still be backed by an UnstartedCell, causing CI failures in TCP/TLS stream stages that create stage actors during preStart. Modification: Register lazy stage actor FunctionRefs under the stream supervisor again, while keeping the new LazyDispatch path that drains messages through the graph interpreter. Update StageActorRefSpec to assert that the FunctionRef parent remains the supervisor and that message handling still runs in the interpreter. Result: TCP/TLS stages can create stage actors during interpreter startup without hitting UnstartedCell, while lazy stage actor messages still execute on the graph interpreter thread. Tests: - scalafmt --mode diff-ref=e04e721ab67d3418621ba568601bea2c5483c475 --non-interactive - scalafmt --list --mode diff-ref=e04e721ab67d3418621ba568601bea2c5483c475 --non-interactive - sbt "stream-tests / Test / testOnly org.apache.pekko.stream.scaladsl.StageActorRefSpec" - sbt "stream-tests / Test / testOnly org.apache.pekko.stream.io.TlsGraphStageSpec" - git diff --check References: Refs apache#3035
Motivation: Once a lazy stage actor's underlying stage is completed, `GraphInterpreter.runAsyncInput` short-circuits the drain handler. Without a producer-side check the CAS-winning producer left `state=SCHEDULED` forever, subsequent producers skipped the mailbox push, and the MPSC queue could grow unbounded until the stage actor was GC'd. Modification: - `LazyDispatch.apply` pre-checks `interpreter.isStageCompleted(logic)` and drops the message, matching the original per-tell behaviour where `runAsyncInput` silently ignored post-completion sends. - After winning the IDLE -> SCHEDULED CAS, re-check completion before scheduling: if completion landed in between, reset state to IDLE rather than posting an envelope that the interpreter would skip. - Fix missing space in the PoisonPill / Kill warning so the concatenated message reads "...not a real Actor. Use ..." instead of "...Actor.Use ...". Result: - Post-completion sends are silently dropped and the MPSC queue cannot grow unbounded. - StageActorRefSpec, ActorRefSink/Source, ActorRefBackpressureSink/Source, QueueSink/ Source and HubSpec all pass (125/125). - `sbt stream/mimaReportBinaryIssues` clean. References: Refs apache#3035 (Copilot review comments).
…atch Motivation: After processing `drainBatchSize` items, `LazyDispatch.drain` unconditionally posted another drain envelope so other BoundaryEvents could interleave. If the last `handler(item)` call in the batch completed the stage (e.g. user code called `completeStage()`), `GraphInterpreter.runAsyncInput` would skip the freshly scheduled envelope, leaving `state=SCHEDULED` forever and retaining any already-enqueued items until the FunctionRef was stopped. Modification: Mirror the existing mid-loop completion branch at the post-batch tail: check `interpreter.isStageCompleted(logic)`, and if true, drain the remainder, publish IDLE and return without re-scheduling. Result: The MPSC queue is always drained (or cleared) before the dispatch goes idle, even when completion lands on the last handled item of a batch. StageActorRefSpec, stream/compile, scalafmt and stream/mimaReportBinaryIssues all clean. References: Refs apache#3035 (Copilot review on da487e3).
… LazyDispatch Motivation: Per-instance AtomicInteger wrapper costs ~16 bytes per StageActor. Separate drainCallback lambda is an unnecessary allocation. Modification: - Replace AtomicInteger state field with @volatile var Int driven by static VarHandle in companion object (MethodHandles.privateLookupIn), same pattern as AbstractNodeQueue._tailDoNotCallMeDirectly. - LazyDispatch now extends Any => Unit directly, serving as both producer callback and drain callback, eliminating the drainCallback lambda allocation. - Remove private[this] (deprecated in Scala 3). Result: Each LazyDispatch instance saves ~16 bytes (AtomicInteger) + one lambda allocation (drainCallback). VarHandle lives on the companion object (true JVM static), shared across all instances. Tests: - sbt "stream / compile" "stream / mimaReportBinaryIssues" - sbt "stream-tests / Test / testOnly org.apache.pekko.stream.scaladsl.StageActorRefSpec" (11/11) Refs: apache#3035
Motivation: The squashed PR #3035 fails Binary Compatibility and Tests (3.3.x) on Scala 3 for two reasons: (1) `schedStateHandle.getAcquire(this).asInstanceOf[Int]` cannot be resolved — Scala 3's strict inference cannot pick the Int-returning signature- polymorphic overload from a chained `.asInstanceOf` call site, defaulting the return to Object and rejecting the primitive comparison; (2) `private final class LazyDispatch` is unreachable from `class StageActor`'s aux constructor on Scala 3, which compiles the aux ctor outside the companion object. Modification: Read state through a typed local (`val cur: Int = schedStateHandle.getAcquire(this)`) so the call site witnesses the Int-returning overload of the polymorphic-signature method. Drop `private` from `class LazyDispatch` (the enclosing `object StageActor` is itself private, so the class stays internally scoped). Preserves the VarHandle — no per-instance AtomicInteger, no extra wrapper allocation. Result: Scala 2.13 + Scala 3.3 compile clean, stream / mimaReportBinaryIssues passes, scalafmt clean, StageActorRefSpec 11/11 green.
Motivation: The previous commit re-introduced VarHandle in LazyDispatch to avoid per-instance AtomicInteger overhead, but Scala 3's strict inference cannot pick the Int-returning signature-polymorphic overload of VarHandle.get without explicit return-type context, breaking the Binary Compatibility and Tests (3.3.x) jobs on PR apache#3035 with "Values of types Object and Int cannot be compared with == or !=" at GraphStage.scala:391. Modification: Read the state through a typed local (val cur: Int = u.get(this)) so the call site witnesses the Int-returning overload. Compiles on both Scala 2.13 and Scala 3.3. Keeps the VarHandle (no per-instance AtomicInteger) and the double-checked plain-read fast path under producer contention. Result: Scala 2.13 + Scala 3.3 compile clean, stream/mimaReportBinaryIssues passes, scalafmt unchanged, StageActorRefSpec 11/11 green.
Motivation:
Lazy `getStageActor` refs paid one actor mailbox enqueue per external tell:
sender -> FunctionRef -> ConcurrentAsyncCallback.invokeWithPromise -> interpreter
self ! AsyncInput. Under high tell rate to a single stage actor the bottleneck
is mailbox traffic (envelope alloc, cross-thread wakeup, dequeue), not the
dispatch lambda. Each tell also allocated a Tuple2, an AsyncInput, and a
mailbox Envelope.
Modification:
Lazy `getStageActor` now installs an MPSC dispatch (`LazyDispatch`) that:
- enqueues (sender, msg) into a Vyukov MPSC queue (`AbstractNodeQueue`)
- elects a single drain via IDLE -> SCHEDULED CAS; only the elected
producer pays a mailbox enqueue
- drains on the interpreter thread in a tight loop bounded by
`stage-actor-drain-batch` (default 16), then either publishes IDLE
(with the canonical recheck race fix) or re-schedules another envelope
so other BoundaryEvents interleave naturally via the actor mailbox
- preserves `isStageCompleted` semantics: items added after completion are
dropped exactly as the old per-tell path silently skipped them.
The eager construction path (used before stream demand) is unchanged and
still routes through the materializer supervisor + `AsyncCallback`.
JIT/GC notes:
- `LazyDispatch` is a `final class` and extends `AbstractNodeQueue`
directly so it is its own queue (one fewer allocation and field deref).
- `scheduledState` is a plain `@volatile var Int` driven by a static
`VarHandle` (created via `MethodHandles.privateLookupIn`), avoiding the
per-instance `AtomicBoolean` wrapper. Same pattern as
`AbstractNodeQueue` itself.
- The dispatch `apply` is monomorphic per StageActor instance; the drain
callback is allocated once and reused. The FunctionRef lambda is
rewritten as `(sender, msg) =>` to skip the Tuple2 allocation on the
PoisonPill / Kill warning path.
- Per-tell allocation is now 1 Node + 1 Tuple2 (the Tuple2 is forced by
the public `StageActorRef.Receive` type); AsyncInput and Envelope are
amortized across the batch.
Configuration:
`pekko.stream.materializer.stage-actor-drain-batch` (default 16) bounds the
per-envelope drain. The default aligns with `InputBuffer.max` and keeps the
per-actor-wakeup work in the same order of magnitude as the dispatcher
throughput; smaller values trade tell throughput for tighter interleaving
with upstream/downstream events, larger values do the opposite.
Binary compatibility:
The original 4-arg `private[pekko] StageActor` constructor
(`materializer, getAsyncCallback, initialReceive, name`) is preserved as
an auxiliary constructor and continues to use the eager
`AsyncCallback` path. A new 5-arg `private[pekko]` constructor
(`materializer, interpreter, logic, initialReceive, name`) is added for the
lazy path. `sbt stream/mimaReportBinaryIssues` passes clean.
Result:
`StageActorRefBenchmark.lazy_stage_actor_ref_tell_10k` (JMH 2 forks x 10
iter x 2s, macOS) - throughput is now bounded by Vyukov enqueue + drain
loop rather than per-tell mailbox traffic:
| Variant | Throughput (ops/s) | vs main |
|----------------------------------|----------------------|---------|
| main | 6,587,561 +- 616,243 | 1.00x |
| MPSC + drain coalescing (cap=16) | 13,044,829 +- 1,525K | 1.98x |
| MPSC + drain coalescing (cap=8) | 13,589,612 +- 2,114K | 2.06x |
BroadcastHubBenchmark is unchanged in this measurement (its bottleneck is
fan-out broadcasting, not stage-actor tell traffic).
Tests:
- sbt "stream / compile" "stream / mimaReportBinaryIssues"
- sbt "stream-tests / Test / testOnly org.apache.pekko.stream.scaladsl.StageActorRefSpec"
(11/11)
- sbt "stream-tests / Test / testOnly
org.apache.pekko.stream.scaladsl.ActorRefSinkSpec
org.apache.pekko.stream.scaladsl.ActorRefSourceSpec
org.apache.pekko.stream.scaladsl.ActorRefBackpressureSinkSpec
org.apache.pekko.stream.scaladsl.ActorRefBackpressureSourceSpec" (42/42)
- sbt "stream-tests / Test / testOnly
org.apache.pekko.stream.scaladsl.QueueSinkSpec
org.apache.pekko.stream.scaladsl.QueueSourceSpec
org.apache.pekko.stream.scaladsl.HubSpec" (94/94)
- sbt scalafmt headerCheck
- sbt "bench-jmh / Jmh / run -i 10 -wi 5 -f 2 -r 2s -w 2s
.*StageActorRefBenchmark.*"
References:
Refs akka/akka-core#26857 (public issue only;
clean-room implementation)
Motivation: PR apache#3035 moved lazy stage actor FunctionRefs under the graph interpreter actor. During ActorGraphInterpreter preStart the interpreter context can still be backed by an UnstartedCell, causing CI failures in TCP/TLS stream stages that create stage actors during preStart. Modification: Register lazy stage actor FunctionRefs under the stream supervisor again, while keeping the new LazyDispatch path that drains messages through the graph interpreter. Update StageActorRefSpec to assert that the FunctionRef parent remains the supervisor and that message handling still runs in the interpreter. Result: TCP/TLS stages can create stage actors during interpreter startup without hitting UnstartedCell, while lazy stage actor messages still execute on the graph interpreter thread. Tests: - scalafmt --mode diff-ref=e04e721ab67d3418621ba568601bea2c5483c475 --non-interactive - scalafmt --list --mode diff-ref=e04e721ab67d3418621ba568601bea2c5483c475 --non-interactive - sbt "stream-tests / Test / testOnly org.apache.pekko.stream.scaladsl.StageActorRefSpec" - sbt "stream-tests / Test / testOnly org.apache.pekko.stream.io.TlsGraphStageSpec" - git diff --check References: Refs apache#3035
Motivation: Once a lazy stage actor's underlying stage is completed, `GraphInterpreter.runAsyncInput` short-circuits the drain handler. Without a producer-side check the CAS-winning producer left `state=SCHEDULED` forever, subsequent producers skipped the mailbox push, and the MPSC queue could grow unbounded until the stage actor was GC'd. Modification: - `LazyDispatch.apply` pre-checks `interpreter.isStageCompleted(logic)` and drops the message, matching the original per-tell behaviour where `runAsyncInput` silently ignored post-completion sends. - After winning the IDLE -> SCHEDULED CAS, re-check completion before scheduling: if completion landed in between, reset state to IDLE rather than posting an envelope that the interpreter would skip. - Fix missing space in the PoisonPill / Kill warning so the concatenated message reads "...not a real Actor. Use ..." instead of "...Actor.Use ...". Result: - Post-completion sends are silently dropped and the MPSC queue cannot grow unbounded. - StageActorRefSpec, ActorRefSink/Source, ActorRefBackpressureSink/Source, QueueSink/ Source and HubSpec all pass (125/125). - `sbt stream/mimaReportBinaryIssues` clean. References: Refs apache#3035 (Copilot review comments).
…atch Motivation: After processing `drainBatchSize` items, `LazyDispatch.drain` unconditionally posted another drain envelope so other BoundaryEvents could interleave. If the last `handler(item)` call in the batch completed the stage (e.g. user code called `completeStage()`), `GraphInterpreter.runAsyncInput` would skip the freshly scheduled envelope, leaving `state=SCHEDULED` forever and retaining any already-enqueued items until the FunctionRef was stopped. Modification: Mirror the existing mid-loop completion branch at the post-batch tail: check `interpreter.isStageCompleted(logic)`, and if true, drain the remainder, publish IDLE and return without re-scheduling. Result: The MPSC queue is always drained (or cleared) before the dispatch goes idle, even when completion lands on the last handled item of a batch. StageActorRefSpec, stream/compile, scalafmt and stream/mimaReportBinaryIssues all clean. References: Refs apache#3035 (Copilot review on da487e3).
… LazyDispatch Motivation: Per-instance AtomicInteger wrapper costs ~16 bytes per StageActor. Separate drainCallback lambda is an unnecessary allocation. Modification: - Replace AtomicInteger state field with @volatile var Int driven by static VarHandle in companion object (MethodHandles.privateLookupIn), same pattern as AbstractNodeQueue._tailDoNotCallMeDirectly. - LazyDispatch now extends Any => Unit directly, serving as both producer callback and drain callback, eliminating the drainCallback lambda allocation. - Remove private[this] (deprecated in Scala 3). Result: Each LazyDispatch instance saves ~16 bytes (AtomicInteger) + one lambda allocation (drainCallback). VarHandle lives on the companion object (true JVM static), shared across all instances. Tests: - sbt "stream / compile" "stream / mimaReportBinaryIssues" - sbt "stream-tests / Test / testOnly org.apache.pekko.stream.scaladsl.StageActorRefSpec" (11/11) Refs: apache#3035
Motivation: The previous commit re-introduced VarHandle in LazyDispatch to avoid per-instance AtomicInteger overhead, but Scala 3's strict inference cannot pick the Int-returning signature-polymorphic overload of VarHandle.get without explicit return-type context, breaking the Binary Compatibility and Tests (3.3.x) jobs on PR apache#3035 with "Values of types Object and Int cannot be compared with == or !=" at GraphStage.scala:391. Modification: Read the state through a typed local (val cur: Int = u.get(this)) so the call site witnesses the Int-returning overload. Compiles on both Scala 2.13 and Scala 3.3. Keeps the VarHandle (no per-instance AtomicInteger) and the double-checked plain-read fast path under producer contention. Result: Scala 2.13 + Scala 3.3 compile clean, stream/mimaReportBinaryIssues passes, scalafmt unchanged, StageActorRefSpec 11/11 green.
2162e44 to
c76b9d6
Compare
|
|
||
| override def apply(msg: Any): Unit = { | ||
| // Drain path: onAsyncInput passes null as the message when scheduling a drain envelope. | ||
| if (msg.asInstanceOf[AnyRef] eq null) { drain(); return } |
There was a problem hiding this comment.
why do you need to cast before checking eq null? can you also avoid ; and just use new lines? can you avoid return here by using an else on the if.
|
|
||
| object LazyDispatch { | ||
| private val stateHandle: VarHandle = { | ||
| val lookup = MethodHandles.privateLookupIn(classOf[LazyDispatch], MethodHandles.lookup()) |
There was a problem hiding this comment.
this is our own code - why do we need to use var handles? Can't we just change LazyDispatch to expose methods that work with the 'state' field?
Motivation: PR review feedback from pjfanning: 1. `apply` method used `asInstanceOf[AnyRef] eq null`, semicolons on one line, and early `return` — non-idiomatic Scala style. 2. VarHandle accessed directly via companion `stateHandle` field from call sites — should be encapsulated behind methods on LazyDispatch. Modification: - Add private `getState()/setState()/casState()` methods on LazyDispatch that encapsulate all VarHandle access. The typed local in `getState()` witnesses the Int-returning signature-polymorphic overload for Scala 3. - Restructure `apply` to use `if/else` instead of early `return`, `msg == null` instead of `msg.asInstanceOf[AnyRef] eq null`, and no semicolons. - Replace all `val u = LazyDispatch.stateHandle; u.xxx(this, ...)` patterns in `drain()` with the new accessor methods. Result: VarHandle is retained (saves ~16 bytes/instance vs AtomicInteger) but fully encapsulated. Code style aligned with project conventions. Compilation clean on Scala 2.13. References: Refs apache#3035 (pjfanning review comments).
Motivation
Refs akka/akka-core#26857.
Lazy
getStageActorrefs previously paid one actor mailbox enqueue per externaltell— each tell allocated aTuple2, aConcurrentAsyncCallbackindirection, anAsyncInput, and a mailboxEnvelope. Under high tell rate to a single stage actor the bottleneck is mailbox traffic and per-tell allocation, not the dispatch lambda.Modification
Lazy
getStageActorinstalls an MPSC dispatch (LazyDispatch) that:(sender, msg)into a Vyukov MPSC queue (AbstractNodeQueue)stage-actor-drain-batch(default16), then either publishes IDLE with the canonical recheck race fix, or re-schedules another envelope so otherBoundaryEvents (pull/push/complete/subscribe) interleave naturally via the actor mailboxisStageCompletedsemantics — items added after completion are dropped exactly as the old per-tell path silently skipped themThe eager construction path (used before stream demand) is unchanged and still routes through the materializer supervisor +
AsyncCallback.JIT / GC notes
LazyDispatchis afinal classextendingAbstractNodeQueuedirectly — it is its own queue (one fewer allocation and field deref).AtomicIntegerfield. An earlier revision used a@volatile var Intdriven by a staticVarHandle(via aAbstractMpscDispatch.javahelper), but that pattern hits cross-Scala polymorphic-signature inference quirks forVarHandle.getAcquirebetween Scala 2 and Scala 3, so the helper was dropped in favour of the simplerAtomicInteger. The extra reference perStageActoris negligible against the per-tell mailbox traffic the coalescing path already saves.applyis monomorphic perStageActorinstance; the drain callback is allocated once and reused.FunctionReflambda is(sender, msg) =>(explicit, not a pattern-matchFunction2literal) so thePoisonPill/Killwarning path skips theTuple2allocation.Tuple2is forced by the publicStageActorRef.Receivetype); theAsyncInputandEnvelopeare amortized across the batch.Configuration
pekko.stream.materializer.stage-actor-drain-batch16InputBuffer.max.Concurrency / correctness
getAndSet+ release store, no CAS spin), lock-free single-consumer drain.interpreter.isStageCompleted(logic)is checked per drain iteration — completion mid-batch drops the remainder, matching the old per-tell skip-if-completed semantics.Terminatedarrives via the sameFunctionRefpath and therefore the same MPSC queue, preserving ordering.Binary compatibility
The original 4-arg
private[pekko] StageActorconstructor (materializer, getAsyncCallback, initialReceive, name) is preserved as an auxiliary constructor and continues to use the eagerAsyncCallbackpath. A new 5-argprivate[pekko]constructor (materializer, interpreter, logic, initialReceive, name) is added for the lazy path.sbt stream/mimaReportBinaryIssues✅ passes clean.Benchmark
StageActorRefBenchmark.lazy_stage_actor_ref_tell_10k(JMH 2 forks × 10 iter × 2s, macOS):mainBroadcastHubBenchmarkis unchanged — its bottleneck is fan-out broadcasting, not stage-actor tell traffic. Benefit materializes when many producers tell a single stage actor at high rate (e.g. MergeHub fan-in, TCP IO byte streams, custom GraphStages that usegetStageActoras an external input port).Command:
Tests
scalafmt+headerChecksbt "stream / compile" "stream / mimaReportBinaryIssues"✅sbt "stream-tests / Test / testOnly org.apache.pekko.stream.scaladsl.StageActorRefSpec"— 11/11sbt "stream-tests / Test / testOnly org.apache.pekko.stream.scaladsl.ActorRefSinkSpec org.apache.pekko.stream.scaladsl.ActorRefSourceSpec org.apache.pekko.stream.scaladsl.ActorRefBackpressureSinkSpec org.apache.pekko.stream.scaladsl.ActorRefBackpressureSourceSpec"— 42/42sbt "stream-tests / Test / testOnly org.apache.pekko.stream.scaladsl.QueueSinkSpec org.apache.pekko.stream.scaladsl.QueueSourceSpec org.apache.pekko.stream.scaladsl.HubSpec"— 94/94Clean-room note
Implementation developed against the Pekko codebase and the public issue statement only. No upstream code, tests, comments, or diff content were copied.
References
Refs akka/akka-core#26857