diff --git a/contrib/src/main/scala/akka/stream/contrib/PartitionWith.scala b/contrib/src/main/scala/akka/stream/contrib/PartitionWith.scala index 5e72bc0..b5844a5 100644 --- a/contrib/src/main/scala/akka/stream/contrib/PartitionWith.scala +++ b/contrib/src/main/scala/akka/stream/contrib/PartitionWith.scala @@ -43,12 +43,11 @@ object PartitionWith { * Partition the output of the decorated flow according to the given partition function. */ def partitionWith[Out0, Out1](p: Out => Either[Out0, Out1]): Graph[FanOutShape2[In, Out0, Out1], M] = { - GraphDSL.create(flowGraph, PartitionWith(p))(Keep.left) { implicit builder => - (flow, fanOut) => { - import GraphDSL.Implicits._ - flow.out ~> fanOut.in - new FanOutShape2(flow.in, fanOut.out0, fanOut.out1) - } + GraphDSL.create(flowGraph, PartitionWith(p))(Keep.left) { implicit builder => (flow, fanOut) => { + import GraphDSL.Implicits._ + flow.out ~> fanOut.in + new FanOutShape2(flow.in, fanOut.out0, fanOut.out1) + } } } } diff --git a/contrib/src/test/scala/akka/stream/contrib/PartitionWithSpec.scala b/contrib/src/test/scala/akka/stream/contrib/PartitionWithSpec.scala index bdc4a7d..7660aff 100644 --- a/contrib/src/test/scala/akka/stream/contrib/PartitionWithSpec.scala +++ b/contrib/src/test/scala/akka/stream/contrib/PartitionWithSpec.scala @@ -14,17 +14,16 @@ class PartitionWithSpecAutoFusingOff extends { val autoFusing = false } with Par trait PartitionWithSpec extends BaseStreamSpec { private def mergeFanOut[I, O, M](fanOutGraph: Graph[FanOutShape2[I, O, O], M]): Flow[I, O, M] = { - Flow.fromGraph(GraphDSL.create(fanOutGraph){ implicit builder => - fanOut => { - import GraphDSL.Implicits._ + Flow.fromGraph(GraphDSL.create(fanOutGraph) { implicit builder => fanOut => { + import GraphDSL.Implicits._ - val mrg = builder.add(Merge[O](2)) + val mrg = builder.add(Merge[O](2)) - fanOut.out0 ~> mrg.in(0) - fanOut.out1 ~> mrg.in(1) + fanOut.out0 ~> mrg.in(0) + fanOut.out1 ~> mrg.in(1) - FlowShape(fanOut.in, mrg.out) - } + FlowShape(fanOut.in, mrg.out) + } }) } @@ -103,8 +102,7 @@ trait PartitionWithSpec extends BaseStreamSpec { import PartitionWith.Implicits._ val flow = mergeFanOut( - Flow[Int].partitionWith(i => if (i % 2 == 0) Left(-i) else Right(i)) - ) + Flow[Int].partitionWith(i => if (i % 2 == 0) Left(-i) else Right(i))) val (source, sink) = TestSource.probe[Int] .via(flow)