Skip to content

Commit

Permalink
Make IsolateChannel use StreamChannelCompleter.
Browse files Browse the repository at this point in the history
This exposed some lingering bugs in GuaranteeChannel as well.

R=tjblasi@google.com

Review URL: https://codereview.chromium.org//1671763002 .
  • Loading branch information
nex3 committed Feb 9, 2016
1 parent 571eb86 commit 89c732f
Show file tree
Hide file tree
Showing 5 changed files with 68 additions and 122 deletions.
10 changes: 9 additions & 1 deletion lib/src/guarantee_channel.dart
Expand Up @@ -127,6 +127,14 @@ class _GuaranteeSink<T> implements StreamSink<T> {
}
if (_disconnected) return;

_addError(error, stackTrace);
}

/// Like [addError], but doesn't check to ensure that an error can be added.
///
/// This is called from [addStream], so it shouldn't fail if a stream is being
/// added.
void _addError(error, [StackTrace stackTrace]) {
if (_allowErrors) {
_inner.addError(error, stackTrace);
return;
Expand All @@ -153,7 +161,7 @@ class _GuaranteeSink<T> implements StreamSink<T> {
_addStreamCompleter = new Completer.sync();
_addStreamSubscription = stream.listen(
_inner.add,
onError: _inner.addError,
onError: _addError,
onDone: _addStreamCompleter.complete);
return _addStreamCompleter.future.then((_) {
_addStreamCompleter = null;
Expand Down
22 changes: 14 additions & 8 deletions lib/src/isolate_channel.dart
Expand Up @@ -9,7 +9,6 @@ import 'package:async/async.dart';
import 'package:stack_trace/stack_trace.dart';

import '../stream_channel.dart';
import 'isolate_channel/send_port_sink.dart';

/// A [StreamChannel] that communicates over a [ReceivePort]/[SendPort] pair,
/// presumably with another isolate.
Expand Down Expand Up @@ -54,10 +53,13 @@ class IsolateChannel<T> extends StreamChannelMixin<T> {
var subscription;
subscription = receivePort.listen((message) {
if (message is SendPort) {
streamCompleter.setSourceStream(
new SubscriptionStream<T>(subscription));
sinkCompleter.setDestinationSink(
new SendPortSink<T>(receivePort, message));
var controller = new StreamChannelController(allowForeignErrors: false);
new SubscriptionStream<T>(subscription).pipe(controller.local.sink);
controller.local.stream.listen(message.send,
onDone: receivePort.close);

streamCompleter.setSourceStream(controller.foreign.stream);
sinkCompleter.setDestinationSink(controller.foreign.sink);
return;
}

Expand Down Expand Up @@ -88,9 +90,13 @@ class IsolateChannel<T> extends StreamChannelMixin<T> {

/// Creates a stream channel that receives messages from [receivePort] and
/// sends them over [sendPort].
IsolateChannel(ReceivePort receivePort, SendPort sendPort)
: stream = new StreamView<T>(receivePort),
sink = new SendPortSink<T>(receivePort, sendPort);
factory IsolateChannel(ReceivePort receivePort, SendPort sendPort) {
var controller = new StreamChannelController(allowForeignErrors: false);
receivePort.pipe(controller.local.sink);
controller.local.stream.listen(sendPort.send, onDone: receivePort.close);
return new IsolateChannel._(
controller.foreign.stream, controller.foreign.sink);
}

IsolateChannel._(this.stream, this.sink);
}
111 changes: 0 additions & 111 deletions lib/src/isolate_channel/send_port_sink.dart

This file was deleted.

4 changes: 2 additions & 2 deletions lib/src/stream_channel_controller.dart
Expand Up @@ -15,13 +15,13 @@ import '../stream_channel.dart';
///
/// ```dart
/// StreamChannel isolateChannel(ReceivePort receivePort, SendPort sendPort) {
/// var controller = new StreamChannelController();
/// var controller = new StreamChannelController(allowForeignErrors: false);
///
/// // Pipe all events from the receive port into the local sink...
/// receivePort.pipe(controller.local.sink);
///
/// // ...and all events from the local stream into the send port.
/// controller.local.listen(sendPort.add, onDone: receivePort.close);
/// controller.local.stream.listen(sendPort.send, onDone: receivePort.close);
///
/// // Then return the foreign controller for your users to use.
/// return controller.foreign;
Expand Down
43 changes: 43 additions & 0 deletions test/with_guarantees_test.dart
Expand Up @@ -114,6 +114,29 @@ void main() {
streamController.close();
});

test("events can't be added to an explicitly-closed sink", () {
sinkController.stream.listen(null); // Work around sdk#19095.

expect(channel.sink.close(), completes);
expect(() => channel.sink.add(1), throwsStateError);
expect(() => channel.sink.addError("oh no"), throwsStateError);
expect(() => channel.sink.addStream(new Stream.fromIterable([])),
throwsStateError);
});

test("events can't be added while a stream is being added", () {
var controller = new StreamController();
channel.sink.addStream(controller.stream);

expect(() => channel.sink.add(1), throwsStateError);
expect(() => channel.sink.addError("oh no"), throwsStateError);
expect(() => channel.sink.addStream(new Stream.fromIterable([])),
throwsStateError);
expect(() => channel.sink.close(), throwsStateError);

controller.close();
});

group("with allowSinkErrors: false", () {
setUp(() {
streamController = new StreamController();
Expand Down Expand Up @@ -146,5 +169,25 @@ void main() {
expect(channel.sink.done, throwsA("oh no"));
expect(sinkController.stream.toList(), completion(isEmpty));
});

test("adding an error via via addStream causes the stream to emit a done "
"event", () async {
var canceled = false;
var controller = new StreamController(onCancel: () {
canceled = true;
});

// This future shouldn't get the error, because it's sent to [Sink.done].
expect(channel.sink.addStream(controller.stream), completes);

controller.addError("oh no");
expect(channel.sink.done, throwsA("oh no"));
await pumpEventQueue();
expect(canceled, isTrue);

// Even though the sink is closed, this shouldn't throw an error because
// the user didn't explicitly close it.
channel.sink.add(1);
});
});
}

0 comments on commit 89c732f

Please sign in to comment.