-
Notifications
You must be signed in to change notification settings - Fork 51
Added Condition and boundedForEach #249
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,67 @@ | ||
| // Copyright (c) 2023, the Dart project authors. Please see the AUTHORS file | ||
| // for details. All rights reserved. Use of this source code is governed by a | ||
| // BSD-style license that can be found in the LICENSE file. | ||
|
|
||
| import 'dart:async'; | ||
|
|
||
| import 'package:meta/meta.dart'; | ||
|
|
||
| /// A [Notifier] allows micro-tasks to [wait] for other micro-tasks to | ||
| /// [notify]. | ||
| /// | ||
| /// [Notifier] is a concurrency primitive that allows one micro-task to | ||
| /// wait for notification from another micro-task. The [Future] return from | ||
| /// [wait] will be completed the next time [notify] is called. | ||
| /// | ||
| /// ```dart | ||
| /// var weather = 'rain'; | ||
| /// final notifier = Notifier(); | ||
| /// | ||
| /// // Create a micro task to fetch the weather | ||
| /// scheduleMicrotask(() async { | ||
| /// // Infinitely loop that just keeps the weather up-to-date | ||
| /// while (true) { | ||
| /// weather = await getWeather(); | ||
| /// notifier.notify(); | ||
| /// | ||
| /// // Sleep 5s before updating the weather again | ||
| /// await Future.delayed(Duration(seconds: 5)); | ||
| /// } | ||
| /// }); | ||
| /// | ||
| /// // Wait for sunny weather | ||
| /// while (weather != 'sunny') { | ||
| /// await notifier.wait; | ||
| /// } | ||
| /// ``` | ||
| // TODO: Apply `final` when language version for this library is bumped to 3.0 | ||
| @sealed | ||
| class Notifier { | ||
| var _completer = Completer<void>(); | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'd make this That way, if the first call is |
||
|
|
||
| /// Notify everybody waiting for notification. | ||
| /// | ||
| /// This will complete all futures previously returned by [wait]. | ||
| /// Calls to [wait] after this call, will not be resolved, until the | ||
| /// next time [notify] is called. | ||
| void notify() { | ||
| if (!_completer.isCompleted) { | ||
| _completer.complete(); | ||
| } | ||
| } | ||
|
|
||
| /// Wait for notification. | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Noun phrase for getters. Maybe call it var tick = Notifier();
....
tick.notify();
...
do {
await tick.next;
} while (result != "success");
`` |
||
| /// | ||
| /// Returns a [Future] that will complete the next time [notify] is called. | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Don't use "Returns" about properties. /// The `wait` [Future] is completed the next time [notify] is called. |
||
| /// | ||
| /// The [Future] returned will always be unresolved, and it will never throw. | ||
| /// Once [notify] is called the future will be completed, and any new calls | ||
| /// to [wait] will return a new future. This new future will also be | ||
| /// unresolved, until [notify] is called. | ||
| Future<void> get wait { | ||
| if (_completer.isCompleted) { | ||
| _completer = Completer(); | ||
| } | ||
| return _completer.future; | ||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -4,6 +4,8 @@ | |
|
|
||
| import 'dart:async'; | ||
|
|
||
| import 'notifier.dart'; | ||
|
|
||
| /// Utility extensions on [Stream]. | ||
| extension StreamExtensions<T> on Stream<T> { | ||
| /// Creates a stream whose elements are contiguous slices of `this`. | ||
|
|
@@ -78,4 +80,90 @@ extension StreamExtensions<T> on Stream<T> { | |
| ..onCancel = subscription.cancel; | ||
| return controller.stream; | ||
| } | ||
|
|
||
| /// Call [each] for each item in this stream with [maxParallel] invocations. | ||
| /// | ||
| /// This method will invoke [each] for each item in this stream, and wait for | ||
| /// all futures from [each] to be resolved. [parallelForEach] will call [each] | ||
| /// in parallel, but never more then [maxParallel]. | ||
| /// | ||
| /// If [each] throws and [onError] rethrows (default behavior), then | ||
| /// [parallelForEach] will wait for ongoing [each] invocations to finish, | ||
| /// before throw the first error. | ||
| /// | ||
| /// If [onError] does not throw, then iteration will not be interrupted and | ||
| /// errors from [each] will be ignored. | ||
| /// | ||
| /// ```dart | ||
| /// // Count size of all files in the current folder | ||
| /// var folderSize = 0; | ||
| /// // Use parallelForEach to read at-most 5 files at the same time. | ||
| /// await Directory.current.list().parallelForEach(5, (item) async { | ||
| /// if (item is File) { | ||
| /// final bytes = await item.readAsBytes(); | ||
| /// folderSize += bytes.length; | ||
| /// } | ||
| /// }); | ||
| /// print('Folder size: $folderSize'); | ||
| /// ``` | ||
| Future<void> parallelForEach( | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Could return a Or make it: Stream<R> parallelMap<R>(int maxParallel, FutureOr<R> mapData(T), {FutureOr<R> Function(Object error, StackTrace stack)? mapError});which just calls |
||
| int maxParallel, | ||
| FutureOr<void> Function(T item) each, { | ||
| FutureOr<void> Function(Object e, StackTrace? st) onError = Future.error, | ||
|
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm open to: FutureOr<bool> Function(Exception e) continueIfOn the assumption that:
But I could use help choosing a better name?
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm not open to special-casing If the user knows that I'd just not have this parameter at all. |
||
| }) async { | ||
| // Track the first error, so we rethrow when we're done. | ||
| Object? firstError; | ||
| StackTrace? firstStackTrace; | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'd store this as |
||
|
|
||
| // Track number of running items. | ||
| var running = 0; | ||
| final itemDone = Notifier(); | ||
|
|
||
| try { | ||
| var doBreak = false; | ||
| await for (final item in this) { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This throws if the stream contains an error. User might expect that their |
||
| // For each item we increment [running] and call [each] | ||
| running += 1; | ||
| unawaited(() async { | ||
| try { | ||
| await each(item); | ||
| } catch (e, st) { | ||
| try { | ||
| // If [onError] doesn't throw, we'll just continue. | ||
| await onError(e, st); | ||
| } catch (e, st) { | ||
| doBreak = true; | ||
| if (firstError == null) { | ||
| firstError = e; | ||
| firstStackTrace = st; | ||
| } | ||
| } | ||
| } finally { | ||
| // When [each] is done, we decrement [running] and notify | ||
| running -= 1; | ||
| itemDone.notify(); | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. You don't really need Future<void> parallelForEach(
int maxParallel,
FutureOr<void> Function(T item) each) {
var pending = 0;
var done = false;
var result = Completer<void>.sync();
(Object, StackTrace)? firstError;
void complete() {
assert(pending == 0);
assert(done);
if (firstError case (var error, var stack)) {
result.completeError(error, stack);
} else {
result.complete(null);
}
}
var subscription = stream.listen(null, onDone: () {
done = true;
if (pending == 0) {
complete();
}
});
subscription
..onError((Object error, StackTrace stack) {
subscription.cancel().ignore();
done = true;
firstError = (error, stack); // Takes precedence over user errors.
if (pending == 0) complete();
})
..onData((T value) {
try {
var computation = each(value);
if (computation is Future) {
if (++pending >= maxParallel) subscription.pause();
computation.then((_) {
if (--pending == 0 && done) complete();
subscription.resume();
}, onError: (Object error, StackTrace stack) {
subscription.cancel().ignore();
done = true;
firstError ??= (error, stack);
if (--pending == 0) complete();
});
} catch (error, stack) {
subscription.cancel().ignore();
done = true;
firstError ??= (error, stack);
if (pending == 0) complete();
}
});
return result.future;
} |
||
| } | ||
| }()); | ||
|
|
||
| if (running >= maxParallel) { | ||
| await itemDone.wait; | ||
| } | ||
| if (doBreak) { | ||
| break; | ||
| } | ||
| } | ||
| } finally { | ||
| // Wait for all items to be finished | ||
| while (running > 0) { | ||
| await itemDone.wait; | ||
| } | ||
| } | ||
|
|
||
| // If an error happened, then we rethrow the first one. | ||
| final firstError_ = firstError; | ||
| final firstStackTrace_ = firstStackTrace; | ||
| if (firstError_ != null && firstStackTrace_ != null) { | ||
| Error.throwWithStackTrace(firstError_, firstStackTrace_); | ||
| } | ||
| } | ||
jonasfj marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,53 @@ | ||
| // Copyright (c) 2023, the Dart project authors. Please see the AUTHORS file | ||
| // for details. All rights reserved. Use of this source code is governed by a | ||
| // BSD-style license that can be found in the LICENSE file. | ||
|
|
||
| import 'dart:async'; | ||
|
|
||
| import 'package:async/async.dart'; | ||
| import 'package:test/test.dart'; | ||
|
|
||
| void main() { | ||
| test('Notifier.wait/notify', () async { | ||
| final notified = Completer<void>(); | ||
|
|
||
| final notifier = Notifier(); | ||
| notifier.wait.then((value) => notified.complete()); | ||
| expect(notified.isCompleted, isFalse); | ||
|
|
||
| notifier.notify(); | ||
| expect(notified.isCompleted, isFalse); | ||
|
|
||
| await notified.future; | ||
| expect(notified.isCompleted, isTrue); | ||
| }); | ||
|
|
||
| test('Notifier.wait is never resolved', () async { | ||
| var count = 0; | ||
|
|
||
| final notifier = Notifier(); | ||
| notifier.wait.then((value) => count++); | ||
| expect(count, 0); | ||
|
|
||
| await Future.delayed(Duration.zero); | ||
| expect(count, 0); | ||
|
|
||
| notifier.notify(); | ||
| expect(count, 0); | ||
|
|
||
| await Future.delayed(Duration.zero); | ||
| expect(count, 1); | ||
|
|
||
| notifier.wait.then((value) => count++); | ||
| notifier.wait.then((value) => count++); | ||
|
|
||
| await Future.delayed(Duration.zero); | ||
| expect(count, 1); | ||
|
|
||
| notifier.notify(); | ||
| expect(count, 1); | ||
|
|
||
| await Future.delayed(Duration.zero); | ||
| expect(count, 3); | ||
| }); | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Don't mention the microtask!
(Also, makes no sense. If a microtask is waiting for something, it's no longer the same microtask.)
So: