Skip to content

Commit

Permalink
Add StreamChannel.withCloseGuarantee.
Browse files Browse the repository at this point in the history
This also properly enforces the close guarantee for transformers
provided by this package.

R=rnystrom@google.com

Review URL: https://codereview.chromium.org//2041983003 .
  • Loading branch information
nex3 committed Jun 6, 2016
1 parent b74a376 commit f155869
Show file tree
Hide file tree
Showing 6 changed files with 189 additions and 5 deletions.
11 changes: 11 additions & 0 deletions CHANGELOG.md
@@ -1,3 +1,14 @@
## 1.5.0

* Add `new StreamChannel.withCloseGuarantee()` to provide the specific guarantee
that closing the sink causes the stream to close before it emits any more
events. This is the only guarantee that isn't automatically preserved when
transforming a channel.

* `StreamChannelTransformer`s provided by the `stream_channel` package now
properly provide the guarantee that closing the sink causes the stream to
close before it emits any more events

## 1.4.0

* Add `StreamChannel.cast()`, which soundly coerces the generic type of a
Expand Down
86 changes: 86 additions & 0 deletions lib/src/close_guarantee_channel.dart
@@ -0,0 +1,86 @@
// Copyright (c) 2016, the Dart project authors. Please see the AUTHORS file
// for details. All rights reserved. Use of this source code is governed by a
// BSD-style license that can be found in the LICENSE file.

import 'dart:async';

import 'package:async/async.dart';

import '../stream_channel.dart';

/// A [StreamChannel] that specifically enforces the stream channel guarantee
/// that closing the sink causes the stream to close before it emits any more
/// events
///
/// This is exposed via [new StreamChannel.withCloseGuarantee].
class CloseGuaranteeChannel<T> extends StreamChannelMixin<T> {
Stream<T> get stream => _stream;
_CloseGuaranteeStream<T> _stream;

StreamSink<T> get sink => _sink;
_CloseGuaranteeSink<T> _sink;

/// The subscription to the inner stream.
StreamSubscription<T> _subscription;

/// Whether the sink has closed, causing the underlying channel to disconnect.
bool _disconnected = false;

CloseGuaranteeChannel(Stream<T> innerStream, StreamSink<T> innerSink) {
_sink = new _CloseGuaranteeSink<T>(innerSink, this);
_stream = new _CloseGuaranteeStream<T>(innerStream, this);
}
}

/// The stream for [CloseGuaranteeChannel].
///
/// This wraps the inner stream to save the subscription on the channel when
/// [listen] is called.
class _CloseGuaranteeStream<T> extends Stream<T> {
/// The inner stream this is delegating to.
final Stream<T> _inner;

/// The [CloseGuaranteeChannel] this belongs to.
final CloseGuaranteeChannel<T> _channel;

_CloseGuaranteeStream(this._inner, this._channel);

StreamSubscription<T> listen(void onData(T event),
{Function onError, void onDone(), bool cancelOnError}) {
// If the channel is already disconnected, we shouldn't dispatch anything
// but a done event.
if (_channel._disconnected) {
onData = null;
onError = null;
}

var subscription = _inner.listen(onData,
onError: onError, onDone: onDone, cancelOnError: cancelOnError);
if (!_channel._disconnected) {
_channel._subscription = subscription;
}
return subscription;
}
}

