diff --git a/CHANGELOG.md b/CHANGELOG.md index 6d86e48..6e3a6ac 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,14 @@ +## 1.5.0 + +* Add `new StreamChannel.withCloseGuarantee()` to provide the specific guarantee + that closing the sink causes the stream to close before it emits any more + events. This is the only guarantee that isn't automatically preserved when + transforming a channel. + +* `StreamChannelTransformer`s provided by the `stream_channel` package now + properly provide the guarantee that closing the sink causes the stream to + close before it emits any more events + ## 1.4.0 * Add `StreamChannel.cast()`, which soundly coerces the generic type of a diff --git a/lib/src/close_guarantee_channel.dart b/lib/src/close_guarantee_channel.dart new file mode 100644 index 0000000..a2c69bc --- /dev/null +++ b/lib/src/close_guarantee_channel.dart @@ -0,0 +1,86 @@ +// Copyright (c) 2016, the Dart project authors. Please see the AUTHORS file +// for details. All rights reserved. Use of this source code is governed by a +// BSD-style license that can be found in the LICENSE file. + +import 'dart:async'; + +import 'package:async/async.dart'; + +import '../stream_channel.dart'; + +/// A [StreamChannel] that specifically enforces the stream channel guarantee +/// that closing the sink causes the stream to close before it emits any more +/// events +/// +/// This is exposed via [new StreamChannel.withCloseGuarantee]. +class CloseGuaranteeChannel extends StreamChannelMixin { + Stream get stream => _stream; + _CloseGuaranteeStream _stream; + + StreamSink get sink => _sink; + _CloseGuaranteeSink _sink; + + /// The subscription to the inner stream. + StreamSubscription _subscription; + + /// Whether the sink has closed, causing the underlying channel to disconnect. + bool _disconnected = false; + + CloseGuaranteeChannel(Stream innerStream, StreamSink innerSink) { + _sink = new _CloseGuaranteeSink(innerSink, this); + _stream = new _CloseGuaranteeStream(innerStream, this); + } +} + +/// The stream for [CloseGuaranteeChannel]. +/// +/// This wraps the inner stream to save the subscription on the channel when +/// [listen] is called. +class _CloseGuaranteeStream extends Stream { + /// The inner stream this is delegating to. + final Stream _inner; + + /// The [CloseGuaranteeChannel] this belongs to. + final CloseGuaranteeChannel _channel; + + _CloseGuaranteeStream(this._inner, this._channel); + + StreamSubscription listen(void onData(T event), + {Function onError, void onDone(), bool cancelOnError}) { + // If the channel is already disconnected, we shouldn't dispatch anything + // but a done event. + if (_channel._disconnected) { + onData = null; + onError = null; + } + + var subscription = _inner.listen(onData, + onError: onError, onDone: onDone, cancelOnError: cancelOnError); + if (!_channel._disconnected) { + _channel._subscription = subscription; + } + return subscription; + } +} + +/// The sink for [CloseGuaranteeChannel]. +/// +/// This wraps the inner sink to cancel the stream subscription when the sink is +/// canceled. +class _CloseGuaranteeSink extends DelegatingStreamSink { + /// The [CloseGuaranteeChannel] this belongs to. + final CloseGuaranteeChannel _channel; + + _CloseGuaranteeSink(StreamSink inner, this._channel) : super(inner); + + Future close() { + var done = super.close(); + _channel._disconnected = true; + if (_channel._subscription != null) { + // Don't dispatch anything but a done event. + _channel._subscription.onData(null); + _channel._subscription.onError(null); + } + return done; + } +} diff --git a/lib/src/json_document_transformer.dart b/lib/src/json_document_transformer.dart index c62c597..86b9ae7 100644 --- a/lib/src/json_document_transformer.dart +++ b/lib/src/json_document_transformer.dart @@ -40,6 +40,6 @@ class JsonDocumentTransformer var sink = new StreamSinkTransformer.fromHandlers(handleData: (data, sink) { sink.add(_codec.encode(data)); }).bind(channel.sink); - return new StreamChannel(stream, sink); + return new StreamChannel.withCloseGuarantee(stream, sink); } } diff --git a/lib/src/stream_channel_transformer.dart b/lib/src/stream_channel_transformer.dart index ca09ea1..46232d7 100644 --- a/lib/src/stream_channel_transformer.dart +++ b/lib/src/stream_channel_transformer.dart @@ -17,7 +17,14 @@ import 'transformer/typed.dart'; /// [StreamSinkTransformer]. Each transformer defines a [bind] method that takes /// in the original [StreamChannel] and returns the transformed version. /// -/// Transformers must be able to have `bind` called multiple times. +/// Transformers must be able to have [bind] called multiple times. If a +/// subclass implements [bind] explicitly, it should be sure that the returned +/// stream follows the second stream channel guarantee: closing the sink causes +/// the stream to close before it emits any more events. This guarantee is +/// invalidated when an asynchronous gap is added between the original stream's +/// event dispatch and the returned stream's, for example by transforming it +/// with a [StreamTransformer]. The guarantee can be easily preserved using [new +/// StreamChannel.withCloseGuarantee]. class StreamChannelTransformer { /// The transformer to use on the channel's stream. final StreamTransformer _streamTransformer; @@ -63,7 +70,7 @@ class StreamChannelTransformer { /// `channel.straem`, the transformer will transform them and pass the /// transformed versions to the returned channel's stream. StreamChannel bind(StreamChannel channel) => - new StreamChannel( + new StreamChannel.withCloseGuarantee( channel.stream.transform(_streamTransformer), _sinkTransformer.bind(channel.sink)); } diff --git a/lib/stream_channel.dart b/lib/stream_channel.dart index 3615d21..16323b1 100644 --- a/lib/stream_channel.dart +++ b/lib/stream_channel.dart @@ -7,6 +7,7 @@ import 'dart:async'; import 'package:async/async.dart'; import 'src/guarantee_channel.dart'; +import 'src/close_guarantee_channel.dart'; import 'src/stream_channel_transformer.dart'; export 'src/delegating_stream_channel.dart'; @@ -87,6 +88,19 @@ abstract class StreamChannel { {bool allowSinkErrors: true}) => new GuaranteeChannel(stream, sink, allowSinkErrors: allowSinkErrors); + /// Creates a new [StreamChannel] that communicates over [stream] and [sink]. + /// + /// This specifically enforces the second guarantee: closing the sink causes + /// the stream to close before it emits any more events. This guarantee is + /// invalidated when an asynchronous gap is added between the original + /// stream's event dispatch and the returned stream's, for example by + /// transforming it with a [StreamTransformer]. This is a lighter-weight way + /// of preserving that guarantee in particular than + /// [StreamChannel.withGuarantees]. + factory StreamChannel.withCloseGuarantee(Stream stream, + StreamSink sink) => + new CloseGuaranteeChannel(stream, sink); + /// Connects [this] to [other], so that any values emitted by either are sent /// directly to the other. void pipe(StreamChannel other); @@ -148,10 +162,10 @@ abstract class StreamChannelMixin implements StreamChannel { changeSink(transformer.bind); StreamChannel changeStream(Stream change(Stream stream)) => - new StreamChannel(change(stream), sink); + new StreamChannel.withCloseGuarantee(change(stream), sink); StreamChannel changeSink(StreamSink change(StreamSink sink)) => - new StreamChannel(stream, change(sink)); + new StreamChannel.withCloseGuarantee(stream, change(sink)); StreamChannel/**/ cast/**/() => new StreamChannel( DelegatingStream.typed(stream), DelegatingStreamSink.typed(sink)); diff --git a/test/with_close_guarantee_test.dart b/test/with_close_guarantee_test.dart new file mode 100644 index 0000000..caf48cf --- /dev/null +++ b/test/with_close_guarantee_test.dart @@ -0,0 +1,66 @@ +// Copyright (c) 2016, the Dart project authors. Please see the AUTHORS file +// for details. All rights reserved. Use of this source code is governed by a +// BSD-style license that can be found in the LICENSE file. + +import 'dart:async'; + +import 'package:async/async.dart'; +import 'package:stream_channel/stream_channel.dart'; +import 'package:test/test.dart'; + +import 'utils.dart'; + +final _delayTransformer = new StreamTransformer.fromHandlers( + handleData: (data, sink) => new Future.microtask(() => sink.add(data)), + handleDone: (sink) => new Future.microtask(() => sink.close())); + +final _delaySinkTransformer = + new StreamSinkTransformer.fromStreamTransformer(_delayTransformer); + +void main() { + var controller; + var channel; + setUp(() { + controller = new StreamChannelController(); + + // Add a bunch of layers of asynchronous dispatch between the channel and + // the underlying controllers. + var stream = controller.foreign.stream; + var sink = controller.foreign.sink; + for (var i = 0; i < 10; i++) { + stream = stream.transform(_delayTransformer); + sink = _delaySinkTransformer.bind(sink); + } + + channel = new StreamChannel.withCloseGuarantee(stream, sink); + }); + + test("closing the event sink causes the stream to close before it emits any " + "more events", () async { + controller.local.sink.add(1); + controller.local.sink.add(2); + controller.local.sink.add(3); + + expect(channel.stream.listen(expectAsync((event) { + if (event == 2) channel.sink.close(); + }, count: 2)).asFuture(), completes); + + await pumpEventQueue(); + }); + + test("closing the event sink before events are emitted causes the stream to " + "close immediately", () async { + channel.sink.close(); + channel.stream.listen( + expectAsync((_) {}, count: 0), + onError: expectAsync((_, __) {}, count: 0), + onDone: expectAsync(() {})); + + controller.local.sink.add(1); + controller.local.sink.add(2); + controller.local.sink.add(3); + controller.local.sink.close(); + + await pumpEventQueue(); + }); +}