Skip to content

Commit

Permalink
Add a Disconnector class.
Browse files Browse the repository at this point in the history
R=tjblasi@google.com

Review URL: https://codereview.chromium.org//1679193002 .
  • Loading branch information
nex3 committed Feb 9, 2016
1 parent 89c732f commit 4556f5e
Show file tree
Hide file tree
Showing 5 changed files with 259 additions and 1 deletion.
5 changes: 5 additions & 0 deletions CHANGELOG.md
@@ -1,3 +1,8 @@
## 1.3.0

* Add `Disconnector`, a transformer that allows the caller to disconnect the
transformed channel.

## 1.2.0

* Add `new StreamChannel.withGuarantees()`, which creates a channel with extra
Expand Down
139 changes: 139 additions & 0 deletions lib/src/disconnector.dart
@@ -0,0 +1,139 @@
// Copyright (c) 2016, the Dart project authors. Please see the AUTHORS file
// for details. All rights reserved. Use of this source code is governed by a
// BSD-style license that can be found in the LICENSE file.

import 'dart:async';

import '../stream_channel.dart';

/// Allows the caller to force a channel to disconnect.
///
/// When [disconnect] is called, the channel (or channels) transformed by this
/// transformer will act as though the remote end had disconnected—the stream
/// will emit a done event, and the sink will ignore future inputs. The inner
/// sink will also be closed to notify the remote end of the disconnection.
///
/// If a channel is transformed after the [disconnect] has been called, it will
/// be disconnected immediately.
class Disconnector<T> implements StreamChannelTransformer<T, T> {
/// Whether [disconnect] has been called.
bool get isDisconnected => _isDisconnected;
var _isDisconnected = false;

/// The sinks for transformed channels.
///
/// Note that we assume that transformed channels provide the stream channel
/// guarantees. This allows us to only track sinks, because we know closing
/// the underlying sink will cause the stream to emit a done event.
final _sinks = <_DisconnectorSink<T>>[];

/// Disconnects all channels that have been transformed.
void disconnect() {
_isDisconnected = true;
for (var sink in _sinks) {
sink._disconnect();
}
_sinks.clear();
}

StreamChannel<T> bind(StreamChannel<T> channel) {
return channel.changeSink((innerSink) {
var sink = new _DisconnectorSink(innerSink);

if (_isDisconnected) {
sink._disconnect();
} else {
_sinks.add(sink);
}

return sink;
});
}
}

/// A sink wrapper that can force a disconnection.
class _DisconnectorSink<T> implements StreamSink<T> {
/// The inner sink.
final StreamSink<T> _inner;

Future get done => _inner.done;

/// Whether [Disconnector.disconnect] has been called.
var _isDisconnected = false;

/// Whether the user has called [close].
var _closed = false;

/// The subscription to the stream passed to [addStream], if a stream is
/// currently being added.
StreamSubscription<T> _addStreamSubscription;

/// The completer for the future returned by [addStream], if a stream is
/// currently being added.
Completer _addStreamCompleter;

/// Whether we're currently adding a stream with [addStream].
bool get _inAddStream => _addStreamSubscription != null;

_DisconnectorSink(this._inner);

void add(T data) {
if (_closed) throw new StateError("Cannot add event after closing.");
if (_inAddStream) {
throw new StateError("Cannot add event while adding stream.");
}
if (_isDisconnected) return;

_inner.add(data);
}

void addError(error, [StackTrace stackTrace]) {
if (_closed) throw new StateError("Cannot add event after closing.");
if (_inAddStream) {
throw new StateError("Cannot add event while adding stream.");
}
if (_isDisconnected) return;

_inner.addError(error, stackTrace);
}

Future addStream(Stream<T> stream) {
if (_closed) throw new StateError("Cannot add stream after closing.");
if (_inAddStream) {
throw new StateError("Cannot add stream while adding stream.");
}
if (_isDisconnected) return new Future.value();

_addStreamCompleter = new Completer.sync();
_addStreamSubscription = stream.listen(
_inner.add,
onError: _inner.addError,
onDone: _addStreamCompleter.complete);
return _addStreamCompleter.future.then((_) {
_addStreamCompleter = null;
_addStreamSubscription = null;
});
}

Future close() {
if (_inAddStream) {
throw new StateError("Cannot close sink while adding stream.");
}

_closed = true;
return _inner.close();
}

/// Disconnects this sink.
///
/// This closes the underlying sink and stops forwarding events.
void _disconnect() {
_isDisconnected = true;
_inner.close();

if (!_inAddStream) return;
_addStreamCompleter.complete(_addStreamSubscription.cancel());
_addStreamCompleter = null;
_addStreamSubscription = null;
}
}
1 change: 1 addition & 0 deletions lib/stream_channel.dart
Expand Up @@ -10,6 +10,7 @@ import 'src/guarantee_channel.dart';
import 'src/stream_channel_transformer.dart';

