Skip to content

Commit

Permalink
Merge pull request #148 from TomasMikula/partitionWith
Browse files Browse the repository at this point in the history
Additional PartitionWith test
  • Loading branch information
2m committed Feb 26, 2019
2 parents f195033 + 14df4bb commit 9fcb381
Showing 1 changed file with 31 additions and 1 deletion.
Expand Up @@ -5,7 +5,7 @@
package akka.stream.contrib

import akka.stream.scaladsl._
import akka.stream.{ FlowShape }
import akka.stream.{ ClosedShape, FlowShape }
import akka.stream.testkit.scaladsl.{ TestSink, TestSource }

class PartitionWithSpecAutoFusingOn extends { val autoFusing = true } with PartitionWithSpec
Expand Down Expand Up @@ -63,5 +63,35 @@ trait PartitionWithSpec extends BaseStreamSpec {
source.sendError(new Exception)
sink.expectError()
}

"allow flow of values of one partition even when the other outlet was not pulled" in {
val source = TestSource.probe[Int]
val sink0 = TestSink.probe[Int]
val sink1 = TestSink.probe[Int]

val graph = GraphDSL.create(source, sink0, sink1)(Tuple3.apply) { implicit b => (src, snk0, snk1) =>
import GraphDSL.Implicits._

val pw = b.add(PartitionWith[Int, Int, Int] {
case i if i % 2 == 0 => Left(i)
case i => Right(i)
})

src.out ~> pw.in
pw.out0 ~> snk0.in
pw.out1 ~> snk1.in

ClosedShape
}
val (pub, sub1, sub2) = RunnableGraph.fromGraph(graph).run()

sub1.request(10)
(1 to 10).foreach(i => pub.sendNext(2 * i))
sub1.expectNext(2, 4, 6, 8, 10, 12, 14, 16, 18, 20)

sub2.request(10)
(1 to 10).foreach(i => pub.sendNext(2 * i + 1))
sub2.expectNext(3, 5, 7, 9, 11, 13, 15, 17, 19, 21)
}
}
}

0 comments on commit 9fcb381

Please sign in to comment.