Skip to content

Commit

Permalink
Add StreamQueue.fork and ForkableStream.
Browse files Browse the repository at this point in the history
StramQueue.fork is a very useful operation for creating complex and composable
user-defined operations over stream queues. It allows arbitrary lookahead to be
performed without modifying the semantics of the original stream, providing for
higher-order operations like "check for this sequence of values or, if they
don't exist, this other distinct sequence".

Review URL: https://codereview.chromium.org//1241723003 .
  • Loading branch information
nex3 committed Jul 15, 2015
1 parent 5f90846 commit 312d396
Show file tree
Hide file tree
Showing 6 changed files with 1,083 additions and 4 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.md
Expand Up @@ -10,6 +10,9 @@
- Added `SubscriptionStream` which creates a single-subscription stream
from an existing stream subscription.

- Added `ForkableStream` which wraps a stream and allows independent forks to be
created that emit the same events as the original.

- Added a `ResultFuture` class for synchronously accessing the result of a
wrapped future.

Expand Down
1 change: 1 addition & 0 deletions lib/async.dart
Expand Up @@ -12,6 +12,7 @@ export "src/delegate/sink.dart";
export "src/delegate/stream_consumer.dart";
export "src/delegate/stream_sink.dart";
export "src/delegate/stream_subscription.dart";
export "src/forkable_stream.dart";
export "src/future_group.dart";
export "src/result_future.dart";
export "src/stream_completer.dart";
Expand Down
166 changes: 166 additions & 0 deletions lib/src/forkable_stream.dart
@@ -0,0 +1,166 @@
// Copyright (c) 2015, the Dart project authors. Please see the AUTHORS file
// for details. All rights reserved. Use of this source code is governed by a
// BSD-style license that can be found in the LICENSE file.

library async.forkable_stream;

import 'dart:async';

import 'stream_completer.dart';

/// A single-subscription stream from which other streams may be forked off at
/// the current position.
///
/// This adds an operation, [fork], which produces a new stream that
/// independently emits the same events as this stream. Unlike the branches
/// produced by [StreamSplitter], a fork only emits events that arrive *after*
/// the call to [fork].
///
/// Each fork can be paused or canceled independently of one another and of this
/// stream. The underlying stream will be listened to once any branch is
/// listened to. It will be paused when all branches are paused or not yet
/// listened to. It will be canceled when all branches have been listened to and
/// then canceled.
class ForkableStream<T> extends StreamView<T> {
/// The underlying stream.
final Stream _sourceStream;

/// The subscription to [_sourceStream].
///
/// This will be `null` until this stream or any of its forks are listened to.
StreamSubscription _subscription;

/// Whether this has been cancelled and no more forks may be created.
bool _isCanceled = false;

/// The controllers for any branches that have not yet been canceled.
///
/// This includes a controller for this stream, until that has been cancelled.
final _controllers = new Set<StreamController<T>>();

/// Creates a new forkable stream wrapping [sourceStream].
ForkableStream(Stream sourceStream)
// Use a completer here so that we can provide its stream to the
// superclass constructor while also adding the stream controller to
// [_controllers].
: this._(sourceStream, new StreamCompleter());

ForkableStream._(this._sourceStream, StreamCompleter completer)
: super(completer.stream) {
completer.setSourceStream(_fork(primary: true));
}

/// Creates a new fork of this stream.
///
/// From this point forward, the fork will emit the same events as this
/// stream. It will *not* emit any events that have already been emitted by
/// this stream. The fork is independent of this stream, which means each one
/// may be paused or canceled without affecting the other.
///
/// If this stream is done or its subscription has been canceled, this returns
/// an empty stream.
Stream<T> fork() => _fork(primary: false);

/// Creates a stream forwarding [_sourceStream].
///
/// If [primary] is true, this is the stream underlying this object;
/// otherwise, it's a fork. The only difference is that when the primary
/// stream is canceled, [fork] starts throwing [StateError]s.
Stream<T> _fork({bool primary: false}) {
if (_isCanceled) {
var controller = new StreamController<T>()..close();
return controller.stream;
}

var controller;
controller = new StreamController<T>(
onListen: () => _onListenOrResume(controller),
onCancel: () => _onCancel(controller, primary: primary),
onPause: () => _onPause(controller),
onResume: () => _onListenOrResume(controller),
sync: true);

_controllers.add(controller);

return controller.stream;
}

/// The callback called when `onListen` or `onResume` is called for the branch
/// managed by [controller].
///
/// This ensures that we're subscribed to [_sourceStream] and that the
/// subscription isn't paused.
void _onListenOrResume(StreamController<T> controller) {
if (controller.isClosed) return;
if (_subscription == null) {
_subscription =
_sourceStream.listen(_onData, onError: _onError, onDone: _onDone);
} else {
_subscription.resume();
}
}

/// The callback called when `onCancel` is called for the branch managed by
/// [controller].
///
/// This cancels or pauses the underlying subscription as necessary. If
/// [primary] is true, it also ensures that future calls to [fork] throw
/// [StateError]s.
Future _onCancel(StreamController<T> controller, {bool primary: false}) {
if (primary) _isCanceled = true;

if (controller.isClosed) return null;
_controllers.remove(controller);

if (_controllers.isEmpty) return _subscription.cancel();

_onPause(controller);
return null;
}

/// The callback called when `onPause` is called for the branch managed by
/// [controller].
///
/// This pauses the underlying subscription if necessary.
void _onPause(StreamController<T> controller) {
if (controller.isClosed) return;
if (_subscription.isPaused) return;
if (_controllers.any((controller) =>
controller.hasListener && !controller.isPaused)) {
return;
}

_subscription.pause();
}

/// Forwards data events to all branches.
void _onData(value) {
// Don't iterate directly over the set because [controller.add] might cause
// it to be modified synchronously.
for (var controller in _controllers.toList()) {
controller.add(value);
}
}

/// Forwards error events to all branches.
void _onError(error, StackTrace stackTrace) {
// Don't iterate directly over the set because [controller.addError] might
// cause it to be modified synchronously.
for (var controller in _controllers.toList()) {
controller.addError(error, stackTrace);
}
}

/// Forwards close events to all branches.
void _onDone() {
_isCanceled = true;

// Don't iterate directly over the set because [controller.close] might
// cause it to be modified synchronously.
for (var controller in _controllers.toList()) {
controller.close();
}
_controllers.clear();
}
}