/// The sink for [CloseGuaranteeChannel].
///
/// This wraps the inner sink to cancel the stream subscription when the sink is
/// canceled.
class _CloseGuaranteeSink<T> extends DelegatingStreamSink<T> {
/// The [CloseGuaranteeChannel] this belongs to.
final CloseGuaranteeChannel<T> _channel;

_CloseGuaranteeSink(StreamSink<T> inner, this._channel) : super(inner);

Future close() {
var done = super.close();
_channel._disconnected = true;
if (_channel._subscription != null) {
// Don't dispatch anything but a done event.
_channel._subscription.onData(null);
_channel._subscription.onError(null);
}
return done;
}
}
2 changes: 1 addition & 1 deletion lib/src/json_document_transformer.dart
Expand Up @@ -40,6 +40,6 @@ class JsonDocumentTransformer
var sink = new StreamSinkTransformer.fromHandlers(handleData: (data, sink) {
sink.add(_codec.encode(data));
}).bind(channel.sink);
return new StreamChannel(stream, sink);
return new StreamChannel.withCloseGuarantee(stream, sink);
}
}
11 changes: 9 additions & 2 deletions lib/src/stream_channel_transformer.dart
Expand Up @@ -17,7 +17,14 @@ import 'transformer/typed.dart';
/// [StreamSinkTransformer]. Each transformer defines a [bind] method that takes
/// in the original [StreamChannel] and returns the transformed version.
///
/// Transformers must be able to have `bind` called multiple times.
/// Transformers must be able to have [bind] called multiple times. If a
/// subclass implements [bind] explicitly, it should be sure that the returned
/// stream follows the second stream channel guarantee: closing the sink causes
/// the stream to close before it emits any more events. This guarantee is
/// invalidated when an asynchronous gap is added between the original stream's
/// event dispatch and the returned stream's, for example by transforming it
/// with a [StreamTransformer]. The guarantee can be easily preserved using [new
/// StreamChannel.withCloseGuarantee].
class StreamChannelTransformer<S, T> {
/// The transformer to use on the channel's stream.
final StreamTransformer<T, S> _streamTransformer;
Expand Down Expand Up @@ -63,7 +70,7 @@ class StreamChannelTransformer<S, T> {
/// `channel.straem`, the transformer will transform them and pass the
/// transformed versions to the returned channel's stream.
StreamChannel<S> bind(StreamChannel<T> channel) =>
new StreamChannel<S>(
new StreamChannel<S>.withCloseGuarantee(
channel.stream.transform(_streamTransformer),
_sinkTransformer.bind(channel.sink));
}
18 changes: 16 additions & 2 deletions lib/stream_channel.dart
Expand Up @@ -7,6 +7,7 @@ import 'dart:async';
import 'package:async/async.dart';

import 'src/guarantee_channel.dart';
import 'src/close_guarantee_channel.dart';
import 'src/stream_channel_transformer.dart';

export 'src/delegating_stream_channel.dart';
Expand Down Expand Up @@ -87,6 +88,19 @@ abstract class StreamChannel<T> {
{bool allowSinkErrors: true}) =>
new GuaranteeChannel(stream, sink, allowSinkErrors: allowSinkErrors);

/// Creates a new [StreamChannel] that communicates over [stream] and [sink].
///
/// This specifically enforces the second guarantee: closing the sink causes
/// the stream to close before it emits any more events. This guarantee is
/// invalidated when an asynchronous gap is added between the original
/// stream's event dispatch and the returned stream's, for example by
/// transforming it with a [StreamTransformer]. This is a lighter-weight way
/// of preserving that guarantee in particular than
/// [StreamChannel.withGuarantees].
factory StreamChannel.withCloseGuarantee(Stream<T> stream,
StreamSink<T> sink) =>
new CloseGuaranteeChannel(stream, sink);

/// Connects [this] to [other], so that any values emitted by either are sent
/// directly to the other.
void pipe(StreamChannel<T> other);
Expand Down Expand Up @@ -148,10 +162,10 @@ abstract class StreamChannelMixin<T> implements StreamChannel<T> {
changeSink(transformer.bind);

StreamChannel<T> changeStream(Stream<T> change(Stream<T> stream)) =>
new StreamChannel(change(stream), sink);
new StreamChannel.withCloseGuarantee(change(stream), sink);

StreamChannel<T> changeSink(StreamSink<T> change(StreamSink<T> sink)) =>
new StreamChannel(stream, change(sink));
new StreamChannel.withCloseGuarantee(stream, change(sink));

StreamChannel/*<S>*/ cast/*<S>*/() => new StreamChannel(
DelegatingStream.typed(stream), DelegatingStreamSink.typed(sink));
Expand Down
66 changes: 66 additions & 0 deletions test/with_close_guarantee_test.dart
@@ -0,0 +1,66 @@
// Copyright (c) 2016, the Dart project authors. Please see the AUTHORS file
// for details. All rights reserved. Use of this source code is governed by a
// BSD-style license that can be found in the LICENSE file.

import 'dart:async';

import 'package:async/async.dart';
import 'package:stream_channel/stream_channel.dart';
import 'package:test/test.dart';

import 'utils.dart';

final _delayTransformer = new StreamTransformer.fromHandlers(
handleData: (data, sink) => new Future.microtask(() => sink.add(data)),
handleDone: (sink) => new Future.microtask(() => sink.close()));

final _delaySinkTransformer =
new StreamSinkTransformer.fromStreamTransformer(_delayTransformer);

void main() {
var controller;
var channel;
setUp(() {
controller = new StreamChannelController();

// Add a bunch of layers of asynchronous dispatch between the channel and
// the underlying controllers.
var stream = controller.foreign.stream;
var sink = controller.foreign.sink;
for (var i = 0; i < 10; i++) {
stream = stream.transform(_delayTransformer);
sink = _delaySinkTransformer.bind(sink);
}

channel = new StreamChannel.withCloseGuarantee(stream, sink);
});

test("closing the event sink causes the stream to close before it emits any "
"more events", () async {
controller.local.sink.add(1);
controller.local.sink.add(2);
controller.local.sink.add(3);

expect(channel.stream.listen(expectAsync((event) {
if (event == 2) channel.sink.close();
}, count: 2)).asFuture(), completes);

await pumpEventQueue();
});

test("closing the event sink before events are emitted causes the stream to "
"close immediately", () async {
channel.sink.close();
channel.stream.listen(
expectAsync((_) {}, count: 0),
onError: expectAsync((_, __) {}, count: 0),
onDone: expectAsync(() {}));

controller.local.sink.add(1);
controller.local.sink.add(2);
controller.local.sink.add(3);
controller.local.sink.close();

await pumpEventQueue();
});
}

0 comments on commit f155869

Please sign in to comment.