-
Notifications
You must be signed in to change notification settings - Fork 1.6k
/
stream.dart
2580 lines (2475 loc) · 97.4 KB
/
stream.dart
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
// Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file
// for details. All rights reserved. Use of this source code is governed by a
// BSD-style license that can be found in the LICENSE file.
part of dart.async;
// -------------------------------------------------------------------
// Core Stream types
// -------------------------------------------------------------------
typedef void _TimerCallback();
/// A source of asynchronous data events.
///
/// A Stream provides a way to receive a sequence of events.
/// Each event is either a data event, also called an *element* of the stream,
/// or an error event, which is a notification that something has failed.
/// When a stream has emitted all its events,
/// a single "done" event notifies the listener that the end has been reached.
///
/// You produce a stream by calling an `async*` function, which then returns
/// a stream. Consuming that stream will lead the function to emit events
/// until it ends, and the stream closes.
/// You consume a stream either using an `await for` loop, which is available
/// inside an `async` or `async*` function, or by forwarding its events directly
/// using `yield*` inside an `async*` function.
/// Example:
/// ```dart
/// Stream<T> optionalMap<T>(
/// Stream<T> source , [T Function(T)? convert]) async* {
/// if (convert == null) {
/// yield* source;
/// } else {
/// await for (var event in source) {
/// yield convert(event);
/// }
/// }
/// }
/// ```
/// When this function is called, it immediately returns a `Stream<T>` object.
/// Then nothing further happens until someone tries to consume that stream.
/// At that point, the body of the `async*` function starts running.
/// If the `convert` function was omitted, the `yield*` will listen to the
/// `source` stream and forward all events, date and errors, to the returned
/// stream. When the `source` stream closes, the `yield*` is done,
/// and the `optionalMap` function body ends too. This closes the returned
/// stream.
/// If a `convert` *is* supplied, the function instead listens on the source
/// stream and enters an `await for` loop which
/// repeatedly waits for the next data event.
/// On a data event, it calls `convert` with the value and emits the result
/// on the returned stream.
/// If no error events are emitted by the `source` stream,
/// the loop ends when the `source` stream does,
/// then the `optionalMap` function body completes,
/// which closes the returned stream.
/// On an error event from the `source` stream,
/// the `await for` re-throws that error, which breaks the loop.
/// The error then reaches the end of the `optionalMap` function body,
/// since it's not caught.
/// That makes the error be emitted on the returned stream, which then closes.
///
/// The `Stream` class also provides functionality which allows you to
/// manually listen for events from a stream, or to convert a stream
/// into another stream or into a future.
///
/// The [forEach] function corresponds to the `await for` loop,
/// just as [Iterable.forEach] corresponds to a normal `for`/`in` loop.
/// Like the loop, it will call a function for each data event and break on an
/// error.
///
/// The more low-level [listen] method is what every other method is based on.
/// You call `listen` on a stream to tell it that you want to receive
/// events, and to register the callbacks which will receive those events.
/// When you call `listen`, you receive a [StreamSubscription] object
/// which is the active object providing the events,
/// and which can be used to stop listening again,
/// or to temporarily pause events from the subscription.
///
/// There are two kinds of streams: "Single-subscription" streams and
/// "broadcast" streams.
///
/// *A single-subscription stream* allows only a single listener during the whole
/// lifetime of the stream.
/// It doesn't start generating events until it has a listener,
/// and it stops sending events when the listener is unsubscribed,
/// even if the source of events could still provide more.
/// The stream created by an `async*` function is a single-subscription stream,
/// but each call to the function creates a new such stream.
///
/// Listening twice on a single-subscription stream is not allowed, even after
/// the first subscription has been canceled.
///
/// Single-subscription streams are generally used for streaming chunks of
/// larger contiguous data, like file I/O.
///
/// *A broadcast stream* allows any number of listeners, and it fires
/// its events when they are ready, whether there are listeners or not.
///
/// Broadcast streams are used for independent events/observers.
///
/// If several listeners want to listen to a single-subscription stream,
/// use [asBroadcastStream] to create a broadcast stream on top of the
/// non-broadcast stream.
///
/// On either kind of stream, stream transformations, such as [where] and
/// [skip], return the same type of stream as the one the method was called on,
/// unless otherwise noted.
///
/// When an event is fired, the listener(s) at that time will receive the event.
/// If a listener is added to a broadcast stream while an event is being fired,
/// that listener will not receive the event currently being fired.
/// If a listener is canceled, it immediately stops receiving events.
/// Listening on a broadcast stream can be treated as listening on a new stream
/// containing only the events that have not yet been emitted when the [listen]
/// call occurs.
/// For example the [first] getter listens to the stream, then returns the first
/// event that listener receives.
/// This is not necessarily the first even emitted by the stream, but the first
/// of the *remaining* events of the broadcast stream.
///
/// When the "done" event is fired, subscribers are unsubscribed before
/// receiving the event. After the event has been sent, the stream has no
/// subscribers. Adding new subscribers to a broadcast stream after this point
/// is allowed, but they will just receive a new "done" event as soon
/// as possible.
///
/// Stream subscriptions always respect "pause" requests. If necessary they need
/// to buffer their input, but often, and preferably they can simply request
/// their input to pause too.
///
/// The default implementation of [isBroadcast] returns false.
/// A broadcast stream inheriting from [Stream] must override [isBroadcast]
/// to return `true` if it wants to signal that it behaves like a broadcast
/// stream.
abstract class Stream<T> {
Stream();
/// Internal use only. We do not want to promise that Stream stays const.
///
/// If mixins become compatible with const constructors, we may use a
/// stream mixin instead of extending Stream from a const class.
/// (They now are compatible. We still consider, but it's not urgent.)
const Stream._internal();
/// Creates an empty broadcast stream.
///
/// This is a stream which does nothing except sending a done event
/// when it's listened to.
///
/// Example:
/// ```dart
/// const stream = Stream.empty();
/// stream.listen(
/// (value) {
/// throw "Unreachable";
/// },
/// onDone: () {
/// print('Done');
/// },
/// );
/// ```
const factory Stream.empty() = _EmptyStream<T>;
/// Creates a stream which emits a single data event before closing.
///
/// This stream emits a single data event of [value]
/// and then closes with a done event.
///
/// Example:
/// ```dart
/// Future<void> printThings(Stream<String> data) async {
/// await for (var x in data) {
/// print(x);
/// }
/// }
/// printThings(Stream<String>.value('ok')); // prints "ok".
/// ```
///
/// The returned stream is effectively equivalent to one created by
/// `(() async* { yield value; } ())` or `Future<T>.value(value).asStream()`.
@Since("2.5")
factory Stream.value(T value) =>
(_AsyncStreamController<T>(null, null, null, null)
.._add(value)
.._closeUnchecked())
.stream;
/// Creates a stream which emits a single error event before completing.
///
/// This stream emits a single error event of [error] and [stackTrace]
/// and then completes with a done event.
///
/// Example:
/// ```dart
/// Future<void> tryThings(Stream<int> data) async {
/// try {
/// await for (var x in data) {
/// print('Data: $x');
/// }
/// } catch (e) {
/// print(e);
/// }
/// }
/// tryThings(Stream<int>.error('Error')); // prints "Error".
/// ```
/// The returned stream is effectively equivalent to one created by
/// `Future<T>.error(error, stackTrace).asStream()`, by or
/// `(() async* { throw error; } ())`, except that you can control the
/// stack trace as well.
@Since("2.5")
factory Stream.error(Object error, [StackTrace? stackTrace]) {
// TODO(40614): Remove once non-nullability is sound.
checkNotNullable(error, "error");
return (_AsyncStreamController<T>(null, null, null, null)
.._addError(error, stackTrace ?? AsyncError.defaultStackTrace(error))
.._closeUnchecked())
.stream;
}
/// Creates a new single-subscription stream from the future.
///
/// When the future completes, the stream will fire one event, either
/// data or error, and then close with a done-event.
///
/// Example:
/// ```dart
/// Future<String> futureTask() async {
/// await Future.delayed(const Duration(seconds: 5));
/// return 'Future complete';
/// }
///
/// final stream = Stream<String>.fromFuture(futureTask());
/// stream.listen(print,
/// onDone: () => print('Done'), onError: print);
///
/// // Outputs:
/// // "Future complete" after 'futureTask' finished.
/// // "Done" when stream completed.
/// ```
factory Stream.fromFuture(Future<T> future) {
// Use the controller's buffering to fill in the value even before
// the stream has a listener. For a single value, it's not worth it
// to wait for a listener before doing the `then` on the future.
_StreamController<T> controller =
new _SyncStreamController<T>(null, null, null, null);
future.then((value) {
controller._add(value);
controller._closeUnchecked();
}, onError: (error, stackTrace) {
controller._addError(error, stackTrace);
controller._closeUnchecked();
});
return controller.stream;
}
/// Create a single-subscription stream from a group of futures.
///
/// The stream reports the results of the futures on the stream in the order
/// in which the futures complete.
/// Each future provides either a data event or an error event,
/// depending on how the future completes.
///
/// If some futures have already completed when `Stream.fromFutures` is called,
/// their results will be emitted in some unspecified order.
///
/// When all futures have completed, the stream is closed.
///
/// If [futures] is empty, the stream closes as soon as possible.
///
/// Example:
/// ```dart
/// Future<int> waitTask() async {
/// await Future.delayed(const Duration(seconds: 2));
/// return 10;
/// }
///
/// Future<String> doneTask() async {
/// await Future.delayed(const Duration(seconds: 5));
/// return 'Future complete';
/// }
///
/// final stream = Stream<Object>.fromFutures([doneTask(), waitTask()]);
/// stream.listen(print, onDone: () => print('Done'), onError: print);
///
/// // Outputs:
/// // 10 after 'waitTask' finished.
/// // "Future complete" after 'doneTask' finished.
/// // "Done" when stream completed.
/// ```
factory Stream.fromFutures(Iterable<Future<T>> futures) {
_StreamController<T> controller =
new _SyncStreamController<T>(null, null, null, null);
int count = 0;
// Declare these as variables holding closures instead of as
// function declarations.
// This avoids creating a new closure from the functions for each future.
void onValue(T value) {
if (!controller.isClosed) {
controller._add(value);
if (--count == 0) controller._closeUnchecked();
}
}
void onError(Object error, StackTrace stack) {
if (!controller.isClosed) {
controller._addError(error, stack);
if (--count == 0) controller._closeUnchecked();
}
}
// The futures are already running, so start listening to them immediately
// (instead of waiting for the stream to be listened on).
// If we wait, we might not catch errors in the futures in time.
for (var future in futures) {
count++;
future.then(onValue, onError: onError);
}
// Use schedule microtask since controller is sync.
if (count == 0) scheduleMicrotask(controller.close);
return controller.stream;
}
/// Creates a single-subscription stream that gets its data from [elements].
///
/// The iterable is iterated when the stream receives a listener, and stops
/// iterating if the listener cancels the subscription, or if the
/// [Iterator.moveNext] method returns `false` or throws.
/// Iteration is suspended while the stream subscription is paused.
///
/// If calling [Iterator.moveNext] on `elements.iterator` throws,
/// the stream emits that error and then it closes.
/// If reading [Iterator.current] on `elements.iterator` throws,
/// the stream emits that error, but keeps iterating.
///
/// Example:
/// ```dart
/// final numbers = [1, 2, 3, 5, 6, 7];
/// final stream = Stream.fromIterable(numbers);
/// ```
factory Stream.fromIterable(Iterable<T> elements) {
return new _GeneratedStreamImpl<T>(
() => new _IterablePendingEvents<T>(elements));
}
/// Creates a multi-subscription stream.
///
/// Each time the created stream is listened to,
/// the [onListen] callback is invoked with a new [MultiStreamController],
/// which forwards events to the [StreamSubscription]
/// returned by that [listen] call.
///
/// This allows each listener to be treated as an individual stream.
///
/// The [MultiStreamController] does not support reading its
/// [StreamController.stream]. Setting its [StreamController.onListen]
/// has no effect since the [onListen] callback is called instead,
/// and the [StreamController.onListen] won't be called later.
/// The controller acts like an asynchronous controller,
/// but provides extra methods for delivering events synchronously.
///
/// If [isBroadcast] is set to `true`, the returned stream's
/// [Stream.isBroadcast] will be `true`.
/// This has no effect on the stream behavior,
/// it is up to the [onListen] function
/// to act like a broadcast stream if it claims to be one.
///
/// A multi-subscription stream can behave like any other stream.
/// If the [onListen] callback throws on every call after the first,
/// the stream behaves like a single-subscription stream.
/// If the stream emits the same events to all current listeners,
/// it behaves like a broadcast stream.
///
/// It can also choose to emit different events to different listeners.
/// For example, a stream which repeats the most recent
/// non-`null` event to new listeners, could be implemented as this example:
/// ```dart
/// extension StreamRepeatLatestExtension<T extends Object> on Stream<T> {
/// Stream<T> repeatLatest() {
/// var done = false;
/// T? latest = null;
/// var currentListeners = <MultiStreamController<T>>{};
/// this.listen((event) {
/// latest = event;
/// for (var listener in [...currentListeners]) listener.addSync(event);
/// }, onError: (Object error, StackTrace stack) {
/// for (var listener in [...currentListeners]) listener.addErrorSync(error, stack);
/// }, onDone: () {
/// done = true;
/// latest = null;
/// for (var listener in currentListeners) listener.closeSync();
/// currentListeners.clear();
/// });
/// return Stream.multi((controller) {
/// if (done) {
/// controller.close();
/// return;
/// }
/// currentListeners.add(controller);
/// var latestValue = latest;
/// if (latestValue != null) controller.add(latestValue);
/// controller.onCancel = () {
/// currentListeners.remove(controller);
/// };
/// });
/// }
/// }
/// ```
@Since("2.9")
factory Stream.multi(void Function(MultiStreamController<T>) onListen,
{bool isBroadcast = false}) {
return _MultiStream<T>(onListen, isBroadcast);
}
/// Creates a stream that repeatedly emits events at [period] intervals.
///
/// The event values are computed by invoking [computation]. The argument to
/// this callback is an integer that starts with 0 and is incremented for
/// every event.
///
/// The [period] must be a non-negative [Duration].
///
/// If [computation] is omitted, the event values will all be `null`.
///
/// The [computation] must not be omitted if the event type [T] does not
/// allow `null` as a value.
///
/// Example:
/// ```dart
/// final stream =
/// Stream<int>.periodic(const Duration(
/// seconds: 1), (count) => count * count).take(5);
///
/// stream.forEach(print); // Outputs event values 0,1,4,9,16.
/// ```
factory Stream.periodic(Duration period,
[T computation(int computationCount)?]) {
if (computation == null && !typeAcceptsNull<T>()) {
throw ArgumentError.value(null, "computation",
"Must not be omitted when the event type is non-nullable");
}
var controller = _SyncStreamController<T>(null, null, null, null);
// Counts the time that the Stream was running (and not paused).
Stopwatch watch = new Stopwatch();
controller.onListen = () {
int computationCount = 0;
void sendEvent(_) {
watch.reset();
if (computation != null) {
T event;
try {
event = computation(computationCount++);
} catch (e, s) {
controller.addError(e, s);
return;
}
controller.add(event);
} else {
controller.add(null as T); // We have checked that null is T.
}
}
Timer timer = Timer.periodic(period, sendEvent);
controller
..onCancel = () {
timer.cancel();
return Future._nullFuture;
}
..onPause = () {
watch.stop();
timer.cancel();
}
..onResume = () {
Duration elapsed = watch.elapsed;
watch.start();
timer = new Timer(period - elapsed, () {
timer = Timer.periodic(period, sendEvent);
sendEvent(null);
});
};
};
return controller.stream;
}
/// Creates a stream where all events of an existing stream are piped through
/// a sink-transformation.
///
/// The given [mapSink] closure is invoked when the returned stream is
/// listened to. All events from the [source] are added into the event sink
/// that is returned from the invocation. The transformation puts all
/// transformed events into the sink the [mapSink] closure received during
/// its invocation. Conceptually the [mapSink] creates a transformation pipe
/// with the input sink being the returned [EventSink] and the output sink
/// being the sink it received.
///
/// This constructor is frequently used to build transformers.
///
/// Example use for a duplicating transformer:
/// ```dart
/// class DuplicationSink implements EventSink<String> {
/// final EventSink<String> _outputSink;
/// DuplicationSink(this._outputSink);
///
/// void add(String data) {
/// _outputSink.add(data);
/// _outputSink.add(data);
/// }
///
/// void addError(e, [st]) { _outputSink.addError(e, st); }
/// void close() { _outputSink.close(); }
/// }
///
/// class DuplicationTransformer extends StreamTransformerBase<String, String> {
/// // Some generic types omitted for brevity.
/// Stream bind(Stream stream) => Stream<String>.eventTransformed(
/// stream,
/// (EventSink sink) => DuplicationSink(sink));
/// }
///
/// stringStream.transform(DuplicationTransformer());
/// ```
/// The resulting stream is a broadcast stream if [source] is.
factory Stream.eventTransformed(
Stream<dynamic> source, EventSink<dynamic> mapSink(EventSink<T> sink)) {
return new _BoundSinkStream(source, mapSink);
}
/// Adapts [source] to be a `Stream<T>`.
///
/// This allows [source] to be used at the new type, but at run-time it
/// must satisfy the requirements of both the new type and its original type.
///
/// Data events created by the source stream must also be instances of [T].
static Stream<T> castFrom<S, T>(Stream<S> source) =>
new CastStream<S, T>(source);
/// Whether this stream is a broadcast stream.
bool get isBroadcast => false;
/// Returns a multi-subscription stream that produces the same events as this.
///
/// The returned stream will subscribe to this stream when its first
/// subscriber is added, and will stay subscribed until this stream ends,
/// or a callback cancels the subscription.
///
/// If [onListen] is provided, it is called with a subscription-like object
/// that represents the underlying subscription to this stream. It is
/// possible to pause, resume or cancel the subscription during the call
/// to [onListen]. It is not possible to change the event handlers, including
/// using [StreamSubscription.asFuture].
///
/// If [onCancel] is provided, it is called in a similar way to [onListen]
/// when the returned stream stops having listeners. If it later gets
/// a new listener, the [onListen] function is called again.
///
/// Use the callbacks, for example, for pausing the underlying subscription
/// while having no subscribers to prevent losing events, or canceling the
/// subscription when there are no listeners.
///
/// Cancelling is intended to be used when there are no current subscribers.
/// If the subscription passed to `onListen` or `onCancel` is cancelled,
/// then no further events are ever emitted by current subscriptions on
/// the returned broadcast stream, not even a done event.
///
/// Example:
/// ```dart
/// final stream =
/// Stream<int>.periodic(const Duration(seconds: 1), (count) => count)
/// .take(10);
///
/// final broadcastStream = stream.asBroadcastStream(
/// onCancel: (controller) {
/// print('Stream paused');
/// controller.pause();
/// },
/// onListen: (controller) async {
/// if (controller.isPaused) {
/// print('Stream resumed');
/// controller.resume();
/// }
/// },
/// );
///
/// final oddNumberStream = broadcastStream.where((event) => event.isOdd);
/// final oddNumberListener = oddNumberStream.listen(
/// (event) {
/// print('Odd: $event');
/// },
/// onDone: () => print('Done'),
/// );
///
/// final evenNumberStream = broadcastStream.where((event) => event.isEven);
/// final evenNumberListener = evenNumberStream.listen((event) {
/// print('Even: $event');
/// }, onDone: () => print('Done'));
///
/// await Future.delayed(const Duration(milliseconds: 3500)); // 3.5 second
/// // Outputs:
/// // Even: 0
/// // Odd: 1
/// // Even: 2
/// oddNumberListener.cancel(); // Nothing printed.
/// evenNumberListener.cancel(); // "Stream paused"
/// await Future.delayed(const Duration(seconds: 2));
/// print(await broadcastStream.first); // "Stream resumed"
/// // Outputs:
/// // 3
/// ```
Stream<T> asBroadcastStream(
{void onListen(StreamSubscription<T> subscription)?,
void onCancel(StreamSubscription<T> subscription)?}) {
return new _AsBroadcastStream<T>(this, onListen, onCancel);
}
/// Adds a subscription to this stream.
///
/// Returns a [StreamSubscription] which handles events from this stream using
/// the provided [onData], [onError] and [onDone] handlers.
/// The handlers can be changed on the subscription, but they start out
/// as the provided functions.
///
/// On each data event from this stream, the subscriber's [onData] handler
/// is called. If [onData] is `null`, nothing happens.
///
/// On errors from this stream, the [onError] handler is called with the
/// error object and possibly a stack trace.
///
/// The [onError] callback must be of type `void Function(Object error)` or
/// `void Function(Object error, StackTrace)`.
/// The function type determines whether [onError] is invoked with a stack
/// trace argument.
/// The stack trace argument may be [StackTrace.empty] if this stream received
/// an error without a stack trace.
///
/// Otherwise it is called with just the error object.
/// If [onError] is omitted, any errors on this stream are considered unhandled,
/// and will be passed to the current [Zone]'s error handler.
/// By default unhandled async errors are treated
/// as if they were uncaught top-level errors.
///
/// If this stream closes and sends a done event, the [onDone] handler is
/// called. If [onDone] is `null`, nothing happens.
///
/// If [cancelOnError] is `true`, the subscription is automatically canceled
/// when the first error event is delivered. The default is `false`.
///
/// While a subscription is paused, or when it has been canceled,
/// the subscription doesn't receive events and none of the
/// event handler functions are called.
StreamSubscription<T> listen(void onData(T event)?,
{Function? onError, void onDone()?, bool? cancelOnError});
/// Creates a new stream from this stream that discards some elements.
///
/// The new stream sends the same error and done events as this stream,
/// but it only sends the data events that satisfy the [test].
///
/// If the [test] function throws, the data event is dropped and the
/// error is emitted on the returned stream instead.
///
/// The returned stream is a broadcast stream if this stream is.
/// If a broadcast stream is listened to more than once, each subscription
/// will individually perform the `test`.
///
/// Example:
/// ```dart
/// final stream =
/// Stream<int>.periodic(const Duration(seconds: 1), (count) => count)
/// .take(10);
///
/// final customStream = stream.where((event) => event > 3 && event <= 6);
/// customStream.listen(print); // Outputs event values: 4,5,6.
/// ```
Stream<T> where(bool test(T event)) {
return new _WhereStream<T>(this, test);
}
/// Transforms each element of this stream into a new stream event.
///
/// Creates a new stream that converts each element of this stream
/// to a new value using the [convert] function, and emits the result.
///
/// For each data event, `o`, in this stream, the returned stream
/// provides a data event with the value `convert(o)`.
/// If [convert] throws, the returned stream reports it as an error
/// event instead.
///
/// Error and done events are passed through unchanged to the returned stream.
///
/// The returned stream is a broadcast stream if this stream is.
/// The [convert] function is called once per data event per listener.
/// If a broadcast stream is listened to more than once, each subscription
/// will individually call [convert] on each data event.
///
/// Unlike [transform], this method does not treat the stream as
/// chunks of a single value. Instead each event is converted independently
/// of the previous and following events, which may not always be correct.
/// For example, UTF-8 encoding, or decoding, will give wrong results
/// if a surrogate pair, or a multibyte UTF-8 encoding, is split into
/// separate events, and those events are attempted encoded or decoded
/// independently.
///
/// Example:
/// ```dart
/// final stream =
/// Stream<int>.periodic(const Duration(seconds: 1), (count) => count)
/// .take(5);
///
/// final calculationStream =
/// stream.map<String>((event) => 'Square: ${event * event}');
/// calculationStream.forEach(print);
/// // Square: 0
/// // Square: 1
/// // Square: 4
/// // Square: 9
/// // Square: 16
/// ```
Stream<S> map<S>(S convert(T event)) {
return new _MapStream<T, S>(this, convert);
}
/// Creates a new stream with each data event of this stream asynchronously
/// mapped to a new event.
///
/// This acts like [map], except that [convert] may return a [Future],
/// and in that case, this stream waits for that future to complete before
/// continuing with its result.
///
/// The returned stream is a broadcast stream if this stream is.
Stream<E> asyncMap<E>(FutureOr<E> convert(T event)) {
_StreamControllerBase<E> controller;
if (isBroadcast) {
controller = _SyncBroadcastStreamController<E>(null, null);
} else {
controller = _SyncStreamController<E>(null, null, null, null);
}
controller.onListen = () {
StreamSubscription<T> subscription = this.listen(null,
onError: controller._addError, // Avoid Zone error replacement.
onDone: controller.close);
FutureOr<Null> add(E value) {
controller.add(value);
}
final addError = controller._addError;
final resume = subscription.resume;
subscription.onData((T event) {
FutureOr<E> newValue;
try {
newValue = convert(event);
} catch (e, s) {
controller.addError(e, s);
return;
}
if (newValue is Future<E>) {
subscription.pause();
newValue.then(add, onError: addError).whenComplete(resume);
} else {
// TODO(40014): Remove cast when type promotion works.
controller.add(newValue as dynamic);
}
});
controller.onCancel = subscription.cancel;
if (!isBroadcast) {
controller
..onPause = subscription.pause
..onResume = resume;
}
};
return controller.stream;
}
/// Transforms each element into a sequence of asynchronous events.
///
/// Returns a new stream and for each event of this stream, do the following:
///
/// * If the event is an error event or a done event, it is emitted directly
/// by the returned stream.
/// * Otherwise it is an element. Then the [convert] function is called
/// with the element as argument to produce a convert-stream for the element.
/// * If that call throws, the error is emitted on the returned stream.
/// * If the call returns `null`, no further action is taken for the elements.
/// * Otherwise, this stream is paused and convert-stream is listened to.
/// Every data and error event of the convert-stream is emitted on the returned
/// stream in the order it is produced.
/// When the convert-stream ends, this stream is resumed.
///
/// The returned stream is a broadcast stream if this stream is.
Stream<E> asyncExpand<E>(Stream<E>? convert(T event)) {
_StreamControllerBase<E> controller;
if (isBroadcast) {
controller = _SyncBroadcastStreamController<E>(null, null);
} else {
controller = _SyncStreamController<E>(null, null, null, null);
}
controller.onListen = () {
StreamSubscription<T> subscription = this.listen(null,
onError: controller._addError, // Avoid Zone error replacement.
onDone: controller.close);
subscription.onData((T event) {
Stream<E>? newStream;
try {
newStream = convert(event);
} catch (e, s) {
controller.addError(e, s);
return;
}
if (newStream != null) {
subscription.pause();
controller.addStream(newStream).whenComplete(subscription.resume);
}
});
controller.onCancel = subscription.cancel;
if (!isBroadcast) {
controller
..onPause = subscription.pause
..onResume = subscription.resume;
}
};
return controller.stream;
}
/// Creates a wrapper Stream that intercepts some errors from this stream.
///
/// If this stream sends an error that matches [test], then it is intercepted
/// by the [onError] function.
///
/// The [onError] callback must be of type `void Function(Object error)` or
/// `void Function(Object error, StackTrace)`.
/// The function type determines whether [onError] is invoked with a stack
/// trace argument.
/// The stack trace argument may be [StackTrace.empty] if this stream received
/// an error without a stack trace.
///
/// An asynchronous error `error` is matched by a test function if
/// `test(error)` returns true. If [test] is omitted, every error is
/// considered matching.
///
/// If the error is intercepted, the [onError] function can decide what to do
/// with it. It can throw if it wants to raise a new (or the same) error,
/// or simply return to make this stream forget the error.
/// If the received `error` value is thrown again by the [onError] function,
/// it acts like a `rethrow` and it is emitted along with its original
/// stack trace, not the stack trace of the `throw` inside [onError].
///
/// If you need to transform an error into a data event, use the more generic
/// [Stream.transform] to handle the event by writing a data event to
/// the output sink.
///
/// The returned stream is a broadcast stream if this stream is.
/// If a broadcast stream is listened to more than once, each subscription
/// will individually perform the `test` and handle the error.
///
/// Example:
/// ```dart
/// Stream.periodic(const Duration(seconds: 1), (count) {
/// if (count == 2) {
/// throw Exception('Exceptional event');
/// }
/// return count;
/// }).take(4).handleError(print).forEach(print);
///
/// // Outputs:
/// // 0
/// // 1
/// // Exception: Exceptional event
/// // 3
/// // 4
/// ```
Stream<T> handleError(Function onError, {bool test(error)?}) {
if (onError is! void Function(Object, StackTrace) &&
onError is! void Function(Object)) {
throw ArgumentError.value(
onError,
"onError",
"Error handler must accept one Object or one Object and a StackTrace"
" as arguments.");
}
return new _HandleErrorStream<T>(this, onError, test);
}
/// Transforms each element of this stream into a sequence of elements.
///
/// Returns a new stream where each element of this stream is replaced
/// by zero or more data events.
/// The event values are provided as an [Iterable] by a call to [convert]
/// with the element as argument, and the elements of that iterable is
/// emitted in iteration order.
/// If calling [convert] throws, or if the iteration of the returned values
/// throws, the error is emitted on the returned stream and iteration ends
/// for that element of this stream.
///
/// Error events and the done event of this stream are forwarded directly
/// to the returned stream.
///
/// The returned stream is a broadcast stream if this stream is.
/// If a broadcast stream is listened to more than once, each subscription
/// will individually call `convert` and expand the events.
Stream<S> expand<S>(Iterable<S> convert(T element)) {
return new _ExpandStream<T, S>(this, convert);
}
/// Pipes the events of this stream into [streamConsumer].
///
/// All events of this stream are added to `streamConsumer` using
/// [StreamConsumer.addStream].
/// The `streamConsumer` is closed when this stream has been successfully added
/// to it - when the future returned by `addStream` completes without an error.
///
/// Returns a future which completes when this stream has been consumed
/// and the consumer has been closed.
///
/// The returned future completes with the same result as the future returned
/// by [StreamConsumer.close].
/// If the call to [StreamConsumer.addStream] fails in some way, this
/// method fails in the same way.
Future pipe(StreamConsumer<T> streamConsumer) {
return streamConsumer.addStream(this).then((_) => streamConsumer.close());
}
/// Applies [streamTransformer] to this stream.
///
/// Returns the transformed stream,
/// that is, the result of `streamTransformer.bind(this)`.
/// This method simply allows writing the call to `streamTransformer.bind`
/// in a chained fashion, like
/// ```dart
/// stream.map(mapping).transform(transformation).toList()
/// ```
/// which can be more convenient than calling `bind` directly.
///
/// The [streamTransformer] can return any stream.
/// Whether the returned stream is a broadcast stream or not,
/// and which elements it will contain,
/// is entirely up to the transformation.
///
/// This method should always be used for transformations which treat
/// the entire stream as representing a single value
/// which has perhaps been split into several parts for transport,
/// like a file being read from disk or being fetched over a network.
/// The transformation will then produce a new stream which
/// transforms the stream's value incrementally (perhaps using
/// [Converter.startChunkedConversion]). The resulting stream
/// may again be chunks of the result, but does not have to
/// correspond to specific events from the source string.
Stream<S> transform<S>(StreamTransformer<T, S> streamTransformer) {
return streamTransformer.bind(this);
}
/// Combines a sequence of values by repeatedly applying [combine].
///
/// Similar to [Iterable.reduce], this function maintains a value,
/// starting with the first element of this stream
/// and updated for each further element of this stream.
/// For each element after the first,
/// the value is updated to the result of calling [combine]
/// with the previous value and the element.
///
/// When this stream is done, the returned future is completed with
/// the value at that time.
///
/// If this stream is empty, the returned future is completed with
/// an error.
/// If this stream emits an error, or the call to [combine] throws,
/// the returned future is completed with that error,
/// and processing is stopped.
///
/// Example:
/// ```dart
/// final result = await Stream.fromIterable([2, 6, 10, 8, 2])
/// .reduce((previous, element) => previous + element);
/// print(result); // 28
/// ```
Future<T> reduce(T combine(T previous, T element)) {
_Future<T> result = new _Future<T>();
bool seenFirst = false;
late T value;
StreamSubscription<T> subscription =
this.listen(null, onError: result._completeError, onDone: () {
if (!seenFirst) {
try {
// Throw and recatch, instead of just doing
// _completeWithErrorCallback, e, theError, StackTrace.current),
// to ensure that the stackTrace is set on the error.
throw IterableElementError.noElement();
} catch (e, s) {
_completeWithErrorCallback(result, e, s);
}
} else {
result._complete(value);
}
}, cancelOnError: true);
subscription.onData((T element) {
if (seenFirst) {
_runUserCode(() => combine(value, element), (T newValue) {
value = newValue;
}, _cancelAndErrorClosure(subscription, result));
} else {