Skip to content

Commit

Permalink
Add StreamChannel.withGuarantees and StreamChannelController.
Browse files Browse the repository at this point in the history
These APIs make it easier for users to create their own stream channels
that follow the StreamChannel guarantees.

R=tjblasi@google.com

Review URL: https://codereview.chromium.org//1662773003 .
  • Loading branch information
nex3 committed Feb 4, 2016
1 parent 32c090b commit 66a45ff
Show file tree
Hide file tree
Showing 7 changed files with 439 additions and 2 deletions.
8 changes: 8 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,11 @@
## 1.2.0

* Add `new StreamChannel.withGuarantees()`, which creates a channel with extra
wrapping to ensure that it obeys the stream channel guarantees.

* Add `StreamChannelController`, which can be used to create custom
`StreamChannel` objects.

## 1.1.1

* Fix the type annotation for `StreamChannel.transform()`'s parameter.
Expand Down
163 changes: 163 additions & 0 deletions lib/src/guarantee_channel.dart
Original file line number Diff line number Diff line change
@@ -0,0 +1,163 @@
// Copyright (c) 2016, the Dart project authors. Please see the AUTHORS file
// for details. All rights reserved. Use of this source code is governed by a
// BSD-style license that can be found in the LICENSE file.

import 'dart:async';

import 'package:async/async.dart';

import '../stream_channel.dart';

/// A [StreamChannel] that enforces the stream channel guarantees.
///
/// This is exposed via [new StreamChannel.withGuarantees].
class GuaranteeChannel<T> extends StreamChannelMixin<T> {
Stream<T> get stream => _streamController.stream;

StreamSink<T> get sink => _sink;
_GuaranteeSink<T> _sink;

/// The controller for [stream].
///
/// This intermediate controller allows us to continue listening for a done
/// event even after the user has canceled their subscription, and to send our
/// own done event when the sink is closed.
StreamController<T> _streamController;

/// The subscription to the inner stream.
StreamSubscription<T> _subscription;

/// Whether the sink has closed, causing the underlying channel to disconnect.
bool _disconnected = false;

GuaranteeChannel(Stream<T> innerStream, StreamSink<T> innerSink) {
_sink = new _GuaranteeSink<T>(innerSink, this);

// Enforce the single-subscription guarantee by changing a broadcast stream
// to single-subscription.
if (innerStream.isBroadcast) {
innerStream = innerStream.transform(
const SingleSubscriptionTransformer());
}

_streamController = new StreamController<T>(onListen: () {
// If the sink has disconnected, we've already called
// [_streamController.close].
if (_disconnected) return;

_subscription = innerStream.listen(_streamController.add,
onError: _streamController.addError,
onDone: () {
_sink._onStreamDisconnected();
_streamController.close();
});
}, sync: true);
}

/// Called by [_GuaranteeSink] when the user closes it.
///
/// The sink closing indicates that the connection is closed, so the stream
/// should stop emitting events.
void _onSinkDisconnected() {
_disconnected = true;
if (_subscription != null) _subscription.cancel();
_streamController.close();
}
}

