Skip to content

Commit

Permalink
Prep version 0.18.1 - add retryWhen
Browse files Browse the repository at this point in the history
  • Loading branch information
brianegan committed Jul 3, 2018
1 parent d3eda9e commit 3fcd4ca
Show file tree
Hide file tree
Showing 6 changed files with 26 additions and 24 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
@@ -1,3 +1,7 @@
## 0.18.1

* Add `retryWhen` operator. Thanks to Razvan Lung (@long1eu)! This can be used for custom retry logic.

## 0.18.0

* Breaking Change: remove `retype` method, deprecated as part of Dart 2.
Expand Down
1 change: 1 addition & 0 deletions README.md
Expand Up @@ -82,6 +82,7 @@ var myObservable = new Observable(myStream);
- [periodic](https://www.dartdocs.org/documentation/rxdart/latest/rx/Observable/Observable.periodic.html)
- [race](https://www.dartdocs.org/documentation/rxdart/latest/rx/Observable/Observable.race.html) / [RaceStream](https://www.dartdocs.org/documentation/rxdart/latest/rx/RaceStream-class.html)
- [retry](https://www.dartdocs.org/documentation/rxdart/latest/rx/Observable/Observable.retry.html) / [RetryStream](https://www.dartdocs.org/documentation/rxdart/latest/rx/RetryStream-class.html)
- [retryWhen](https://www.dartdocs.org/documentation/rxdart/latest/rx/Observable/Observable.retryWhen.html) / [RetryWhenStream](https://www.dartdocs.org/documentation/rxdart/latest/rx/RetryWhenStream-class.html)
- [switchLatest](https://wwwSwitchLatestStreamdartdocs.org/documentation/rxdart/latest/rx/Observable/Observable.switchLatest.html) / [SwitchLatestStream](https://www.dartdocs.org/documentation/rxdart/latest/rx/SwitchLatestStream-class.html)
- [timer](https://www.dartdocs.org/documentation/rxdart/latest/rx/Observable/Observable.timer.html) / [TimerStream](https://www.dartdocs.org/documentation/rxdart/latest/rx/TimerStream-class.html)

Expand Down
21 changes: 9 additions & 12 deletions lib/src/streams/retry_when.dart
@@ -1,7 +1,3 @@
// File created by
// Lung Razvan <long1eu>
// on 29/06/2018

import 'dart:async';

import 'package:rxdart/src/streams/utils.dart';
Expand Down Expand Up @@ -75,18 +71,19 @@ class RetryWhenStream<T> extends Stream<T> {

@override
StreamSubscription<T> listen(
void onData(T event), {
Function onError,
void onDone(),
bool cancelOnError,
}) {
void onData(T event), {
Function onError,
void onDone(),
bool cancelOnError,
}) {
if (_isUsed) throw new StateError('Stream has already been listened to.');
_isUsed = true;

controller = new StreamController<T>(
sync: true,
onListen: retry,
onPause: ([Future<dynamic> resumeSignal]) => subscription.pause(resumeSignal),
onPause: ([Future<dynamic> resumeSignal]) =>
subscription.pause(resumeSignal),
onResume: () => subscription.resume(),
onCancel: () => subscription.cancel(),
);
Expand All @@ -107,7 +104,7 @@ class RetryWhenStream<T> extends Stream<T> {

StreamSubscription<void> sub;
sub = retryWhenFactory(e, s).listen(
(dynamic event) {
(dynamic event) {
sub.cancel();
_errors.add(new ErrorAndStacktrace(e, s));
retry();
Expand All @@ -126,4 +123,4 @@ class RetryWhenStream<T> extends Stream<T> {
cancelOnError: false,
);
}
}
}
2 changes: 1 addition & 1 deletion pubspec.yaml
@@ -1,5 +1,5 @@
name: rxdart
version: 0.18.0
version: 0.18.1
authors:
- Frank Pepermans <frank@igindo.com>
- Brian Egan <brian@brianegan.com>
Expand Down
4 changes: 2 additions & 2 deletions test/streams/retry_test.dart
Expand Up @@ -80,7 +80,7 @@ void main() {

test('RetryStream.error.capturesErrors', () async {
Stream<int> observableWithError =
new RetryStream<int>(_getRetryStream(3), 2);
new RetryStream<int>(_getRetryStream(3), 2);

await expectLater(
observableWithError,
Expand All @@ -89,7 +89,7 @@ void main() {
predicate<RetryError>((RetryError a) {
return a.errors.length == 3 &&
a.errors.every((ErrorAndStacktrace es) =>
es.error != null && es.stacktrace != null);
es.error != null && es.stacktrace != null);
}),
),
emitsDone,
Expand Down
18 changes: 9 additions & 9 deletions test/streams/retry_when_test.dart
Expand Up @@ -12,28 +12,28 @@ void main() {
);
});

test('RetryStream', () {
test('RetryWhenStream', () {
expect(
new RetryWhenStream<int>(_sourceStream(3), _alwaysThrow),
emitsInOrder(<dynamic>[0, 1, 2, emitsDone]),
);
});

test('RetryStream.onDone', () {
test('RetryWhenStream.onDone', () {
expect(
new RetryWhenStream<int>(_sourceStream(3), _alwaysThrow),
emitsInOrder(<dynamic>[0, 1, 2, emitsDone]),
);
});

test('RetryStream.infinite.retries', () {
test('RetryWhenStream.infinite.retries', () {
expect(
new RetryWhenStream<int>(_sourceStream(1000, 2), _neverThrow).take(6),
emitsInOrder(<dynamic>[0, 1, 0, 1, 0, 1, emitsDone]),
);
});

test('RetryStream.emits.original.items', () {
test('RetryWhenStream.emits.original.items', () {
final int retries = 3;

expect(
Expand All @@ -43,7 +43,7 @@ void main() {
);
});

test('RetryStream.single.subscription', () {
test('RetryWhenStream.single.subscription', () {
Stream<int> stream =
new RetryWhenStream<int>(_sourceStream(3), _neverThrow);
try {
Expand All @@ -54,7 +54,7 @@ void main() {
}
});

test('RetryStream.asBroadcastStream', () {
test('RetryWhenStream.asBroadcastStream', () {
Stream<int> stream = new RetryWhenStream<int>(_sourceStream(3), _neverThrow)
.asBroadcastStream();

Expand All @@ -65,7 +65,7 @@ void main() {
expect(stream.isBroadcast, isTrue);
});

test('RetryStream.error.shouldThrow', () {
test('RetryWhenStream.error.shouldThrow', () {
Stream<int> observableWithError =
new RetryWhenStream<int>(_sourceStream(3, 0), _alwaysThrow);

Expand All @@ -75,7 +75,7 @@ void main() {
<dynamic>[emitsError(new TypeMatcher<RetryError>()), emitsDone]));
});

test('RetryStream.error.capturesErrors', () async {
test('RetryWhenStream.error.capturesErrors', () async {
Stream<int> observableWithError =
new RetryWhenStream<int>(_sourceStream(3, 0), _alwaysThrow);

Expand All @@ -93,7 +93,7 @@ void main() {
]));
});

test('RetryStream.pause.resume', () async {
test('RetryWhenStream.pause.resume', () async {
StreamSubscription<int> subscription;

subscription = new RetryWhenStream<int>(_sourceStream(3), _neverThrow)
Expand Down

0 comments on commit 3fcd4ca

Please sign in to comment.