76 changes: 72 additions & 4 deletions lib/src/stream_queue.dart
Expand Up @@ -7,6 +7,7 @@ library async.stream_events;
import 'dart:async';
import 'dart:collection';

import "forkable_stream.dart";
import "subscription_stream.dart";
import "stream_completer.dart";
import "../result.dart";
Expand Down Expand Up @@ -78,7 +79,7 @@ class StreamQueue<T> {
// by the content of the fifth event.

/// Source of events.
final Stream _sourceStream;
final ForkableStream _sourceStream;

/// Subscription on [_sourceStream] while listening for events.
///
Expand All @@ -104,7 +105,9 @@ class StreamQueue<T> {

/// Create a `StreamQueue` of the events of [source].
StreamQueue(Stream source)
: _sourceStream = source;
: _sourceStream = source is ForkableStream
? source
: new ForkableStream(source);

/// Asks if the stream has any more events.
///
Expand Down Expand Up @@ -216,6 +219,22 @@ class StreamQueue<T> {
throw _failClosed();
}

/// Creates a new stream queue in the same position as this one.
///
/// The fork is subscribed to the same underlying stream as this queue, but
/// it's otherwise wholly independent. If requests are made on one, they don't
/// move the other forward; if one is closed, the other is still open.
///
/// The underlying stream will only be paused when all forks have no
/// outstanding requests, and only canceled when all forks are canceled.
StreamQueue<T> fork() {
if (_isClosed) throw _failClosed();

var request = new _ForkRequest<T>(this);
_addRequest(request);
return request.queue;
}

/// Cancels the underlying stream subscription.
///
/// If [immediate] is `false` (the default), the cancel operation waits until
Expand All @@ -236,14 +255,15 @@ class StreamQueue<T> {
if (_isClosed) throw _failClosed();
_isClosed = true;

if (_isDone) return new Future.value();
if (_subscription == null) _subscription = _sourceStream.listen(null);

if (!immediate) {
var request = new _CancelRequest(this);
_addRequest(request);
return request.future;
}

if (_isDone) return new Future.value();
if (_subscription == null) _subscription = _sourceStream.listen(null);
var future = _subscription.cancel();
_onDone();
return future;
Expand Down Expand Up @@ -333,6 +353,7 @@ class StreamQueue<T> {
return;
}
}

if (!_isDone) {
_subscription.pause();
}
Expand Down Expand Up @@ -628,3 +649,50 @@ class _HasNextRequest<T> implements _EventRequest {
_completer.complete(false);
}
}

/// Request for a [StreamQueue.fork] call.
class _ForkRequest<T> implements _EventRequest {
/// Completer for the stream used by the queue by the `fork` call.
StreamCompleter _completer;

StreamQueue<T> queue;

/// The [StreamQueue] object that has this request queued.
final StreamQueue _streamQueue;

_ForkRequest(this._streamQueue) {
_completer = new StreamCompleter<T>();
queue = new StreamQueue<T>(_completer.stream);
}

bool addEvents(Queue<Result> events) {
_completeStream(events);
return true;
}

void close(Queue<Result> events) {
_completeStream(events);
}

void _completeStream(Queue<Result> events) {
if (events.isEmpty) {
if (_streamQueue._isDone) {
_completer.setEmpty();
} else {
_completer.setSourceStream(_streamQueue._sourceStream.fork());
}
} else {
// There are prefetched events which need to be added before the
// remaining stream.
var controller = new StreamController<T>();
for (var event in events) {
event.addTo(controller);
}

var fork = _streamQueue._sourceStream.fork();
controller.addStream(fork, cancelOnError: false)
.whenComplete(controller.close);
_completer.setSourceStream(controller.stream);
}
}
}

0 comments on commit 312d396

Please sign in to comment.