Skip to content
Permalink
Browse files

Merge pull request #149 from TomasMikula/partitionWith

Add partitionWith extension method to Flow-shaped graphs.
  • Loading branch information...
2m committed Mar 1, 2019
2 parents 9fcb381 + acc1602 commit 8285f74e1d242ff017c8ede093877161239598e6
@@ -5,7 +5,8 @@
package akka.stream.contrib

import akka.japi.function
import akka.stream.{ Attributes, FanOutShape2 }
import akka.stream.scaladsl.{ GraphDSL, Keep }
import akka.stream.{ Attributes, FanOutShape2, FlowShape, Graph }
import akka.stream.stage.{ GraphStage, GraphStageLogic, InHandler, OutHandler }

/**
@@ -34,6 +35,23 @@ object PartitionWith {
* @return [[PartitionWith]] instance
*/
def create[In, Out0, Out1](p: function.Function[In, Either[Out0, Out1]]): PartitionWith[In, Out0, Out1] = new PartitionWith(p.apply)

object Implicits {
implicit final class FlowGraphOps[In, Out, M](val flowGraph: Graph[FlowShape[In, Out], M]) extends AnyVal {

/**
* Partition the output of the decorated flow according to the given partition function.
*/
def partitionWith[Out0, Out1](p: Out => Either[Out0, Out1]): Graph[FanOutShape2[In, Out0, Out1], M] = {
GraphDSL.create(flowGraph, PartitionWith(p))(Keep.left) { implicit builder => (flow, fanOut) => {
import GraphDSL.Implicits._
flow.out ~> fanOut.in
new FanOutShape2(flow.in, fanOut.out0, fanOut.out1)
}
}
}
}
}
}

/**
@@ -4,30 +4,48 @@

package akka.stream.contrib

import akka.NotUsed
import akka.stream.scaladsl._
import akka.stream.{ ClosedShape, FlowShape }
import akka.stream.{ ClosedShape, FanInShape2, FanOutShape2, FlowShape, Graph }
import akka.stream.testkit.scaladsl.{ TestSink, TestSource }

class PartitionWithSpecAutoFusingOn extends { val autoFusing = true } with PartitionWithSpec
class PartitionWithSpecAutoFusingOff extends { val autoFusing = false } with PartitionWithSpec

trait PartitionWithSpec extends BaseStreamSpec {

val flow = Flow.fromGraph(GraphDSL.create() { implicit b =>
private def fanOutAndIn[I, X, Y, O, M](
fanOutGraph: Graph[FanOutShape2[I, X, Y], M],
fanInGraph: Graph[FanInShape2[X, Y, O], NotUsed]): Flow[I, O, M] = {
Flow.fromGraph(GraphDSL.create(fanOutGraph, fanInGraph)(Keep.left) { implicit builder => (fanOut, fanIn) =>
import GraphDSL.Implicits._

import GraphDSL.Implicits._
fanOut.out0 ~> fanIn.in0
fanOut.out1 ~> fanIn.in1

val pw = b.add(PartitionWith[Int, Int, Int] {
case i if i % 2 == 0 => Left(i / 2)
case i => Right(i * 3 - 1)
FlowShape(fanOut.in, fanIn.out)
})
}

private def zipFanOut[I, O1, O2, M](fanOutGraph: Graph[FanOutShape2[I, O1, O2], M]): Flow[I, (O1, O2), M] =
fanOutAndIn(fanOutGraph, Zip[O1, O2])

private def mergeFanOut[I, O, M](fanOutGraph: Graph[FanOutShape2[I, O, O], M]): Flow[I, O, M] = {
Flow.fromGraph(GraphDSL.create(fanOutGraph) { implicit builder => fanOut =>
import GraphDSL.Implicits._

val mrg = builder.add(Merge[O](2))

val mrg = b.add(Merge[Int](2))
fanOut.out0 ~> mrg.in(0)
fanOut.out1 ~> mrg.in(1)

pw.out0 ~> mrg.in(0)
pw.out1 ~> mrg.in(1)
FlowShape(fanOut.in, mrg.out)
})
}

FlowShape(pw.in, mrg.out)
val flow = mergeFanOut(PartitionWith[Int, Int, Int] {
case i if i % 2 == 0 => Left(i / 2)
case i => Right(i * 3 - 1)
})

"PartitionWith" should {
@@ -94,4 +112,22 @@ trait PartitionWithSpec extends BaseStreamSpec {
sub2.expectNext(3, 5, 7, 9, 11, 13, 15, 17, 19, 21)
}
}

"partitionWith extension method" should {
"be callable on Flow and partition its output" in {
import PartitionWith.Implicits._

val flow = zipFanOut(
Flow[Int].partitionWith(i => if (i % 2 == 0) Left(-i) else Right(i)))

val (source, sink) = TestSource.probe[Int]
.via(flow)
.toMat(TestSink.probe)(Keep.both)
.run()

sink.request(5)
(1 to 10).foreach(source.sendNext)
sink.expectNextN(List((-2, 1), (-4, 3), (-6, 5), (-8, 7), (-10, 9)))
}
}
}

0 comments on commit 8285f74

Please sign in to comment.
You can’t perform that action at this time.