/
send_port_sink.dart
111 lines (90 loc) · 3.14 KB
/
send_port_sink.dart
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
// Copyright (c) 2016, the Dart project authors. Please see the AUTHORS file
// for details. All rights reserved. Use of this source code is governed by a
// BSD-style license that can be found in the LICENSE file.
import 'dart:async';
import 'dart:isolate';
/// The sink for [IsolateChannel].
///
/// [SendPort] doesn't natively implement any sink API, so this adds that API as
/// a wrapper. Closing this just closes the [ReceivePort].
class SendPortSink<T> implements StreamSink<T> {
/// The port that produces incoming messages.
///
/// This is wrapped in a [StreamView] to produce [stream].
final ReceivePort _receivePort;
/// The port that sends outgoing messages.
final SendPort _sendPort;
Future get done => _doneCompleter.future;
final _doneCompleter = new Completer();
/// Whether [done] has been completed.
///
/// This is distinct from [_closed] because [done] can complete with an error
/// without the user explicitly calling [close].
bool get _isDone => _doneCompleter.isCompleted;
/// Whether the user has called [close].
bool _closed = false;
/// Whether we're currently adding a stream with [addStream].
bool _inAddStream = false;
SendPortSink(this._receivePort, this._sendPort);
void add(T data) {
if (_closed) throw new StateError("Cannot add event after closing.");
if (_inAddStream) {
throw new StateError("Cannot add event while adding stream.");
}
if (_isDone) return;
_add(data);
}
/// A helper for [add] that doesn't check for [StateError]s.
///
/// This is called from [addStream], so it shouldn't check [_inAddStream].
void _add(T data) {
_sendPort.send(data);
}
void addError(error, [StackTrace stackTrace]) {
if (_closed) throw new StateError("Cannot add event after closing.");
if (_inAddStream) {
throw new StateError("Cannot add event while adding stream.");
}
_close(error, stackTrace);
}
Future close() {
if (_inAddStream) {
throw new StateError("Cannot close sink while adding stream.");
}
_closed = true;
return _close();
}
/// A helper for [close] that doesn't check for [StateError]s.
///
/// This is called from [addStream], so it shouldn't check [_inAddStream]. It
/// also forwards [error] and [stackTrace] to [done] if they're passed.
Future _close([error, StackTrace stackTrace]) {
if (_isDone) return done;
_receivePort.close();
if (error != null) {
_doneCompleter.completeError(error, stackTrace);
} else {
_doneCompleter.complete();
}
return done;
}
Future addStream(Stream<T> stream) {
if (_closed) throw new StateError("Cannot add stream after closing.");
if (_inAddStream) {
throw new StateError("Cannot add stream while adding stream.");
}
if (_isDone) return new Future.value();
_inAddStream = true;
var completer = new Completer.sync();
stream.listen(_add,
onError: (error, stackTrace) {
_close(error, stackTrace);
completer.complete();
},
onDone: completer.complete,
cancelOnError: true);
return completer.future.then((_) {
_inAddStream = false;
});
}
}