Skip to content

Commit

Permalink
Add Isolate.run.
Browse files Browse the repository at this point in the history
Change-Id: I049f6b1bf684ed28b6f14d380adfadf9f4e97fcb
Reviewed-on: https://dart-review.googlesource.com/c/sdk/+/217008
Reviewed-by: Nate Bosch <nbosch@google.com>
Reviewed-by: Michael Thomsen <mit@google.com>
Commit-Queue: Lasse Nielsen <lrn@google.com>
  • Loading branch information
lrhn authored and Commit Bot committed Mar 3, 2022
1 parent c8950a9 commit 05322f2
Show file tree
Hide file tree
Showing 4 changed files with 399 additions and 4 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,10 @@
- Deprecate `SecureSocket.renegotiate` and `RawSecureSocket.renegotiate`,
which were no-ops.

#### `dart:isolate`

- Add `Isolate.run` to run a function in a new isolate.

### Tools

#### Dart command line
Expand Down
167 changes: 163 additions & 4 deletions sdk/lib/isolate/isolate.dart
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,106 @@ class Isolate {
/// inspect the isolate and see uncaught errors or when it terminates.
Isolate(this.controlPort, {this.pauseCapability, this.terminateCapability});

/// Runs [computation] in a new isolate and returns the result.
///
/// ```dart
/// int slowFib(int n) =>
/// n <= 1 ? 1 : slowFib(n - 1) + slowFib(n - 2);
///
/// // Compute without blocking current isolate.
/// var fib40 = await Isolate.run(() => slowFib(40));
/// ```
///
/// If [computation] is asynchronous (returns a `Future<R>`) then
/// that future is awaited in the new isolate, completing the entire
/// asynchronous computation, before returning the result.
///
/// ```dart
/// int slowFib(int n) =>
/// n <= 1 ? 1 : slowFib(n - 1) + slowFib(n - 2);
/// Stream<int> fibStream() async* {
/// for (var i = 0;; i++) yield slowFib(i);
/// }
///
/// // Returns `Future<int>`.
/// var fib40 = await Isolate.run(() => fibStream().elementAt(40));
/// ```
///
/// If [computation] throws, the isolate is terminated and this
/// function throws the same error.
///
/// ```dart import:convert
/// Future<int> eventualError() async {
/// await Future.delayed(const Duration(seconds: 1));
/// throw StateError("In a bad state!");
/// }
///
/// try {
/// await Isolate.run(eventualError);
/// } on StateError catch (e, s) {
/// print(e.message); // In a bad state!
/// print(LineSplitter.split("$s").first); // Contains "eventualError"
/// }
/// ```
/// Any uncaught asynchronous errors will terminate the computation as well,
/// but will be reported as a [RemoteError] because [addErrorListener]
/// does not provide the original error object.
///
/// The result is sent using [exit], which means it's sent to this
/// isolate without copying.
///
/// The [computation] function and its result (or error) must be
/// sendable between isolates.
///
/// The [debugName] is only used to name the new isolate for debugging.
@Since("2.17")
static Future<R> run<R>(FutureOr<R> computation(), {String? debugName}) {
var result = Completer<R>();
var resultPort = RawReceivePort();
resultPort.handler = (response) {
resultPort.close();
if (response == null) {
// onExit handler message, isolate terminated without sending result.
result.completeError(
RemoteError("Computation ended without result", ""),
StackTrace.empty);
return;
}
var list = response as List<Object?>;
if (list.length == 2) {
var remoteError = list[0];
var remoteStack = list[1];
if (remoteStack is StackTrace) {
// Typed error.
result.completeError(remoteError!, remoteStack);
} else {
// onError handler message, uncaught async error.
// Both values are strings, so calling `toString` is efficient.
var error =
RemoteError(remoteError.toString(), remoteStack.toString());
result.completeError(error, error.stackTrace);
}
} else {
assert(list.length == 1);
result.complete(list[0] as R);
}
};
try {
Isolate.spawn(_RemoteRunner._remoteExecute,
_RemoteRunner<R>(computation, resultPort.sendPort),
onError: resultPort.sendPort,
onExit: resultPort.sendPort,
errorsAreFatal: true,
debugName: debugName)
.then<void>((_) {}, onError: result.completeError);
} on Object {
// Sending the computation failed.
resultPort.close();
rethrow;
}
return result.future;
}

/// An [Isolate] object representing the current isolate.
///
/// The current isolate for code using [current]
Expand Down Expand Up @@ -345,7 +445,7 @@ class Isolate {
/// of the isolate identified by [controlPort],
/// the pause request is ignored by the receiving isolate.
Capability pause([Capability? resumeCapability]) {
resumeCapability ??= new Capability();
resumeCapability ??= Capability();
_pause(resumeCapability);
return resumeCapability;
}
Expand Down Expand Up @@ -533,12 +633,12 @@ class Isolate {
var listMessage = message as List<Object?>;
var errorDescription = listMessage[0] as String;
var stackDescription = listMessage[1] as String;
var error = new RemoteError(errorDescription, stackDescription);
var error = RemoteError(errorDescription, stackDescription);
controller.addError(error, error.stackTrace);
}

controller.onListen = () {
RawReceivePort receivePort = new RawReceivePort(handleError);
RawReceivePort receivePort = RawReceivePort(handleError);
port = receivePort;
this.addErrorListener(receivePort.sendPort);
};
Expand Down Expand Up @@ -765,7 +865,7 @@ class RemoteError implements Error {
final StackTrace stackTrace;
RemoteError(String description, String stackDescription)
: _description = description,
stackTrace = new StackTrace.fromString(stackDescription);
stackTrace = StackTrace.fromString(stackDescription);
String toString() => _description;
}

Expand Down Expand Up @@ -795,3 +895,62 @@ abstract class TransferableTypedData {
/// transferable bytes, even if the calls occur in different isolates.
ByteBuffer materialize();
}

/// Parameter object used by [Isolate.run].
///
/// The [_remoteExecute] function is run in a new isolate with a
/// [_RemoteRunner] object as argument.
class _RemoteRunner<R> {
/// User computation to run.
final FutureOr<R> Function() computation;

/// Port to send isolate computation result on.
///
/// Only one object is ever sent on this port.
/// If the value is `null`, it is sent by the isolate's "on-exit" handler
/// when the isolate terminates without otherwise sending value.
/// If the value is a list with one element,
/// then it is the result value of the computation.
/// Otherwise it is a list with two elements representing an error.
/// If the error is sent by the isolate's "on-error" uncaught error handler,
/// then the list contains two strings. This also terminates the isolate.
/// If sent manually by this class, after capturing the error,
/// the list contains one non-`null` [Object] and one [StackTrace].
final SendPort resultPort;

_RemoteRunner(this.computation, this.resultPort);

/// Run in a new isolate to get the result of [computation].
///
/// The result is sent back on [resultPort] as a single-element list.
/// A two-element list sent on the same port is an error result.
/// When sent by this function, it's always an object and a [StackTrace].
/// (The same port listens on uncaught errors from the isolate, which
/// sends two-element lists containing [String]s instead).
static void _remoteExecute(_RemoteRunner<Object?> runner) {
runner._run();
}

void _run() async {
R result;
try {
var potentiallyAsyncResult = computation();
if (potentiallyAsyncResult is Future<R>) {
result = await potentiallyAsyncResult;
} else {
result = potentiallyAsyncResult as R;
}
} catch (e, s) {
// If sending fails, the error becomes an uncaught error.
Isolate.exit(resultPort, _list2(e, s));
}
Isolate.exit(resultPort, _list1(result));
}

/// Helper function to create a one-element non-growable list.
static List<Object?> _list1(Object? value) => List.filled(1, value);

/// Helper function to create a two-element non-growable list.
static List<Object?> _list2(Object? value1, Object? value2) =>
List.filled(2, value1)..[1] = value2;
}
115 changes: 115 additions & 0 deletions tests/lib/isolate/isolate_run_test.dart
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
// Copyright (c) 2022, the Dart project authors. Please see the AUTHORS file
// for details. All rights reserved. Use of this source code is governed by a
// BSD-style license that can be found in the LICENSE file.

import 'dart:isolate';
import 'dart:async';
import 'package:async_helper/async_helper.dart';
import 'package:expect/expect.dart';

void main() async {
asyncStart();
// Sending result back.
await testValue();
await testAsyncValue();
// Sending error from computation back.
await testError();
await testAsyncError();
// Sending uncaught async error back.
await testUncaughtError();
// Not sending anything back before isolate dies.
await testIsolateHangs();
await testIsolateKilled();
await testIsolateExits();
asyncEnd();
}

final StackTrace stack = StackTrace.fromString("Known Stacktrace");
final ArgumentError error = ArgumentError.value(42, "name");

var variable = 0;

Future<void> testValue() async {
var value = await Isolate.run<int>(() {
variable = 1; // Changed in other isolate!
Expect.equals(1, variable);
return 42;
});
Expect.equals(42, value);
Expect.equals(0, variable);
}

Future<void> testAsyncValue() async {
var value = await Isolate.run<int>(() async {
variable = 1;
return 42;
});
Expect.equals(42, value);
Expect.equals(0, variable);
}

Future<void> testError() async {
var e = await asyncExpectThrows<ArgumentError>(Isolate.run<int>(() {
variable = 1;
Error.throwWithStackTrace(error, stack);
}));
Expect.equals(42, e.invalidValue);
Expect.equals("name", e.name);
Expect.equals(0, variable);
}

Future<void> testAsyncError() async {
var e = await asyncExpectThrows<ArgumentError>(Isolate.run<int>(() async {
variable = 1;
Error.throwWithStackTrace(error, stack);
}));
Expect.equals(42, e.invalidValue);
Expect.equals("name", e.name);
Expect.equals(0, variable);
}

Future<void> testUncaughtError() async {
var e = await asyncExpectThrows<RemoteError>(Isolate.run<int>(() async {
variable = 1;
unawaited(Future.error(error, stack)); // Uncaught error
await Completer().future; // Never completes.
return -1;
}));

Expect.type<RemoteError>(e);
Expect.equals(error.toString(), e.toString());
Expect.equals(0, variable);
}

Future<void> testIsolateHangs() async {
var e = await asyncExpectThrows<RemoteError>(Isolate.run<int>(() async {
variable = 1;
await Completer<Never>().future; // Never completes.
// Isolate should end while hanging here, because its event loop is empty.
}));
Expect.type<RemoteError>(e);
Expect.equals("Computation ended without result", e.toString());
Expect.equals(0, variable);
}

Future<void> testIsolateKilled() async {
var e = await asyncExpectThrows<RemoteError>(Isolate.run<int>(() async {
variable = 1;
Isolate.current.kill(); // Send kill request.
await Completer<Never>().future; // Never completes.
// Isolate should get killed while hanging here.
}));
Expect.type<RemoteError>(e);
Expect.equals("Computation ended without result", e.toString());
Expect.equals(0, variable);
}

Future<void> testIsolateExits() async {
var e = await asyncExpectThrows<RemoteError>(Isolate.run<int>(() async {
variable = 1;
Isolate.exit(); // Dies here without sending anything back.
}));
Expect.type<RemoteError>(e);
Expect.equals("Computation ended without result", e.toString());
Expect.equals(0, variable);
}
Loading

0 comments on commit 05322f2

Please sign in to comment.