Skip to content

Commit

Permalink
+str Add Flow.lazyInit (akka#24427)
Browse files Browse the repository at this point in the history
  • Loading branch information
gosubpl committed Feb 20, 2018
1 parent 6012b93 commit 6f42215
Show file tree
Hide file tree
Showing 6 changed files with 426 additions and 8 deletions.
33 changes: 27 additions & 6 deletions akka-stream-tests/src/test/java/akka/stream/javadsl/FlowTest.java
Expand Up @@ -10,6 +10,7 @@
import akka.japi.Pair;
import akka.japi.function.*;
import akka.stream.*;
import akka.stream.impl.Timers;
import akka.util.ConstantFun;
import akka.stream.javadsl.GraphDSL.Builder;
import akka.stream.stage.*;
Expand Down Expand Up @@ -192,13 +193,13 @@ public void mustBeAbleToUseVia() {

public final Inlet<Integer> in = Inlet.create("in");
public final Outlet<Integer> out = Outlet.create("out");

@Override
public GraphStageLogic createLogic(Attributes inheritedAttributes) throws Exception {
return new GraphStageLogic(shape()) {
int sum = 0;
int count = 0;

{
setHandler(in, new AbstractInHandler() {
@Override
Expand All @@ -211,7 +212,7 @@ public void onPush() throws Exception {
} else {
emitMultiple(out, Arrays.asList(element, element).iterator());
}

}
});
setHandler(out, new AbstractOutHandler() {
Expand All @@ -223,14 +224,14 @@ public void onPull() throws Exception {
}
};
}

@Override
public FlowShape<Integer, Integer> shape() {
return FlowShape.of(in, out);
}
}
);
Source.from(input).via(flow).runForeach((Procedure<Integer>) elem ->
Source.from(input).via(flow).runForeach((Procedure<Integer>) elem ->
probe.getRef().tell(elem, ActorRef.noSender()), materializer);

probe.expectMsgEquals(0);
Expand Down Expand Up @@ -315,7 +316,7 @@ public <T> GraphStage<FlowShape<T, T>> op() {
return new GraphStage<FlowShape<T, T>>() {
public final Inlet<T> in = Inlet.create("in");
public final Outlet<T> out = Outlet.create("out");

@Override
public GraphStageLogic createLogic(Attributes inheritedAttributes) throws Exception {
return new GraphStageLogic(shape()) {
Expand Down Expand Up @@ -950,4 +951,24 @@ public void mustBeAbleToUseDivertTo() {
final Flow<Integer, Integer, NotUsed> f = Flow.of(Integer.class).divertTo(Sink.ignore(), e -> true);
final Flow<Integer, Integer, String> f2 = Flow.of(Integer.class).divertToMat(Sink.ignore(), e -> true, (i, n) -> "foo");
}

@Test
public void mustBeAbleToUseLazyInit() throws Exception {
final CompletionStage<Flow<Integer, Integer, NotUsed>> future = new CompletableFuture<Flow<Integer, Integer, NotUsed>>();
future.toCompletableFuture().complete(Flow.fromFunction((id) -> id));
Creator<NotUsed> ignoreFunction = new Creator<NotUsed>() {
@Override
public NotUsed create() throws Exception {
return NotUsed.getInstance();
}
};

Integer result =
Source.range(1, 10)
.via(Flow.lazyInit((i) -> future, ignoreFunction))
.runWith(Sink.<Integer>head(), materializer)
.toCompletableFuture().get(3, TimeUnit.SECONDS);

assertEquals((Object) 1, result);
}
}
@@ -0,0 +1,184 @@
/**
* Copyright (C) 2018-2018 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.stream.scaladsl

import java.util.concurrent.TimeoutException

import akka.NotUsed
import akka.stream.ActorAttributes.supervisionStrategy
import akka.stream.Supervision._
import akka.stream._
import akka.stream.impl.fusing.LazyFlow
import akka.stream.stage.{ GraphStage, GraphStageLogic, GraphStageWithMaterializedValue }
import akka.stream.testkit.{ StreamSpec, TestPublisher }
import akka.stream.testkit.TestSubscriber.Probe
import akka.stream.testkit.Utils._
import akka.stream.testkit.scaladsl.TestSink

import scala.concurrent.{ Await, Future, Promise }
import scala.concurrent.duration._

class LazyFlowSpec extends StreamSpec {

val settings = ActorMaterializerSettings(system)
.withInputBuffer(initialSize = 1, maxSize = 1)
implicit val materializer = ActorMaterializer(settings)

val fallback = () NotUsed
val ex = TE("")

"A LazyFlow" must {
def mapF(e: Int): Future[Flow[Int, String, NotUsed]] =
Future.successful(Flow.fromFunction[Int, String](i (i * e).toString))
val flowF = Future.successful(Flow.fromFunction[Int, Int](id id))
"work in happy case" in assertAllStagesStopped {
val probe = Source(2 to 10)
.via(Flow.lazyInit[Int, String, NotUsed](mapF, fallback))
.runWith(TestSink.probe[String])
probe.request(100)
(2 to 10).map(i (i * 2).toString).foreach(probe.expectNext)
}

"work with slow flow init" in assertAllStagesStopped {
val p = Promise[Flow[Int, Int, NotUsed]]()
val sourceProbe = TestPublisher.manualProbe[Int]()
val flowProbe = Source.fromPublisher(sourceProbe)
.via(Flow.lazyInit[Int, Int, NotUsed](_ p.future, fallback))
.runWith(TestSink.probe[Int])

val sourceSub = sourceProbe.expectSubscription()
flowProbe.request(1)
sourceSub.expectRequest(1)
sourceSub.sendNext(0)
sourceSub.expectRequest(1)
sourceProbe.expectNoMsg(200.millis)

p.success(Flow.fromFunction[Int, Int](id id))
flowProbe.request(99)
flowProbe.expectNext(0)
(1 to 10).foreach(i {
sourceSub.sendNext(i)
flowProbe.expectNext(i)
})
sourceSub.sendComplete()
}

"complete when there was no elements in the stream" in assertAllStagesStopped {
def flowMaker(i: Int) = flowF
val probe = Source.empty
.via(Flow.lazyInit(flowMaker, () 0))
.runWith(TestSink.probe[Int])
probe.request(1).expectComplete()
}

"complete normally when upstream is completed" in assertAllStagesStopped {
val probe = Source.single(1)
.via(Flow.lazyInit[Int, Int, NotUsed](_ flowF, fallback))
.runWith(TestSink.probe[Int])
probe.request(1)
.expectNext(1)
.expectComplete()
}

"fail gracefully when flow factory method failed" in assertAllStagesStopped {
val sourceProbe = TestPublisher.manualProbe[Int]()
val probe = Source.fromPublisher(sourceProbe)
.via(Flow.lazyInit[Int, Int, NotUsed](_ throw ex, fallback))
.runWith(TestSink.probe[Int])

val sourceSub = sourceProbe.expectSubscription()
probe.request(1)
sourceSub.expectRequest(1)
sourceSub.sendNext(0)
sourceSub.expectCancellation()
probe.expectError(ex)
}

"fail gracefully when upstream failed" in assertAllStagesStopped {
val sourceProbe = TestPublisher.manualProbe[Int]()
val probe = Source.fromPublisher(sourceProbe)
.via(Flow.lazyInit[Int, Int, NotUsed](_ flowF, fallback))
.runWith(TestSink.probe[Int])

val sourceSub = sourceProbe.expectSubscription()
sourceSub.expectRequest(1)
sourceSub.sendNext(0)
probe.request(1)
.expectNext(0)
sourceSub.sendError(ex)
probe.expectError(ex)
}

"fail gracefully when factory future failed" in assertAllStagesStopped {
val sourceProbe = TestPublisher.manualProbe[Int]()
val flowProbe = Source.fromPublisher(sourceProbe)
.via(Flow.lazyInit[Int, Int, NotUsed](_ Future.failed(ex), fallback))
.withAttributes(supervisionStrategy(stoppingDecider))
.runWith(TestSink.probe[Int])

val sourceSub = sourceProbe.expectSubscription()
sourceSub.expectRequest(1)
sourceSub.sendNext(0)
flowProbe.request(1).expectError(ex)
}

"cancel upstream when the downstream is cancelled" in assertAllStagesStopped {
val sourceProbe = TestPublisher.manualProbe[Int]()
val probe = Source.fromPublisher(sourceProbe)
.via(Flow.lazyInit[Int, Int, NotUsed](_ flowF, fallback))
.withAttributes(supervisionStrategy(stoppingDecider))
.runWith(TestSink.probe[Int])

val sourceSub = sourceProbe.expectSubscription()
probe.request(1)
sourceSub.expectRequest(1)
sourceSub.sendNext(0)
sourceSub.expectRequest(1)
probe.expectNext(0)
probe.cancel()
sourceSub.expectCancellation()
}

"continue if supervision is resume" in assertAllStagesStopped {
val sourceProbe = TestPublisher.manualProbe[Int]()
def flowBuilder(a: Int) = if (a == 0) throw ex else Future.successful(Flow.fromFunction[Int, Int](id id))
val probe = Source.fromPublisher(sourceProbe)
.via(Flow.lazyInit[Int, Int, NotUsed](flowBuilder, fallback))
.withAttributes(supervisionStrategy(resumingDecider))
.runWith(TestSink.probe[Int])

val sourceSub = sourceProbe.expectSubscription()
probe.request(1)
sourceSub.expectRequest(1)
sourceSub.sendNext(0)
sourceSub.expectRequest(1)
sourceSub.sendNext(1)
probe.expectNext(1)
probe.cancel()
}

"fail correctly when materialization of inner sink fails" in assertAllStagesStopped {
val matFail = TE("fail!")
object FailingInnerMat extends GraphStageWithMaterializedValue[FlowShape[String, String], Option[String]] {
val in = Inlet[String]("in")
val out = Outlet[String]("out")
val shape = FlowShape(in, out)
override def createLogicAndMaterializedValue(inheritedAttributes: Attributes) =
(new GraphStageLogic(shape) {
throw matFail
}, Some("fine"))
}

val result = Source.single("whatever")
.viaMat(Flow.lazyInit(
_ Future.successful(Flow.fromGraph(FailingInnerMat)),
() Some("boom")))(Keep.right)
.toMat(Sink.ignore)(Keep.left)
.run()

result should ===(Some("boom"))
}
}

}
1 change: 1 addition & 0 deletions akka-stream/src/main/scala/akka/stream/impl/Stages.scala
Expand Up @@ -129,6 +129,7 @@ import akka.stream._
val actorSubscriberSink = name("actorSubscriberSink")
val queueSink = name("queueSink")
val lazySink = name("lazySink")
val lazyFlow = name("lazyFlow")
val lazySource = name("lazySource")
val outputStreamSink = name("outputStreamSink") and IODispatcher
val inputStreamSink = name("inputStreamSink") and IODispatcher
Expand Down

0 comments on commit 6f42215

Please sign in to comment.