diff --git a/lib/src/guarantee_channel.dart b/lib/src/guarantee_channel.dart index 1047e14..a874799 100644 --- a/lib/src/guarantee_channel.dart +++ b/lib/src/guarantee_channel.dart @@ -127,6 +127,14 @@ class _GuaranteeSink implements StreamSink { } if (_disconnected) return; + _addError(error, stackTrace); + } + + /// Like [addError], but doesn't check to ensure that an error can be added. + /// + /// This is called from [addStream], so it shouldn't fail if a stream is being + /// added. + void _addError(error, [StackTrace stackTrace]) { if (_allowErrors) { _inner.addError(error, stackTrace); return; @@ -153,7 +161,7 @@ class _GuaranteeSink implements StreamSink { _addStreamCompleter = new Completer.sync(); _addStreamSubscription = stream.listen( _inner.add, - onError: _inner.addError, + onError: _addError, onDone: _addStreamCompleter.complete); return _addStreamCompleter.future.then((_) { _addStreamCompleter = null; diff --git a/lib/src/isolate_channel.dart b/lib/src/isolate_channel.dart index a466d87..3466d19 100644 --- a/lib/src/isolate_channel.dart +++ b/lib/src/isolate_channel.dart @@ -9,7 +9,6 @@ import 'package:async/async.dart'; import 'package:stack_trace/stack_trace.dart'; import '../stream_channel.dart'; -import 'isolate_channel/send_port_sink.dart'; /// A [StreamChannel] that communicates over a [ReceivePort]/[SendPort] pair, /// presumably with another isolate. @@ -54,10 +53,13 @@ class IsolateChannel extends StreamChannelMixin { var subscription; subscription = receivePort.listen((message) { if (message is SendPort) { - streamCompleter.setSourceStream( - new SubscriptionStream(subscription)); - sinkCompleter.setDestinationSink( - new SendPortSink(receivePort, message)); + var controller = new StreamChannelController(allowForeignErrors: false); + new SubscriptionStream(subscription).pipe(controller.local.sink); + controller.local.stream.listen(message.send, + onDone: receivePort.close); + + streamCompleter.setSourceStream(controller.foreign.stream); + sinkCompleter.setDestinationSink(controller.foreign.sink); return; } @@ -88,9 +90,13 @@ class IsolateChannel extends StreamChannelMixin { /// Creates a stream channel that receives messages from [receivePort] and /// sends them over [sendPort]. - IsolateChannel(ReceivePort receivePort, SendPort sendPort) - : stream = new StreamView(receivePort), - sink = new SendPortSink(receivePort, sendPort); + factory IsolateChannel(ReceivePort receivePort, SendPort sendPort) { + var controller = new StreamChannelController(allowForeignErrors: false); + receivePort.pipe(controller.local.sink); + controller.local.stream.listen(sendPort.send, onDone: receivePort.close); + return new IsolateChannel._( + controller.foreign.stream, controller.foreign.sink); + } IsolateChannel._(this.stream, this.sink); } diff --git a/lib/src/isolate_channel/send_port_sink.dart b/lib/src/isolate_channel/send_port_sink.dart deleted file mode 100644 index d98f1da..0000000 --- a/lib/src/isolate_channel/send_port_sink.dart +++ /dev/null @@ -1,111 +0,0 @@ -// Copyright (c) 2016, the Dart project authors. Please see the AUTHORS file -// for details. All rights reserved. Use of this source code is governed by a -// BSD-style license that can be found in the LICENSE file. - -import 'dart:async'; -import 'dart:isolate'; - -/// The sink for [IsolateChannel]. -/// -/// [SendPort] doesn't natively implement any sink API, so this adds that API as -/// a wrapper. Closing this just closes the [ReceivePort]. -class SendPortSink implements StreamSink { - /// The port that produces incoming messages. - /// - /// This is wrapped in a [StreamView] to produce [stream]. - final ReceivePort _receivePort; - - /// The port that sends outgoing messages. - final SendPort _sendPort; - - Future get done => _doneCompleter.future; - final _doneCompleter = new Completer(); - - /// Whether [done] has been completed. - /// - /// This is distinct from [_closed] because [done] can complete with an error - /// without the user explicitly calling [close]. - bool get _isDone => _doneCompleter.isCompleted; - - /// Whether the user has called [close]. - bool _closed = false; - - /// Whether we're currently adding a stream with [addStream]. - bool _inAddStream = false; - - SendPortSink(this._receivePort, this._sendPort); - - void add(T data) { - if (_closed) throw new StateError("Cannot add event after closing."); - if (_inAddStream) { - throw new StateError("Cannot add event while adding stream."); - } - if (_isDone) return; - - _add(data); - } - - /// A helper for [add] that doesn't check for [StateError]s. - /// - /// This is called from [addStream], so it shouldn't check [_inAddStream]. - void _add(T data) { - _sendPort.send(data); - } - - void addError(error, [StackTrace stackTrace]) { - if (_closed) throw new StateError("Cannot add event after closing."); - if (_inAddStream) { - throw new StateError("Cannot add event while adding stream."); - } - - _close(error, stackTrace); - } - - Future close() { - if (_inAddStream) { - throw new StateError("Cannot close sink while adding stream."); - } - - _closed = true; - return _close(); - } - - /// A helper for [close] that doesn't check for [StateError]s. - /// - /// This is called from [addStream], so it shouldn't check [_inAddStream]. It - /// also forwards [error] and [stackTrace] to [done] if they're passed. - Future _close([error, StackTrace stackTrace]) { - if (_isDone) return done; - - _receivePort.close(); - - if (error != null) { - _doneCompleter.completeError(error, stackTrace); - } else { - _doneCompleter.complete(); - } - - return done; - } - - Future addStream(Stream stream) { - if (_closed) throw new StateError("Cannot add stream after closing."); - if (_inAddStream) { - throw new StateError("Cannot add stream while adding stream."); - } - if (_isDone) return new Future.value(); - - _inAddStream = true; - var completer = new Completer.sync(); - stream.listen(_add, - onError: (error, stackTrace) { - _close(error, stackTrace); - completer.complete(); - }, - onDone: completer.complete, - cancelOnError: true); - return completer.future.then((_) { - _inAddStream = false; - }); - } -} diff --git a/lib/src/stream_channel_controller.dart b/lib/src/stream_channel_controller.dart index ad78323..45b2865 100644 --- a/lib/src/stream_channel_controller.dart +++ b/lib/src/stream_channel_controller.dart @@ -15,13 +15,13 @@ import '../stream_channel.dart'; /// /// ```dart /// StreamChannel isolateChannel(ReceivePort receivePort, SendPort sendPort) { -/// var controller = new StreamChannelController(); +/// var controller = new StreamChannelController(allowForeignErrors: false); /// /// // Pipe all events from the receive port into the local sink... /// receivePort.pipe(controller.local.sink); /// /// // ...and all events from the local stream into the send port. -/// controller.local.listen(sendPort.add, onDone: receivePort.close); +/// controller.local.stream.listen(sendPort.send, onDone: receivePort.close); /// /// // Then return the foreign controller for your users to use. /// return controller.foreign; diff --git a/test/with_guarantees_test.dart b/test/with_guarantees_test.dart index 38afefa..409b28f 100644 --- a/test/with_guarantees_test.dart +++ b/test/with_guarantees_test.dart @@ -114,6 +114,29 @@ void main() { streamController.close(); }); + test("events can't be added to an explicitly-closed sink", () { + sinkController.stream.listen(null); // Work around sdk#19095. + + expect(channel.sink.close(), completes); + expect(() => channel.sink.add(1), throwsStateError); + expect(() => channel.sink.addError("oh no"), throwsStateError); + expect(() => channel.sink.addStream(new Stream.fromIterable([])), + throwsStateError); + }); + + test("events can't be added while a stream is being added", () { + var controller = new StreamController(); + channel.sink.addStream(controller.stream); + + expect(() => channel.sink.add(1), throwsStateError); + expect(() => channel.sink.addError("oh no"), throwsStateError); + expect(() => channel.sink.addStream(new Stream.fromIterable([])), + throwsStateError); + expect(() => channel.sink.close(), throwsStateError); + + controller.close(); + }); + group("with allowSinkErrors: false", () { setUp(() { streamController = new StreamController(); @@ -146,5 +169,25 @@ void main() { expect(channel.sink.done, throwsA("oh no")); expect(sinkController.stream.toList(), completion(isEmpty)); }); + + test("adding an error via via addStream causes the stream to emit a done " + "event", () async { + var canceled = false; + var controller = new StreamController(onCancel: () { + canceled = true; + }); + + // This future shouldn't get the error, because it's sent to [Sink.done]. + expect(channel.sink.addStream(controller.stream), completes); + + controller.addError("oh no"); + expect(channel.sink.done, throwsA("oh no")); + await pumpEventQueue(); + expect(canceled, isTrue); + + // Even though the sink is closed, this shouldn't throw an error because + // the user didn't explicitly close it. + channel.sink.add(1); + }); }); }