Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions pkgs/async/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
## 2.13.1-wip

- Fix `StreamGroup.broadcast().close()` to properly complete when all streams in the group close without being explicitly removed.
- Run `dart format` with the new style.

## 2.13.0

Expand Down
13 changes: 9 additions & 4 deletions pkgs/async/lib/src/async_cache.dart
Original file line number Diff line number Diff line change
Expand Up @@ -85,10 +85,15 @@ class AsyncCache<T> {
throw StateError('Previously used to cache via `fetch`');
}
var splitter = _cachedStreamSplitter ??= StreamSplitter(
callback().transform(StreamTransformer.fromHandlers(handleDone: (sink) {
_startStaleTimer();
sink.close();
})));
callback().transform(
StreamTransformer.fromHandlers(
handleDone: (sink) {
_startStaleTimer();
sink.close();
},
),
),
);
return splitter.split();
}

Expand Down
28 changes: 18 additions & 10 deletions pkgs/async/lib/src/byte_collector.dart
Original file line number Diff line number Diff line change
Expand Up @@ -28,25 +28,33 @@ Future<Uint8List> collectBytes(Stream<List<int>> source) {
/// If any of the input data are not valid bytes, they will be truncated to
/// an eight-bit unsigned value in the resulting list.
CancelableOperation<Uint8List> collectBytesCancelable(
Stream<List<int>> source) {
Stream<List<int>> source,
) {
return _collectBytes(
source,
(subscription, result) => CancelableOperation.fromFuture(result,
onCancel: subscription.cancel));
source,
(subscription, result) =>
CancelableOperation.fromFuture(result, onCancel: subscription.cancel),
);
}

