Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
156 changes: 156 additions & 0 deletions app/lib/shared/parallel_foreach.dart
Original file line number Diff line number Diff line change
@@ -0,0 +1,156 @@
// Copyright (c) 2024, the Dart project authors. Please see the AUTHORS file
// for details. All rights reserved. Use of this source code is governed by a
// BSD-style license that can be found in the LICENSE file.

// TODO: This library is a decent proposal for addition to `dart:async` or
// other similar utility package. It's extremely useful when processing
// a stream of objects, where I/O is required for each object.

import 'dart:async';

/// A [Notifier] allows micro-tasks to [wait] for other micro-tasks to
/// [notify].
///
/// [Notifier] is a concurrency primitive that allows one micro-task to
/// wait for notification from another micro-task. The [Future] return from
/// [wait] will be completed the next time [notify] is called.
///
/// ```dart
/// var weather = 'rain';
/// final notifier = Notifier();
///
/// // Create a micro task to fetch the weather
/// scheduleMicrotask(() async {
/// // Infinitely loop that just keeps the weather up-to-date
/// while (true) {
/// weather = await getWeather();
/// notifier.notify();
///
/// // Sleep 5s before updating the weather again
/// await Future.delayed(Duration(seconds: 5));
/// }
/// });
///
/// // Wait for sunny weather
/// while (weather != 'sunny') {
/// await notifier.wait;
/// }
/// ```
final class Notifier {
var _completer = Completer<void>();

/// Notify everybody waiting for notification.
///
/// This will complete all futures previously returned by [wait].
/// Calls to [wait] after this call, will not be resolved, until the
/// next time [notify] is called.
void notify() {
if (!_completer.isCompleted) {
_completer.complete();
}
}

/// Wait for notification.
///
/// Returns a [Future] that will complete the next time [notify] is called.
///
/// The [Future] returned will always be unresolved, and it will never throw.
/// Once [notify] is called the future will be completed, and any new calls
/// to [wait] will return a new future. This new future will also be
/// unresolved, until [notify] is called.
Future<void> get wait {
if (_completer.isCompleted) {
_completer = Completer();
}
return _completer.future;
}
}

/// Utility extensions on [Stream].
extension StreamExtensions<T> on Stream<T> {
/// Call [each] for each item in this stream with [maxParallel] invocations.
///
/// This method will invoke [each] for each item in this stream, and wait for
/// all futures from [each] to be resolved. [parallelForEach] will call [each]
/// in parallel, but never more then [maxParallel].
///
/// If [each] throws and [onError] rethrows (default behavior), then
/// [parallelForEach] will wait for ongoing [each] invocations to finish,
/// before throw the first error.
///
/// If [onError] does not throw, then iteration will not be interrupted and
/// errors from [each] will be ignored.
///
/// ```dart
/// // Count size of all files in the current folder
/// var folderSize = 0;
/// // Use parallelForEach to read at-most 5 files at the same time.
/// await Directory.current.list().parallelForEach(5, (item) async {
/// if (item is File) {
/// final bytes = await item.readAsBytes();
/// folderSize += bytes.length;
/// }
/// });
/// print('Folder size: $folderSize');
/// ```
Future<void> parallelForEach(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't we call this concurrentForEach instead? Parallel would indicate separate isolates for me.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I recall discussing this on the PR along ago. I don't really like concurrentForEach.

Maybe asyncForEach or boundedAsyncForEach or something completely different.

I think I'm going to keep the current name, just because it's not conflicting with other extensions on Stream.

int maxParallel,
FutureOr<void> Function(T item) each, {
FutureOr<void> Function(Object e, StackTrace? st) onError = Future.error,
}) async {
// Track the first error, so we rethrow when we're done.
Object? firstError;
StackTrace? firstStackTrace;

// Track number of running items.
var running = 0;
final itemDone = Notifier();

try {
var doBreak = false;
await for (final item in this) {
// For each item we increment [running] and call [each]
running += 1;
unawaited(() async {
try {
await each(item);
} catch (e, st) {
try {
// If [onError] doesn't throw, we'll just continue.
await onError(e, st);
} catch (e, st) {
doBreak = true;
if (firstError == null) {
firstError = e;
firstStackTrace = st;
}
}
} finally {
// When [each] is done, we decrement [running] and notify
running -= 1;
itemDone.notify();
}
}());

if (running >= maxParallel) {
await itemDone.wait;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm wondering: will this implicitly pause the stream listener until the pending processing is done? Can we leave a not on that, or a TODO to investigate it at one point?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm wondering: will this implicitly pause the stream listener until the pending processing is done?

Yes, that's kind of the point:

await stream.parallelForEach(N, each);

will:

  • Call each for each item in the stream.
  • Ensure that there are no more than N concurrent invocations of each.
    • This means pausing the stream, while items are being processed.
  • Wait for all each invocations to be completed.
    • Unless, each throws, in which case the stream will be canceled, and but pending each invocations will be awaited.

I would think that pausing the stream is desirable. Supposed you have a Stream<Package> query resulting from a datastore query.

  • You don't really want to load all Package entities into memory, if you did, you could do: await query.toList();.
  • You also don't really want to process each Package entity one at the time, because that's sort of slow. If you did could just do await for (final p in query) {...}.
  • You don't really want to process all Package entities concurrently because it'll do a crazy amount of I/O, won't reuse TCP connections and cause pain. If you did you could do await Future.wait(await query.map((p) async {...}).toList());.

You really want to process N number of entities concurrently and not more.

}
if (doBreak) {
break;
}
}
} finally {
// Wait for all items to be finished
while (running > 0) {
await itemDone.wait;
}
}

// If an error happened, then we rethrow the first one.
final firstError_ = firstError;
final firstStackTrace_ = firstStackTrace;
if (firstError_ != null && firstStackTrace_ != null) {
Error.throwWithStackTrace(firstError_, firstStackTrace_);
}
}
}
171 changes: 171 additions & 0 deletions app/test/shared/parallel_foreach_test.dart
Original file line number Diff line number Diff line change
@@ -0,0 +1,171 @@
// Copyright (c) 2024, the Dart project authors. Please see the AUTHORS file
// for details. All rights reserved. Use of this source code is governed by a
// BSD-style license that can be found in the LICENSE file.

