-
Notifications
You must be signed in to change notification settings - Fork 1.6k
/
io_sink.dart
296 lines (261 loc) · 9.19 KB
/
io_sink.dart
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
// Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file
// for details. All rights reserved. Use of this source code is governed by a
// BSD-style license that can be found in the LICENSE file.
part of dart.io;
/// A combined byte and text output.
///
/// An [IOSink] combines a [StreamSink] of bytes with a [StringSink],
/// and allows easy output of both bytes and text.
///
/// Writing text ([write]) and adding bytes ([add]) may be interleaved freely.
///
/// While a stream is being added using [addStream], any further attempts
/// to add or write to the [IOSink] will fail until the [addStream] completes.
///
/// It is an error to add data to the [IOSink] after the sink is closed.
abstract class IOSink implements StreamSink<List<int>>, StringSink {
/// Create an [IOSink] that outputs to a [target] [StreamConsumer] of bytes.
///
/// Text written to [StreamSink] methods is encoded to bytes using [encoding]
/// before being output on [target].
factory IOSink(StreamConsumer<List<int>> target,
{Encoding encoding = utf8}) =>
new _IOSinkImpl(target, encoding);
/// The [Encoding] used when writing strings.
///
/// Depending on the underlying consumer, this property might be mutable.
late Encoding encoding;
/// Adds byte [data] to the target consumer, ignoring [encoding].
///
/// The [encoding] does not apply to this method, and the [data] list is passed
/// directly to the target consumer as a stream event.
///
/// This function must not be called when a stream is currently being added
/// using [addStream].
///
/// This operation is non-blocking. See [flush] or [done] for how to get any
/// errors generated by this call.
///
/// The data list should not be modified after it has been passed to `add`.
void add(List<int> data);
/// Converts [object] to a String by invoking [Object.toString] and
/// [add]s the encoding of the result to the target consumer.
///
/// This operation is non-blocking. See [flush] or [done] for how to get any
/// errors generated by this call.
void write(Object? object);
/// Iterates over the given [objects] and [write]s them in sequence.
///
/// If [separator] is provided, a `write` with the `separator` is performed
/// between any two elements of objects.
///
/// This operation is non-blocking. See [flush] or [done] for how to get any
/// errors generated by this call.
void writeAll(Iterable objects, [String separator = ""]);
/// Converts [object] to a String by invoking [Object.toString] and
/// writes the result to `this`, followed by a newline.
///
/// This operation is non-blocking. See [flush] or [done] for how to get any
/// errors generated by this call.
void writeln([Object? object = ""]);
/// Writes the character of [charCode].
///
/// This method is equivalent to `write(String.fromCharCode(charCode))`.
///
/// This operation is non-blocking. See [flush] or [done] for how to get any
/// errors generated by this call.
void writeCharCode(int charCode);
/// Passes the error to the target consumer as an error event.
///
/// This function must not be called when a stream is currently being added
/// using [addStream].
///
/// This operation is non-blocking. See [flush] or [done] for how to get any
/// errors generated by this call.
void addError(error, [StackTrace? stackTrace]);
/// Adds all elements of the given [stream].
///
/// Returns a [Future] that completes when
/// all elements of the given [stream] have been added.
///
/// If the stream contains an error, the `addStream` ends at the error,
/// and the returned future completes with that error.
///
/// This function must not be called when a stream is currently being added
/// using this function.
Future addStream(Stream<List<int>> stream);
/// Returns a [Future] that completes once all buffered data is accepted by the
/// underlying [StreamConsumer].
///
/// This method must not be called while an [addStream] is incomplete.
///
/// NOTE: This is not necessarily the same as the data being flushed by the
/// operating system.
Future flush();
/// Close the target consumer.
///
/// NOTE: Writes to the [IOSink] may be buffered, and may not be flushed by
/// a call to `close()`. To flush all buffered writes, call `flush()` before
/// calling `close()`.
Future close();
/// A future that will complete when the consumer closes, or when an
/// error occurs.
///
/// This future is identical to the future returned by [close].
Future get done;
}
class _StreamSinkImpl<T> implements StreamSink<T> {
final StreamConsumer<T> _target;
final Completer _doneCompleter = new Completer();
StreamController<T>? _controllerInstance;
Completer? _controllerCompleter;
bool _isClosed = false;
bool _isBound = false;
bool _hasError = false;
_StreamSinkImpl(this._target);
void add(T data) {
if (_isClosed) {
throw StateError("StreamSink is closed");
}
_controller.add(data);
}
void addError(error, [StackTrace? stackTrace]) {
if (_isClosed) {
throw StateError("StreamSink is closed");
}
_controller.addError(error, stackTrace);
}
Future addStream(Stream<T> stream) {
if (_isBound) {
throw new StateError("StreamSink is already bound to a stream");
}
if (_hasError) return done;
_isBound = true;
var future = _controllerCompleter == null
? _target.addStream(stream)
: _controllerCompleter!.future.then((_) => _target.addStream(stream));
_controllerInstance?.close();
// Wait for any pending events in [_controller] to be dispatched before
// adding [stream].
return future.whenComplete(() {
_isBound = false;
});
}
Future flush() {
if (_isBound) {
throw new StateError("StreamSink is bound to a stream");
}
if (_controllerInstance == null) return new Future.value(this);
// Adding an empty stream-controller will return a future that will complete
// when all data is done.
_isBound = true;
var future = _controllerCompleter!.future;
_controllerInstance!.close();
return future.whenComplete(() {
_isBound = false;
});
}
Future close() {
if (_isBound) {
throw new StateError("StreamSink is bound to a stream");
}
if (!_isClosed) {
_isClosed = true;
if (_controllerInstance != null) {
_controllerInstance!.close();
} else {
_closeTarget();
}
}
return done;
}
void _closeTarget() {
_target.close().then(_completeDoneValue, onError: _completeDoneError);
}
Future get done => _doneCompleter.future;
void _completeDoneValue(value) {
if (!_doneCompleter.isCompleted) {
_doneCompleter.complete(value);
}
}
void _completeDoneError(error, StackTrace? stackTrace) {
if (!_doneCompleter.isCompleted) {
_hasError = true;
_doneCompleter.completeError(error, stackTrace);
}
}
StreamController<T> get _controller {
if (_isBound) {
throw new StateError("StreamSink is bound to a stream");
}
if (_isClosed) {
throw new StateError("StreamSink is closed");
}
if (_controllerInstance == null) {
_controllerInstance = new StreamController<T>(sync: true);
_controllerCompleter = new Completer();
_target.addStream(_controller.stream).then((_) {
if (_isBound) {
// A new stream takes over - forward values to that stream.
_controllerCompleter!.complete(this);
_controllerCompleter = null;
_controllerInstance = null;
} else {
// No new stream, .close was called. Close _target.
_closeTarget();
}
}, onError: (error, stackTrace) {
if (_isBound) {
// A new stream takes over - forward errors to that stream.
_controllerCompleter!.completeError(error, stackTrace);
_controllerCompleter = null;
_controllerInstance = null;
} else {
// No new stream. No need to close target, as it has already
// failed.
_completeDoneError(error, stackTrace);
}
});
}
return _controllerInstance!;
}
}
class _IOSinkImpl extends _StreamSinkImpl<List<int>> implements IOSink {
Encoding _encoding;
bool _encodingMutable = true;
_IOSinkImpl(StreamConsumer<List<int>> target, this._encoding) : super(target);
Encoding get encoding => _encoding;
void set encoding(Encoding value) {
if (!_encodingMutable) {
throw new StateError("IOSink encoding is not mutable");
}
_encoding = value;
}
void write(Object? obj) {
String string = '$obj';
if (string.isEmpty) return;
add(_encoding.encode(string));
}
void writeAll(Iterable objects, [String separator = ""]) {
Iterator iterator = objects.iterator;
if (!iterator.moveNext()) return;
if (separator.isEmpty) {
do {
write(iterator.current);
} while (iterator.moveNext());
} else {
write(iterator.current);
while (iterator.moveNext()) {
write(separator);
write(iterator.current);
}
}
}
void writeln([Object? object = ""]) {
write(object);
write("\n");
}
void writeCharCode(int charCode) {
write(new String.fromCharCode(charCode));
}
}