From 9d939ef3922e7d8537702bbfdb1b474e595884a7 Mon Sep 17 00:00:00 2001 From: He-Pin Date: Sun, 19 Apr 2026 07:11:18 +0800 Subject: [PATCH] chore(stream): remove internal FanOut Unzip Motivation: Public Unzip is already GraphStage-based, while stream.impl.FanOut still carried an unused internal actor-backed Unzip implementation. Keeping that dead code makes issue #2860 harder to reason about and still leaves binary-visible legacy symbols behind on the 2.0.x line. Modification: Remove the internal impl.Unzip object/class from FanOut.scala and add a 2.0.x MiMa exclude file for the deleted binary-visible symbols. Result: The dead internal Unzip cleanup now stands alone as a small, reviewable slice with unchanged public Unzip behavior and green compile/MiMa/spec validation. References: apache/pekko#2860 Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- .../remove-internal-unzip.excludes | 20 +++++++++++ .../org/apache/pekko/stream/impl/FanOut.scala | 34 ------------------- 2 files changed, 20 insertions(+), 34 deletions(-) create mode 100644 stream/src/main/mima-filters/2.0.x.backwards.excludes/remove-internal-unzip.excludes diff --git a/stream/src/main/mima-filters/2.0.x.backwards.excludes/remove-internal-unzip.excludes b/stream/src/main/mima-filters/2.0.x.backwards.excludes/remove-internal-unzip.excludes new file mode 100644 index 00000000000..c3d8231dece --- /dev/null +++ b/stream/src/main/mima-filters/2.0.x.backwards.excludes/remove-internal-unzip.excludes @@ -0,0 +1,20 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +# Remove internal legacy Unzip implementation +ProblemFilters.exclude[MissingClassProblem]("org.apache.pekko.stream.impl.Unzip") +ProblemFilters.exclude[MissingClassProblem]("org.apache.pekko.stream.impl.Unzip$") diff --git a/stream/src/main/scala/org/apache/pekko/stream/impl/FanOut.scala b/stream/src/main/scala/org/apache/pekko/stream/impl/FanOut.scala index f3265f76ad1..c8b2a12e481 100644 --- a/stream/src/main/scala/org/apache/pekko/stream/impl/FanOut.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/impl/FanOut.scala @@ -313,37 +313,3 @@ import org.reactivestreams.Subscription def receive = primaryInputs.subreceive.orElse[Any, Unit](outputBunch.subreceive) } - -/** - * INTERNAL API - */ -@InternalApi private[pekko] object Unzip { - def props(attributes: Attributes): Props = - Props(new Unzip(attributes)).withDeploy(Deploy.local) -} - -/** - * INTERNAL API - */ -@InternalApi private[pekko] class Unzip(attributes: Attributes) extends FanOut(attributes, outputCount = 2) { - outputBunch.markAllOutputs() - - initialPhase( - 1, - TransferPhase(primaryInputs.NeedsInput && outputBunch.AllOfMarkedOutputs) { () => - primaryInputs.dequeueInputElement() match { - case (a, b) => - outputBunch.enqueue(0, a) - outputBunch.enqueue(1, b) - - case t: pekko.japi.Pair[_, _] => - outputBunch.enqueue(0, t.first) - outputBunch.enqueue(1, t.second) - - case t => - throw new IllegalArgumentException( - s"Unable to unzip elements of type ${t.getClass.getName}, " + - s"can only handle Tuple2 and org.apache.pekko.japi.Pair!") - } - }) -}