Skip to content
Permalink
Browse files

Fix formatting.

  • Loading branch information...
TomasMikula committed Feb 28, 2019
1 parent a5baa1e commit 608fdb7220e1891cb06a606d132c8dddeae9401b
@@ -43,12 +43,11 @@ object PartitionWith {
* Partition the output of the decorated flow according to the given partition function.
*/
def partitionWith[Out0, Out1](p: Out => Either[Out0, Out1]): Graph[FanOutShape2[In, Out0, Out1], M] = {
GraphDSL.create(flowGraph, PartitionWith(p))(Keep.left) { implicit builder =>
(flow, fanOut) => {
import GraphDSL.Implicits._
flow.out ~> fanOut.in
new FanOutShape2(flow.in, fanOut.out0, fanOut.out1)
}
GraphDSL.create(flowGraph, PartitionWith(p))(Keep.left) { implicit builder => (flow, fanOut) => {
import GraphDSL.Implicits._
flow.out ~> fanOut.in
new FanOutShape2(flow.in, fanOut.out0, fanOut.out1)
}
}
}
}
@@ -14,17 +14,16 @@ class PartitionWithSpecAutoFusingOff extends { val autoFusing = false } with Par
trait PartitionWithSpec extends BaseStreamSpec {

private def mergeFanOut[I, O, M](fanOutGraph: Graph[FanOutShape2[I, O, O], M]): Flow[I, O, M] = {
Flow.fromGraph(GraphDSL.create(fanOutGraph){ implicit builder =>
fanOut => {
import GraphDSL.Implicits._
Flow.fromGraph(GraphDSL.create(fanOutGraph) { implicit builder => fanOut => {
import GraphDSL.Implicits._

val mrg = builder.add(Merge[O](2))
val mrg = builder.add(Merge[O](2))

fanOut.out0 ~> mrg.in(0)
fanOut.out1 ~> mrg.in(1)
fanOut.out0 ~> mrg.in(0)
fanOut.out1 ~> mrg.in(1)

FlowShape(fanOut.in, mrg.out)
}
FlowShape(fanOut.in, mrg.out)
}
})
}

@@ -103,8 +102,7 @@ trait PartitionWithSpec extends BaseStreamSpec {
import PartitionWith.Implicits._

val flow = mergeFanOut(
Flow[Int].partitionWith(i => if (i % 2 == 0) Left(-i) else Right(i))
)
Flow[Int].partitionWith(i => if (i % 2 == 0) Left(-i) else Right(i)))

val (source, sink) = TestSource.probe[Int]
.via(flow)

0 comments on commit 608fdb7

Please sign in to comment.
You can’t perform that action at this time.