feat(stream): replace fanout publisher runtime with GraphStage bridge#2874
Conversation
Motivation: Sink.asPublisher(fanout = true) still depended on the legacy actor-backed FanoutProcessorImpl runtime, which kept issue apache#2860 blocked on old processor infrastructure and implementation-bound tests. Modification: Route FanoutPublisherSink through a new FanoutPublisherBridgeStage, delete the legacy FanoutProcessor implementation, replace the old actor-bound spec with FanoutPublisherBehaviorSpec, and add the matching 2.0.x MiMa excludes for the removed binary-visible classes. Result: The fanout publisher path now runs on a dedicated GraphStage bridge with the existing terminal-signal contract preserved, broader behavior coverage added, and compile/MiMa/TCK validation passing. References: apache#2860 Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
Motivation: GitHub's scalafmt diff-ref job flagged FanoutPublisherBridgeStage.scala on PR apache#2874. Modification: Applied scalafmt formatting to FanoutPublisherBridgeStage.scala without changing behavior. Result: The fanout bridge file now matches the repository's CI formatting expectations. References: apache#2860 apache#2874 Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
There was a problem hiding this comment.
Pull request overview
This PR migrates the runtime behind Sink.asPublisher(fanout = true) from the legacy actor-based fanout processor to a GraphStage-based bridge (FanoutPublisherBridgeStage), while keeping the existing fanout publisher behavior contract intact and updating tests and MiMa filters accordingly.
Changes:
- Rewires
FanoutPublisherSinkmaterialization to use a GraphStage bridge instead ofFanoutProcessorImpl/ActorPublisher. - Introduces
FanoutPublisherBridgeStage(GraphStage + RSPublisher) implementing the fanout subscription/buffering/timeout behavior. - Replaces the implementation-bound actor spec with a black-box behavior spec and adds MiMa exclusions for removed internal classes.
Reviewed changes
Copilot reviewed 6 out of 6 changed files in this pull request and generated no comments.
Show a summary per file
| File | Description |
|---|---|
| stream/src/main/scala/org/apache/pekko/stream/impl/Sinks.scala | Switches FanoutPublisherSink.create to materialize a Source.asSubscriber → FanoutPublisherBridgeStage bridge. |
| stream/src/main/scala/org/apache/pekko/stream/impl/FanoutPublisherBridgeStage.scala | Adds the new GraphStage-based fanout publisher bridge implementation. |
| stream/src/main/scala/org/apache/pekko/stream/impl/FanoutProcessor.scala | Removes the legacy actor-backed fanout processor implementation. |
| stream/src/main/mima-filters/2.0.x.backwards.excludes/remove-fanout-processor.excludes | Adds MiMa filters for the removed internal fanout processor classes. |
| stream-tests/src/test/scala/org/apache/pekko/stream/impl/FanoutPublisherBehaviorSpec.scala | Adds black-box behavior tests for Sink.asPublisher(fanout = true) (including timeout + multi-subscriber behavior). |
| stream-tests/src/test/scala/org/apache/pekko/stream/impl/FanoutProcessorSpec.scala | Removes the actor-implementation-specific spec. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| override def subscribe(subscriber: Subscriber[_ >: T]): Unit = { | ||
| requireNonNullSubscriber(subscriber) | ||
|
|
||
| @tailrec def doSubscribe(): Unit = { |
There was a problem hiding this comment.
Long-term safety risk: `FanoutPublisherBridgePublisher` holds a reference to `registerPendingSubscribers: AsyncCallback[Unit]`, which internally holds a reference to the `GraphStageLogic`. If the `Publisher` is retained by user code after the stage terminates, this creates a reference chain that prevents GC of the entire stage logic and all its state (buffer, subscriptions, callbacks). Consider using a weak reference pattern or clearing the callback reference in `shutdown()` to break this chain for long-running applications.
There was a problem hiding this comment.
would this be possible to add?
There was a problem hiding this comment.
I think there is no need to fix this.
Motivation: FanoutPublisherBridgeStage.postStop always completed the exposed publisher with ActorPublisher.NormalShutdownReason, even when the stage was stopped abruptly without an upstream or subscriber terminal signal. That made active and late subscribers observe a normal shutdown instead of the GraphStage abrupt termination failure. Modification: Track whether the bridge is stopping from an intentional terminal path. Only synthesize AbruptStageTerminationException from postStop when no terminal path was already signalled, and add a bridge-level regression test for active and late subscribers. Result: Controlled completion, failure, timeout, and last-subscriber cancellation keep their existing reasons, while abrupt bridge shutdown is now reported as AbruptStageTerminationException. References: apache#2874
Motivation: FanoutPublisherBridgeStage.postStop always completed the exposed publisher with ActorPublisher.NormalShutdownReason, even when the stage was stopped abruptly without an upstream or subscriber terminal signal. That made active and late subscribers observe a normal shutdown instead of the GraphStage abrupt termination failure. Modification: Track whether the bridge is stopping from an intentional terminal path. Only synthesize AbruptStageTerminationException from postStop when no terminal path was already signalled, and add a bridge-level regression test for active and late subscribers. Result: Controlled completion, failure, timeout, and last-subscriber cancellation keep their existing reasons, while abrupt bridge shutdown is now reported as AbruptStageTerminationException. References: #2874
Summary
This is the second #2860 slice.
It replaces the actor-backed runtime behind
Sink.asPublisher(fanout = true)withFanoutPublisherBridgeStage, removes the legacyFanoutProcessor.scala, replaces the old implementation-bound spec withFanoutPublisherBehaviorSpec, and adds the matching2.0.xMiMa exclusions.The runtime stays on a dedicated internal fanout bridge and preserves the existing terminal-signal contract already exercised by
FlowSpec.Motivation
Sink.asPublisher(fanout = true)behavior-compatible while moving the runtime to GraphStage-based stream infrastructureModification
FanoutPublisherSink.createinstream/src/main/scala/org/apache/pekko/stream/impl/Sinks.scalastream/src/main/scala/org/apache/pekko/stream/impl/FanoutPublisherBridgeStage.scalastream/src/main/scala/org/apache/pekko/stream/impl/FanoutProcessor.scalastream-tests/src/test/scala/org/apache/pekko/stream/impl/FanoutProcessorSpec.scalawithstream-tests/src/test/scala/org/apache/pekko/stream/impl/FanoutPublisherBehaviorSpec.scalastream/src/main/mima-filters/2.0.x.backwards.excludes/remove-fanout-processor.excludesFanoutPublisherBridgeStage.scalato satisfy the GitHubCode is formattedcheckResult
FanoutProcessorImplValidation
sbt scalafmtAllsbt "stream/test:compile"sbt "stream/mimaReportBinaryIssues"sbt "stream-tests/testOnly org.apache.pekko.stream.impl.FanoutPublisherBehaviorSpec org.apache.pekko.stream.impl.TimeoutsSpec org.apache.pekko.stream.scaladsl.FlowSpec org.apache.pekko.stream.scaladsl.SinkSpec"sbt "stream-tests/testOnly org.apache.pekko.stream.javadsl.SinkTest"sbt "stream-tests-tck/testOnly org.apache.pekko.stream.tck.FanoutPublisherTest"sbt "scalafmtOnly stream/src/main/scala/org/apache/pekko/stream/impl/FanoutPublisherBridgeStage.scala"Upstream / references