Skip to content

Commit

Permalink
Improve docs for switchMap (#109)
Browse files Browse the repository at this point in the history
- Expand the docs for `switchMap` and add caveats.
- Crosslink between `switchMap` and `concurrentAsyncMap`.
- Add test for erroring convert callback in `switchMap`
  • Loading branch information
creativecreatorormaybenot committed May 27, 2020
1 parent 06a0740 commit de8e39a
Show file tree
Hide file tree
Showing 5 changed files with 57 additions and 14 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
@@ -1,5 +1,7 @@
## 1.2.1-dev

* Improve tests of `switchMap` and improve documentation with links and clarification.

## 1.2.0

- Add support for emitting the "leading" event in `debounce`.
Expand Down
11 changes: 9 additions & 2 deletions lib/src/merge.dart
Expand Up @@ -4,6 +4,8 @@

import 'dart:async';

import 'package:stream_transform/src/switch.dart';

/// Utilities to interleave events from multiple streams.
extension Merge<T> on Stream<T> {
/// Returns a stream which emits values and errors from the source stream and
Expand Down Expand Up @@ -56,7 +58,7 @@ extension Merge<T> on Stream<T> {
Stream<T> mergeAll(Iterable<Stream<T>> others) => transform(_Merge(others));

/// Like [asyncExpand] but the [convert] callback may be called for an element
/// before the Stream emitted by the previous element has closed.
/// before the [Stream] emitted by the previous element has closed.
///
/// Events on the result stream will be emitted in the order they are emitted
/// by the sub streams, which may not match the order of the original stream.
Expand All @@ -67,7 +69,7 @@ extension Merge<T> on Stream<T> {
/// The result stream will not close until the source stream closes and all
/// sub streams have closed.
///
/// If the source stream is a broadcast stream the result will be as well,
/// If the source stream is a broadcast stream, the result will be as well,
/// regardless of the types of streams created by [convert]. In this case,
/// some care should be taken:
/// - If [convert] returns a single subscription stream it may be listened to
Expand All @@ -76,6 +78,11 @@ extension Merge<T> on Stream<T> {
/// stream, any sub streams from previously emitted events will be ignored,
/// regardless of whether they emit further events after a listener is added
/// back.
///
/// See also:
///
/// * [switchMap], which cancels subscriptions to the previous sub
/// stream instead of concurrently emitting events from all sub streams.
Stream<S> concurrentAsyncExpand<S>(Stream<S> Function(T) convert) =>
map(convert).transform(_MergeExpanded());
}
Expand Down
27 changes: 25 additions & 2 deletions lib/src/switch.dart
Expand Up @@ -4,6 +4,8 @@

import 'dart:async';

import 'package:stream_transform/src/merge.dart';

/// A utility to take events from the most recent sub stream returned by a
/// callback.
extension Switch<T> on Stream<T> {
Expand All @@ -12,9 +14,30 @@ extension Switch<T> on Stream<T> {
///
/// When the source emits a value it will be converted to a [Stream] using
/// [convert] and the output will switch to emitting events from that result.
/// Like [asyncExpand] but the [Stream] emitted by a previous element
/// will be ignored as soon as the source stream emits a new event.
///
/// If the source stream is a broadcast stream, the result stream will be as
/// well, regardless of the types of the streams produced by [convert].
/// This means that the source stream is not paused until a sub stream
/// returned from the [convert] callback is done. Instead, the subscription
/// to the sub stream is canceled as soon as the source stream emits a new event.
///
/// Errors from [convert], the source stream, or any of the sub streams are
/// forwarded to the result stream.
///
/// The result stream will not close until the source stream closes and
/// the current sub stream have closed.
///
/// If the source stream is a broadcast stream, the result will be as well,
/// regardless of the types of streams created by [convert]. In this case,
/// some care should be taken:
///
/// * If [convert] returns a single subscription stream it may be listened to
/// and never canceled.
///
/// See also:
///
/// * [concurrentAsyncExpand], which emits events from all sub streams
/// concurrently instead of cancelling subscriptions to previous subs streams.
Stream<S> switchMap<S>(Stream<S> Function(T) convert) {
return map(convert).switchLatest();
}
Expand Down
2 changes: 1 addition & 1 deletion pubspec.yaml
@@ -1,6 +1,6 @@
name: stream_transform
description: A collection of utilities to transform and manipulate streams.
homepage: https://www.github.com/dart-lang/stream_transform
homepage: https://github.com/dart-lang/stream_transform
version: 1.2.1-dev

environment:
Expand Down
29 changes: 20 additions & 9 deletions test/switch_test.dart
Expand Up @@ -145,18 +145,29 @@ void main() {

expect(transformed.isBroadcast, true);
});
});

test('handles null response from cancel', () async {
var outer = StreamController<Stream<int>>();
var inner = StreamController<int>();
test('handles null response from cancel', () async {
var outer = StreamController<Stream<int>>();
var inner = StreamController<int>();

var subscription =
NullOnCancelStream(outer.stream).switchLatest().listen(null);

var subscription =
NullOnCancelStream(outer.stream).switchLatest().listen(null);
outer.add(NullOnCancelStream(inner.stream));
await Future<void>(() {});

outer.add(NullOnCancelStream(inner.stream));
await Future<void>(() {});
await subscription.cancel();
});

await subscription.cancel();
test('forwards errors from the convert callback', () async {
var errors = <String>[];
var source = Stream.fromIterable([1, 2, 3]);
source.switchMap((i) {
// ignore: only_throw_errors
throw 'Error: $i';
}).listen((_) {}, onError: errors.add);
await Future<void>(() {});
expect(errors, ['Error: 1', 'Error: 2', 'Error: 3']);
});
});
}

0 comments on commit de8e39a

Please sign in to comment.