From ca4a14bb1226d82d347731c0aff4c4ff152c04de Mon Sep 17 00:00:00 2001 From: kerr Date: Sun, 30 Jul 2023 19:51:33 +0800 Subject: [PATCH] +str Add flatmapConcat with parallelism. --- .../akka/stream/FlatMapConcatBenchmark.scala | 13 + .../scaladsl/FlowFlatMapConcatSpec.scala | 124 ++++++++++ .../main/scala/akka/stream/impl/Stages.scala | 1 + .../akka/stream/impl/TraversalBuilder.scala | 33 ++- .../scala/akka/stream/impl/fusing/Ops.scala | 1 + .../stream/impl/fusing/StreamOfStreams.scala | 226 +++++++++++++++++- .../main/scala/akka/stream/javadsl/Flow.scala | 19 ++ .../scala/akka/stream/javadsl/Source.scala | 17 ++ .../scala/akka/stream/javadsl/SubFlow.scala | 20 ++ .../scala/akka/stream/javadsl/SubSource.scala | 20 ++ .../scala/akka/stream/scaladsl/Flow.scala | 17 ++ 11 files changed, 480 insertions(+), 11 deletions(-) create mode 100644 akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowFlatMapConcatSpec.scala diff --git a/akka-bench-jmh/src/main/scala/akka/stream/FlatMapConcatBenchmark.scala b/akka-bench-jmh/src/main/scala/akka/stream/FlatMapConcatBenchmark.scala index 5ee4fe088e3f..86e1ca414e7b 100644 --- a/akka-bench-jmh/src/main/scala/akka/stream/FlatMapConcatBenchmark.scala +++ b/akka-bench-jmh/src/main/scala/akka/stream/FlatMapConcatBenchmark.scala @@ -9,6 +9,7 @@ import java.util.concurrent.TimeUnit import scala.concurrent.Await import scala.concurrent.duration._ +import scala.concurrent.Future import com.typesafe.config.ConfigFactory import org.openjdk.jmh.annotations._ @@ -88,6 +89,18 @@ class FlatMapConcatBenchmark { awaitLatch(latch) } + @Benchmark + @OperationsPerInvocation(OperationsPerInvocation) + def completedFuture(): Unit = { + val latch = new CountDownLatch(1) + + testSource + .flatMapConcat(n => Source.future(Future.successful(n))) + .runWith(new LatchSink(OperationsPerInvocation, latch)) + + awaitLatch(latch) + } + @Benchmark @OperationsPerInvocation(OperationsPerInvocation) def mapBaseline(): Unit = { diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowFlatMapConcatSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowFlatMapConcatSpec.scala new file mode 100644 index 000000000000..a234277b5ed5 --- /dev/null +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowFlatMapConcatSpec.scala @@ -0,0 +1,124 @@ +/* + * Copyright (C) 2014-2023 Lightbend Inc. + */ + +package akka.stream.scaladsl + +import akka.stream.OverflowStrategy +import akka.stream.testkit._ +import akka.stream.testkit.scaladsl.TestSink + +import java.util.concurrent.ThreadLocalRandom +import java.util.concurrent.atomic.AtomicInteger +import scala.concurrent.Future +import scala.concurrent.duration.DurationInt +import scala.util.control.NoStackTrace + +class FlowFlatMapConcatSpec extends StreamSpec(""" + akka.stream.materializer.initial-input-buffer-size = 2 + """) with ScriptedTest { + val toSeq = Flow[Int].grouped(1000).toMat(Sink.head)(Keep.right) + + class BoomException extends RuntimeException("BOOM~~") with NoStackTrace + "A flatMapConcat" must { + + "work with value presented sources" in { + Source( + List( + Source.empty[Int], + Source.single(1), + Source.empty[Int], + Source(List(2, 3, 4)), + Source.future(Future.successful(5)), + Source.lazyFuture(() => Future.successful(6)))) + .flatMapConcat(ThreadLocalRandom.current().nextInt(1, 129), identity) + .runWith(toSeq) + .futureValue should ===(1 to 6) + } + + "work with value presented failed sources" in { + val ex = new BoomException + Source( + List( + Source.empty[Int], + Source.single(1), + Source.empty[Int], + Source(List(2, 3, 4)), + Source.future(Future.failed(ex)), + Source.lazyFuture(() => Future.successful(5)))) + .flatMapConcat(ThreadLocalRandom.current().nextInt(1, 129), identity) + .onErrorComplete[BoomException]() + .runWith(toSeq) + .futureValue should ===(1 to 4) + } + + "work with value presented sources when demands slow" in { + val prob = Source( + List(Source.empty[Int], Source.single(1), Source(List(2, 3, 4)), Source.lazyFuture(() => Future.successful(5)))) + .flatMapConcat(ThreadLocalRandom.current().nextInt(1, 129), identity) + .runWith(TestSink()) + + prob.request(1) + prob.expectNext(1) + prob.expectNoMessage(1.seconds) + prob.request(2) + prob.expectNext(2, 3) + prob.expectNoMessage(1.seconds) + prob.request(2) + prob.expectNext(4, 5) + prob.expectComplete() + } + + "can do pre materialization when parallelism > 1" in { + val materializationCounter = new AtomicInteger(0) + val randomParallelism = ThreadLocalRandom.current().nextInt(4, 65) + val prob = Source(1 to (randomParallelism * 3)) + .flatMapConcat( + randomParallelism, + value => { + Source + .lazySingle(() => { + materializationCounter.incrementAndGet() + value + }) + .buffer(1, overflowStrategy = OverflowStrategy.backpressure) + }) + .runWith(TestSink()) + + expectNoMessage(1.seconds) + materializationCounter.get() shouldBe 0 + + prob.request(1) + prob.expectNext(1.seconds, 1) + expectNoMessage(1.seconds) + materializationCounter.get() shouldBe (randomParallelism + 1) + materializationCounter.set(0) + + prob.request(2) + prob.expectNextN(List(2, 3)) + expectNoMessage(1.seconds) + materializationCounter.get() shouldBe 2 + materializationCounter.set(0) + + prob.request(randomParallelism - 3) + prob.expectNextN(4 to randomParallelism) + expectNoMessage(1.seconds) + materializationCounter.get() shouldBe (randomParallelism - 3) + materializationCounter.set(0) + + prob.request(randomParallelism) + prob.expectNextN(randomParallelism + 1 to randomParallelism * 2) + expectNoMessage(1.seconds) + materializationCounter.get() shouldBe randomParallelism + materializationCounter.set(0) + + prob.request(randomParallelism) + prob.expectNextN(randomParallelism * 2 + 1 to randomParallelism * 3) + expectNoMessage(1.seconds) + materializationCounter.get() shouldBe 0 + prob.expectComplete() + } + + } + +} diff --git a/akka-stream/src/main/scala/akka/stream/impl/Stages.scala b/akka-stream/src/main/scala/akka/stream/impl/Stages.scala index 13c9a3e0bca7..7a4c51900319 100755 --- a/akka-stream/src/main/scala/akka/stream/impl/Stages.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/Stages.scala @@ -79,6 +79,7 @@ import akka.stream.Attributes._ val mergePreferred = name("mergePreferred") val mergePrioritized = name("mergePrioritized") val flattenMerge = name("flattenMerge") + val flattenConcat = name("flattenConcat") val recoverWith = name("recoverWith") val onErrorComplete = name("onErrorComplete") val broadcast = name("broadcast") diff --git a/akka-stream/src/main/scala/akka/stream/impl/TraversalBuilder.scala b/akka-stream/src/main/scala/akka/stream/impl/TraversalBuilder.scala index a2b707630e0e..f052d9a3cd03 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/TraversalBuilder.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/TraversalBuilder.scala @@ -6,12 +6,12 @@ package akka.stream.impl import scala.collection.immutable.Map.Map1 import scala.language.existentials - import akka.annotation.{ DoNotInherit, InternalApi } import akka.stream._ import akka.stream.impl.StreamLayout.AtomicModule import akka.stream.impl.TraversalBuilder.{ AnyFunction1, AnyFunction2 } import akka.stream.impl.fusing.GraphStageModule +import akka.stream.impl.fusing.GraphStages.IterableSource import akka.stream.impl.fusing.GraphStages.SingleSource import akka.stream.scaladsl.Keep import akka.util.OptionVal @@ -370,6 +370,37 @@ import akka.util.unused } } + def getValuePresentedSource[A >: Null](graph: Graph[SourceShape[A], _]): OptionVal[Graph[SourceShape[A], _]] = { + def isValuePresentedSource(graph: Graph[SourceShape[_ <: A], _]): Boolean = graph match { + case _: SingleSource[_] | _: IterableSource[_] | EmptySource => true + case _ => false + } + graph match { + case _ if isValuePresentedSource(graph) => OptionVal.Some(graph) + case _ => + graph.traversalBuilder match { + case l: LinearTraversalBuilder => + l.pendingBuilder match { + case OptionVal.Some(a: AtomicTraversalBuilder) => + a.module match { + case m: GraphStageModule[_, _] => + m.stage match { + case _ if isValuePresentedSource(m.stage.asInstanceOf[Graph[SourceShape[A], _]]) => + // It would be != EmptyTraversal if mapMaterializedValue was used and then we can't optimize. + if ((l.traversalSoFar eq EmptyTraversal) && !l.attributes.isAsync) + OptionVal.Some(m.stage.asInstanceOf[Graph[SourceShape[A], _]]) + else OptionVal.None + case _ => OptionVal.None + } + case _ => OptionVal.None + } + case _ => OptionVal.None + } + case _ => OptionVal.None + } + } + } + /** * Test if a Graph is an empty Source. * */ diff --git a/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala b/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala index 493d02db6b96..c73e083c6d67 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala @@ -1272,6 +1272,7 @@ private[stream] object Collect { */ @InternalApi private[akka] final case class MapAsync[In, Out](parallelism: Int, f: In => Future[Out]) extends GraphStage[FlowShape[In, Out]] { + require(parallelism >= 1, "parallelism should >= 1") import MapAsync._ diff --git a/akka-stream/src/main/scala/akka/stream/impl/fusing/StreamOfStreams.scala b/akka-stream/src/main/scala/akka/stream/impl/fusing/StreamOfStreams.scala index 61da21a300ee..9a34a9859a56 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/fusing/StreamOfStreams.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/fusing/StreamOfStreams.scala @@ -4,37 +4,39 @@ package akka.stream.impl.fusing -import java.util.Collections -import java.util.concurrent.atomic.AtomicReference - -import scala.annotation.tailrec -import scala.collection.immutable -import scala.concurrent.duration.FiniteDuration -import scala.util.control.NonFatal - import akka.NotUsed import akka.annotation.InternalApi -import akka.stream._ import akka.stream.ActorAttributes.StreamSubscriptionTimeout import akka.stream.ActorAttributes.SupervisionStrategy import akka.stream.Attributes.SourceLocation -import akka.stream.impl.{ Buffer => BufferImpl } +import akka.stream._ import akka.stream.impl.ActorSubscriberMessage import akka.stream.impl.ActorSubscriberMessage.OnError +import akka.stream.impl.EmptySource import akka.stream.impl.Stages.DefaultAttributes import akka.stream.impl.SubscriptionTimeoutException import akka.stream.impl.TraversalBuilder +import akka.stream.impl.fusing.GraphStages.IterableSource import akka.stream.impl.fusing.GraphStages.SingleSource +import akka.stream.impl.{ Buffer => BufferImpl } import akka.stream.scaladsl._ import akka.stream.stage._ import akka.util.OptionVal import akka.util.ccompat.JavaConverters._ +import java.util.Collections +import java.util.concurrent.atomic.AtomicReference +import scala.annotation.tailrec +import scala.collection.immutable +import scala.concurrent.duration.FiniteDuration +import scala.util.control.NonFatal + /** * INTERNAL API */ @InternalApi private[akka] final class FlattenMerge[T, M](val breadth: Int) extends GraphStage[FlowShape[Graph[SourceShape[T], M], T]] { + require(breadth >= 1, "breadth should >= 1") private val in = Inlet[Graph[SourceShape[T], M]]("flatten.in") private val out = Outlet[T]("flatten.out") @@ -137,6 +139,210 @@ import akka.util.ccompat.JavaConverters._ override def toString: String = s"FlattenMerge($breadth)" } +@InternalApi +private[akka] object FlattenConcat { + + sealed trait Materializable { + def materialize(): Unit + } + + sealed abstract class InflightSource[T] { + def hasNext: Boolean + def next(): T + def tryPull(): Unit + def cancel(cause: Throwable): Unit + def isClosed: Boolean + } + + final class InflightSingleSource[T](elem: T) extends InflightSource[T] { + private var _hasNext = true + override def hasNext: Boolean = _hasNext + override def next(): T = + if (_hasNext) { + _hasNext = false + elem + } else throw new NoSuchElementException("next called after completion") + override def tryPull(): Unit = () + override def cancel(cause: Throwable): Unit = () + override def isClosed: Boolean = !hasNext + } + + final class InflightIterableSource[T](elements: Iterable[T]) extends InflightSource[T] { + private val iterator: Iterator[T] = elements.iterator + override def hasNext: Boolean = iterator.hasNext + override def next(): T = iterator.next() + override def tryPull(): Unit = () + override def cancel(cause: Throwable): Unit = () + override def isClosed: Boolean = !hasNext + } + + object InflightEmptySource extends InflightSource[Any] { + override def hasNext: Boolean = false + override def next(): Any = throw new NoSuchElementException("next called after completion") + override def tryPull(): Unit = () + override def cancel(cause: Throwable): Unit = () + override def isClosed: Boolean = true + } +} + +@InternalApi +private[akka] final class FlattenConcat[T, M](parallelism: Int) + extends GraphStage[FlowShape[Graph[SourceShape[T], M], T]] { + require(parallelism >= 1, "parallelism should >= 1") + private val in = Inlet[Graph[SourceShape[T], M]]("flattenConcat.in") + private val out = Outlet[T]("flattenConcat.out") + + override def initialAttributes: Attributes = DefaultAttributes.flattenConcat + override val shape: FlowShape[Graph[SourceShape[T], M], T] = FlowShape(in, out) + override def createLogic(inheritedAttributes: Attributes) = + new GraphStageLogic(shape) with InHandler with OutHandler { + import FlattenConcat._ + private var inflightSources: BufferImpl[InflightSource[T]] = _ + + override def preStart(): Unit = inflightSources = BufferImpl(parallelism, inheritedAttributes) + + override def onPush(): Unit = { + val source = grab(in) + addSource(source) + if (!inflightSources.isFull) { + tryPull(in) + } + } + + override def onUpstreamFinish(): Unit = if (inflightSources.isEmpty) completeStage() + + override def onUpstreamFailure(ex: Throwable): Unit = { + super.onUpstreamFailure(ex) + cancelInflightSources(SubscriptionWithCancelException.NoMoreElementsNeeded) + } + + override def onPull(): Unit = { + if (inflightSources.nonEmpty) { + val currentSource = inflightSources.peek() + //purge if possible + if (currentSource.hasNext) { + push(out, currentSource.next()) + if (currentSource.isClosed) { + removeSource(currentSource) + } + } else if (currentSource.isClosed) { + removeSource(currentSource) + } else { + currentSource.tryPull() + } + } else if (!hasBeenPulled(in)) { + tryPull(in) + } else if (isClosed(in)) { + completeStage() + } + } + + override def onDownstreamFinish(cause: Throwable): Unit = { + super.onDownstreamFinish(cause) + cancelInflightSources(cause) + } + + private def cancelInflightSources(cause: Throwable): Unit = { + if (inflightSources.nonEmpty) { + var source = inflightSources.dequeue() + while (source ne null) { + source.cancel(cause) + source = inflightSources.dequeue() + } + } + } + + private def addSource(source: Graph[SourceShape[T], M]): Unit = { + TraversalBuilder.getValuePresentedSource(source) match { + case OptionVal.Some(graph) => + graph match { + case single: SingleSource[T] @unchecked => + if (isAvailable(out) && inflightSources.isEmpty) { + push(out, single.elem) + } else { + inflightSources.enqueue(new InflightSingleSource(single.elem)) + } + case iterable: IterableSource[T] @unchecked => + val inflightSource = new InflightIterableSource[T](iterable.elements) + if (isAvailable(out) && inflightSources.isEmpty) { + if (inflightSource.hasNext) { + push(out, inflightSource.next()) + if (inflightSource.hasNext) { + inflightSources.enqueue(inflightSource) + } + } + } else { + inflightSources.enqueue(inflightSource) + } + case EmptySource => + if (!inflightSources.isEmpty) { + inflightSources.enqueue(InflightEmptySource.asInstanceOf[InflightSource[T]]) + } + case _ => throw new IllegalArgumentException("Not supported source type.") + } + case _ => attachAndMaterializeSource(source) + } + + def attachAndMaterializeSource(source: Graph[SourceShape[T], M]): Unit = { + val inflightSource: InflightSource[T] with Materializable = new InflightSource[T] with Materializable { + self => + val sinkIn = new SubSinkInlet[T]("FlattenConcatSink") + sinkIn.setHandler(new InHandler { + override def onPush(): Unit = { + if (isAvailable(out) && (inflightSources.peek() eq self)) { + push(out, sinkIn.grab()) + } + } + override def onUpstreamFinish(): Unit = if (!sinkIn.isAvailable) removeSource(self) + }) + + final override def materialize(): Unit = { + val graph = Source.fromGraph(source).to(sinkIn.sink) + interpreter.subFusingMaterializer.materialize(graph, defaultAttributes = inheritedAttributes) + } + + final override def cancel(cause: Throwable): Unit = sinkIn.cancel(cause) + final override def hasNext: Boolean = sinkIn.isAvailable + final override def isClosed: Boolean = sinkIn.isClosed + final override def next(): T = sinkIn.grab() + final override def tryPull(): Unit = if (!sinkIn.isClosed && !sinkIn.hasBeenPulled) sinkIn.pull() + } + if (inflightSources.isEmpty && isAvailable(out)) { + //this is the first one, pull + inflightSource.tryPull() + } + inflightSources.enqueue(inflightSource) + inflightSource.materialize() + } + } + + private def removeSource(source: InflightSource[T]): Unit = { + if (inflightSources.nonEmpty && (source eq inflightSources.peek())) { + //only dequeue if it's the first + inflightSources.dequeue() + if (isClosed(in) && inflightSources.isEmpty) { + completeStage() + } else { + //pull next + if (inflightSources.nonEmpty) { + val nextSource = inflightSources.peek() + nextSource.tryPull() + } + if (!hasBeenPulled(in)) { + tryPull(in) + } + } + } else { + throw new IllegalStateException("Should not reach here.") + } + } + + setHandlers(in, out, this) + } + + override def toString: String = s"FlattenConcat($parallelism)" +} + /** * INTERNAL API */ diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala index 641e7ff17d76..f12808675807 100755 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala @@ -2436,6 +2436,25 @@ final class Flow[In, Out, Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Gr def flatMapConcat[T, M](f: function.Function[Out, _ <: Graph[SourceShape[T], M]]): Flow[In, T, Mat] = new Flow(delegate.flatMapConcat[T, M](x => f(x))) + /** + * Transform each input element into a `Source` of output elements that is + * then flattened into the output stream by concatenation, + * fully consuming one Source after the other. + * `parallelism` can be used to config the max inflight sources. + * + * '''Emits when''' a currently consumed substream has an element available + * + * '''Backpressures when''' downstream backpressures + * + * '''Completes when''' upstream completes and all consumed substreams complete + * + * '''Cancels when''' downstream cancels + */ + def flatMapConcat[T, M]( + parallelism: Int, + f: function.Function[Out, _ <: Graph[SourceShape[T], M]]): Flow[In, T, Mat] = + new Flow(delegate.flatMapConcat[T, M](parallelism, x => f(x))) + /** * Transform each input element into a `Source` of output elements that is * then flattened into the output stream by merging, where at most `breadth` diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala index 2159617eec71..ac58ccbcfe58 100755 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala @@ -3875,6 +3875,23 @@ final class Source[Out, Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[ def flatMapConcat[T, M](f: function.Function[Out, _ <: Graph[SourceShape[T], M]]): Source[T, Mat] = new Source(delegate.flatMapConcat[T, M](x => f(x))) + /** + * Transform each input element into a `Source` of output elements that is + * then flattened into the output stream by concatenation, + * fully consuming one Source after the other. + * `parallelism` can be used to config the max inflight sources. + * + * '''Emits when''' a currently consumed substream has an element available + * + * '''Backpressures when''' downstream backpressures + * + * '''Completes when''' upstream completes and all consumed substreams complete + * + * '''Cancels when''' downstream cancels + */ + def flatMapConcat[T, M](parallelism: Int, f: function.Function[Out, _ <: Graph[SourceShape[T], M]]): Source[T, Mat] = + new Source(delegate.flatMapConcat[T, M](parallelism, x => f(x))) + /** * Transform each input element into a `Source` of output elements that is * then flattened into the output stream by merging, where at most `breadth` diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/SubFlow.scala b/akka-stream/src/main/scala/akka/stream/javadsl/SubFlow.scala index 2362dd311d52..987c9684487e 100755 --- a/akka-stream/src/main/scala/akka/stream/javadsl/SubFlow.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/SubFlow.scala @@ -1495,6 +1495,26 @@ final class SubFlow[In, Out, Mat]( def flatMapConcat[T, M](f: function.Function[Out, _ <: Graph[SourceShape[T], M]]): SubFlow[In, T, Mat] = new SubFlow(delegate.flatMapConcat(x => f(x))) + /** + * Transform each input element into a `Source` of output elements that is + * then flattened into the output stream by concatenation, + * fully consuming one Source after the other. + * `parallelism` can be used to config the max inflight sources. + * + * '''Emits when''' a currently consumed substream has an element available + * + * '''Backpressures when''' downstream backpressures + * + * '''Completes when''' upstream completes and all consumed substreams complete + * + * '''Cancels when''' downstream cancels + * + */ + def flatMapConcat[T, M]( + parallelism: Int, + f: function.Function[Out, _ <: Graph[SourceShape[T], M]]): SubFlow[In, T, Mat] = + new SubFlow(delegate.flatMapConcat(parallelism, x => f(x))) + /** * Transform each input element into a `Source` of output elements that is * then flattened into the output stream by merging, where at most `breadth` diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/SubSource.scala b/akka-stream/src/main/scala/akka/stream/javadsl/SubSource.scala index ded06e246aaf..296a2162c358 100755 --- a/akka-stream/src/main/scala/akka/stream/javadsl/SubSource.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/SubSource.scala @@ -1473,6 +1473,26 @@ final class SubSource[Out, Mat]( def flatMapConcat[T, M](f: function.Function[Out, _ <: Graph[SourceShape[T], M]]): SubSource[T, Mat] = new SubSource(delegate.flatMapConcat(x => f(x))) + /** + * Transform each input element into a `Source` of output elements that is + * then flattened into the output stream by concatenation, + * fully consuming one Source after the other. + * `parallelism` can be used to config the max inflight sources. + * + * '''Emits when''' a currently consumed substream has an element available + * + * '''Backpressures when''' downstream backpressures + * + * '''Completes when''' upstream completes and all consumed substreams complete + * + * '''Cancels when''' downstream cancels + * + */ + def flatMapConcat[T, M]( + parallelism: Int, + f: function.Function[Out, _ <: Graph[SourceShape[T], M]]): SubSource[T, Mat] = + new SubSource(delegate.flatMapConcat(parallelism, x => f(x))) + /** * Transform each input element into a `Source` of output elements that is * then flattened into the output stream by merging, where at most `breadth` diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala index 3d7306e4177f..742c36594414 100755 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala @@ -2489,6 +2489,23 @@ trait FlowOps[+Out, +Mat] { */ def flatMapConcat[T, M](f: Out => Graph[SourceShape[T], M]): Repr[T] = map(f).via(new FlattenMerge[T, M](1)) + /** + * Transform each input element into a `Source` of output elements that is + * then flattened into the output stream by concatenation, + * fully consuming one Source after the other. + * `parallelism` can be used to config the max inflight sources. + * + * '''Emits when''' a currently consumed substream has an element available + * + * '''Backpressures when''' downstream backpressures + * + * '''Completes when''' upstream completes and all consumed substreams complete + * + * '''Cancels when''' downstream cancels + */ + def flatMapConcat[T, M](parallelism: Int, f: Out => Graph[SourceShape[T], M]): Repr[T] = + map(f).via(new FlattenConcat[T, M](parallelism)) + /** * Transform each input element into a `Source` of output elements that is * then flattened into the output stream by merging, where at most `breadth`