Skip to content

Commit

Permalink
Merge pull request #1033 from benjchristensen/retry-1027
Browse files Browse the repository at this point in the history
Manual Merge #1027
  • Loading branch information
benjchristensen committed Apr 12, 2014
2 parents a9e7975 + 1e503b4 commit 8bb52a0
Show file tree
Hide file tree
Showing 2 changed files with 191 additions and 100 deletions.
6 changes: 5 additions & 1 deletion rxjava-core/src/main/java/rx/operators/OperatorRetry.java
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,11 @@
import rx.Observable.Operator;
import rx.Scheduler.Inner;
import rx.Subscriber;
import rx.functions.Action0;
import rx.functions.Action1;
import rx.schedulers.Schedulers;
import rx.subscriptions.SerialSubscription;
import rx.subscriptions.Subscriptions;

public class OperatorRetry<T> implements Operator<T, Observable<T>> {

Expand Down Expand Up @@ -82,7 +84,9 @@ public void call(final Inner inner) {
final Action1<Inner> _self = this;
attempts.incrementAndGet();

Subscriber<T> subscriber = new Subscriber<T>(child) {
// new subscription each time so if it unsubscribes itself it does not prevent retries
// by unsubscribing the child subscription
Subscriber<T> subscriber = new Subscriber<T>() {

@Override
public void onCompleted() {
Expand Down
285 changes: 186 additions & 99 deletions rxjava-core/src/test/java/rx/operators/OperatorRetryTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,23 +15,30 @@
*/
package rx.operators;

import static org.junit.Assert.*;
import static org.mockito.Matchers.*;
import static org.mockito.Mockito.*;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.inOrder;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;

import org.junit.Test;
import org.mockito.InOrder;

import rx.Observable;
import rx.Observable.OnSubscribeFunc;
import rx.Observable.OnSubscribe;
import rx.Observer;
import rx.Subscriber;
import rx.Subscription;
import rx.functions.Action0;
import rx.functions.Action1;
import rx.observers.TestSubscriber;
import rx.subjects.PublishSubject;
import rx.subscriptions.Subscriptions;

Expand Down Expand Up @@ -112,7 +119,7 @@ public void testInfiniteRetry() {
inOrder.verifyNoMoreInteractions();
}

public static class FuncWithErrors implements Observable.OnSubscribeFunc<String> {
public static class FuncWithErrors implements Observable.OnSubscribe<String> {

private final int numFailures;
private final AtomicInteger count = new AtomicInteger(0);
Expand All @@ -122,15 +129,14 @@ public static class FuncWithErrors implements Observable.OnSubscribeFunc<String>
}

@Override
public Subscription onSubscribe(Observer<? super String> o) {
public void call(Subscriber<? super String> o) {
o.onNext("beginningEveryTime");
if (count.incrementAndGet() <= numFailures) {
o.onError(new RuntimeException("forced failure: " + count.get()));
} else {
o.onNext("onSuccessOnly");
o.onCompleted();
}
return Subscriptions.empty();
}
}

Expand All @@ -150,101 +156,14 @@ public void call(Integer n) {
assertEquals(1, count.get());
}

public static class SlowFuncAlwaysFails implements Observable.OnSubscribe<String> {

final AtomicInteger nextSeq=new AtomicInteger();
final AtomicInteger activeSubs=new AtomicInteger();
final AtomicInteger concurrentSubs=new AtomicInteger();

public void call(final Subscriber<? super String> s)
{
final int seq=nextSeq.incrementAndGet();

int cur=activeSubs.incrementAndGet();
// Track concurrent subscriptions
concurrentSubs.set(Math.max(cur,concurrentSubs.get()));

// Use async error
new Thread(new Runnable() {
@Override
public void run() {
try {
Thread.sleep(100);
} catch (InterruptedException e) {
// ignore
}
s.onError(new RuntimeException("Subscriber #"+seq+" fails"));
}
}).start();

// Track unsubscribes
s.add(new Subscription()
{
private boolean active=true;

public void unsubscribe()
{
if (active) {
activeSubs.decrementAndGet();
active=false;
}
}

public boolean isUnsubscribed()
{
return !active;
}
});
}
}

@Test
public void testUnsubscribeAfterError() {

final CountDownLatch check=new CountDownLatch(1);
final SlowFuncAlwaysFails sf=new SlowFuncAlwaysFails();

Observable
.create(sf)
.retry(4)
.subscribe(
new Action1<String>()
{
@Override
public void call(String v)
{
fail("Should never happen");
}
},
new Action1<Throwable>()
{
public void call(Throwable throwable)
{
check.countDown();
}
}
);

try
{
check.await(1, TimeUnit.SECONDS);
} catch (InterruptedException e)
{
fail("interrupted");
}

assertEquals("5 Subscribers created", 5, sf.nextSeq.get());
assertEquals("1 Active Subscriber", 1, sf.concurrentSubs.get());
}

@Test
public void testRetryAllowsSubscriptionAfterAllSubscriptionsUnsubsribed() throws InterruptedException {
public void testRetryAllowsSubscriptionAfterAllSubscriptionsUnsubscribed() throws InterruptedException {
final AtomicInteger subsCount = new AtomicInteger(0);
OnSubscribeFunc<String> onSubscribe = new OnSubscribeFunc<String>() {
OnSubscribe<String> onSubscribe = new OnSubscribe<String>() {
@Override
public Subscription onSubscribe(Observer<? super String> observer) {
public void call(Subscriber<? super String> s) {
subsCount.incrementAndGet();
return new Subscription() {
s.add(new Subscription() {
boolean unsubscribed = false;

@Override
Expand All @@ -257,7 +176,7 @@ public void unsubscribe() {
public boolean isUnsubscribed() {
return unsubscribed;
}
};
});
}
};
Observable<String> stream = Observable.create(onSubscribe);
Expand All @@ -269,4 +188,172 @@ public boolean isUnsubscribed() {
streamWithRetry.subscribe();
assertEquals(1, subsCount.get());
}

@Test
public void testSourceObservableCallsUnsubscribe() throws InterruptedException {
final AtomicInteger subsCount = new AtomicInteger(0);

final TestSubscriber<String> ts = new TestSubscriber<String>();

OnSubscribe<String> onSubscribe = new OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> s) {
// if isUnsubscribed is true that means we have a bug such as https://github.com/Netflix/RxJava/issues/1024
if (!s.isUnsubscribed()) {
subsCount.incrementAndGet();
s.onError(new RuntimeException("failed"));
// it unsubscribes the child directly
// this simulates various error/completion scenarios that could occur
// or just a source that proactively triggers cleanup
s.unsubscribe();
}
}
};

Observable.create(onSubscribe).retry(3).subscribe(ts);
assertEquals(4, subsCount.get()); // 1 + 3 retries
}

class SlowObservable implements Observable.OnSubscribe<Long> {

private AtomicInteger efforts = new AtomicInteger(0);
private AtomicInteger active = new AtomicInteger(0), maxActive = new AtomicInteger(0);
private AtomicInteger nextBeforeFailure;

private final int emitDelay;

public SlowObservable(int emitDelay, int countNext) {
this.emitDelay = emitDelay;
this.nextBeforeFailure = new AtomicInteger(countNext);
}

public void call(final Subscriber<? super Long> subscriber) {
final AtomicBoolean terminate = new AtomicBoolean(false);
efforts.getAndIncrement();
active.getAndIncrement();
maxActive.set(Math.max(active.get(), maxActive.get()));
final Thread thread = new Thread() {
@Override
public void run() {
long nr = 0;
try {
while (!terminate.get()) {
Thread.sleep(emitDelay);
if (nextBeforeFailure.getAndDecrement() > 0) {
subscriber.onNext(nr++);
}
else {
subscriber.onError(new RuntimeException("expected-failed"));
}
}
}
catch (InterruptedException t) {
}
}
};
thread.start();
subscriber.add(Subscriptions.create(new Action0() {
@Override
public void call() {
terminate.set(true);
active.decrementAndGet();
}
}));
}
}

/** Observer for listener on seperate thread */
class AsyncObserver<T> implements Observer<T> {

protected CountDownLatch latch = new CountDownLatch(1);

protected Observer<T> target;

/** Wrap existing Observer */
public AsyncObserver(Observer<T> target) {
this.target = target;
}

/** Wait */
public void await() {
try {
latch.await();
} catch (InterruptedException e) {
fail("Test interrupted");
}
}

// Observer implementation

@Override
public void onCompleted() {
target.onCompleted();
latch.countDown();
}

@Override
public void onError(Throwable t) {
target.onError(t);
latch.countDown();
}

@Override
public void onNext(T v) {
target.onNext(v);
}
}

@Test(timeout = 1000)
public void testUnsubscribeAfterError() {

@SuppressWarnings("unchecked")
Observer<Long> observer = mock(Observer.class);

// Observable that always fails after 100ms
SlowObservable so = new SlowObservable(100, 0);
Observable<Long> o = Observable
.create(so)
.retry(5);

AsyncObserver<Long> async = new AsyncObserver<Long>(observer);

o.subscribe(async);

async.await();

InOrder inOrder = inOrder(observer);
// Should fail once
inOrder.verify(observer, times(1)).onError(any(Throwable.class));
inOrder.verify(observer, never()).onCompleted();

assertEquals("Start 6 threads, retry 5 then fail on 6", 6, so.efforts.get());
assertEquals("Only 1 active subscription", 1, so.maxActive.get());
}

@Test(timeout = 1000)
public void testTimeoutWithRetry() {

@SuppressWarnings("unchecked")
Observer<Long> observer = mock(Observer.class);

// Observable that sends every 100ms (timeout fails instead)
SlowObservable so = new SlowObservable(100, 10);
Observable<Long> o = Observable
.create(so)
.timeout(80, TimeUnit.MILLISECONDS)
.retry(5);

AsyncObserver<Long> async = new AsyncObserver<Long>(observer);

o.subscribe(async);

async.await();

InOrder inOrder = inOrder(observer);
// Should fail once
inOrder.verify(observer, times(1)).onError(any(Throwable.class));
inOrder.verify(observer, never()).onCompleted();

assertEquals("Start 6 threads, retry 5 then fail on 6", 6, so.efforts.get());
}
}

0 comments on commit 8bb52a0

Please sign in to comment.