Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 52 additions & 8 deletions src/main/java/rx/Observable.java
Original file line number Diff line number Diff line change
Expand Up @@ -5801,8 +5801,19 @@ public final Observable<T> repeat(final long count, Scheduler scheduler) {
* @see <a href="https://github.com/ReactiveX/RxJava/wiki/Creating-Observables#repeatwhen">RxJava Wiki: repeatWhen()</a>
* @see <a href="http://msdn.microsoft.com/en-us/library/hh229428.aspx">MSDN: Observable.Repeat</a>
*/
public final Observable<T> repeatWhen(Func1<? super Observable<? extends Notification<?>>, ? extends Observable<?>> notificationHandler, Scheduler scheduler) {
return OnSubscribeRedo.repeat(this, notificationHandler, scheduler);
public final Observable<T> repeatWhen(final Func1<? super Observable<? extends Void>, ? extends Observable<?>> notificationHandler, Scheduler scheduler) {
Func1<? super Observable<? extends Notification<?>>, ? extends Observable<?>> dematerializedNotificationHandler = new Func1<Observable<? extends Notification<?>>, Observable<?>>() {
@Override
public Observable<?> call(Observable<? extends Notification<?>> notifications) {
return notificationHandler.call(notifications.map(new Func1<Notification<?>, Void>() {
@Override
public Void call(Notification<?> notification) {
return null;
}
}));
}
};
return OnSubscribeRedo.repeat(this, dematerializedNotificationHandler, scheduler);
}

/**
Expand All @@ -5825,8 +5836,19 @@ public final Observable<T> repeatWhen(Func1<? super Observable<? extends Notific
* @see <a href="https://github.com/ReactiveX/RxJava/wiki/Creating-Observables#repeatwhen">RxJava Wiki: repeatWhen()</a>
* @see <a href="http://msdn.microsoft.com/en-us/library/hh229428.aspx">MSDN: Observable.Repeat</a>
*/
public final Observable<T> repeatWhen(Func1<? super Observable<? extends Notification<?>>, ? extends Observable<?>> notificationHandler) {
return OnSubscribeRedo.repeat(this, notificationHandler);
public final Observable<T> repeatWhen(final Func1<? super Observable<? extends Void>, ? extends Observable<?>> notificationHandler) {
Func1<? super Observable<? extends Notification<?>>, ? extends Observable<?>> dematerializedNotificationHandler = new Func1<Observable<? extends Notification<?>>, Observable<?>>() {
@Override
public Observable<?> call(Observable<? extends Notification<?>> notifications) {
return notificationHandler.call(notifications.map(new Func1<Notification<?>, Void>() {
@Override
public Void call(Notification<?> notification) {
return null;
}
}));
}
};
return OnSubscribeRedo.repeat(this, dematerializedNotificationHandler);
}

/**
Expand Down Expand Up @@ -6541,8 +6563,19 @@ public final Observable<T> retry(Func2<Integer, Throwable, Boolean> predicate) {
* @return the source Observable modified with retry logic
* @see <a href="https://github.com/ReactiveX/RxJava/wiki/Error-Handling-Operators#retrywhen">RxJava Wiki: retryWhen()</a>
*/
public final Observable<T> retryWhen(Func1<? super Observable<? extends Notification<?>>, ? extends Observable<?>> notificationHandler) {
return OnSubscribeRedo.<T> retry(this, notificationHandler);
public final Observable<T> retryWhen(final Func1<? super Observable<? extends Throwable>, ? extends Observable<?>> notificationHandler) {
Func1<? super Observable<? extends Notification<?>>, ? extends Observable<?>> dematerializedNotificationHandler = new Func1<Observable<? extends Notification<?>>, Observable<?>>() {
@Override
public Observable<?> call(Observable<? extends Notification<?>> notifications) {
return notificationHandler.call(notifications.map(new Func1<Notification<?>, Throwable>() {
@Override
public Throwable call(Notification<?> notification) {
return notification.getThrowable();
}
}));
}
};
return OnSubscribeRedo.<T> retry(this, dematerializedNotificationHandler);
}

/**
Expand All @@ -6566,8 +6599,19 @@ public final Observable<T> retryWhen(Func1<? super Observable<? extends Notifica
* @return the source Observable modified with retry logic
* @see <a href="https://github.com/ReactiveX/RxJava/wiki/Error-Handling-Operators#retrywhen">RxJava Wiki: retryWhen()</a>
*/
public final Observable<T> retryWhen(Func1<? super Observable<? extends Notification<?>>, ? extends Observable<?>> notificationHandler, Scheduler scheduler) {
return OnSubscribeRedo.<T> retry(this, notificationHandler, scheduler);
public final Observable<T> retryWhen(final Func1<? super Observable<? extends Throwable>, ? extends Observable<?>> notificationHandler, Scheduler scheduler) {
Func1<? super Observable<? extends Notification<?>>, ? extends Observable<?>> dematerializedNotificationHandler = new Func1<Observable<? extends Notification<?>>, Observable<?>>() {
@Override
public Observable<?> call(Observable<? extends Notification<?>> notifications) {
return notificationHandler.call(notifications.map(new Func1<Notification<?>, Throwable>() {
@Override
public Throwable call(Notification<?> notification) {
return notification.getThrowable();
}
}));
}
};
return OnSubscribeRedo.<T> retry(this, dematerializedNotificationHandler, scheduler);
}

/**
Expand Down
57 changes: 28 additions & 29 deletions src/test/java/rx/internal/operators/OperatorRetryTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,18 +24,12 @@
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;

import org.junit.Test;
import org.mockito.InOrder;
import org.mockito.Mockito;

import rx.Observable;
import rx.Observable.OnSubscribe;
import rx.Notification;
import rx.Observer;
import rx.Subscriber;
import rx.Subscription;
Expand All @@ -49,6 +43,11 @@
import rx.subjects.PublishSubject;
import rx.subscriptions.Subscriptions;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;

public class OperatorRetryTest {

@Test
Expand All @@ -73,15 +72,15 @@ public void call(Subscriber<? super String> t1) {

});
TestSubscriber<String> ts = new TestSubscriber<String>(consumer);
producer.retryWhen(new Func1<Observable<? extends Notification<?>>, Observable<?>>() {
producer.retryWhen(new Func1<Observable<? extends Throwable>, Observable<?>>() {

@Override
public Observable<?> call(Observable<? extends Notification<?>> attempts) {
public Observable<?> call(Observable<? extends Throwable> attempts) {
// Worker w = Schedulers.computation().createWorker();
return attempts
.map(new Func1<Notification<?>, Tuple>() {
.map(new Func1<Throwable, Tuple>() {
@Override
public Tuple call(Notification<?> n) {
public Tuple call(Throwable n) {
return new Tuple(new Long(1), n);
}})
.scan(new Func2<Tuple, Tuple, Tuple>(){
Expand All @@ -94,7 +93,7 @@ public Tuple call(Tuple t, Tuple n) {
public Observable<Long> call(Tuple t) {
System.out.println("Retry # "+t.count);
return t.count > 20 ?
Observable.<Long>error(t.n.getThrowable()) :
Observable.<Long>error(t.n) :
Observable.timer(t.count *1L, TimeUnit.MILLISECONDS);
}});
}
Expand All @@ -112,9 +111,9 @@ public Observable<Long> call(Tuple t) {

public static class Tuple {
Long count;
Notification<?> n;
Throwable n;

Tuple(Long c, Notification<?> n) {
Tuple(Long c, Throwable n) {
count = c;
this.n = n;
}
Expand Down Expand Up @@ -147,15 +146,15 @@ public void testSchedulingNotificationHandler() {
int NUM_RETRIES = 2;
Observable<String> origin = Observable.create(new FuncWithErrors(NUM_RETRIES));
TestSubscriber<String> subscriber = new TestSubscriber<String>(observer);
origin.retryWhen(new Func1<Observable<? extends Notification<?>>, Observable<? extends Notification<?>>>() {
origin.retryWhen(new Func1<Observable<? extends Throwable>, Observable<?>>() {
@Override
public Observable<? extends Notification<?>> call(Observable<? extends Notification<?>> t1) {
return t1.observeOn(Schedulers.computation()).map(new Func1<Notification<?>, Notification<?>>() {
public Observable<?> call(Observable<? extends Throwable> t1) {
return t1.observeOn(Schedulers.computation()).map(new Func1<Throwable, Void>() {
@Override
public Notification<?> call(Notification<?> t1) {
return Notification.createOnNext(null);
public Void call(Throwable t1) {
return null;
}
}).startWith(Notification.createOnNext(null));
}).startWith((Void) null);
}
}).subscribe(subscriber);

Expand All @@ -178,16 +177,16 @@ public void testOnNextFromNotificationHandler() {
Observer<String> observer = mock(Observer.class);
int NUM_RETRIES = 2;
Observable<String> origin = Observable.create(new FuncWithErrors(NUM_RETRIES));
origin.retryWhen(new Func1<Observable<? extends Notification<?>>, Observable<? extends Notification<?>>>() {
origin.retryWhen(new Func1<Observable<? extends Throwable>, Observable<?>>() {
@Override
public Observable<? extends Notification<?>> call(Observable<? extends Notification<?>> t1) {
return t1.map(new Func1<Notification<?>, Notification<?>>() {
public Observable<?> call(Observable<? extends Throwable> t1) {
return t1.map(new Func1<Throwable, Void>() {

@Override
public Notification<?> call(Notification<?> t1) {
return Notification.createOnNext(null);
public Void call(Throwable t1) {
return null;
}
}).startWith(Notification.createOnNext(null));
}).startWith((Void) null);
}
}).subscribe(observer);

Expand All @@ -209,9 +208,9 @@ public void testOnCompletedFromNotificationHandler() {
Observer<String> observer = mock(Observer.class);
Observable<String> origin = Observable.create(new FuncWithErrors(1));
TestSubscriber<String> subscriber = new TestSubscriber<String>(observer);
origin.retryWhen(new Func1<Observable<? extends Notification<?>>, Observable<? extends Notification<?>>>() {
origin.retryWhen(new Func1<Observable<? extends Throwable>, Observable<?>>() {
@Override
public Observable<? extends Notification<?>> call(Observable<? extends Notification<?>> t1) {
public Observable<?> call(Observable<? extends Throwable> t1) {
return Observable.empty();
}
}).subscribe(subscriber);
Expand All @@ -229,9 +228,9 @@ public void testOnErrorFromNotificationHandler() {
@SuppressWarnings("unchecked")
Observer<String> observer = mock(Observer.class);
Observable<String> origin = Observable.create(new FuncWithErrors(2));
origin.retryWhen(new Func1<Observable<? extends Notification<?>>, Observable<? extends Notification<?>>>() {
origin.retryWhen(new Func1<Observable<? extends Throwable>, Observable<?>>() {
@Override
public Observable<? extends Notification<?>> call(Observable<? extends Notification<?>> t1) {
public Observable<?> call(Observable<? extends Throwable> t1) {
return Observable.error(new RuntimeException());
}
}).subscribe(observer);
Expand Down