From 6f42215e5e593fc10939a108c7a04588f8ba0054 Mon Sep 17 00:00:00 2001 From: gosubpl Date: Sun, 11 Feb 2018 17:01:54 +0100 Subject: [PATCH] +str Add Flow.lazyInit (#24427) --- .../java/akka/stream/javadsl/FlowTest.java | 33 +++- .../akka/stream/scaladsl/LazyFlowSpec.scala | 184 ++++++++++++++++++ .../main/scala/akka/stream/impl/Stages.scala | 1 + .../scala/akka/stream/impl/fusing/Ops.scala | 154 ++++++++++++++- .../main/scala/akka/stream/javadsl/Flow.scala | 34 ++++ .../scala/akka/stream/scaladsl/Flow.scala | 28 +++ 6 files changed, 426 insertions(+), 8 deletions(-) create mode 100644 akka-stream-tests/src/test/scala/akka/stream/scaladsl/LazyFlowSpec.scala diff --git a/akka-stream-tests/src/test/java/akka/stream/javadsl/FlowTest.java b/akka-stream-tests/src/test/java/akka/stream/javadsl/FlowTest.java index ba9a2c28b34a..d79abc5bb1e4 100644 --- a/akka-stream-tests/src/test/java/akka/stream/javadsl/FlowTest.java +++ b/akka-stream-tests/src/test/java/akka/stream/javadsl/FlowTest.java @@ -10,6 +10,7 @@ import akka.japi.Pair; import akka.japi.function.*; import akka.stream.*; +import akka.stream.impl.Timers; import akka.util.ConstantFun; import akka.stream.javadsl.GraphDSL.Builder; import akka.stream.stage.*; @@ -192,13 +193,13 @@ public void mustBeAbleToUseVia() { public final Inlet in = Inlet.create("in"); public final Outlet out = Outlet.create("out"); - + @Override public GraphStageLogic createLogic(Attributes inheritedAttributes) throws Exception { return new GraphStageLogic(shape()) { int sum = 0; int count = 0; - + { setHandler(in, new AbstractInHandler() { @Override @@ -211,7 +212,7 @@ public void onPush() throws Exception { } else { emitMultiple(out, Arrays.asList(element, element).iterator()); } - + } }); setHandler(out, new AbstractOutHandler() { @@ -223,14 +224,14 @@ public void onPull() throws Exception { } }; } - + @Override public FlowShape shape() { return FlowShape.of(in, out); } } ); - Source.from(input).via(flow).runForeach((Procedure) elem -> + Source.from(input).via(flow).runForeach((Procedure) elem -> probe.getRef().tell(elem, ActorRef.noSender()), materializer); probe.expectMsgEquals(0); @@ -315,7 +316,7 @@ public GraphStage> op() { return new GraphStage>() { public final Inlet in = Inlet.create("in"); public final Outlet out = Outlet.create("out"); - + @Override public GraphStageLogic createLogic(Attributes inheritedAttributes) throws Exception { return new GraphStageLogic(shape()) { @@ -950,4 +951,24 @@ public void mustBeAbleToUseDivertTo() { final Flow f = Flow.of(Integer.class).divertTo(Sink.ignore(), e -> true); final Flow f2 = Flow.of(Integer.class).divertToMat(Sink.ignore(), e -> true, (i, n) -> "foo"); } + + @Test + public void mustBeAbleToUseLazyInit() throws Exception { + final CompletionStage> future = new CompletableFuture>(); + future.toCompletableFuture().complete(Flow.fromFunction((id) -> id)); + Creator ignoreFunction = new Creator() { + @Override + public NotUsed create() throws Exception { + return NotUsed.getInstance(); + } + }; + + Integer result = + Source.range(1, 10) + .via(Flow.lazyInit((i) -> future, ignoreFunction)) + .runWith(Sink.head(), materializer) + .toCompletableFuture().get(3, TimeUnit.SECONDS); + + assertEquals((Object) 1, result); + } } diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/LazyFlowSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/LazyFlowSpec.scala new file mode 100644 index 000000000000..f21cd6323634 --- /dev/null +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/LazyFlowSpec.scala @@ -0,0 +1,184 @@ +/** + * Copyright (C) 2018-2018 Lightbend Inc. + */ +package akka.stream.scaladsl + +import java.util.concurrent.TimeoutException + +import akka.NotUsed +import akka.stream.ActorAttributes.supervisionStrategy +import akka.stream.Supervision._ +import akka.stream._ +import akka.stream.impl.fusing.LazyFlow +import akka.stream.stage.{ GraphStage, GraphStageLogic, GraphStageWithMaterializedValue } +import akka.stream.testkit.{ StreamSpec, TestPublisher } +import akka.stream.testkit.TestSubscriber.Probe +import akka.stream.testkit.Utils._ +import akka.stream.testkit.scaladsl.TestSink + +import scala.concurrent.{ Await, Future, Promise } +import scala.concurrent.duration._ + +class LazyFlowSpec extends StreamSpec { + + val settings = ActorMaterializerSettings(system) + .withInputBuffer(initialSize = 1, maxSize = 1) + implicit val materializer = ActorMaterializer(settings) + + val fallback = () ⇒ NotUsed + val ex = TE("") + + "A LazyFlow" must { + def mapF(e: Int): Future[Flow[Int, String, NotUsed]] = + Future.successful(Flow.fromFunction[Int, String](i ⇒ (i * e).toString)) + val flowF = Future.successful(Flow.fromFunction[Int, Int](id ⇒ id)) + "work in happy case" in assertAllStagesStopped { + val probe = Source(2 to 10) + .via(Flow.lazyInit[Int, String, NotUsed](mapF, fallback)) + .runWith(TestSink.probe[String]) + probe.request(100) + (2 to 10).map(i ⇒ (i * 2).toString).foreach(probe.expectNext) + } + + "work with slow flow init" in assertAllStagesStopped { + val p = Promise[Flow[Int, Int, NotUsed]]() + val sourceProbe = TestPublisher.manualProbe[Int]() + val flowProbe = Source.fromPublisher(sourceProbe) + .via(Flow.lazyInit[Int, Int, NotUsed](_ ⇒ p.future, fallback)) + .runWith(TestSink.probe[Int]) + + val sourceSub = sourceProbe.expectSubscription() + flowProbe.request(1) + sourceSub.expectRequest(1) + sourceSub.sendNext(0) + sourceSub.expectRequest(1) + sourceProbe.expectNoMsg(200.millis) + + p.success(Flow.fromFunction[Int, Int](id ⇒ id)) + flowProbe.request(99) + flowProbe.expectNext(0) + (1 to 10).foreach(i ⇒ { + sourceSub.sendNext(i) + flowProbe.expectNext(i) + }) + sourceSub.sendComplete() + } + + "complete when there was no elements in the stream" in assertAllStagesStopped { + def flowMaker(i: Int) = flowF + val probe = Source.empty + .via(Flow.lazyInit(flowMaker, () ⇒ 0)) + .runWith(TestSink.probe[Int]) + probe.request(1).expectComplete() + } + + "complete normally when upstream is completed" in assertAllStagesStopped { + val probe = Source.single(1) + .via(Flow.lazyInit[Int, Int, NotUsed](_ ⇒ flowF, fallback)) + .runWith(TestSink.probe[Int]) + probe.request(1) + .expectNext(1) + .expectComplete() + } + + "fail gracefully when flow factory method failed" in assertAllStagesStopped { + val sourceProbe = TestPublisher.manualProbe[Int]() + val probe = Source.fromPublisher(sourceProbe) + .via(Flow.lazyInit[Int, Int, NotUsed](_ ⇒ throw ex, fallback)) + .runWith(TestSink.probe[Int]) + + val sourceSub = sourceProbe.expectSubscription() + probe.request(1) + sourceSub.expectRequest(1) + sourceSub.sendNext(0) + sourceSub.expectCancellation() + probe.expectError(ex) + } + + "fail gracefully when upstream failed" in assertAllStagesStopped { + val sourceProbe = TestPublisher.manualProbe[Int]() + val probe = Source.fromPublisher(sourceProbe) + .via(Flow.lazyInit[Int, Int, NotUsed](_ ⇒ flowF, fallback)) + .runWith(TestSink.probe[Int]) + + val sourceSub = sourceProbe.expectSubscription() + sourceSub.expectRequest(1) + sourceSub.sendNext(0) + probe.request(1) + .expectNext(0) + sourceSub.sendError(ex) + probe.expectError(ex) + } + + "fail gracefully when factory future failed" in assertAllStagesStopped { + val sourceProbe = TestPublisher.manualProbe[Int]() + val flowProbe = Source.fromPublisher(sourceProbe) + .via(Flow.lazyInit[Int, Int, NotUsed](_ ⇒ Future.failed(ex), fallback)) + .withAttributes(supervisionStrategy(stoppingDecider)) + .runWith(TestSink.probe[Int]) + + val sourceSub = sourceProbe.expectSubscription() + sourceSub.expectRequest(1) + sourceSub.sendNext(0) + flowProbe.request(1).expectError(ex) + } + + "cancel upstream when the downstream is cancelled" in assertAllStagesStopped { + val sourceProbe = TestPublisher.manualProbe[Int]() + val probe = Source.fromPublisher(sourceProbe) + .via(Flow.lazyInit[Int, Int, NotUsed](_ ⇒ flowF, fallback)) + .withAttributes(supervisionStrategy(stoppingDecider)) + .runWith(TestSink.probe[Int]) + + val sourceSub = sourceProbe.expectSubscription() + probe.request(1) + sourceSub.expectRequest(1) + sourceSub.sendNext(0) + sourceSub.expectRequest(1) + probe.expectNext(0) + probe.cancel() + sourceSub.expectCancellation() + } + + "continue if supervision is resume" in assertAllStagesStopped { + val sourceProbe = TestPublisher.manualProbe[Int]() + def flowBuilder(a: Int) = if (a == 0) throw ex else Future.successful(Flow.fromFunction[Int, Int](id ⇒ id)) + val probe = Source.fromPublisher(sourceProbe) + .via(Flow.lazyInit[Int, Int, NotUsed](flowBuilder, fallback)) + .withAttributes(supervisionStrategy(resumingDecider)) + .runWith(TestSink.probe[Int]) + + val sourceSub = sourceProbe.expectSubscription() + probe.request(1) + sourceSub.expectRequest(1) + sourceSub.sendNext(0) + sourceSub.expectRequest(1) + sourceSub.sendNext(1) + probe.expectNext(1) + probe.cancel() + } + + "fail correctly when materialization of inner sink fails" in assertAllStagesStopped { + val matFail = TE("fail!") + object FailingInnerMat extends GraphStageWithMaterializedValue[FlowShape[String, String], Option[String]] { + val in = Inlet[String]("in") + val out = Outlet[String]("out") + val shape = FlowShape(in, out) + override def createLogicAndMaterializedValue(inheritedAttributes: Attributes) = + (new GraphStageLogic(shape) { + throw matFail + }, Some("fine")) + } + + val result = Source.single("whatever") + .viaMat(Flow.lazyInit( + _ ⇒ Future.successful(Flow.fromGraph(FailingInnerMat)), + () ⇒ Some("boom")))(Keep.right) + .toMat(Sink.ignore)(Keep.left) + .run() + + result should ===(Some("boom")) + } + } + +} diff --git a/akka-stream/src/main/scala/akka/stream/impl/Stages.scala b/akka-stream/src/main/scala/akka/stream/impl/Stages.scala index f3ce424bd252..56f428eff8b6 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/Stages.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/Stages.scala @@ -129,6 +129,7 @@ import akka.stream._ val actorSubscriberSink = name("actorSubscriberSink") val queueSink = name("queueSink") val lazySink = name("lazySink") + val lazyFlow = name("lazyFlow") val lazySource = name("lazySource") val outputStreamSink = name("outputStreamSink") and IODispatcher val inputStreamSink = name("inputStreamSink") and IODispatcher diff --git a/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala b/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala index d11a2c4812c4..122916129b35 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala @@ -6,20 +6,21 @@ package akka.stream.impl.fusing import java.util.concurrent.TimeUnit.NANOSECONDS import akka.annotation.{ DoNotInherit, InternalApi } +import akka.dispatch.ExecutionContexts import akka.event.Logging.LogLevel import akka.event.{ LogSource, Logging, LoggingAdapter } import akka.stream.Attributes.{ InputBuffer, LogLevels } import akka.stream.OverflowStrategies._ import akka.stream.impl.fusing.GraphStages.SimpleLinearGraphStage import akka.stream.impl.{ ConstantFun, ReactiveStreamsCompliance, Stages, Buffer ⇒ BufferImpl } -import akka.stream.scaladsl.{ Source, SourceQueue } +import akka.stream.scaladsl.{ Flow, Keep, Source, SourceQueue } import akka.stream.stage._ import akka.stream.{ Supervision, _ } import scala.annotation.tailrec import scala.collection.immutable import scala.collection.immutable.VectorBuilder -import scala.concurrent.Future +import scala.concurrent.{ Future, Promise } import scala.util.control.{ NoStackTrace, NonFatal } import scala.util.{ Failure, Success, Try } import akka.stream.ActorAttributes.SupervisionStrategy @@ -1927,3 +1928,152 @@ private[stream] object Collect { override def toString = "StatefulMapConcat" } + +/** + * INTERNAL API + */ +@InternalApi final private[akka] class LazyFlow[I, O, M](flowFactory: I ⇒ Future[Flow[I, O, M]], zeroMat: () ⇒ M) + extends GraphStageWithMaterializedValue[FlowShape[I, O], M] { + val in = Inlet[I]("lazyFlow.in") + val out = Outlet[O]("lazyFlow.out") + override def initialAttributes = DefaultAttributes.lazyFlow + override val shape: FlowShape[I, O] = FlowShape.of(in, out) + + override def toString: String = "LazyFlow" + + override def createLogicAndMaterializedValue(inheritedAttributes: Attributes) = { + lazy val decider = inheritedAttributes.mandatoryAttribute[SupervisionStrategy].decider + + var completed = false + var matVal: Option[M] = None + val stageLogic = new GraphStageLogic(shape) with InHandler with OutHandler { + val subSink = new SubSinkInlet[O]("LazyFlowSubSink") + + override def onPush(): Unit = { + try { + val element = grab(in) + val cb: AsyncCallback[Try[Flow[I, O, M]]] = + getAsyncCallback { + case Success(flow) ⇒ initInternalSource(flow, element) + case Failure(e) ⇒ failure(e) + } + flowFactory(element).onComplete { cb.invoke }(ExecutionContexts.sameThreadExecutionContext) + setHandler(in, new InHandler { + override def onPush(): Unit = throw new IllegalStateException("LazyFlow received push while waiting for flowFactory to complete.") + override def onUpstreamFinish(): Unit = gotCompletionEvent() + override def onUpstreamFailure(ex: Throwable): Unit = failure(ex) + }) + } catch { + case NonFatal(e) ⇒ decider(e) match { + case Supervision.Stop ⇒ failure(e) + case _ ⇒ pull(in) + } + } + } + + override def onPull(): Unit = { + pull(in) + subSink.pull() + + setHandler(out, new OutHandler { + override def onPull(): Unit = { + subSink.pull() + } + + override def onDownstreamFinish(): Unit = { + subSink.cancel() + completeStage() + } + }) + + subSink.setHandler(new InHandler { + override def onPush(): Unit = { + val elem = subSink.grab() + push(out, elem) + } + + override def onUpstreamFinish(): Unit = { + completeStage() + } + }) + } + + setHandler(out, this) + + private def failure(ex: Throwable): Unit = { + matVal = Some(zeroMat()) + failStage(ex) + } + + override def onUpstreamFinish(): Unit = { + matVal = Some(zeroMat()) + completeStage() + } + override def onUpstreamFailure(ex: Throwable): Unit = failure(ex) + setHandler(in, this) + + private def gotCompletionEvent(): Unit = { + setKeepGoing(true) + completed = true + } + + private def initInternalSource(flow: Flow[I, O, M], firstElement: I): Unit = { + val sourceOut = new SubSourceOutlet[I]("LazyFlowSubSource") + + def switchToFirstElementHandlers(): Unit = { + sourceOut.setHandler(new OutHandler { + override def onPull(): Unit = { + sourceOut.push(firstElement) + if (completed) internalSourceComplete() else switchToFinalHandlers() + } + override def onDownstreamFinish(): Unit = internalSourceComplete() + }) + + setHandler(in, new InHandler { + override def onPush(): Unit = sourceOut.push(grab(in)) + override def onUpstreamFinish(): Unit = gotCompletionEvent() + override def onUpstreamFailure(ex: Throwable): Unit = internalSourceFailure(ex) + }) + } + + def switchToFinalHandlers(): Unit = { + sourceOut.setHandler(new OutHandler { + override def onPull(): Unit = pull(in) + override def onDownstreamFinish(): Unit = internalSourceComplete() + }) + setHandler(in, new InHandler { + override def onPush(): Unit = { + val elem = grab(in) + sourceOut.push(elem) + } + override def onUpstreamFinish(): Unit = internalSourceComplete() + override def onUpstreamFailure(ex: Throwable): Unit = internalSourceFailure(ex) + }) + } + + def internalSourceComplete(): Unit = { + sourceOut.complete() + // normal completion, subSink.onUpstreamFinish will complete the stage + } + + def internalSourceFailure(ex: Throwable): Unit = { + sourceOut.fail(ex) + failStage(ex) + } + + switchToFirstElementHandlers() + try { + matVal = Some(Source.fromGraph(sourceOut.source) + .viaMat(flow)(Keep.right).toMat(subSink.sink)(Keep.left).run()(interpreter.subFusingMaterializer)) + } catch { + case NonFatal(ex) ⇒ + subSink.cancel() + matVal = Some(zeroMat()) + failStage(ex) + } + } + + } + (stageLogic, matVal.getOrElse(zeroMat())) + } +} diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala index 2e9c17157f83..e8ddbfa4582d 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala @@ -16,6 +16,10 @@ import akka.japi.Util import java.util.Comparator import java.util.concurrent.CompletionStage +import akka.dispatch.ExecutionContexts +import akka.stream.impl.fusing.LazyFlow +import akka.stream.scaladsl.Flow + import scala.compat.java8.FutureConverters._ object Flow { @@ -199,6 +203,36 @@ object Flow { sink: Graph[SinkShape[I], M1], source: Graph[SourceShape[O], M2], combine: function.Function2[M1, M2, M]): Flow[I, O, M] = new Flow(scaladsl.Flow.fromSinkAndSourceCoupledMat(sink, source)(combinerToScala(combine))) + + /** + * Creates a real `Flow` upon receiving the first element. Internal `Flow` will not be created + * if there are no elements, because of completion or error. + * The materialized value of the `Flow` will be the materialized + * value of the created internal flow. + * + * If `flowFactory` throws an exception and the supervision decision is + * [[akka.stream.Supervision.Stop]] the materialized value of the flow will be completed with + * the result of the `fallback`. For all other supervision options it will + * try to create flow with the next element. + * + * `fallback` will be executed when there was no elements and completed is received from upstream + * or when there was an exception either thrown by the `flowFactory` or during the internal flow + * materialization process. + * + * Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute. + * + * '''Emits when''' the internal flow is successfully created and it emits + * + * '''Backpressures when''' the internal flow is successfully created and it backpressures + * + * '''Completes when''' upstream completes and all elements have been emitted from the internal flow + * + * '''Cancels when''' downstream cancels + */ + def lazyInit[I, O, M](flowFactory: function.Function[I, CompletionStage[Flow[I, O, M]]], fallback: function.Creator[M]): Flow[I, O, M] = + Flow.fromGraph(new LazyFlow[I, O, M]( + t ⇒ flowFactory.apply(t).toScala.map(_.asScala)(ExecutionContexts.sameThreadExecutionContext), + () ⇒ fallback.create())) } /** Create a `Flow` which can process elements of type `T`. */ diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala index 32102a952d9e..27f9a9fb58bc 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala @@ -493,6 +493,34 @@ object Flow { FlowShape(bidi.in1, bidi.out2) }) // format: ON + + /** + * Creates a real `Flow` upon receiving the first element. Internal `Flow` will not be created + * if there are no elements, because of completion or error. + * The materialized value of the `Flow` will be the materialized + * value of the created internal flow. + * + * If `flowFactory` throws an exception and the supervision decision is + * [[akka.stream.Supervision.Stop]] the materialized value of the flow will be completed with + * the result of the `fallback`. For all other supervision options it will + * try to create flow with the next element. + * + * `fallback` will be executed when there was no elements and completed is received from upstream + * or when there was an exception either thrown by the `flowFactory` or during the internal flow + * materialization process. + * + * Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute. + * + * '''Emits when''' the internal flow is successfully created and it emits + * + * '''Backpressures when''' the internal flow is successfully created and it backpressures + * + * '''Completes when''' upstream completes and all elements have been emitted from the internal flow + * + * '''Cancels when''' downstream cancels + */ + def lazyInit[I, O, M](flowFactory: I ⇒ Future[Flow[I, O, M]], fallback: () ⇒ M): Flow[I, O, M] = + Flow.fromGraph(new LazyFlow[I, O, M](flowFactory, fallback)) } object RunnableGraph {