/// The sink for [GuaranteeChannel].
///
/// This wraps the inner sink to ignore events and cancel any in-progress
/// [addStream] calls when the underlying channel closes.
class _GuaranteeSink<T> implements StreamSink<T> {
/// The inner sink being wrapped.
final StreamSink<T> _inner;

/// The [GuaranteeChannel] this belongs to.
final GuaranteeChannel<T> _channel;

Future get done => _inner.done;

/// Whether the stream has emitted a done event, causing the underlying
/// channel to disconnect.
bool _disconnected = false;

/// Whether the user has called [close].
bool _closed = false;

/// The subscription to the stream passed to [addStream], if a stream is
/// currently being added.
StreamSubscription<T> _addStreamSubscription;

/// The completer for the future returned by [addStream], if a stream is
/// currently being added.
Completer _addStreamCompleter;

/// Whether we're currently adding a stream with [addStream].
bool get _inAddStream => _addStreamSubscription != null;

_GuaranteeSink(this._inner, this._channel);

void add(T data) {
if (_closed) throw new StateError("Cannot add event after closing.");
if (_inAddStream) {
throw new StateError("Cannot add event while adding stream.");
}
if (_disconnected) return;

_inner.add(data);
}

void addError(error, [StackTrace stackTrace]) {
if (_closed) throw new StateError("Cannot add event after closing.");
if (_inAddStream) {
throw new StateError("Cannot add event while adding stream.");
}
if (_disconnected) return;

_inner.addError(error, stackTrace);
}

Future addStream(Stream<T> stream) {
if (_closed) throw new StateError("Cannot add stream after closing.");
if (_inAddStream) {
throw new StateError("Cannot add stream while adding stream.");
}
if (_disconnected) return new Future.value();

_addStreamCompleter = new Completer.sync();
_addStreamSubscription = stream.listen(
_inner.add,
onError: _inner.addError,
onDone: _addStreamCompleter.complete);
return _addStreamCompleter.future.then((_) {
_addStreamCompleter = null;
_addStreamSubscription = null;
});
}

Future close() {
if (_inAddStream) {
throw new StateError("Cannot close sink while adding stream.");
}

_closed = true;
if (_disconnected) return new Future.value();

_channel._onSinkDisconnected();
return _inner.close();
}

/// Called by [GuaranteeChannel] when the stream emits a done event.
///
/// The stream being done indicates that the connection is closed, so the
/// sink should stop forwarding events.
void _onStreamDisconnected() {
_disconnected = true;
if (!_inAddStream) return;

_addStreamCompleter.complete(_addStreamSubscription.cancel());
_addStreamCompleter = null;
_addStreamSubscription = null;
}
}
58 changes: 58 additions & 0 deletions lib/src/stream_channel_controller.dart
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
// Copyright (c) 2016, the Dart project authors. Please see the AUTHORS file
// for details. All rights reserved. Use of this source code is governed by a
// BSD-style license that can be found in the LICENSE file.

import 'dart:async';

import '../stream_channel.dart';

/// A controller for exposing a new [StreamChannel].
///
/// This exposes two connected [StreamChannel]s, [local] and [foreign]. The
/// user's code should use [local] to emit and receive events. Then [foreign]
/// can be returned for others to use. For example, here's a simplified version
/// of the implementation of [new IsolateChannel]:
///
/// ```dart
/// StreamChannel isolateChannel(ReceivePort receivePort, SendPort sendPort) {
/// var controller = new StreamChannelController();
///
/// // Pipe all events from the receive port into the local sink...
/// receivePort.pipe(controller.local.sink);
///
/// // ...and all events from the local stream into the send port.
/// controller.local.listen(sendPort.add, onDone: receivePort.close);
///
/// // Then return the foreign controller for your users to use.
/// return controller.foreign;
/// }
/// ```
class StreamChannelController<T> {
/// The local channel.
///
/// This channel should be used directly by the creator of this
/// [StreamChannelController] to send and receive events.
StreamChannel<T> get local => _local;
StreamChannel<T> _local;

/// The foreign channel.
///
/// This channel should be returned to external users so they can communicate
/// with [local].
StreamChannel<T> get foreign => _foreign;
StreamChannel<T> _foreign;

/// Creates a [StreamChannelController].
///
/// If [sync] is true, events added to either channel's sink are synchronously
/// dispatched to the other channel's stream. This should only be done if the
/// source of those events is already asynchronous.
StreamChannelController({bool sync: false}) {
var localToForeignController = new StreamController<T>(sync: sync);
var foreignToLocalController = new StreamController<T>(sync: sync);
_local = new StreamChannel<T>.withGuarantees(
foreignToLocalController.stream, localToForeignController.sink);
_foreign = new StreamChannel<T>.withGuarantees(
localToForeignController.stream, foreignToLocalController.sink);
}
}
14 changes: 13 additions & 1 deletion lib/stream_channel.dart
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,15 @@ import 'dart:async';

import 'package:async/async.dart';

import 'src/guarantee_channel.dart';
import 'src/stream_channel_transformer.dart';