export 'src/delegating_stream_channel.dart';
export 'src/disconnector.dart';
export 'src/isolate_channel.dart';
export 'src/json_document_transformer.dart';
export 'src/multi_channel.dart';
Expand Down
2 changes: 1 addition & 1 deletion pubspec.yaml
@@ -1,5 +1,5 @@
name: stream_channel
version: 1.2.0
version: 1.3.0
description: An abstraction for two-way communication channels.
author: Dart Team <misc@dartlang.org>
homepage: https://github.com/dart-lang/stream_channel
Expand Down
113 changes: 113 additions & 0 deletions test/disconnector_test.dart
@@ -0,0 +1,113 @@
// Copyright (c) 2016, the Dart project authors. Please see the AUTHORS file
// for details. All rights reserved. Use of this source code is governed by a
// BSD-style license that can be found in the LICENSE file.

import 'dart:async';
import 'dart:convert';
import 'dart:isolate';

import 'package:async/async.dart';
import 'package:stream_channel/stream_channel.dart';
import 'package:test/test.dart';

import 'utils.dart';

void main() {
var streamController;
var sinkController;
var disconnector;
var channel;
setUp(() {
streamController = new StreamController();
sinkController = new StreamController();
disconnector = new Disconnector();
channel = new StreamChannel.withGuarantees(
streamController.stream, sinkController.sink)
.transform(disconnector);
});

group("before disconnection", () {
test("forwards events from the sink as normal", () {
channel.sink.add(1);
channel.sink.add(2);
channel.sink.add(3);
channel.sink.close();

expect(sinkController.stream.toList(), completion(equals([1, 2, 3])));
});

test("forwards events to the stream as normal", () {
streamController.add(1);
streamController.add(2);
streamController.add(3);
streamController.close();

expect(channel.stream.toList(), completion(equals([1, 2, 3])));
});

test("events can't be added when the sink is explicitly closed", () {
sinkController.stream.listen(null); // Work around sdk#19095.

expect(channel.sink.close(), completes);
expect(() => channel.sink.add(1), throwsStateError);
expect(() => channel.sink.addError("oh no"), throwsStateError);
expect(() => channel.sink.addStream(new Stream.fromIterable([])),
throwsStateError);
});

test("events can't be added while a stream is being added", () {
var controller = new StreamController();
channel.sink.addStream(controller.stream);

expect(() => channel.sink.add(1), throwsStateError);
expect(() => channel.sink.addError("oh no"), throwsStateError);
expect(() => channel.sink.addStream(new Stream.fromIterable([])),
throwsStateError);
expect(() => channel.sink.close(), throwsStateError);

controller.close();
});
});

test("cancels addStream when disconnected", () async {
var canceled = false;
var controller = new StreamController(onCancel: () {
canceled = true;
});
expect(channel.sink.addStream(controller.stream), completes);
disconnector.disconnect();

await pumpEventQueue();
expect(canceled, isTrue);
});

group("after disconnection", () {
setUp(() => disconnector.disconnect());

test("closes the inner sink and ignores events to the outer sink", () {
channel.sink.add(1);
channel.sink.add(2);
channel.sink.add(3);
channel.sink.close();

expect(sinkController.stream.toList(), completion(isEmpty));
});

test("closes the stream", () {
expect(channel.stream.toList(), completion(isEmpty));
});

test("completes done", () {
sinkController.stream.listen(null); // Work around sdk#19095.
expect(channel.sink.done, completes);
});

test("still emits state errors after explicit close", () {
sinkController.stream.listen(null); // Work around sdk#19095.
expect(channel.sink.close(), completes);

expect(() => channel.sink.add(1), throwsStateError);
expect(() => channel.sink.addError("oh no"), throwsStateError);
});
});
}

0 comments on commit 4556f5e

Please sign in to comment.