diff --git a/akka-docs/src/main/paradox/stream/stages-overview.md b/akka-docs/src/main/paradox/stream/stages-overview.md index f798ce4e7c4..b2fd70d7d83 100644 --- a/akka-docs/src/main/paradox/stream/stages-overview.md +++ b/akka-docs/src/main/paradox/stream/stages-overview.md @@ -1089,6 +1089,23 @@ Each upstream element will either be diverted to the given sink, or the downstre --------------------------------------------------------------- +### wireTap + +Attaches the given `Sink` to this `Flow` as a wire tap, meaning that elements that pass +through will also be sent to the wire-tap `Sink`, without the latter affecting the mainline flow. +If the wire-tap `Sink` backpressures, elements that would've been sent to it will be dropped instead. + +**emits** element is available and demand exists from the downstream; the element will +also be sent to the wire-tap `Sink` if there is demand. + +**backpressures** downstream backpressures + +**completes** upstream completes + +**cancels** downstream cancels + +--------------------------------------------------------------- +
## Flow stages composed of Sinks and Sources diff --git a/akka-stream-tests/src/test/scala/akka/stream/DslConsistencySpec.scala b/akka-stream-tests/src/test/scala/akka/stream/DslConsistencySpec.scala old mode 100644 new mode 100755 index 4463e6b92cf..bd75c111fbc --- a/akka-stream-tests/src/test/scala/akka/stream/DslConsistencySpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/DslConsistencySpec.scala @@ -39,7 +39,7 @@ class DslConsistencySpec extends WordSpec with Matchers { Set("create", "apply", "ops", "appendJava", "andThen", "andThenMat", "isIdentity", "withAttributes", "transformMaterializing") ++ Set("asScala", "asJava", "deprecatedAndThen", "deprecatedAndThenMat") - val graphHelpers = Set("zipGraph", "zipWithGraph", "mergeGraph", "mergeSortedGraph", "interleaveGraph", "concatGraph", "prependGraph", "alsoToGraph", "orElseGraph", "divertToGraph") + val graphHelpers = Set("zipGraph", "zipWithGraph", "mergeGraph", "mergeSortedGraph", "interleaveGraph", "concatGraph", "prependGraph", "alsoToGraph", "wireTapGraph", "orElseGraph", "divertToGraph") val allowMissing: Map[Class[_], Set[String]] = Map( jFlowClass → graphHelpers, jSourceClass → graphHelpers, diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphUnzipSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphUnzipSpec.scala old mode 100644 new mode 100755 index 5611b5593df..c32cee73dd4 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphUnzipSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphUnzipSpec.scala @@ -115,7 +115,7 @@ class GraphUnzipSpec extends StreamSpec { c1.expectComplete() } - "not loose elements when pull is followed by cancel before other sink has requested" in { + "not lose elements when pull is followed by cancel before other sink has requested" in { val c1 = TestSubscriber.manualProbe[Int]() val c2 = TestSubscriber.manualProbe[String]() diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphWireTapSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphWireTapSpec.scala new file mode 100755 index 00000000000..160cac83620 --- /dev/null +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphWireTapSpec.scala @@ -0,0 +1,59 @@ +package akka.stream.scaladsl + +import akka.stream._ +import akka.stream.testkit.Utils._ +import akka.stream.testkit._ +import akka.stream.testkit.scaladsl.TestSink + +class GraphWireTapSpec extends StreamSpec { + + val settings = ActorMaterializerSettings(system) + .withInputBuffer(initialSize = 2, maxSize = 16) + + implicit val materializer = ActorMaterializer(settings) + + "A wire tap" must { + + "wireTap must broadcast to the tap" in assertAllStagesStopped { + val tp, mp = TestSink.probe[Int](system) + val (tps, mps) = Source(1 to 2).wireTapMat(tp)(Keep.right).toMat(mp)(Keep.both).run() + tps.request(2) + mps.requestNext(1) + mps.requestNext(2) + tps.expectNext(1, 2) + mps.expectComplete() + tps.expectComplete() + } + + "wireTap must drop elements while the tap has no demand, buffering up to one element" in assertAllStagesStopped { + val tp, mp = TestSink.probe[Int](system) + val (tps, mps) = Source(1 to 6).wireTapMat(tp)(Keep.right).toMat(mp)(Keep.both).run() + mps.request(3) + mps.expectNext(1, 2, 3) + tps.request(4) + mps.requestNext(4) + mps.requestNext(5) + mps.requestNext(6) + tps.expectNext(3, 4, 5, 6) + mps.expectComplete() + tps.expectComplete() + } + + "wireTap must cancel if main sink cancels" in assertAllStagesStopped { + val tp, mp = TestSink.probe[Int](system) + val (tps, mps) = Source(1 to 6).wireTapMat(tp)(Keep.right).toMat(mp)(Keep.both).run() + tps.request(6) + mps.cancel() + tps.expectComplete() + } + + "wireTap must continue if tap sink cancels" in assertAllStagesStopped { + val tp, mp = TestSink.probe[Int](system) + val (tps, mps) = Source(1 to 6).wireTapMat(tp)(Keep.right).toMat(mp)(Keep.both).run() + tps.cancel() + mps.request(6) + mps.expectNext(1, 2, 3, 4, 5, 6) + mps.expectComplete() + } + } +} diff --git a/akka-stream/src/main/mima-filters/2.5.x.backwards.excludes b/akka-stream/src/main/mima-filters/2.5.x.backwards.excludes index ec4bc83b95f..eaa5b71f9a0 100644 --- a/akka-stream/src/main/mima-filters/2.5.x.backwards.excludes +++ b/akka-stream/src/main/mima-filters/2.5.x.backwards.excludes @@ -40,3 +40,16 @@ ProblemFilters.exclude[MissingClassProblem]("akka.stream.impl.JavaFlowAndRsConve ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.stream.scaladsl.FlowOpsMat.divertToMat") ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.stream.scaladsl.FlowOps.divertTo") ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.stream.scaladsl.FlowOps.divertToGraph") + +# wireTap +ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.stream.scaladsl.FlowOps.wireTap") +ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.stream.scaladsl.FlowOps.wireTapGraph") +ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.stream.scaladsl.FlowOpsMat.wireTapMat") +ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.stream.javadsl.Source.wireTap") +ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.stream.javadsl.SubSource.wireTap") +ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.stream.javadsl.Flow.wireTap") +ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.stream.javadsl.SubFlow.wireTap") +ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.stream.javadsl.Source.wireTapMat") +ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.stream.javadsl.SubSource.wireTapMat") +ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.stream.javadsl.Flow.wireTapMat") +ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.stream.javadsl.SubFlow.wireTapMat") diff --git a/akka-stream/src/main/scala/akka/stream/impl/Stages.scala b/akka-stream/src/main/scala/akka/stream/impl/Stages.scala old mode 100644 new mode 100755 index f3ce424bd25..fada0576ed6 --- a/akka-stream/src/main/scala/akka/stream/impl/Stages.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/Stages.scala @@ -74,6 +74,7 @@ import akka.stream._ val flattenMerge = name("flattenMerge") val recoverWith = name("recoverWith") val broadcast = name("broadcast") + val wireTap = name("wireTap") val balance = name("balance") val zip = name("zip") val zipN = name("zipN") diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala old mode 100644 new mode 100755 index 2e9c17157f8..171638a7c58 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala @@ -1722,6 +1722,39 @@ final class Flow[-In, +Out, +Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends def divertToMat[M2, M3](that: Graph[SinkShape[Out], M2], when: function.Predicate[Out], matF: function.Function2[Mat, M2, M3]): javadsl.Flow[In, Out, M3] = new Flow(delegate.divertToMat(that, when.test)(combinerToScala(matF))) + /** + * Attaches the given [[Sink]] to this [[Flow]] as a wire tap, meaning that elements that pass + * through will also be sent to the wire-tap Sink, without the latter affecting the mainline flow. + * If the wire-tap Sink backpressures, elements that would've been sent to it will be dropped instead. + * + * '''Emits when''' element is available and demand exists from the downstream; the element will + * also be sent to the wire-tap Sink if there is demand. + * + * '''Backpressures when''' downstream backpressures + * + * '''Completes when''' upstream completes + * + * '''Cancels when''' downstream cancels + */ + + def wireTap(that: Graph[SinkShape[Out], _]): javadsl.Flow[In, Out, Mat] = + new Flow(delegate.wireTap(that)) + + /** + * Attaches the given [[Sink]] to this [[Flow]] as a wire tap, meaning that elements that pass + * through will also be sent to the wire-tap Sink, without the latter affecting the mainline flow. + * If the wire-tap Sink backpressures, elements that would've been sent to it will be dropped instead. + * + * It is recommended to use the internally optimized `Keep.left` and `Keep.right` combiners + * where appropriate instead of manually writing functions that pass through one of the values. + * + * @see [[#wireTap]] + */ + def wireTapMat[M2, M3]( + that: Graph[SinkShape[Out], M2], + matF: function.Function2[Mat, M2, M3]): javadsl.Flow[In, Out, M3] = + new Flow(delegate.wireTapMat(that)(combinerToScala(matF))) + /** * Interleave is a deterministic merge of the given [[Source]] with elements of this [[Flow]]. * It first emits `segmentSize` number of elements from this flow to downstream, then - same amount for `that` source, diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala old mode 100644 new mode 100755 index 320440a4005..b5be3606ca8 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala @@ -778,6 +778,38 @@ final class Source[+Out, +Mat](delegate: scaladsl.Source[Out, Mat]) extends Grap def divertToMat[M2, M3](that: Graph[SinkShape[Out], M2], when: function.Predicate[Out], matF: function.Function2[Mat, M2, M3]): javadsl.Source[Out, M3] = new Source(delegate.divertToMat(that, when.test)(combinerToScala(matF))) + /** + * Attaches the given [[Sink]] to this [[Flow]] as a wire tap, meaning that elements that pass + * through will also be sent to the wire-tap Sink, without the latter affecting the mainline flow. + * If the wire-tap Sink backpressures, elements that would've been sent to it will be dropped instead. + * + * '''Emits when''' element is available and demand exists from the downstream; the element will + * also be sent to the wire-tap Sink if there is demand. + * + * '''Backpressures when''' downstream backpressures + * + * '''Completes when''' upstream completes + * + * '''Cancels when''' downstream cancels + */ + def wireTap(that: Graph[SinkShape[Out], _]): javadsl.Source[Out, Mat] = + new Source(delegate.wireTap(that)) + + /** + * Attaches the given [[Sink]] to this [[Flow]] as a wire tap, meaning that elements that pass + * through will also be sent to the wire-tap Sink, without the latter affecting the mainline flow. + * If the wire-tap Sink backpressures, elements that would've been sent to it will be dropped instead. + * + * It is recommended to use the internally optimized `Keep.left` and `Keep.right` combiners + * where appropriate instead of manually writing functions that pass through one of the values. + * + * @see [[#wireTap]] + */ + def wireTapMat[M2, M3]( + that: Graph[SinkShape[Out], M2], + matF: function.Function2[Mat, M2, M3]): javadsl.Source[Out, M3] = + new Source(delegate.wireTapMat(that)(combinerToScala(matF))) + /** * Interleave is a deterministic merge of the given [[Source]] with elements of this [[Source]]. * It first emits `segmentSize` number of elements from this flow to downstream, then - same amount for `that` source, diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/SubFlow.scala b/akka-stream/src/main/scala/akka/stream/javadsl/SubFlow.scala old mode 100644 new mode 100755 index cfc6559cfbf..6c40bfb05bb --- a/akka-stream/src/main/scala/akka/stream/javadsl/SubFlow.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/SubFlow.scala @@ -1165,6 +1165,23 @@ class SubFlow[-In, +Out, +Mat](delegate: scaladsl.SubFlow[Out, Mat, scaladsl.Flo def divertTo(that: Graph[SinkShape[Out], _], when: function.Predicate[Out]): SubFlow[In, Out, Mat] = new SubFlow(delegate.divertTo(that, when.test)) + /** + * Attaches the given [[Sink]] to this [[Flow]] as a wire tap, meaning that elements that pass + * through will also be sent to the wire-tap Sink, without the latter affecting the mainline flow. + * If the wire-tap Sink backpressures, elements that would've been sent to it will be dropped instead. + * + * '''Emits when''' element is available and demand exists from the downstream; the element will + * also be sent to the wire-tap Sink if there is demand. + * + * '''Backpressures when''' downstream backpressures + * + * '''Completes when''' upstream completes + * + * '''Cancels when''' downstream cancels + */ + def wireTap(that: Graph[SinkShape[Out], _]): SubFlow[In, Out, Mat] = + new SubFlow(delegate.wireTap(that)) + /** * Merge the given [[Source]] to this [[Flow]], taking elements as they arrive from input streams, * picking randomly when several elements ready. diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/SubSource.scala b/akka-stream/src/main/scala/akka/stream/javadsl/SubSource.scala old mode 100644 new mode 100755 index a13e258cedf..7e471738a96 --- a/akka-stream/src/main/scala/akka/stream/javadsl/SubSource.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/SubSource.scala @@ -1157,6 +1157,23 @@ class SubSource[+Out, +Mat](delegate: scaladsl.SubFlow[Out, Mat, scaladsl.Source def divertTo(that: Graph[SinkShape[Out], _], when: function.Predicate[Out]): SubSource[Out, Mat] = new SubSource(delegate.divertTo(that, when.test)) + /** + * Attaches the given [[Sink]] to this [[Flow]] as a wire tap, meaning that elements that pass + * through will also be sent to the wire-tap Sink, without the latter affecting the mainline flow. + * If the wire-tap Sink backpressures, elements that would've been sent to it will be dropped instead. + * + * '''Emits when''' element is available and demand exists from the downstream; the element will + * also be sent to the wire-tap Sink if there is demand. + * + * '''Backpressures when''' downstream backpressures + * + * '''Completes when''' upstream completes + * + * '''Cancels when''' downstream cancels + */ + def wireTap(that: Graph[SinkShape[Out], _]): SubSource[Out, Mat] = + new SubSource(delegate.wireTap(that)) + /** * Merge the given [[Source]] to this [[Flow]], taking elements as they arrive from input streams, * picking randomly when several elements ready. diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala old mode 100644 new mode 100755 index 32102a952d9..8d75e058ce0 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala @@ -2277,7 +2277,7 @@ trait FlowOps[+Out, +Mat] { def to[Mat2](sink: Graph[SinkShape[Out], Mat2]): Closed /** - * Attaches the given [[Sink]] to this [[Flow]], meaning that elements that passes + * Attaches the given [[Sink]] to this [[Flow]], meaning that elements that pass * through will also be sent to the [[Sink]]. * * '''Emits when''' element is available and demand exists both from the Sink and the downstream. @@ -2286,7 +2286,7 @@ trait FlowOps[+Out, +Mat] { * * '''Completes when''' upstream completes * - * '''Cancels when''' downstream cancels + * '''Cancels when''' downstream and Sink cancel */ def alsoTo(that: Graph[SinkShape[Out], _]): Repr[Out] = via(alsoToGraph(that)) @@ -2320,6 +2320,30 @@ trait FlowOps[+Out, +Mat] { FlowShape(partition.in, partition.out(0)) } + /** + * Attaches the given [[Sink]] to this [[Flow]] as a wire tap, meaning that elements that pass + * through will also be sent to the wire-tap Sink, without the latter affecting the mainline flow. + * If the wire-tap Sink backpressures, elements that would've been sent to it will be dropped instead. + * + * '''Emits when''' element is available and demand exists from the downstream; the element will + * also be sent to the wire-tap Sink if there is demand. + * + * '''Backpressures when''' downstream backpressures + * + * '''Completes when''' upstream completes + * + * '''Cancels when''' downstream cancels + */ + def wireTap(that: Graph[SinkShape[Out], _]): Repr[Out] = via(wireTapGraph(that)) + + protected def wireTapGraph[M](that: Graph[SinkShape[Out], M]): Graph[FlowShape[Out @uncheckedVariance, Out], M] = + GraphDSL.create(that) { implicit b ⇒ r ⇒ + import GraphDSL.Implicits._ + val bcast = b.add(WireTap[Out]()) + bcast.out1 ~> r + FlowShape(bcast.in, bcast.out0) + } + def withAttributes(attr: Attributes): Repr[Out] def addAttributes(attr: Attributes): Repr[Out] @@ -2549,7 +2573,7 @@ trait FlowOpsMat[+Out, +Mat] extends FlowOps[Out, Mat] { viaMat(orElseGraph(secondary))(matF) /** - * Attaches the given [[Sink]] to this [[Flow]], meaning that elements that passes + * Attaches the given [[Sink]] to this [[Flow]], meaning that elements that pass * through will also be sent to the [[Sink]]. * * @see [[#alsoTo]] @@ -2572,6 +2596,19 @@ trait FlowOpsMat[+Out, +Mat] extends FlowOps[Out, Mat] { def divertToMat[Mat2, Mat3](that: Graph[SinkShape[Out], Mat2], when: Out ⇒ Boolean)(matF: (Mat, Mat2) ⇒ Mat3): ReprMat[Out, Mat3] = viaMat(divertToGraph(that, when))(matF) + /** + * Attaches the given [[Sink]] to this [[Flow]] as a wire tap, meaning that elements that pass + * through will also be sent to the wire-tap Sink, without the latter affecting the mainline flow. + * If the wire-tap Sink backpressures, elements that would've been sent to it will be dropped instead. + * + * @see [[#wireTap]] + * + * It is recommended to use the internally optimized `Keep.left` and `Keep.right` combiners + * where appropriate instead of manually writing functions that pass through one of the values. + */ + def wireTapMat[Mat2, Mat3](that: Graph[SinkShape[Out], Mat2])(matF: (Mat, Mat2) ⇒ Mat3): ReprMat[Out, Mat3] = + viaMat(wireTapGraph(that))(matF) + /** * Materializes to `Future[Done]` that completes on getting termination message. * The Future completes with success when received complete message from upstream or cancel diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Graph.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Graph.scala old mode 100644 new mode 100755 index a37aadce934..df136e8625c --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Graph.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Graph.scala @@ -6,16 +6,17 @@ package akka.stream.scaladsl import java.util.SplittableRandom import akka.NotUsed +import akka.annotation.InternalApi import akka.stream._ +import akka.stream.impl.Stages.DefaultAttributes import akka.stream.impl._ import akka.stream.impl.fusing.GraphStages -import akka.stream.impl.Stages.DefaultAttributes import akka.stream.scaladsl.Partition.PartitionOutOfBoundsException import akka.stream.stage.{ GraphStage, GraphStageLogic, InHandler, OutHandler } import akka.util.ConstantFun -import scala.annotation.unchecked.uncheckedVariance import scala.annotation.tailrec +import scala.annotation.unchecked.uncheckedVariance import scala.collection.{ immutable, mutable } import scala.concurrent.Promise import scala.util.control.{ NoStackTrace, NonFatal } @@ -627,6 +628,88 @@ final class Broadcast[T](val outputPorts: Int, val eagerCancel: Boolean) extends } +object WireTap { + private val singleton = new WireTap[Nothing] + + /** + * @see [[WireTap]] + */ + def apply[T](): WireTap[T] = singleton.asInstanceOf[WireTap[T]] +} + +/** + * Fan-out the stream to two output streams - a 'main' and a 'tap' one. Each incoming element is emitted + * to the 'main' output; elements are also emitted to the 'tap' output if there is demand; + * otherwise they are dropped. + * + * '''Emits when''' element is available and demand exists from the 'main' output; the element will + * also be sent to the 'tap' output if there is demand. + * + * '''Backpressures when''' the 'main' output backpressures + * + * '''Completes when''' upstream completes + * + * '''Cancels when''' the 'main' output cancels + * + */ +@InternalApi +private[stream] final class WireTap[T] extends GraphStage[FanOutShape2[T, T, T]] { + val in: Inlet[T] = Inlet[T]("WireTap.in") + val outMain: Outlet[T] = Outlet[T]("WireTap.outMain") + val outTap: Outlet[T] = Outlet[T]("WireTap.outTap") + override def initialAttributes = DefaultAttributes.wireTap + override val shape: FanOutShape2[T, T, T] = new FanOutShape2(in, outMain, outTap) + + override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) { + private var pendingTap: Option[T] = None + + setHandler(in, new InHandler { + override def onPush() = { + val elem = grab(in) + push(outMain, elem) + if (isAvailable(outTap)) { + push(outTap, elem) + } else { + pendingTap = Some(elem) + } + } + }) + + setHandler(outMain, new OutHandler { + override def onPull() = { + pull(in) + } + + override def onDownstreamFinish(): Unit = { + completeStage() + } + }) + + // The 'tap' output can neither backpressure, nor cancel, the stage. + setHandler(outTap, new OutHandler { + override def onPull() = { + pendingTap match { + case Some(elem) ⇒ + push(outTap, elem) + pendingTap = None + case None ⇒ // no pending element to emit + } + } + + override def onDownstreamFinish(): Unit = { + setHandler(in, new InHandler { + override def onPush() = { + push(outMain, grab(in)) + } + }) + // Allow any outstanding element to be garbage-collected + pendingTap = None + } + }) + } + override def toString = "WireTap" +} + object Partition { // FIXME make `PartitionOutOfBoundsException` a `final` class when possible case class PartitionOutOfBoundsException(msg: String) extends IndexOutOfBoundsException(msg) with NoStackTrace @@ -1090,6 +1173,10 @@ final class Concat[T](val inputPorts: Int) extends GraphStage[UniformFanInShape[ object OrElse { private val singleton = new OrElse[Nothing] + + /** + * @see [[OrElse]] + */ def apply[T]() = singleton.asInstanceOf[OrElse[T]] } @@ -1111,6 +1198,7 @@ object OrElse { * * '''Cancels when''' downstream cancels */ +@InternalApi private[stream] final class OrElse[T] extends GraphStage[UniformFanInShape[T, T]] { val primary = Inlet[T]("OrElse.primary") val secondary = Inlet[T]("OrElse.secondary")