Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

flatMap iterates sequentially instead of in parallel #244

Closed
felixfbecker opened this issue Dec 5, 2018 · 7 comments
Closed

flatMap iterates sequentially instead of in parallel #244

felixfbecker opened this issue Dec 5, 2018 · 7 comments

Comments

@felixfbecker
Copy link

I expected flatMap to work like RxJS flatMap/mergeMap. In RxJS, when the source emits items, the map function is invoked immediately and the inner emissions flattened out. Inner submissions of different outer emissions can come in in any order as map functions are potentially executed concurrently.

flatMap in Ix however seems to work more like concatMap. It will completely drain the iterable returned by the map function for the first outer iterable before moving on to the next outer iterable:

  async *[Symbol.asyncIterator]() {
    for await (let outer of this._source) {
      const inners = await this._selector(outer);
      for await (let inner of inners) {
        yield inner;
      }
    }
  }

This makes the operator less useful because, as you can see, it would be trivial to write that out into for await loops. The benefit of using an operator would be concurrency.

Also, it seems like how I would expect it to work is how merge works, but not how mergeAll works, since that just calls into flatMap. This is very confusing.

IxJS version: 2.3.5

@trxcllnt
Copy link
Member

trxcllnt commented Dec 5, 2018

merge and mergeAll diverge in behavior like this because the collection they're merging are different. merge is flattening an Array<AsyncIterable<T>>, while mergeAll is flattening an AsyncIterable<AsyncIterable<T>>.

Because we have to use Promise.race to subscribe to promises concurrently, merge over an Array<AsyncIterable<T>> can be concurrent, since we know how many iterators we have to merge up-front.

But in the AsyncIterable<AsyncIterable<T>> case, we don't know the number of iterators to create up front, because they're yielded one at a time. We can't exhaust the outer source until it signals completion, because the way AsyncIterable signals completion is with a Promise<IteratorResult>. As soon as we await any of the outer source's Promise<IteratorResult<AsyncIterable<T>>> values, we've blocked ourselves from pulling the next inner source to run concurrently.

I was initially confused as well, but after talking with @mattpodwysocki a while back, I came to see why it's difficult. We decided the easiest solution was to toObservable() the source AsyncIterable, perform a concurrent Observable merge with Rx, then convert that back into an AsyncIterable. This has the unfortunate side-effect that back-pressure isn't composed through to the inner sources, but I'm not sure that can be helped.

The only workaround I've come up with would involve manually invoking next() on the outer source N number of times up front to create an Array<Promise<IteratorResult<AsyncIterable<T>>>>, then doing a weird double merge that unwraps and yield-star's each inner source. Not totally sure how it'd look, but I'm open to any new ideas here.

@felixfbecker
Copy link
Author

felixfbecker commented Dec 21, 2018

I think I found a nice solution for my use case. What I wanted was actually to introduce bound concurrency, e.g. flat map the source stream with 5 items at a time max. This can be done by using merge() and simply passing it the same AsyncIterable multiple times. Any call to next() on either of the references passed will advance the same underlying iterator, and merge() will race the passed n references in parallel. To run a flatMap in parallel, all that is needed is to apply the same flatMap on each of the references.

Example:

const flatMapConcurrent = <T, R>(
  source: AsyncIterable<T>,
  mapFn: (x: T) => AsyncIterable<R>,
  concurrency: number
) =>
  merge(...new Array<AsyncIterable<R>>(concurrency).fill(flatMap(source, mapFn)))

I think we could use this implementation to introduce a concurrency parameter to flatMap (just like Rx mergeMap has), which would default to 1 (meaning just one reference is passed to merge, resulting in the current behaviour).
The only thing that would not be possible is setting concurrency: Infinity, since you cannot allocate an infinitely large array. From my pov it's simply not possible without losing back pressure.

wdyt?

trxcllnt added a commit that referenced this issue Feb 11, 2019
…ation

BREAKING CHANGE: flatMap now supports concurrency

fix #244
@trxcllnt
Copy link
Member

trxcllnt commented Feb 13, 2019

@felixfbecker I started work on flatmap-concurrent in this branch:

async *[Symbol.asyncIterator]() {
. The concurrent part is currently untested, blocked until the rewrite branch is ready @mattpodwysocki

@jayphelps
Copy link
Member

FWIW I also found this behavior unexpected, but then tried to write my own and realized why it currently works this way and the complexities involved in making it concurrent.

@richardscarrott
Copy link

richardscarrott commented Jul 16, 2021

@felixfbecker is that flatMapConcurrent theory or something that could be used; I wonder if you could share an example?

The best I've got so far is to just buffer the results to gain some concurrency e.g.

from(fs.createReadStream('test.log'))
  .pipe(
     buffer(10),
     flatMap((chunks) => Promise.all(chunks.map((chunk) => process(chunk))),
     fs.createWriteStream('newLog.log')
  )

But of course it's not particularly efficient because the 11th chunk won't get processed until all of the first 10 have finished.

@tim-smart
Copy link

I think I found a nice solution for my use case. What I wanted was actually to introduce bound concurrency, e.g. flat map the source stream with 5 items at a time max. This can be done by using merge() and simply passing it the same AsyncIterable multiple times. Any call to next() on either of the references passed will advance the same underlying iterator, and merge() will race the passed n references in parallel. To run a flatMap in parallel, all that is needed is to apply the same flatMap on each of the references.

Example:

const flatMapConcurrent = <T, R>(
  source: AsyncIterable<T>,
  mapFn: (x: T) => AsyncIterable<R>,
  concurrency: number
) =>
  merge(...new Array<AsyncIterable<R>>(concurrency).fill(flatMap(source, mapFn)))

I think we could use this implementation to introduce a concurrency parameter to flatMap (just like Rx mergeMap has), which would default to 1 (meaning just one reference is passed to merge, resulting in the current behaviour).
The only thing that would not be possible is setting concurrency: Infinity, since you cannot allocate an infinitely large array. From my pov it's simply not possible without losing back pressure.

wdyt?

This is great! Thanks.

Had to make some changes to get it working. Here is my working code:

import * as Ix from "ix/asynciterable";
import * as IxO from "ix/asynciterable/operators";

export const flatMapConcurrent =
  <T, R>(transform: (item: T) => AsyncIterable<R>, concurrency: number) =>
  (source: AsyncIterable<T>) =>
    Ix.merge(
      ...(new Array<AsyncIterable<R>>(concurrency).fill(
        IxO.flatMap(transform)(source),
      ) as [AsyncIterable<R>]),
    );

@tim-smart
Copy link

Actually it doesn't work unless you publish the stream first. Updated code:

import { pipe } from "fp-ts/lib/function";
import * as Ix from "ix/asynciterable";
import * as IxO from "ix/asynciterable/operators";

export const flatMapConcurrent =
  <T, R>(transform: (item: T) => AsyncIterable<R>, concurrency: number) =>
  (source: AsyncIterable<T>) => {
    source = pipe(source, IxO.publish());
    return Ix.merge(
      ...(new Array(concurrency).fill(pipe(source, IxO.flatMap(transform))) as [
        AsyncIterable<R>,
      ]),
    );
  };

trxcllnt added a commit that referenced this issue Mar 14, 2022
BREAKING CHANGE: flatMap enumerates inner sequences in parallel

fix #244
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

5 participants