Skip to content

Commit

Permalink
Add wireTap combinator (#15077) (#23824)
Browse files Browse the repository at this point in the history
  • Loading branch information
catalin-ursachi authored and ktoso committed Feb 13, 2018
1 parent f2e39c7 commit ccf5d46
Show file tree
Hide file tree
Showing 12 changed files with 321 additions and 7 deletions.
17 changes: 17 additions & 0 deletions akka-docs/src/main/paradox/stream/stages-overview.md
Expand Up @@ -1089,6 +1089,23 @@ Each upstream element will either be diverted to the given sink, or the downstre

---------------------------------------------------------------

### wireTap

Attaches the given `Sink` to this `Flow` as a wire tap, meaning that elements that pass
through will also be sent to the wire-tap `Sink`, without the latter affecting the mainline flow.
If the wire-tap `Sink` backpressures, elements that would've been sent to it will be dropped instead.

**emits** element is available and demand exists from the downstream; the element will
also be sent to the wire-tap `Sink` if there is demand.

**backpressures** downstream backpressures

**completes** upstream completes

**cancels** downstream cancels

---------------------------------------------------------------

<br/>

## Flow stages composed of Sinks and Sources
Expand Down
2 changes: 1 addition & 1 deletion akka-stream-tests/src/test/scala/akka/stream/DslConsistencySpec.scala 100644 → 100755
Expand Up @@ -39,7 +39,7 @@ class DslConsistencySpec extends WordSpec with Matchers {
Set("create", "apply", "ops", "appendJava", "andThen", "andThenMat", "isIdentity", "withAttributes", "transformMaterializing") ++
Set("asScala", "asJava", "deprecatedAndThen", "deprecatedAndThenMat")

val graphHelpers = Set("zipGraph", "zipWithGraph", "mergeGraph", "mergeSortedGraph", "interleaveGraph", "concatGraph", "prependGraph", "alsoToGraph", "orElseGraph", "divertToGraph")
val graphHelpers = Set("zipGraph", "zipWithGraph", "mergeGraph", "mergeSortedGraph", "interleaveGraph", "concatGraph", "prependGraph", "alsoToGraph", "wireTapGraph", "orElseGraph", "divertToGraph")
val allowMissing: Map[Class[_], Set[String]] = Map(
jFlowClass graphHelpers,
jSourceClass graphHelpers,
Expand Down
2 changes: 1 addition & 1 deletion akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphUnzipSpec.scala 100644 → 100755
Expand Up @@ -115,7 +115,7 @@ class GraphUnzipSpec extends StreamSpec {
c1.expectComplete()
}

"not loose elements when pull is followed by cancel before other sink has requested" in {
"not lose elements when pull is followed by cancel before other sink has requested" in {
val c1 = TestSubscriber.manualProbe[Int]()
val c2 = TestSubscriber.manualProbe[String]()

Expand Down
@@ -0,0 +1,59 @@
package akka.stream.scaladsl

import akka.stream._
import akka.stream.testkit.Utils._
import akka.stream.testkit._
import akka.stream.testkit.scaladsl.TestSink

class GraphWireTapSpec extends StreamSpec {

val settings = ActorMaterializerSettings(system)
.withInputBuffer(initialSize = 2, maxSize = 16)

implicit val materializer = ActorMaterializer(settings)

"A wire tap" must {

"wireTap must broadcast to the tap" in assertAllStagesStopped {
val tp, mp = TestSink.probe[Int](system)
val (tps, mps) = Source(1 to 2).wireTapMat(tp)(Keep.right).toMat(mp)(Keep.both).run()
tps.request(2)
mps.requestNext(1)
mps.requestNext(2)
tps.expectNext(1, 2)
mps.expectComplete()
tps.expectComplete()
}

"wireTap must drop elements while the tap has no demand, buffering up to one element" in assertAllStagesStopped {
val tp, mp = TestSink.probe[Int](system)
val (tps, mps) = Source(1 to 6).wireTapMat(tp)(Keep.right).toMat(mp)(Keep.both).run()
mps.request(3)
mps.expectNext(1, 2, 3)
tps.request(4)
mps.requestNext(4)
mps.requestNext(5)
mps.requestNext(6)
tps.expectNext(3, 4, 5, 6)
mps.expectComplete()
tps.expectComplete()
}

"wireTap must cancel if main sink cancels" in assertAllStagesStopped {
val tp, mp = TestSink.probe[Int](system)
val (tps, mps) = Source(1 to 6).wireTapMat(tp)(Keep.right).toMat(mp)(Keep.both).run()
tps.request(6)
mps.cancel()
tps.expectComplete()
}

"wireTap must continue if tap sink cancels" in assertAllStagesStopped {
val tp, mp = TestSink.probe[Int](system)
val (tps, mps) = Source(1 to 6).wireTapMat(tp)(Keep.right).toMat(mp)(Keep.both).run()
tps.cancel()
mps.request(6)
mps.expectNext(1, 2, 3, 4, 5, 6)
mps.expectComplete()
}
}
}
13 changes: 13 additions & 0 deletions akka-stream/src/main/mima-filters/2.5.x.backwards.excludes
Expand Up @@ -40,3 +40,16 @@ ProblemFilters.exclude[MissingClassProblem]("akka.stream.impl.JavaFlowAndRsConve
ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.stream.scaladsl.FlowOpsMat.divertToMat")
ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.stream.scaladsl.FlowOps.divertTo")
ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.stream.scaladsl.FlowOps.divertToGraph")

# wireTap
ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.stream.scaladsl.FlowOps.wireTap")
ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.stream.scaladsl.FlowOps.wireTapGraph")
ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.stream.scaladsl.FlowOpsMat.wireTapMat")
ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.stream.javadsl.Source.wireTap")
ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.stream.javadsl.SubSource.wireTap")
ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.stream.javadsl.Flow.wireTap")
ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.stream.javadsl.SubFlow.wireTap")
ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.stream.javadsl.Source.wireTapMat")
ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.stream.javadsl.SubSource.wireTapMat")
ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.stream.javadsl.Flow.wireTapMat")
ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.stream.javadsl.SubFlow.wireTapMat")
1 change: 1 addition & 0 deletions akka-stream/src/main/scala/akka/stream/impl/Stages.scala 100644 → 100755
Expand Up @@ -74,6 +74,7 @@ import akka.stream._
val flattenMerge = name("flattenMerge")
val recoverWith = name("recoverWith")
val broadcast = name("broadcast")
val wireTap = name("wireTap")
val balance = name("balance")
val zip = name("zip")
val zipN = name("zipN")
Expand Down
33 changes: 33 additions & 0 deletions akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala 100644 → 100755
Expand Up @@ -1722,6 +1722,39 @@ final class Flow[-In, +Out, +Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends
def divertToMat[M2, M3](that: Graph[SinkShape[Out], M2], when: function.Predicate[Out], matF: function.Function2[Mat, M2, M3]): javadsl.Flow[In, Out, M3] =
new Flow(delegate.divertToMat(that, when.test)(combinerToScala(matF)))

/**
* Attaches the given [[Sink]] to this [[Flow]] as a wire tap, meaning that elements that pass
* through will also be sent to the wire-tap Sink, without the latter affecting the mainline flow.
* If the wire-tap Sink backpressures, elements that would've been sent to it will be dropped instead.
*
* '''Emits when''' element is available and demand exists from the downstream; the element will
* also be sent to the wire-tap Sink if there is demand.
*
* '''Backpressures when''' downstream backpressures
*
* '''Completes when''' upstream completes
*
* '''Cancels when''' downstream cancels
*/

def wireTap(that: Graph[SinkShape[Out], _]): javadsl.Flow[In, Out, Mat] =
new Flow(delegate.wireTap(that))

/**
* Attaches the given [[Sink]] to this [[Flow]] as a wire tap, meaning that elements that pass
* through will also be sent to the wire-tap Sink, without the latter affecting the mainline flow.
* If the wire-tap Sink backpressures, elements that would've been sent to it will be dropped instead.
*
* It is recommended to use the internally optimized `Keep.left` and `Keep.right` combiners
* where appropriate instead of manually writing functions that pass through one of the values.
*
* @see [[#wireTap]]
*/
def wireTapMat[M2, M3](
that: Graph[SinkShape[Out], M2],
matF: function.Function2[Mat, M2, M3]): javadsl.Flow[In, Out, M3] =
new Flow(delegate.wireTapMat(that)(combinerToScala(matF)))

/**
* Interleave is a deterministic merge of the given [[Source]] with elements of this [[Flow]].
* It first emits `segmentSize` number of elements from this flow to downstream, then - same amount for `that` source,
Expand Down
32 changes: 32 additions & 0 deletions akka-stream/src/main/scala/akka/stream/javadsl/Source.scala 100644 → 100755
Expand Up @@ -778,6 +778,38 @@ final class Source[+Out, +Mat](delegate: scaladsl.Source[Out, Mat]) extends Grap
def divertToMat[M2, M3](that: Graph[SinkShape[Out], M2], when: function.Predicate[Out], matF: function.Function2[Mat, M2, M3]): javadsl.Source[Out, M3] =
new Source(delegate.divertToMat(that, when.test)(combinerToScala(matF)))

/**
* Attaches the given [[Sink]] to this [[Flow]] as a wire tap, meaning that elements that pass
* through will also be sent to the wire-tap Sink, without the latter affecting the mainline flow.
* If the wire-tap Sink backpressures, elements that would've been sent to it will be dropped instead.
*
* '''Emits when''' element is available and demand exists from the downstream; the element will
* also be sent to the wire-tap Sink if there is demand.
*
* '''Backpressures when''' downstream backpressures
*
* '''Completes when''' upstream completes
*
* '''Cancels when''' downstream cancels
*/
def wireTap(that: Graph[SinkShape[Out], _]): javadsl.Source[Out, Mat] =
new Source(delegate.wireTap(that))

/**
* Attaches the given [[Sink]] to this [[Flow]] as a wire tap, meaning that elements that pass
* through will also be sent to the wire-tap Sink, without the latter affecting the mainline flow.
* If the wire-tap Sink backpressures, elements that would've been sent to it will be dropped instead.
*
* It is recommended to use the internally optimized `Keep.left` and `Keep.right` combiners
* where appropriate instead of manually writing functions that pass through one of the values.
*
* @see [[#wireTap]]
*/
def wireTapMat[M2, M3](
that: Graph[SinkShape[Out], M2],
matF: function.Function2[Mat, M2, M3]): javadsl.Source[Out, M3] =
new Source(delegate.wireTapMat(that)(combinerToScala(matF)))

/**
* Interleave is a deterministic merge of the given [[Source]] with elements of this [[Source]].
* It first emits `segmentSize` number of elements from this flow to downstream, then - same amount for `that` source,
Expand Down
17 changes: 17 additions & 0 deletions akka-stream/src/main/scala/akka/stream/javadsl/SubFlow.scala 100644 → 100755
Expand Up @@ -1165,6 +1165,23 @@ class SubFlow[-In, +Out, +Mat](delegate: scaladsl.SubFlow[Out, Mat, scaladsl.Flo
def divertTo(that: Graph[SinkShape[Out], _], when: function.Predicate[Out]): SubFlow[In, Out, Mat] =
new SubFlow(delegate.divertTo(that, when.test))

/**
* Attaches the given [[Sink]] to this [[Flow]] as a wire tap, meaning that elements that pass
* through will also be sent to the wire-tap Sink, without the latter affecting the mainline flow.
* If the wire-tap Sink backpressures, elements that would've been sent to it will be dropped instead.
*
* '''Emits when''' element is available and demand exists from the downstream; the element will
* also be sent to the wire-tap Sink if there is demand.
*
* '''Backpressures when''' downstream backpressures
*
* '''Completes when''' upstream completes
*
* '''Cancels when''' downstream cancels
*/
def wireTap(that: Graph[SinkShape[Out], _]): SubFlow[In, Out, Mat] =
new SubFlow(delegate.wireTap(that))

/**
* Merge the given [[Source]] to this [[Flow]], taking elements as they arrive from input streams,
* picking randomly when several elements ready.
Expand Down
17 changes: 17 additions & 0 deletions akka-stream/src/main/scala/akka/stream/javadsl/SubSource.scala 100644 → 100755
Expand Up @@ -1157,6 +1157,23 @@ class SubSource[+Out, +Mat](delegate: scaladsl.SubFlow[Out, Mat, scaladsl.Source
def divertTo(that: Graph[SinkShape[Out], _], when: function.Predicate[Out]): SubSource[Out, Mat] =
new SubSource(delegate.divertTo(that, when.test))

/**
* Attaches the given [[Sink]] to this [[Flow]] as a wire tap, meaning that elements that pass
* through will also be sent to the wire-tap Sink, without the latter affecting the mainline flow.
* If the wire-tap Sink backpressures, elements that would've been sent to it will be dropped instead.
*
* '''Emits when''' element is available and demand exists from the downstream; the element will
* also be sent to the wire-tap Sink if there is demand.
*
* '''Backpressures when''' downstream backpressures
*
* '''Completes when''' upstream completes
*
* '''Cancels when''' downstream cancels
*/
def wireTap(that: Graph[SinkShape[Out], _]): SubSource[Out, Mat] =
new SubSource(delegate.wireTap(that))

/**
* Merge the given [[Source]] to this [[Flow]], taking elements as they arrive from input streams,
* picking randomly when several elements ready.
Expand Down
43 changes: 40 additions & 3 deletions akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala 100644 → 100755
Expand Up @@ -2277,7 +2277,7 @@ trait FlowOps[+Out, +Mat] {
def to[Mat2](sink: Graph[SinkShape[Out], Mat2]): Closed

/**
* Attaches the given [[Sink]] to this [[Flow]], meaning that elements that passes
* Attaches the given [[Sink]] to this [[Flow]], meaning that elements that pass
* through will also be sent to the [[Sink]].
*
* '''Emits when''' element is available and demand exists both from the Sink and the downstream.
Expand All @@ -2286,7 +2286,7 @@ trait FlowOps[+Out, +Mat] {
*
* '''Completes when''' upstream completes
*
* '''Cancels when''' downstream cancels
* '''Cancels when''' downstream and Sink cancel
*/
def alsoTo(that: Graph[SinkShape[Out], _]): Repr[Out] = via(alsoToGraph(that))

Expand Down Expand Up @@ -2320,6 +2320,30 @@ trait FlowOps[+Out, +Mat] {
FlowShape(partition.in, partition.out(0))
}

/**
* Attaches the given [[Sink]] to this [[Flow]] as a wire tap, meaning that elements that pass
* through will also be sent to the wire-tap Sink, without the latter affecting the mainline flow.
* If the wire-tap Sink backpressures, elements that would've been sent to it will be dropped instead.
*
* '''Emits when''' element is available and demand exists from the downstream; the element will
* also be sent to the wire-tap Sink if there is demand.
*
* '''Backpressures when''' downstream backpressures
*
* '''Completes when''' upstream completes
*
* '''Cancels when''' downstream cancels
*/
def wireTap(that: Graph[SinkShape[Out], _]): Repr[Out] = via(wireTapGraph(that))

protected def wireTapGraph[M](that: Graph[SinkShape[Out], M]): Graph[FlowShape[Out @uncheckedVariance, Out], M] =
GraphDSL.create(that) { implicit b r
import GraphDSL.Implicits._
val bcast = b.add(WireTap[Out]())
bcast.out1 ~> r
FlowShape(bcast.in, bcast.out0)
}

def withAttributes(attr: Attributes): Repr[Out]

def addAttributes(attr: Attributes): Repr[Out]
Expand Down Expand Up @@ -2549,7 +2573,7 @@ trait FlowOpsMat[+Out, +Mat] extends FlowOps[Out, Mat] {
viaMat(orElseGraph(secondary))(matF)

/**
* Attaches the given [[Sink]] to this [[Flow]], meaning that elements that passes
* Attaches the given [[Sink]] to this [[Flow]], meaning that elements that pass
* through will also be sent to the [[Sink]].
*
* @see [[#alsoTo]]
Expand All @@ -2572,6 +2596,19 @@ trait FlowOpsMat[+Out, +Mat] extends FlowOps[Out, Mat] {
def divertToMat[Mat2, Mat3](that: Graph[SinkShape[Out], Mat2], when: Out Boolean)(matF: (Mat, Mat2) Mat3): ReprMat[Out, Mat3] =
viaMat(divertToGraph(that, when))(matF)

/**
* Attaches the given [[Sink]] to this [[Flow]] as a wire tap, meaning that elements that pass
* through will also be sent to the wire-tap Sink, without the latter affecting the mainline flow.
* If the wire-tap Sink backpressures, elements that would've been sent to it will be dropped instead.
*
* @see [[#wireTap]]
*
* It is recommended to use the internally optimized `Keep.left` and `Keep.right` combiners
* where appropriate instead of manually writing functions that pass through one of the values.
*/
def wireTapMat[Mat2, Mat3](that: Graph[SinkShape[Out], Mat2])(matF: (Mat, Mat2) Mat3): ReprMat[Out, Mat3] =
viaMat(wireTapGraph(that))(matF)

/**
* Materializes to `Future[Done]` that completes on getting termination message.
* The Future completes with success when received complete message from upstream or cancel
Expand Down

0 comments on commit ccf5d46

Please sign in to comment.