-
Notifications
You must be signed in to change notification settings - Fork 166
Introduce a Stream.parallelForEach for bounded parallelism.
#8201
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
```dart
Future<void> Stream<T>.parallelForEach(
int maxParallel,
FutureOr<void> Function(T item) each, {
FutureOr<void> Function(Object e, StackTrace? st) onError = Future.error,
})
```
Call `each` for each item in this stream with `maxParallel` invocations.
This method will invoke `each` for each item in this stream, and wait for
all futures from `each` to be resolved. `parallelForEach` will call `each`
in parallel, but never more then `maxParallel`.
If `each` throws and `onError` rethrows (default behavior), then
`parallelForEach` will wait for ongoing `each` invocations to finish,
before throw the first error.
If `onError` does not throw, then iteration will not be interrupted and
errors from `each` will be ignored.
```dart
// Count size of all files in the current folder
var folderSize = 0;
// Use parallelForEach to read at-most 5 files at the same time.
await Directory.current.list().parallelForEach(5, (item) async {
if (item is File) {
final bytes = await item.readAsBytes();
folderSize += bytes.length;
}
});
print('Folder size: $folderSize');
93b5b57 to
4c597ba
Compare
isoos
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lgtm, but I'd use the concurrentForEach name instead
| /// }); | ||
| /// print('Folder size: $folderSize'); | ||
| /// ``` | ||
| Future<void> parallelForEach( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shouldn't we call this concurrentForEach instead? Parallel would indicate separate isolates for me.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I recall discussing this on the PR along ago. I don't really like concurrentForEach.
Maybe asyncForEach or boundedAsyncForEach or something completely different.
I think I'm going to keep the current name, just because it's not conflicting with other extensions on Stream.
| }()); | ||
|
|
||
| if (running >= maxParallel) { | ||
| await itemDone.wait; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm wondering: will this implicitly pause the stream listener until the pending processing is done? Can we leave a not on that, or a TODO to investigate it at one point?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm wondering: will this implicitly pause the stream listener until the pending processing is done?
Yes, that's kind of the point:
await stream.parallelForEach(N, each);
will:
- Call
eachfor each item in the stream. - Ensure that there are no more than
Nconcurrent invocations ofeach.- This means pausing the stream, while items are being processed.
- Wait for all
eachinvocations to be completed.- Unless,
eachthrows, in which case the stream will be canceled, and but pendingeachinvocations will be awaited.
- Unless,
I would think that pausing the stream is desirable. Supposed you have a Stream<Package> query resulting from a datastore query.
- You don't really want to load all
Packageentities into memory, if you did, you could do:await query.toList();. - You also don't really want to process each
Packageentity one at the time, because that's sort of slow. If you did could just doawait for (final p in query) {...}. - You don't really want to process all
Packageentities concurrently because it'll do a crazy amount of I/O, won't reuse TCP connections and cause pain. If you did you could doawait Future.wait(await query.map((p) async {...}).toList());.
You really want to process N number of entities concurrently and not more.
From dart-archive/async#249 which we never managed to land in
dart:async.Call
eachfor each item in this stream withmaxParallelinvocations.This method will invoke
eachfor each item in this stream, and wait forall futures from
eachto be resolved.parallelForEachwill calleachin parallel, but never more then
maxParallel.If
eachthrows andonErrorrethrows (default behavior), thenparallelForEachwill wait for ongoingeachinvocations to finish,before throw the first error.
If
onErrordoes not throw, then iteration will not be interrupted anderrors from
eachwill be ignored.