Skip to content

Commit

Permalink
test(flatMap): fix flatMap tests (#658)
Browse files Browse the repository at this point in the history
  • Loading branch information
hoc081098 committed Jan 29, 2022
1 parent 480b6de commit c5daf4c
Showing 1 changed file with 29 additions and 13 deletions.
42 changes: 29 additions & 13 deletions test/transformers/flat_map_test.dart
Original file line number Diff line number Diff line change
Expand Up @@ -149,76 +149,92 @@ void main() {
const maxConcurrent = 2;
var activeCount = 0;

final stream = Stream.fromIterable([1, 2, 3, 4, 5, 6, 7, 8, 9, 10]).flatMap(
// 1 -> 500
// 2 -> 400
// 3 -> 500
// 4 -> 200
// -----1--4
// ----2-----3
// ----21--4-3
final stream = Stream.fromIterable([1, 2, 3, 4]).flatMap(
(value) {
return Rx.defer(() {
expect(++activeCount, lessThanOrEqualTo(maxConcurrent));

final ms = value.isOdd ? ((10 - value) * 200) : ((11 - value) * 400);
final ms = (value.isOdd ? 5 : 6 - value) * 100;
return Rx.timer(value, Duration(milliseconds: ms));
}).doOnDone(() => --activeCount);
},
maxConcurrent: maxConcurrent,
);

await expectLater(stream,
emitsInOrder(<Object>[1, 3, 2, 5, 4, 6, 7, 9, 10, 8, emitsDone]));
await expectLater(stream, emitsInOrder(<Object>[2, 1, 4, 3, emitsDone]));
});

test('Rx.flatMap(maxConcurrent: 3)', () async {
const maxConcurrent = 3;
var activeCount = 0;

final stream = Stream.fromIterable([1, 2, 3, 4, 5, 6, 7, 8, 9, 10]).flatMap(
// 1 -> 400
// 2 -> 300
// 3 -> 200
// 4 -> 200
// 5 -> 300
// 6 -> 400
// ----1----6
// ---2---5
// --3--4
// --3214-5-6
final stream = Stream.fromIterable([1, 2, 3, 4, 5, 6]).flatMap(
(value) {
return Rx.defer(() {
expect(++activeCount, lessThanOrEqualTo(maxConcurrent));

final ms = value.isOdd ? ((10 - value) * 240) : ((11 - value) * 360);
final ms = (value <= 3 ? 5 - value : value - 2) * 100;
return Rx.timer(value, Duration(milliseconds: ms));
}).doOnDone(() => --activeCount);
},
maxConcurrent: maxConcurrent,
);

await expectLater(stream,
emitsInOrder(<Object>[3, 1, 2, 5, 7, 4, 9, 10, 6, 8, emitsDone]));
await expectLater(
stream, emitsInOrder(<Object>[3, 2, 1, 4, 5, 6, emitsDone]));
});

test('Rx.flatMap.cancel', () {
_getStream()
.flatMap(_getOtherStream)
.listen(expectAsync1((data) {}, count: 0))
.cancel();
}, timeout: const Timeout(Duration(milliseconds: 500)));
}, timeout: const Timeout(Duration(milliseconds: 200)));

test('Rx.flatMap(maxConcurrent: 1).cancel', () {
_getStream()
.flatMap(_getOtherStream, maxConcurrent: 1)
.listen(expectAsync1((data) {}, count: 0))
.cancel();
}, timeout: const Timeout(Duration(milliseconds: 500)));
}, timeout: const Timeout(Duration(milliseconds: 200)));

test('Rx.flatMap.take.cancel', () {
_getStream()
.flatMap(_getOtherStream)
.take(1)
.listen(expectAsync1((data) => expect(data, 3), count: 1));
}, timeout: const Timeout(Duration(milliseconds: 500)));
}, timeout: const Timeout(Duration(milliseconds: 200)));

test('Rx.flatMap(maxConcurrent: 1).take.cancel', () {
_getStream()
.flatMap(_getOtherStream, maxConcurrent: 1)
.take(1)
.listen(expectAsync1((data) => expect(data, 1), count: 1));
}, timeout: const Timeout(Duration(milliseconds: 500)));
}, timeout: const Timeout(Duration(milliseconds: 200)));

test('Rx.flatMap(maxConcurrent: 2).take.cancel', () {
_getStream()
.flatMap(_getOtherStream, maxConcurrent: 2)
.take(1)
.listen(expectAsync1((data) => expect(data, 2), count: 1));
}, timeout: const Timeout(Duration(milliseconds: 500)));
}, timeout: const Timeout(Duration(milliseconds: 200)));
}

Stream<int> _getStream() => Stream.fromIterable(const [1, 2, 3]);
Expand Down

0 comments on commit c5daf4c

Please sign in to comment.