Skip to content

Commit

Permalink
Remove unnecessary completers from async_patch code.
Browse files Browse the repository at this point in the history
Use _Future directly.

Add ability to get trace of awaited continuations.

Change-Id: I6c3aba0bdc2e54afe1d84fdd802fb5210d7598ac
Reviewed-on: https://dart-review.googlesource.com/c/sdk/+/112721
Reviewed-by: Martin Kustermann <kustermann@google.com>
Commit-Queue: Lasse R.H. Nielsen <lrn@google.com>
  • Loading branch information
lrhn authored and commit-bot@chromium.org committed Aug 13, 2019
1 parent b9217ef commit 9dcd726
Show file tree
Hide file tree
Showing 4 changed files with 83 additions and 57 deletions.
40 changes: 17 additions & 23 deletions runtime/lib/async_patch.dart
Original file line number Diff line number Diff line change
Expand Up @@ -18,30 +18,24 @@ import "dart:_internal" show VMLibraryHooks, patch;
_fatal(msg) native "DartAsync_fatal";

class _AsyncAwaitCompleter<T> implements Completer<T> {
final _completer = new Completer<T>.sync();
final _future = new _Future<T>();
bool isSync;

_AsyncAwaitCompleter() : isSync = false;

void complete([FutureOr<T> value]) {
if (isSync) {
_completer.complete(value);
} else if (value is Future<T>) {
value.then(_completer.complete, onError: _completer.completeError);
if (!isSync || value is Future<T>) {
_future._asyncComplete(value);
} else {
scheduleMicrotask(() {
_completer.complete(value);
});
_future._completeWithValue(value);
}
}

void completeError(e, [st]) {
if (isSync) {
_completer.completeError(e, st);
_future._completeError(e, st);
} else {
scheduleMicrotask(() {
_completer.completeError(e, st);
});
_future._asyncCompleteError(e, st);
}
}

Expand All @@ -50,8 +44,8 @@ class _AsyncAwaitCompleter<T> implements Completer<T> {
isSync = true;
}

Future<T> get future => _completer.future;
bool get isCompleted => _completer.isCompleted;
Future<T> get future => _future;
bool get isCompleted => !_future._mayComplete;
}

// We need to pass the value as first argument and leave the second and third
Expand Down Expand Up @@ -107,7 +101,7 @@ Future _awaitHelper(
// We can only do this for our internal futures (the default implementation of
// all futures that are constructed by the `dart:async` library).
object._awaiter = awaiter;
return object._thenNoZoneRegistration(thenCallback, errorCallback);
return object._thenAwait(thenCallback, errorCallback);
}

// Called as part of the 'await for (...)' construct. Registers the
Expand Down Expand Up @@ -143,7 +137,7 @@ class _AsyncStarStreamController<T> {
bool onListenReceived = false;
bool isScheduled = false;
bool isSuspendedAtYield = false;
Completer cancellationCompleter = null;
_Future cancellationFuture = null;

Stream<T> get stream {
final Stream<T> local = controller.stream;
Expand Down Expand Up @@ -210,10 +204,10 @@ class _AsyncStarStreamController<T> {
}

void addError(Object error, StackTrace stackTrace) {
if ((cancellationCompleter != null) && !cancellationCompleter.isCompleted) {
if ((cancellationFuture != null) && cancellationFuture._mayComplete) {
// If the stream has been cancelled, complete the cancellation future
// with the error.
cancellationCompleter.completeError(error, stackTrace);
cancellationFuture._completeError(error, stackTrace);
return;
}
// If stream is cancelled, tell caller to exit the async generator.
Expand All @@ -226,10 +220,10 @@ class _AsyncStarStreamController<T> {
}

close() {
if ((cancellationCompleter != null) && !cancellationCompleter.isCompleted) {
if ((cancellationFuture != null) && cancellationFuture._mayComplete) {
// If the stream has been cancelled, complete the cancellation future
// with the error.
cancellationCompleter.complete();
cancellationFuture._completeWithValue(null);
}
controller.close();
}
Expand Down Expand Up @@ -257,16 +251,16 @@ class _AsyncStarStreamController<T> {
if (controller.isClosed) {
return null;
}
if (cancellationCompleter == null) {
cancellationCompleter = new Completer();
if (cancellationFuture == null) {
cancellationFuture = new _Future();
// Only resume the generator if it is suspended at a yield.
// Cancellation does not affect an async generator that is
// suspended at an await.
if (isSuspendedAtYield) {
scheduleGenerator();
}
}
return cancellationCompleter.future;
return cancellationFuture;
}
}

Expand Down
4 changes: 2 additions & 2 deletions sdk/lib/_internal/js_dev_runtime/patch/async_patch.dart
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ _async<T>(Function() initGenerator) {
} else {
f = _Future.value(value);
}
f = JS('', '#', f._thenNoZoneRegistration(onValue, onError));
f = JS('', '#', f._thenAwait(onValue, onError));
return f;
}

Expand Down Expand Up @@ -363,7 +363,7 @@ class _AsyncStarImpl<T> {
} else {
f = _Future.value(value);
}
f._thenNoZoneRegistration(_runBodyCallback, handleError);
f._thenAwait(_runBodyCallback, handleError);
}

/// Adds element to [stream] and returns true if the caller should terminate
Expand Down
45 changes: 19 additions & 26 deletions sdk/lib/_internal/js_runtime/lib/async_patch.dart
Original file line number Diff line number Diff line change
Expand Up @@ -190,35 +190,29 @@ bool _hasTimer() {
}

class _AsyncAwaitCompleter<T> implements Completer<T> {
final _completer = new Completer<T>.sync();
final _future = new _Future<T>();
bool isSync;

_AsyncAwaitCompleter() : isSync = false;

void complete([FutureOr<T> value]) {
if (isSync) {
_completer.complete(value);
} else if (value is Future<T>) {
value.then(_completer.complete, onError: _completer.completeError);
if (!isSync || value is Future<T>) {
_future._asyncComplete(value);
} else {
scheduleMicrotask(() {
_completer.complete(value);
});
_future._completeWithValue(value);
}
}

void completeError(e, [st]) {
if (isSync) {
_completer.completeError(e, st);
_future._completeError(e, st);
} else {
scheduleMicrotask(() {
_completer.completeError(e, st);
});
_future._asyncCompleteError(e, st);
}
}

Future<T> get future => _completer.future;
bool get isCompleted => _completer.isCompleted;
Future<T> get future => _future;
bool get isCompleted => !_future._mayComplete;
}

/// Creates a Completer for an `async` function.
Expand Down Expand Up @@ -295,15 +289,14 @@ void _awaitOnObject(object, _WrappedAsyncBody bodyFunction) {
if (object is _Future) {
// We can skip the zone registration, since the bodyFunction is already
// registered (see [_wrapJsFunctionForAsync]).
object._thenNoZoneRegistration(thenCallback, errorCallback);
object._thenAwait(thenCallback, errorCallback);
} else if (object is Future) {
object.then(thenCallback, onError: errorCallback);
} else {
_Future future = new _Future();
future._setValue(object);
_Future future = new _Future().._setValue(object);
// We can skip the zone registration, since the bodyFunction is already
// registered (see [_wrapJsFunctionForAsync]).
future._thenNoZoneRegistration(thenCallback, null);
future._thenAwait(thenCallback, null);
}
}

Expand Down Expand Up @@ -381,15 +374,15 @@ void _asyncStarHelper(
if (identical(bodyFunctionOrErrorCode, async_error_codes.SUCCESS)) {
// This happens on return from the async* function.
if (controller.isCanceled) {
controller.cancelationCompleter.complete();
controller.cancelationFuture._completeWithValue(null);
} else {
controller.close();
}
return;
} else if (identical(bodyFunctionOrErrorCode, async_error_codes.ERROR)) {
// The error is a js-error.
if (controller.isCanceled) {
controller.cancelationCompleter.completeError(
controller.cancelationFuture._completeError(
unwrapException(object), getTraceFromException(object));
} else {
controller.addError(
Expand Down Expand Up @@ -465,13 +458,13 @@ class _AsyncStarStreamController<T> {

bool get isPaused => controller.isPaused;

Completer cancelationCompleter = null;
_Future cancelationFuture = null;

/// True after the StreamSubscription has been cancelled.
/// When this is true, errors thrown from the async* body should go to the
/// [cancelationCompleter] instead of adding them to [controller], and
/// returning from the async function should complete [cancelationCompleter].
bool get isCanceled => cancelationCompleter != null;
/// [cancelationFuture] instead of adding them to [controller], and
/// returning from the async function should complete [cancelationFuture].
bool get isCanceled => cancelationFuture != null;

add(event) => controller.add(event);

Expand Down Expand Up @@ -503,15 +496,15 @@ class _AsyncStarStreamController<T> {
}, onCancel: () {
// If the async* is finished we ignore cancel events.
if (!controller.isClosed) {
cancelationCompleter = new Completer();
cancelationFuture = new _Future();
if (isSuspended) {
// Resume the suspended async* function to run finalizers.
isSuspended = false;
scheduleMicrotask(() {
body(async_error_codes.STREAM_WAS_CANCELED, null);
});
}
return cancelationCompleter.future;
return cancelationFuture;
}
});
}
Expand Down
51 changes: 45 additions & 6 deletions sdk/lib/async/future_impl.dart
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,9 @@ class _FutureListener<S, T> {
static const int stateCatcherror = maskError;
static const int stateCatcherrorTest = maskError | maskTestError;
static const int stateWhencomplete = maskWhencomplete;
static const int maskType =
maskValue | maskError | maskTestError | maskWhencomplete;
static const int stateIsAwait = 16;
// Listeners on the same future are linked through this link.
_FutureListener _nextListener;
// The future to complete when this listener is activated.
Expand All @@ -84,6 +87,13 @@ class _FutureListener<S, T> {
errorCallback = errorCallback,
state = (errorCallback == null) ? stateThen : stateThenOnerror;

_FutureListener.thenAwait(
this.result, _FutureOnValue<S, T> onValue, Function errorCallback)
: callback = onValue,
errorCallback = errorCallback,
state = ((errorCallback == null) ? stateThen : stateThenOnerror)
| stateIsAwait ;

_FutureListener.catchError(this.result, this.errorCallback, this.callback)
: state = (callback == null) ? stateCatcherror : stateCatcherrorTest;

Expand All @@ -95,8 +105,9 @@ class _FutureListener<S, T> {

bool get handlesValue => (state & maskValue != 0);
bool get handlesError => (state & maskError != 0);
bool get hasErrorTest => (state == stateCatcherrorTest);
bool get handlesComplete => (state == stateWhencomplete);
bool get hasErrorTest => (state & maskType == stateCatcherrorTest);
bool get handlesComplete => (state & maskType == stateWhencomplete);
bool get isAwait => (state & stateIsAwait != 0);

_FutureOnValue<S, T> get _onValue {
assert(handlesValue);
Expand Down Expand Up @@ -229,6 +240,27 @@ class _Future<T> implements Future<T> {
bool get _isComplete => _state >= _stateValue;
bool get _hasError => _state == _stateError;

static List<Function> _continuationFunctions(_Future<Object> future) {
List<Function> result = null;
while (true) {
if (future._mayAddListener) return result;
assert(!future._isComplete);
assert(!future._isChained);
// So _resultOrListeners contains listeners.
_FutureListener<Object, Object> listener = future._resultOrListeners;
if (listener != null &&
listener._nextListener == null &&
listener.isAwait) {
(result ??= <Function>[]).add(listener.handleValue);
future = listener.result;
assert(!future._isComplete);
} else {
break;
}
}
return result;
}

void _setChained(_Future source) {
assert(_mayAddListener);
_state = _stateChained;
Expand All @@ -246,14 +278,21 @@ class _Future<T> implements Future<T> {
onError = _registerErrorHandler(onError, currentZone);
}
}
return _thenNoZoneRegistration<R>(f, onError);
_Future<R> result = new _Future<R>();
_addListener(new _FutureListener<T, R>.then(result, f, onError));
return result;
}

// This method is used by async/await.
Future<E> _thenNoZoneRegistration<E>(
/// Registers a system created result and error continuation.
///
/// Used by the implementation of `await` to listen to a future.
/// The system created liseners are not registered in the zone,
/// and the listener is marked as being from an `await`.
/// This marker is used in [_continuationFunctions].
Future<E> _thenAwait<E>(
FutureOr<E> f(T value), Function onError) {
_Future<E> result = new _Future<E>();
_addListener(new _FutureListener<T, E>.then(result, f, onError));
_addListener(new _FutureListener<T, E>.thenAwait(result, f, onError));
return result;
}

Expand Down

0 comments on commit 9dcd726

Please sign in to comment.