Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

stream: fix flatMap concurrency #52816

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from

Conversation

MoLow
Copy link
Member

@MoLow MoLow commented May 3, 2024

Fixes: #52796
this PR causes flatMap to iterate over the stream and mapped iterators in round-robin. I am not sure if this is the best approach but also not sure if there is a better one.
I am also not exactly sure how to test this
I'd love to hear opinions from @nodejs/streams folks.

@nodejs-github-bot
Copy link
Collaborator

Review requested:

  • @nodejs/streams

@nodejs-github-bot nodejs-github-bot added the needs-ci PRs that need a full CI run. label May 3, 2024
Co-Authored-By: Benjamin Gruenbaum <benjamingr@gmail.com>
@MoLow MoLow force-pushed the fix-flat-map-concurrency branch from 255e65c to 948b245 Compare May 3, 2024 09:34
@MoLow MoLow added the stream Issues and PRs related to the stream subsystem. label May 3, 2024
@aduh95
Copy link
Contributor

aduh95 commented May 3, 2024

Did you check if the stage 2 proposal requires one way or another? I think it's quite important that our implementation matches the proposal.

@benjamingr
Copy link
Member

When prototyping together we also explored two other ways to schedule the async iterators and this was nicest.

My main concern is this complicating the regular flow for a single operator but I think it’s probably worth it as a bug fix and for correctness

Copy link
Member

@ronag ronag left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are many problems here. I think we need a different approach.

Copy link
Member

@ronag ronag left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe start with making some better tests?

@MoLow
Copy link
Member Author

MoLow commented May 3, 2024

Did you check if the stage 2 proposal requires one way or another? I think it's quite important that our implementation matches the proposal.

the section about concurrency doesn't mention much about implementation

There are many problems here. I think we need a different approach.

can you please elaborate? are your concerns regarding the approach or the implementation?

Maybe start with making some better tests?

I have to think a little what tests we can add, do you have any suggestion?

@ronag
Copy link
Member

ronag commented May 3, 2024

Also round robin is probably wrong since it won't enforce the order. We would need to add some other operator which does not enfore order.

@benjamingr
Copy link
Member

benjamingr commented May 3, 2024

Also round robin is probably wrong since it won't enforce the order. We would need to add some other operator which does not enfore order.

Order is not enforced anyway with concurrency/flatMap? - round robin had the best fairness out of the attempts we've had (prioritize stream and only then read iterator or read whole iterator (which we do today).

This isn't the implementation we started with, I think it can be improved significantly but there is no way around polling the inner flatMapped iterables and the input stream interchangeably?

(though it may make sense to enforce the order in flatMap with concurrency=1)

@bakkot
Copy link

bakkot commented May 4, 2024

Iterators are strictly pull-based, and the default (possibly only) behavior when pulling twice from x.flatMap(mapper) will be to pull from x once, then from the first result of mapper twice (concurrently). It won't pull from the underlying iterator again until the first yielded iterator is exhausted, no matter how many times you pull. (But, in many cases you can exhaust the iterator without actually needing to finish work on it first - consider pulling four times from [0, 1, 2].values().toAsync().map(slowFn). The fourth promise can complete with { done: true } even while slowFn is still running.)

If I understand correctly, that matches the current behavior of stream's flatMap, modulo differences between an explicit concurrency parameter vs just pulling concurrently from the result. I'm hoping we can avoid having a concurrency parameter to flatMap for async iterators - you can always just do .map(mapper).buffered(N).flatMap(x => x) if you really want concurrency of the mapper function itself (or of the underlying iterator), where buffered is a helper which pulls multiple times. But these semantics aren't entirely worked out yet. (And unfortunately it's less obvious how to avoid such a parameter for consuming operators like forEach.)

Note that there are a few different things you might mean by "flattening" in the context of flatMap. There's a bunch of discussion of this in various places, but ReactiveX/rxjs#7429 is a place to start with some recent discussion. Not all of it applies to pull-based rather than push-based systems, but much of it does.

(Feel free to ping me for any questions about async iterator helpers. The proposal is decidedly not finished, so what's written in the repo is not going to be very helpful.)

@MoLow MoLow force-pushed the fix-flat-map-concurrency branch 2 times, most recently from 38766cf to 948b245 Compare May 5, 2024 07:35
@ronag
Copy link
Member

ronag commented May 5, 2024

Order is not enforced anyway with concurrency/flatMap? - round robin had the best fairness out of the attempts we've had (prioritize stream and only then read iterator or read whole iterator (which we do today).

Order is enforced. Not sure why you think otherwise?

@benjamingr
Copy link
Member

What is the "logical" order of flatMap with concurrency?

@benjamingr
Copy link
Member

The only non-streaming alternative with concurrency is we buffer the intermediate iterators?

@ronag
Copy link
Member

ronag commented May 5, 2024

What is the "logical" order of flatMap with concurrency?

Items exit in the same order they arrive.

@benjamingr
Copy link
Member

@ronag yes but items themselves are iterables of multiple items. If we want them to come in the same sub order we need to buffer all the sub iterations (without making them available to the consumer) while the user is waiting for the first iterable to complete.

What behavior would you expect (unordered or ordered but buffers a lot)? Do you agree that the current behvior is kind of useless (w.r.t concurrency) when yielding async iterables? (the example in #52796 )

@ronag
Copy link
Member

ronag commented May 5, 2024

It is useless but correct IMHO

@ronag
Copy link
Member

ronag commented May 5, 2024

It flattens everything and applies concurrency on the flat stream of entries.

@ronag
Copy link
Member

ronag commented May 5, 2024

You can do an inner flat map on each stream or buffering to achieve concurrency.

@MoLow
Copy link
Member Author

MoLow commented May 5, 2024

It flattens everything and applies concurrency on the flat stream of entries.

that is not the current behavior. it currently applies concurrency on getting/creating an iterator from the mapping function wich is synchronous in the case of async iterators

@benjamingr
Copy link
Member

@ronag so you would prefer we buffer the inner async iterables and yield them "in order" from flatMap? In terms of scheduling the sub-tasks do you have a better idea than round robin?

@ronag
Copy link
Member

ronag commented May 5, 2024

Not round robin. Depth first.

@benjamingr
Copy link
Member

@ronag how would that work with concurrency and @MoLow 's async generator case?

@ronag
Copy link
Member

ronag commented May 5, 2024

@ronag how would that work with concurrency and @MoLow 's async generator case?

Flatten and apply concurrency on the flat stream of events? Basically concatMap followed by mergeMap

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
needs-ci PRs that need a full CI run. stream Issues and PRs related to the stream subsystem.
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Readable.flatMap concurrency isn't working as expected
6 participants