export 'src/delegating_stream_channel.dart';
export 'src/isolate_channel.dart';
export 'src/json_document_transformer.dart';
export 'src/multi_channel.dart';
export 'src/stream_channel_completer.dart';
export 'src/stream_channel_controller.dart';
export 'src/stream_channel_transformer.dart';

/// An abstract class representing a two-way communication channel.
Expand Down Expand Up @@ -65,10 +67,20 @@ abstract class StreamChannel<T> {
/// Creates a new [StreamChannel] that communicates over [stream] and [sink].
///
/// Note that this stream/sink pair must provide the guarantees listed in the
/// [StreamChannel] documentation.
/// [StreamChannel] documentation. If they don't do so natively, [new
/// StreamChannel.withGuarantees] should be used instead.
factory StreamChannel(Stream<T> stream, StreamSink<T> sink) =>
new _StreamChannel<T>(stream, sink);

/// Creates a new [StreamChannel] that communicates over [stream] and [sink].
///
/// Unlike [new StreamChannel], this enforces the guarantees listed in the
/// [StreamChannel] documentation. This makes it somewhat less efficient than
/// just wrapping a stream and a sink directly, so [new StreamChannel] should
/// be used when the guarantees are provided natively.
factory StreamChannel.withGuarantees(Stream<T> stream, StreamSink<T> sink) =>
new GuaranteeChannel(stream, sink);

/// Connects [this] to [other], so that any values emitted by either are sent
/// directly to the other.
void pipe(StreamChannel<T> other);
Expand Down
2 changes: 1 addition & 1 deletion pubspec.yaml
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
name: stream_channel
version: 1.1.1
version: 1.2.0-dev
description: An abstraction for two-way communication channels.
author: Dart Team <misc@dartlang.org>
homepage: https://github.com/dart-lang/stream_channel
Expand Down
86 changes: 86 additions & 0 deletions test/stream_channel_controller_test.dart
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
// Copyright (c) 2016, the Dart project authors. Please see the AUTHORS file
// for details. All rights reserved. Use of this source code is governed by a
// BSD-style license that can be found in the LICENSE file.

import 'dart:async';

import 'package:stream_channel/stream_channel.dart';
import 'package:test/test.dart';

import 'utils.dart';

void main() {
group("asynchronously", () {
var controller;
setUp(() {
controller = new StreamChannelController();
});

test("forwards events from the local sink to the foreign stream", () {
controller.local.sink..add(1)..add(2)..add(3)..close();
expect(controller.foreign.stream.toList(), completion(equals([1, 2, 3])));
});

test("forwards events from the foreign sink to the local stream", () {
controller.foreign.sink..add(1)..add(2)..add(3)..close();
expect(controller.local.stream.toList(), completion(equals([1, 2, 3])));
});
});

group("synchronously", () {
var controller;
setUp(() {
controller = new StreamChannelController(sync: true);
});

test("synchronously forwards events from the local sink to the foreign "
"stream", () {
var receivedEvent = false;
var receivedError = false;
var receivedDone = false;
controller.foreign.stream.listen(expectAsync((event) {
expect(event, equals(1));
receivedEvent = true;
}), onError: expectAsync((error) {
expect(error, equals("oh no"));
receivedError = true;
}), onDone: expectAsync(() {
receivedDone = true;
}));

controller.local.sink.add(1);
expect(receivedEvent, isTrue);

controller.local.sink.addError("oh no");
expect(receivedError, isTrue);

controller.local.sink.close();
expect(receivedDone, isTrue);
});

test("synchronously forwards events from the foreign sink to the local "
"stream", () {
var receivedEvent = false;
var receivedError = false;
var receivedDone = false;
controller.local.stream.listen(expectAsync((event) {
expect(event, equals(1));
receivedEvent = true;
}), onError: expectAsync((error) {
expect(error, equals("oh no"));
receivedError = true;
}), onDone: expectAsync(() {
receivedDone = true;
}));

controller.foreign.sink.add(1);
expect(receivedEvent, isTrue);

controller.foreign.sink.addError("oh no");
expect(receivedError, isTrue);

controller.foreign.sink.close();
expect(receivedDone, isTrue);
});
});
}
Loading

0 comments on commit 66a45ff

Please sign in to comment.