From de8e39af182f49f54819198d0afe83b561b560ee Mon Sep 17 00:00:00 2001 From: creativecreatorormaybenot <19204050+creativecreatorormaybenot@users.noreply.github.com> Date: Wed, 27 May 2020 23:15:58 +0000 Subject: [PATCH] Improve docs for switchMap (#109) - Expand the docs for `switchMap` and add caveats. - Crosslink between `switchMap` and `concurrentAsyncMap`. - Add test for erroring convert callback in `switchMap` --- CHANGELOG.md | 2 ++ lib/src/merge.dart | 11 +++++++++-- lib/src/switch.dart | 27 +++++++++++++++++++++++++-- pubspec.yaml | 2 +- test/switch_test.dart | 29 ++++++++++++++++++++--------- 5 files changed, 57 insertions(+), 14 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 5a5ea2a..73fc368 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,7 @@ ## 1.2.1-dev +* Improve tests of `switchMap` and improve documentation with links and clarification. + ## 1.2.0 - Add support for emitting the "leading" event in `debounce`. diff --git a/lib/src/merge.dart b/lib/src/merge.dart index f654d35..afc2da4 100644 --- a/lib/src/merge.dart +++ b/lib/src/merge.dart @@ -4,6 +4,8 @@ import 'dart:async'; +import 'package:stream_transform/src/switch.dart'; + /// Utilities to interleave events from multiple streams. extension Merge on Stream { /// Returns a stream which emits values and errors from the source stream and @@ -56,7 +58,7 @@ extension Merge on Stream { Stream mergeAll(Iterable> others) => transform(_Merge(others)); /// Like [asyncExpand] but the [convert] callback may be called for an element - /// before the Stream emitted by the previous element has closed. + /// before the [Stream] emitted by the previous element has closed. /// /// Events on the result stream will be emitted in the order they are emitted /// by the sub streams, which may not match the order of the original stream. @@ -67,7 +69,7 @@ extension Merge on Stream { /// The result stream will not close until the source stream closes and all /// sub streams have closed. /// - /// If the source stream is a broadcast stream the result will be as well, + /// If the source stream is a broadcast stream, the result will be as well, /// regardless of the types of streams created by [convert]. In this case, /// some care should be taken: /// - If [convert] returns a single subscription stream it may be listened to @@ -76,6 +78,11 @@ extension Merge on Stream { /// stream, any sub streams from previously emitted events will be ignored, /// regardless of whether they emit further events after a listener is added /// back. + /// + /// See also: + /// + /// * [switchMap], which cancels subscriptions to the previous sub + /// stream instead of concurrently emitting events from all sub streams. Stream concurrentAsyncExpand(Stream Function(T) convert) => map(convert).transform(_MergeExpanded()); } diff --git a/lib/src/switch.dart b/lib/src/switch.dart index 5125d57..dcd2431 100644 --- a/lib/src/switch.dart +++ b/lib/src/switch.dart @@ -4,6 +4,8 @@ import 'dart:async'; +import 'package:stream_transform/src/merge.dart'; + /// A utility to take events from the most recent sub stream returned by a /// callback. extension Switch on Stream { @@ -12,9 +14,30 @@ extension Switch on Stream { /// /// When the source emits a value it will be converted to a [Stream] using /// [convert] and the output will switch to emitting events from that result. + /// Like [asyncExpand] but the [Stream] emitted by a previous element + /// will be ignored as soon as the source stream emits a new event. /// - /// If the source stream is a broadcast stream, the result stream will be as - /// well, regardless of the types of the streams produced by [convert]. + /// This means that the source stream is not paused until a sub stream + /// returned from the [convert] callback is done. Instead, the subscription + /// to the sub stream is canceled as soon as the source stream emits a new event. + /// + /// Errors from [convert], the source stream, or any of the sub streams are + /// forwarded to the result stream. + /// + /// The result stream will not close until the source stream closes and + /// the current sub stream have closed. + /// + /// If the source stream is a broadcast stream, the result will be as well, + /// regardless of the types of streams created by [convert]. In this case, + /// some care should be taken: + /// + /// * If [convert] returns a single subscription stream it may be listened to + /// and never canceled. + /// + /// See also: + /// + /// * [concurrentAsyncExpand], which emits events from all sub streams + /// concurrently instead of cancelling subscriptions to previous subs streams. Stream switchMap(Stream Function(T) convert) { return map(convert).switchLatest(); } diff --git a/pubspec.yaml b/pubspec.yaml index 8279c44..c566c16 100644 --- a/pubspec.yaml +++ b/pubspec.yaml @@ -1,6 +1,6 @@ name: stream_transform description: A collection of utilities to transform and manipulate streams. -homepage: https://www.github.com/dart-lang/stream_transform +homepage: https://github.com/dart-lang/stream_transform version: 1.2.1-dev environment: diff --git a/test/switch_test.dart b/test/switch_test.dart index 40030f0..16deb4b 100644 --- a/test/switch_test.dart +++ b/test/switch_test.dart @@ -145,18 +145,29 @@ void main() { expect(transformed.isBroadcast, true); }); - }); - test('handles null response from cancel', () async { - var outer = StreamController>(); - var inner = StreamController(); + test('handles null response from cancel', () async { + var outer = StreamController>(); + var inner = StreamController(); + + var subscription = + NullOnCancelStream(outer.stream).switchLatest().listen(null); - var subscription = - NullOnCancelStream(outer.stream).switchLatest().listen(null); + outer.add(NullOnCancelStream(inner.stream)); + await Future(() {}); - outer.add(NullOnCancelStream(inner.stream)); - await Future(() {}); + await subscription.cancel(); + }); - await subscription.cancel(); + test('forwards errors from the convert callback', () async { + var errors = []; + var source = Stream.fromIterable([1, 2, 3]); + source.switchMap((i) { + // ignore: only_throw_errors + throw 'Error: $i'; + }).listen((_) {}, onError: errors.add); + await Future(() {}); + expect(errors, ['Error: 1', 'Error: 2', 'Error: 3']); + }); }); }