/// Generalization over [collectBytes] and [collectBytesCancelable].
///
/// Performs all the same operations, but the final result is created
/// by the [result] function, which has access to the stream subscription
/// so it can cancel the operation.
T _collectBytes<T>(Stream<List<int>> source,
T Function(StreamSubscription<List<int>>, Future<Uint8List>) result) {
T _collectBytes<T>(
Stream<List<int>> source,
T Function(StreamSubscription<List<int>>, Future<Uint8List>) result,
) {
var bytes = BytesBuilder(copy: false);
var completer = Completer<Uint8List>.sync();
var subscription =
source.listen(bytes.add, onError: completer.completeError, onDone: () {
completer.complete(bytes.takeBytes());
}, cancelOnError: true);
var subscription = source.listen(
bytes.add,
onError: completer.completeError,
onDone: () {
completer.complete(bytes.takeBytes());
},
cancelOnError: true,
);
return result(subscription, completer.future);
}
198 changes: 116 additions & 82 deletions pkgs/async/lib/src/cancelable_operation.dart
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,10 @@ class CancelableOperation<T> {
///
/// Calling this constructor is equivalent to creating a
/// [CancelableCompleter] and completing it with [result].
factory CancelableOperation.fromFuture(Future<T> result,
{FutureOr Function()? onCancel}) =>
factory CancelableOperation.fromFuture(
Future<T> result, {
FutureOr Function()? onCancel,
}) =>
(CancelableCompleter<T>(onCancel: onCancel)..complete(result)).operation;

/// Creates a [CancelableOperation] which completes to [value].
Expand All @@ -51,7 +53,8 @@ class CancelableOperation<T> {
/// subscription will be canceled (unlike
/// `CancelableOperation.fromFuture(subscription.asFuture())`).
static CancelableOperation<void> fromSubscription(
StreamSubscription<void> subscription) {
StreamSubscription<void> subscription,
) {
var completer = CancelableCompleter<void>(onCancel: subscription.cancel);
subscription.onDone(completer.complete);
subscription.onError((Object error, StackTrace stackTrace) {
Expand All @@ -70,7 +73,8 @@ class CancelableOperation<T> {
/// new operation is cancelled, all the [operations] are cancelled as
/// well.
static CancelableOperation<T> race<T>(
Iterable<CancelableOperation<T>> operations) {
Iterable<CancelableOperation<T>> operations,
) {
operations = operations.toList();
if (operations.isEmpty) {
throw ArgumentError('May not be empty', 'operations');
Expand All @@ -83,20 +87,25 @@ class CancelableOperation<T> {
done = true;
return Future.wait([
for (var operation in operations)
if (!operation.isCanceled) operation.cancel()
if (!operation.isCanceled) operation.cancel(),
]);
}

var completer = CancelableCompleter<T>(onCancel: cancelAll);
for (var operation in operations) {
operation.then((value) {
if (!done) cancelAll().whenComplete(() => completer.complete(value));
}, onError: (error, stackTrace) {
if (!done) {
cancelAll()
.whenComplete(() => completer.completeError(error, stackTrace));
}
}, propagateCancel: false);
operation.then(
(value) {
if (!done) cancelAll().whenComplete(() => completer.complete(value));
},
onError: (error, stackTrace) {
if (!done) {
cancelAll().whenComplete(
() => completer.completeError(error, stackTrace),
);
}
},
propagateCancel: false,
);
}

return completer.operation;
Expand All @@ -114,16 +123,21 @@ class CancelableOperation<T> {
/// This is like `value.asStream()`, but if a subscription to the stream is
/// canceled, this operation is as well.
Stream<T> asStream() {
var controller =
StreamController<T>(sync: true, onCancel: _completer._cancel);

_completer._inner?.future.then((value) {
controller.add(value);
controller.close();
}, onError: (Object error, StackTrace stackTrace) {
controller.addError(error, stackTrace);
controller.close();
});
var controller = StreamController<T>(
sync: true,
onCancel: _completer._cancel,
);

_completer._inner?.future.then(
(value) {
controller.add(value);
controller.close();
},
onError: (Object error, StackTrace stackTrace) {
controller.addError(error, stackTrace);
controller.close();
},
);
return controller.stream;
}

Expand Down Expand Up @@ -172,24 +186,28 @@ class CancelableOperation<T> {
/// operation is canceled as well. Pass `false` if there are multiple
/// listeners on this operation and canceling the [onValue], [onError], and
/// [onCancel] callbacks should not cancel the other listeners.
CancelableOperation<R> then<R>(FutureOr<R> Function(T) onValue,
{FutureOr<R> Function(Object, StackTrace)? onError,
FutureOr<R> Function()? onCancel,
bool propagateCancel = true}) =>
thenOperation<R>((value, completer) {
completer.complete(onValue(value));
},
onError: onError == null
? null
: (error, stackTrace, completer) {
completer.complete(onError(error, stackTrace));
},
onCancel: onCancel == null
? null
: (completer) {
completer.complete(onCancel());
},
propagateCancel: propagateCancel);
CancelableOperation<R> then<R>(
FutureOr<R> Function(T) onValue, {
FutureOr<R> Function(Object, StackTrace)? onError,
FutureOr<R> Function()? onCancel,
bool propagateCancel = true,
}) =>
thenOperation<R>(
(value, completer) {
completer.complete(onValue(value));
},
onError: onError == null
? null
: (error, stackTrace, completer) {
completer.complete(onError(error, stackTrace));
},
onCancel: onCancel == null
? null
: (completer) {
completer.complete(onCancel());
},
propagateCancel: propagateCancel,
);

/// Creates a new cancelable operation to be completed when this operation
/// completes normally or as an error, or is cancelled.
Expand Down Expand Up @@ -222,13 +240,15 @@ class CancelableOperation<T> {
/// listeners on this operation and canceling the [onValue], [onError], and
/// [onCancel] callbacks should not cancel the other listeners.
CancelableOperation<R> thenOperation<R>(
FutureOr<void> Function(T, CancelableCompleter<R>) onValue,
{FutureOr<void> Function(Object, StackTrace, CancelableCompleter<R>)?
onError,
FutureOr<void> Function(CancelableCompleter<R>)? onCancel,
bool propagateCancel = true}) {
FutureOr<void> Function(T, CancelableCompleter<R>) onValue, {
FutureOr<void> Function(Object, StackTrace, CancelableCompleter<R>)?
onError,
FutureOr<void> Function(CancelableCompleter<R>)? onCancel,
bool propagateCancel = true,
}) {
final completer = CancelableCompleter<R>(
onCancel: propagateCancel ? _cancelIfNotCanceled : null);
onCancel: propagateCancel ? _cancelIfNotCanceled : null,
);

// if `_completer._inner` completes before `completer` is cancelled
// call `onValue` or `onError` with the result, and complete `completer`
Expand All @@ -246,25 +266,29 @@ class CancelableOperation<T> {
// completes before `completer` is cancelled,
// then cancel `cancelCompleter`. (Cancelling twice is safe.)

_completer._inner?.future.then<void>((value) async {
if (completer.isCanceled) return;
try {
await onValue(value, completer);
} catch (error, stack) {
completer.completeError(error, stack);
}
},
onError: onError == null
? completer.completeError // Is ignored if already cancelled.
: (Object error, StackTrace stack) async {
if (completer.isCanceled) return;
try {
await onError(error, stack, completer);
} catch (error2, stack2) {
completer.completeErrorIfPending(
error2, identical(error, error2) ? stack : stack2);
}
});
_completer._inner?.future.then<void>(
(value) async {
if (completer.isCanceled) return;
try {
await onValue(value, completer);
} catch (error, stack) {
completer.completeError(error, stack);
}
},
onError: onError == null
? completer.completeError // Is ignored if already cancelled.
: (Object error, StackTrace stack) async {
if (completer.isCanceled) return;
try {
await onError(error, stack, completer);
} catch (error2, stack2) {
completer.completeErrorIfPending(
error2,
identical(error, error2) ? stack : stack2,
);
}
},
);
final cancelForwarder = _CancelForwarder<R>(completer, onCancel);
if (_completer.isCanceled) {
cancelForwarder._forward();
Expand Down Expand Up @@ -430,11 +454,14 @@ class CancelableCompleter<T> {
return;
}

value.then((result) {
_completeNow()?.complete(result);
}, onError: (Object error, StackTrace stackTrace) {
_completeNow()?.completeError(error, stackTrace);
});
value.then(
(result) {
_completeNow()?.complete(result);
},
onError: (Object error, StackTrace stackTrace) {
_completeNow()?.completeError(error, stackTrace);
},
);
}

/// Makes this [CancelableCompleter.operation] complete with the same result
Expand All @@ -443,23 +470,30 @@ class CancelableCompleter<T> {
/// If [propagateCancel] is `true` (the default), and the [operation] of this
/// completer is canceled before [result] completes, then [result] is also
/// canceled.
void completeOperation(CancelableOperation<T> result,
{bool propagateCancel = true}) {
void completeOperation(
CancelableOperation<T> result, {
bool propagateCancel = true,
}) {
if (!_mayComplete) throw StateError('Already completed');
_mayComplete = false;
if (isCanceled) {
if (propagateCancel) result.cancel();
result.value.ignore();
return;
}
result.then<void>((value) {
_inner?.complete(
value); // _inner is set to null if this.operation is cancelled.
}, onError: (error, stack) {
_inner?.completeError(error, stack);
}, onCancel: () {
operation.cancel();
});
result.then<void>(
(value) {
_inner?.complete(
value,
); // _inner is set to null if this.operation is cancelled.
},
onError: (error, stack) {
_inner?.completeError(error, stack);
},
onCancel: () {
operation.cancel();
},
);
if (propagateCancel) {
_cancelCompleter?.future.whenComplete(result.cancel);
}
Expand Down Expand Up @@ -519,7 +553,7 @@ class CancelableCompleter<T> {
final isFuture = toReturn is Future;
final cancelFutures = <Future<Object?>>[
if (isFuture) toReturn,
...?_cancelForwarders?.map(_forward).nonNulls
...?_cancelForwarders?.map(_forward).nonNulls,
];
final results = (isFuture && cancelFutures.length == 1)
? [await toReturn]
Expand Down
5 changes: 4 additions & 1 deletion pkgs/async/lib/src/chunked_stream_reader.dart
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,10 @@ class ChunkedStreamReader<T> {
List<T> output;
if (_buffer is Uint8List) {
output = Uint8List.sublistView(
_buffer as Uint8List, _offset, _offset + size) as List<T>;
_buffer as Uint8List,
_offset,
_offset + size,
) as List<T>;
} else {
output = _buffer.sublist(_offset, _offset + size);
}
Expand Down
3 changes: 2 additions & 1 deletion pkgs/async/lib/src/delegate/event_sink.dart
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@ class DelegatingEventSink<T> implements EventSink<T> {
/// [add] may throw a [TypeError] if the argument type doesn't match the
/// reified type of [sink].
@Deprecated(
'Use StreamController<T>(sync: true)..stream.cast<S>().pipe(sink)')
'Use StreamController<T>(sync: true)..stream.cast<S>().pipe(sink)',
)
static EventSink<T> typed<T>(EventSink sink) =>
sink is EventSink<T> ? sink : DelegatingEventSink._(sink);

Expand Down
3 changes: 2 additions & 1 deletion pkgs/async/lib/src/delegate/sink.dart
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,8 @@ class DelegatingSink<T> implements Sink<T> {
/// throw a [TypeError] if the argument type doesn't match the reified type of
/// [sink].
@Deprecated(
'Use StreamController<T>(sync: true)..stream.cast<S>().pipe(sink)')
'Use StreamController<T>(sync: true)..stream.cast<S>().pipe(sink)',
)
static Sink<T> typed<T>(Sink sink) =>
sink is Sink<T> ? sink : DelegatingSink._(sink);

Expand Down
Loading
Loading