optimize: skip afterStageHasRun no-op finalize check in GraphInterpreter chase hot path#2986
Merged
He-Pin merged 3 commits intoMay 21, 2026
Conversation
Motivation:
The previous shape `new GraphInterpreterSpecKit { new TestSetup { ... } }` ran
inside @benchmark, so each invocation built (and never tore down) a fresh
ActorSystem. Long iterations exhausted native threads and JMH reported empty
results once the JVM ran out of resources.
Modification:
Make the benchmark class itself extend GraphInterpreterSpecKit so JMH's
@State(Scope.Benchmark) lifecycle reuses one ActorSystem across all
invocations. Add @teardown(Level.Trial) to terminate it cleanly.
Result:
The benchmark now runs to completion and produces stable numbers, which is a
prerequisite for measuring follow-up GraphInterpreter optimizations.
Tests:
sbt 'bench-jmh/compile'
Motivation: GraphStages.identity is a singleton whose Inlet/Outlet shape is shared across every reference. Chaining N copies into the assembly (numberOfIds = 5/10) collapses to a single shape and mis-wires the connections, which surfaced as a runtime "Cannot pull port twice" error spam during the benchmark and produced nonsense throughput numbers (5/10 stages reported as faster than 1). Modification: Define a local IdentityStage class with its own Inlet/Outlet per instance and use Vector.fill(numberOfIds)(new IdentityStage[Int]). Result: The benchmark wires N distinct stages and produces stable, monotonic numbers (throughput decreases as numberOfIds grows, as expected). Tests: sbt 'bench-jmh/compile'
Motivation: GraphInterpreter's chase loops dominate hot-path CPU in steady state — JMH stack profiling on InterpreterBenchmark attributes ~50% of stream-related samples to the two while loops at execute:449 / execute:460. Every chase iteration calls afterStageHasRun(activeStage), which in steady state always reads shutdownCounter(activeStage.stageId) and the per-stage finalized flag only to discover the stage has not just completed and skip the body. That is a per-event array load + null check + branch on the hottest path with no work to do, which the JIT cannot fold away because the array is mutable shared state. Modification: Track pendingFinalization: Boolean on the interpreter, set when a stage's shutdownCounter decrements to 0 in completeConnection or transitions to 0 when KeepGoing is cleared in setKeepGoing. Gate the three hot-path afterStageHasRun calls in execute() (post normal-dispatch and the two chase loops) on the flag, resetting it before the call so cascaded completions during finalization re-arm the flag correctly. The slow-frequency callers (init, runAsyncInput) are left untouched. Result: JMH on InterpreterBenchmark (JDK 25, G1, single thread, -i 5 -wi 3 -f 1 -t 1): numberOfIds baseline (ops/ms) with patch (ops/ms) delta 1 45238 ± 3143 50952 ± 4784 +12.6% 5 10526 ± 151 11242 ± 288 +6.8% (CIs disjoint) 10 5350 ± 193 5927 ± 173 +10.8% (CIs disjoint) Allocation rate stays at ~0.6 B/op — no GC impact. All stream-tests pass. Tests: - sbt 'stream/compile' - sbt 'stream/mimaReportBinaryIssues' - clean - sbt 'stream-tests/testOnly *fusing*' - 159 tests, all passed - sbt 'stream-tests/testOnly *Flow*Spec' - 1208 tests, all passed - sbt 'bench-jmh/Jmh/run -i 5 -wi 3 -f 1 -t 1 .*InterpreterBenchmark.*' - numbers above References: Refs apache#2985 - benchmark fix used to obtain trustworthy JMH numbers above.
Contributor
There was a problem hiding this comment.
Pull request overview
This PR reduces GraphInterpreter.execute() hot-path overhead by avoiding an unconditional afterStageHasRun(activeStage) check on every dispatch/chase iteration, while also fixing InterpreterBenchmark to provide reliable JMH measurements for interpreter changes.
Changes:
- Add a
pendingFinalizationflag that is set when a stage’sshutdownCountertransitions to0, and use it to gateafterStageHasRunin the main dispatch path and both chase loops. - Update shutdown bookkeeping (
completeConnection,setKeepGoing(false)) to armpendingFinalizationwhen a stage becomes eligible for finalization. - Fix
InterpreterBenchmarklifecycle (reuse a singleActorSystemper trial + proper teardown) and replaceGraphStages.identityusage with a per-instance identity stage to avoid shape reuse/miswiring.
Reviewed changes
Copilot reviewed 2 out of 2 changed files in this pull request and generated no comments.
| File | Description |
|---|---|
| stream/src/main/scala/org/apache/pekko/stream/impl/fusing/GraphInterpreter.scala | Introduces pendingFinalization and gates afterStageHasRun in execute/chase hot paths; arms the flag when stages newly complete. |
| bench-jmh/src/main/scala/org/apache/pekko/stream/InterpreterBenchmark.scala | Makes the benchmark state reuse one ActorSystem, adds teardown, and uses a per-instance identity GraphStage to avoid singleton-shape wiring issues. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Motivation
GraphInterpreter's chase loops dominate hot-path CPU in steady state. JMH stack profiling onInterpreterBenchmark(numberOfIds=10) attributes ~50% of stream-related samples to the twowhileloops atexecute:449andexecute:460, after deep JIT inlining ofprocessPush/onPush/push/grab/chasePush.Every chase iteration ends with
afterStageHasRun(activeStage), which in steady state always readsshutdownCounter(activeStage.stageId)and the per-stage finalized flag, only to discover the stage has not just completed and skip the body. That is a per-event array load + null check + branch on the hottest path with no work to do, which the JIT cannot fold away becauseshutdownCounteris a mutable shared array.Modification
pendingFinalization: Booleanon the interpreter, set when a stage'sshutdownCounterdecrements to 0 incompleteConnection, or transitions to 0 whenKeepGoingis cleared insetKeepGoing.afterStageHasRun(activeStage)call sites inexecute()(post normal-dispatch and the two chase loops) on the flag, resetting it tofalsebefore invokingafterStageHasRunso cascaded completions during finalization re-arm the flag correctly.afterStageHasRuncallers ininit()andrunAsyncInputare intentionally left untouched — they run once per stage / per async event and are not on the hot path.The semantic invariant is preserved: any path that decrements
shutdownCounterto 0 sets the flag, so any state whereisStageCompleted(activeStage)could newly return true is guaranteed to be observed by the next gated call.Result
JMH on
InterpreterBenchmark(JDK 25, G1, single thread,-i 5 -wi 3 -f 1 -t 1):numberOfIds=5and=10show non-overlapping 99.9% confidence intervals vs the same-tree baseline, so the gain is not noise. Allocation rate stays at ~0.6 B/op (0 GC counts in the measurement window) — no GC impact.Tests
sbt 'stream/compile'sbt 'stream/mimaReportBinaryIssues'— cleansbt 'stream-tests/testOnly *fusing*'— 159 tests, all passed (coversGraphInterpreterSpec,GraphInterpreterPortsSpec, completion / cancel / fail paths)sbt 'stream-tests/testOnly *Flow*Spec'— 1208 tests, all passedsbt 'bench-jmh/Jmh/run -i 5 -wi 3 -f 1 -t 1 .*InterpreterBenchmark.*'— numbers aboveReferences
Refs #2985 — relies on the
InterpreterBenchmarkcorrectness fix in that PR to obtain trustworthy JMH numbers. This branch contains the two commits from #2985 plus the optimization commit; if #2985 lands first, this branch will be rebased to drop those.