import 'dart:async';
import 'dart:math';

import 'package:pub_dev/shared/parallel_foreach.dart';
import 'package:test/test.dart';

void main() {
group('Notifier', () {
test('Notifier.wait/notify', () async {
final notified = Completer<void>();

final notifier = Notifier();
unawaited(notifier.wait.then((value) => notified.complete()));
expect(notified.isCompleted, isFalse);

notifier.notify();
expect(notified.isCompleted, isFalse);

await notified.future;
expect(notified.isCompleted, isTrue);
});

test('Notifier.wait is never resolved', () async {
var count = 0;

final notifier = Notifier();
unawaited(notifier.wait.then((value) => count++));
expect(count, 0);

await Future.delayed(Duration.zero);
expect(count, 0);

notifier.notify();
expect(count, 0);

await Future.delayed(Duration.zero);
expect(count, 1);

unawaited(notifier.wait.then((value) => count++));
unawaited(notifier.wait.then((value) => count++));

await Future.delayed(Duration.zero);
expect(count, 1);

notifier.notify();
expect(count, 1);

await Future.delayed(Duration.zero);
expect(count, 3);
});
});

group('parallelForEach', () {
test('sum (maxParallel: 1)', () async {
var sum = 0;
await Stream.fromIterable([1, 2, 3]).parallelForEach(1, (item) {
sum += item;
});
expect(sum, 6);
});

test('sum (maxParallel: 2)', () async {
var sum = 0;
var active = 0;
var maxActive = 0;
await Stream.fromIterable([1, 2, 3]).parallelForEach(2, (item) async {
active++;
expect(active, lessThanOrEqualTo(2));
maxActive = max(active, maxActive);
await Future.delayed(Duration(milliseconds: 50));
expect(active, lessThanOrEqualTo(2));
maxActive = max(active, maxActive);
sum += item;
active--;
});
expect(sum, 6);
expect(maxActive, 2);
});

test('abort when error is thrown (maxParallel: 1)', () async {
var sum = 0;
await expectLater(
Stream.fromIterable([1, 2, 3]).parallelForEach(1, (item) async {
sum += item;
if (sum > 2) {
throw Exception('abort');
}
}),
throwsException,
);
expect(sum, 3);
});

test('abort will not comsume the entire stream', () async {
var countedTo = 0;
Stream<int> countToN(int N) async* {
for (var i = 1; i <= N; i++) {
await Future.delayed(Duration.zero);
yield i;
countedTo = i;
}
}

var sum = 0;
await countToN(20).parallelForEach(2, (item) async {
sum += item;
});
expect(sum, greaterThan(20));
expect(countedTo, 20);

countedTo = 0;
await expectLater(
countToN(20).parallelForEach(2, (item) async {
if (item > 10) throw Exception('abort');
}),
throwsException,
);
expect(countedTo, greaterThanOrEqualTo(10));
expect(countedTo, lessThan(20));
});

test('onError can ignore errors', () async {
var countedTo = 0;
Stream<int> countToN(int N) async* {
for (var i = 1; i <= N; i++) {
await Future.delayed(Duration.zero);
yield i;
countedTo = i;
}
}

var sum = 0;
await countToN(20).parallelForEach(2, (item) async {
sum += item;
if (sum > 10) {
throw Exception('ignore this');
}
}, onError: (_, __) => null);
expect(sum, greaterThan(20));
expect(countedTo, 20);

countedTo = 0;
await expectLater(
countToN(20).parallelForEach(
2,
(item) async {
sum += item;
if (countedTo > 15) {
throw Exception('break');
}
if (countedTo > 10) {
throw Exception('ignore this');
}
},
onError: (e, st) {
if (e.toString().contains('break')) {
throw e as Exception;
}
},
),
throwsException,
);
expect(countedTo, greaterThanOrEqualTo(10));
expect(countedTo, lessThan(20));
});
});
}
Loading