Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Convenience layer over isolates for function computation (e.g. streaming computations) #48579

Open
mkustermann opened this issue Mar 16, 2022 · 8 comments
Labels
area-vm Use area-vm for VM related issues, including code coverage, FFI, and the AOT and JIT backends. library-isolate

Comments

@mkustermann
Copy link
Member

mkustermann commented Mar 16, 2022

Now that we have lightweight isolates (#36097) we should consider adding convenience abstraction layers on top of the bare-minimum Isolate class itself.

The change in 05322f2 added one specific abstraction, namely Isolate.run<T>() - allowing one to compute one function and get the result back.

One may want other kinds of helpers, e.g.

  • Run a computation in an isolate and stream the result back (e.g. Isolate.stream()), with cancellation support
  • Have typed communication channels for writer and reader (typed wrappers around ReceivePort / SendPort)
  • Have way to specify buffer sizes
  • Have nice way to receive individual messages (akin to StreamIterator but with configurable buffer/pause signal)
  • Have mechanism to setup two-way communication channel

The question is: Would we want to have such convenience layers as part of more methods on the Isolate class itself or rather build on top, possibly in a package. If the latter: Why have Isolate.run() as part of core libraries?

/cc @mit-mit @lrhn

@mkustermann mkustermann added area-vm Use area-vm for VM related issues, including code coverage, FFI, and the AOT and JIT backends. library-isolate labels Mar 16, 2022
@lrhn
Copy link
Member

lrhn commented Mar 16, 2022

We could resurrect package:isolate. We may want to if isolates become popular again.

I think Isolate.run is fine in the platform libraries, it really is the bare minimum API for running simple Dart code in another isolate. Everything else can be built on top of that, and I expect users to do so, rather than using Isolate.spawn directly (that's a very low-level API).

We can do an Isolate.runStream function too. It's probably something we should do in the platform libraries, because handling back-pressure is non-trivial (using Isolate.pause to respond to Stream.pause is tricky). With those, we have the basic async abstractions (Future/Stream) covered, and people should be able to do the rest on top of that.

@lrhn
Copy link
Member

lrhn commented Mar 16, 2022

I'd suggest something like

static Stream<T> stream<T>(Stream<T> Function() computation, {String? debugName}) => ...

which also forwards flow back pressure commands (pause/resume/cancel) in the other direction.

The Isolate.run and Isolate.stream functions would be the interface between the async/async* language features and isolates.

I don't want to include buffering in this function. I'd rather make a buffering abstraction that you can wrap each end in if you want to. (I don't think buffering is something that works in general for arbitrary streams, it needs specific knowledge about the relation between events. A Stream<List<int>> may want to do different chunking of the lists, a Stream<MyEvent> doesn't understand that.)

(It's possible to use Isolate.pause/Isolate.kill as back pressure instead, but that doesn't feel right if the computation returns a stream. In that case, the stream's pushback should be respected.)
If the API was not stream-based, say

static Stream<T> streamEvents<T>(void Function(void Function(T) emit) computation), ...)

then using isolate based control would be reasonable. I don't think I'd want something like this in the SDK.)

@mkustermann
Copy link
Member Author

(It's possible to use Isolate.pause/Isolate.kill as back pressure instead, but that doesn't feel right if the computation returns a stream. In that case, the stream's pushback should be respected.)

Agree.

static Stream<T> stream<T>(Stream<T> Function() computation, {String? debugName}) => ...

👍

I don't want to include buffering in this function.

The reason I've mentioned it is that I think often one may stream elements that are small and frequent. Overhead may be dominated by communication (rather than message sizes themselves).

The pause signal would not be synchronous - the producer isolate will forward controller.add() elements to the consumer and would need to wait until consumer has signaled back it has received it and whether to pause or not. This is a rather high inter-isolate message overhead if done by-default (2x number of messages + significant latency). This frequent pause/resume would also bubble upstream to the Stream<T> computation() function.

On the other hand, if one doesn't wait for received-signal from consumer, there will be a buffer (namely the event loop) - which we don't want to grow unboundedly.

So for performance it would be good to avoid frequent pause/resumes which are significantly more expensive between isolates than inside one isolate and at the same time avoid unbounded growth of event loop buffer. That's why I think it may be a good idea to consider buffering in the design here.

The buffering should preferably be configurable - e.g. support for specialized buffer sizes, so if one has a Stream<Uint8List>, the size of the buffer isn't the number of elements, but the number of bytes.

@lrhn
Copy link
Member

lrhn commented Mar 18, 2022

That makes sense. A buffering to prevent eagerly pausing/resuming can be built into this operation, so that if you pause, the remote end is not notified until at least n elements have been received.

That's still something that can be implemented entirely on the receiving side, like

extension StreamBuffer<T> on Stream<T> {
  /// Delays pause/resume requests on this stream by buffering events.
  ///
  /// When the returned stream is paused, the source stream is not immediately
  /// paused. Only when the buffer has filled up will the pause be forwarded to the source.
  /// After that, if the returned stream is resumed while there are still buffered events,
  /// the source stream is not resumed until all buffered events have been delivered.
  Stream<T> buffer(int size) { ... }
}

It doesn't have to be built into the Isolate.stream function. It can be (passing a non-zero buffer size just means we return .buffer(size) of the stream we'd otherwise have returned).

Or we can configure the buffering by introducing a Buffer interface, but that's a bigger design task (notifications in both ends on when you can add/remove again.)

@mmcdon20
Copy link

Since it appears that Isolate.run is coming in 2.19, is there any chance we get an Isolate.stream as well?

@mkustermann
Copy link
Member Author

(@kevmoo pointed to https://github.com/sourcegraph/conc for concurrency abstractions used in Go, we could use as an inspiration)

@lrhn
Copy link
Member

lrhn commented Jan 12, 2023

I can definitely do an Isolate.stream like:

/// Runs [computation] in a new isolate and sends the stream events back.
///
/// Returns a stream which gets events from [computation], which is running in
/// a new isolate. Pausing, resuming and cancelling the stream subscription
/// is forwarded to the isolate running [computation].
/// The isolate terminates when the stream ends or the subscription is cancelled.
/// The returned stream also ends if the isolate terminates early,
/// and it emits extra errors if the isolate of [computation] has any uncaught 
/// errors.
Stream<T> stream(Stream<T> Function() computation);

I can see some possible variants of such a function:

  • Isolate is started when stream is called. (Dangerous, you must listen to returned stream.)
  • Isolate is started when returned stream is listened to. (Higher latency before first event.)

There are also different ways to handle pause/resume/cancel:

  • A pause/resume/cancel on the returned stream's subscription is forwarded to the other isolate using port messages.
  • A pause/resume/cancel is implemented by doing Isolate.pause/Isolate.resume/Isolate.kill.

I guess we can implement all combinations and provide flags to choose, or we can be opinionated and give you just one.

import "dart:isolate";
import "dart:async";

Stream<T> stream<T>(Stream<T> Function() computation) =>
    Stream<T>.multi((c) async {
      // Set if subscription.cancel is called.
      Completer<void>? cancelCompleter;
      var eventPort = RawReceivePort();
      SendPort? controlPortForCancel;
      c.onCancel = () {
        var completer = cancelCompleter = Completer<void>();
        controlPortForCancel?.send("cancel");
        return completer.future;
      };
      eventPort.handler = (firstMessage) {
        var controlPort = controlPortForCancel = firstMessage as SendPort;
        eventPort.handler = (message) {
          switch (message) {
            // Stream events.
            case (T value,):
              c.add(value);
            case (Object e, StackTrace s):
              c.addError(e, s);
            // Subscription cancel future results.
            case null:
              eventPort.close();
              cancelCompleter?.complete(null);
            case (Object e, StackTrace s, null):
              eventPort.close();
              cancelCompleter?.completeError(e, s);
            // Wat?
            default:
              c.addError(UnsupportedError("Unknown message: $message"));
          }
        };
        if (!c.hasListener) {
          assert(cancelCompleter != null);
          controlPort.send("cancel");
          return;
        }
        if (c.isPaused) {
          controlPort.send("pause");
        }
        c
          ..onPause = () {
            controlPort.send("pause");
          }
          ..onResume = () {
            controlPort.send("resume");
          }
          ..onCancel = () {
            var completer = cancelCompleter = Completer<void>();
            controlPort.send("cancel");
            return completer.future;
          };
      };
      try {
        // Remote run ends when stream completes or isolate dies.
        await Isolate.run(_remote<T>(computation, eventPort.sendPort));
      } catch (e, s) {
        // Isolate terminated with an error.
        c.addError(e, s);
      }
      eventPort.close();
      c.close();
    });

Future<void> Function() _remote<T>(
        Stream<T> Function() computation, SendPort eventPort) =>
    () {
      var done = Completer<void>();
      var stream = computation();
      var controlPort = RawReceivePort();
      eventPort.send(controlPort.sendPort);
      var subscription = stream.listen((value) {
        eventPort.send((value,));
      }, onError: (Object e, StackTrace s) {
        eventPort.send((e, s));
      }, onDone: () {
        controlPort.close();
        done.complete();
      });
      controlPort.handler = (message) {
        switch (message) {
          case "pause":
            subscription.pause();
          case "resume":
            subscription.resume();
          case "cancel":
            controlPort.close();
            unawaited(subscription.cancel().then<Never>((_) {
              Isolate.exit(eventPort, null);
            }, onError: (Object e, StackTrace s) {
              Isolate.exit(eventPort, (e, s, null));
            }));
        }
      };
      return done.future;
    };


void main() async {
  await for (var i in numbers()) {
    print(i);
    if (i == 5) break;
  }
}

Stream<int> numbers() => Stream.periodic(const Duration(seconds: 1), (i) => i);

It's not a deep problem, it's just about figuring out which messages to send and what to do with them.

@lrhn
Copy link
Member

lrhn commented May 26, 2023

One feature which could make distributed message passing more viable, would be receive ports which won't keep the isolate alive. (Maybe even be GC'able while open, if nobody keeps a reference to it.)

A multi-isolate Channel communication primitive, with push/pull operations, is more viable if one of the isolates can just stop caring, without having to actively close their end. (But maybe it's possible to do something clever with Finalizer, to auto-close a "channel" of nobody will use it again.)

It'd probably still need one primary isolate to keep the buffer, and stay alive, to avoid multiple listeners consuming the same value.

Another useful feature would be to ask a send port whether its receive port is still listening, other than waiting for a timeout. Again, maybe something can be built to keep a bunch of ports aware of each other. It just sounds like something which can be very fragile if assumptions change.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area-vm Use area-vm for VM related issues, including code coverage, FFI, and the AOT and JIT backends. library-isolate
Projects
None yet
Development

No branches or pull requests

3 participants