From 14df4bb6b2422d945f86808a43b233a946c36ccd Mon Sep 17 00:00:00 2001 From: Tomas Mikula Date: Mon, 25 Feb 2019 21:45:58 +0100 Subject: [PATCH] Additional PartitionWith test: Test that values belonging to one partition can flow without the other outlet being pulled. --- .../stream/contrib/PartitionWithSpec.scala | 32 ++++++++++++++++++- 1 file changed, 31 insertions(+), 1 deletion(-) diff --git a/contrib/src/test/scala/akka/stream/contrib/PartitionWithSpec.scala b/contrib/src/test/scala/akka/stream/contrib/PartitionWithSpec.scala index d609817..7ea7e47 100644 --- a/contrib/src/test/scala/akka/stream/contrib/PartitionWithSpec.scala +++ b/contrib/src/test/scala/akka/stream/contrib/PartitionWithSpec.scala @@ -5,7 +5,7 @@ package akka.stream.contrib import akka.stream.scaladsl._ -import akka.stream.{ FlowShape } +import akka.stream.{ ClosedShape, FlowShape } import akka.stream.testkit.scaladsl.{ TestSink, TestSource } class PartitionWithSpecAutoFusingOn extends { val autoFusing = true } with PartitionWithSpec @@ -63,5 +63,35 @@ trait PartitionWithSpec extends BaseStreamSpec { source.sendError(new Exception) sink.expectError() } + + "allow flow of values of one partition even when the other outlet was not pulled" in { + val source = TestSource.probe[Int] + val sink0 = TestSink.probe[Int] + val sink1 = TestSink.probe[Int] + + val graph = GraphDSL.create(source, sink0, sink1)(Tuple3.apply) { implicit b => (src, snk0, snk1) => + import GraphDSL.Implicits._ + + val pw = b.add(PartitionWith[Int, Int, Int] { + case i if i % 2 == 0 => Left(i) + case i => Right(i) + }) + + src.out ~> pw.in + pw.out0 ~> snk0.in + pw.out1 ~> snk1.in + + ClosedShape + } + val (pub, sub1, sub2) = RunnableGraph.fromGraph(graph).run() + + sub1.request(10) + (1 to 10).foreach(i => pub.sendNext(2 * i)) + sub1.expectNext(2, 4, 6, 8, 10, 12, 14, 16, 18, 20) + + sub2.request(10) + (1 to 10).foreach(i => pub.sendNext(2 * i + 1)) + sub2.expectNext(3, 5, 7, 9, 11, 13, 15, 17, 19, 21) + } } }