Skip to content

Commit

Permalink
Observable/Flowable/Completable/Single.delay should always call onErr…
Browse files Browse the repository at this point in the history
…or on the provided Scheduler (#4522)

* Provide failing test case for Observable/Flowable/Completable/Single.delay

* Call Observable/Flowable/Completable/Single onError on proper scheduler

* Fix CompletableTest.delayErrorImmediately
  • Loading branch information
nsk-mironov authored and akarnokd committed Sep 9, 2016
1 parent 63c4451 commit 1330373
Show file tree
Hide file tree
Showing 9 changed files with 139 additions and 42 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -57,16 +57,12 @@ public void run() {

@Override
public void onError(final Throwable e) {
if (delayError) {
set.add(scheduler.scheduleDirect(new Runnable() {
@Override
public void run() {
s.onError(e);
}
}, delay, unit));
} else {
s.onError(e);
}
set.add(scheduler.scheduleDirect(new Runnable() {
@Override
public void run() {
s.onError(e);
}
}, delayError ? delay : 0, unit));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,20 +88,16 @@ public void run() {

@Override
public void onError(final Throwable t) {
if (delayError) {
w.schedule(new Runnable() {
@Override
public void run() {
try {
actual.onError(t);
} finally {
w.dispose();
}
w.schedule(new Runnable() {
@Override
public void run() {
try {
actual.onError(t);
} finally {
w.dispose();
}
}, delay, unit);
} else {
actual.onError(t);
}
}
}, delayError ? delay : 0, unit);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,20 +88,16 @@ public void run() {

@Override
public void onError(final Throwable t) {
if (delayError) {
w.schedule(new Runnable() {
@Override
public void run() {
try {
actual.onError(t);
} finally {
w.dispose();
}
w.schedule(new Runnable() {
@Override
public void run() {
try {
actual.onError(t);
} finally {
w.dispose();
}
}, delay, unit);
} else {
actual.onError(t);
}
}
}, delayError ? delay : 0, unit);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,13 @@ public void run() {
}

@Override
public void onError(Throwable e) {
s.onError(e);
public void onError(final Throwable e) {
sd.replace(scheduler.scheduleDirect(new Runnable() {
@Override
public void run() {
s.onError(e);
}
}, 0, unit));
}

});
Expand Down
9 changes: 5 additions & 4 deletions src/test/java/io/reactivex/completable/CompletableTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -1602,7 +1602,8 @@ public void onComplete() {

@Test(timeout = 1000)
public void delayErrorImmediately() throws InterruptedException {
Completable c = error.completable.delay(250, TimeUnit.MILLISECONDS);
final TestScheduler scheduler = new TestScheduler();
final Completable c = error.completable.delay(250, TimeUnit.MILLISECONDS, scheduler);

final AtomicBoolean done = new AtomicBoolean();
final AtomicReference<Throwable> error = new AtomicReference<Throwable>();
Expand All @@ -1624,14 +1625,14 @@ public void onComplete() {
}
});

scheduler.advanceTimeBy(100, TimeUnit.MILLISECONDS);

Assert.assertTrue(error.get().toString(), error.get() instanceof TestException);
Assert.assertFalse("Already done", done.get());

Thread.sleep(100);
scheduler.advanceTimeBy(100, TimeUnit.MILLISECONDS);

Assert.assertFalse("Already done", done.get());

Thread.sleep(200);
}

@Test(timeout = 1000)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,18 @@

package io.reactivex.internal.operators.completable;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;

import io.reactivex.functions.Consumer;
import org.junit.Test;

import io.reactivex.Completable;
import io.reactivex.schedulers.Schedulers;

import static org.junit.Assert.assertNotEquals;

public class CompletableDelayTest {

@Test
Expand All @@ -30,4 +35,27 @@ public void delayCustomScheduler() {
.test()
.assertResult();
}

@Test
public void testOnErrorCalledOnScheduler() throws Exception {
final CountDownLatch latch = new CountDownLatch(1);
final AtomicReference<Thread> thread = new AtomicReference<Thread>();

Completable.<String>error(new Exception())
.delay(0, TimeUnit.MILLISECONDS, Schedulers.newThread())
.doOnError(new Consumer<Throwable>() {
@Override
public void accept(Throwable throwable) throws Exception {
thread.set(Thread.currentThread());
latch.countDown();
}
})
.onErrorComplete()
.subscribe();

latch.await();

assertNotEquals(Thread.currentThread(), thread.get());
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,14 @@
package io.reactivex.internal.operators.flowable;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotEquals;
import static org.mockito.Matchers.*;
import static org.mockito.Mockito.*;

import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;

import org.junit.*;
import org.mockito.InOrder;
Expand Down Expand Up @@ -911,4 +913,26 @@ public void testDelaySubscriptionDisposeBeforeTime() {
verify(o, never()).onError(any(Throwable.class));
}

@Test
public void testOnErrorCalledOnScheduler() throws Exception {
final CountDownLatch latch = new CountDownLatch(1);
final AtomicReference<Thread> thread = new AtomicReference<Thread>();

Flowable.<String>error(new Exception())
.delay(0, TimeUnit.MILLISECONDS, Schedulers.newThread())
.doOnError(new Consumer<Throwable>() {
@Override
public void accept(Throwable throwable) throws Exception {
thread.set(Thread.currentThread());
latch.countDown();
}
})
.onErrorResumeNext(Flowable.<String>empty())
.subscribe();

latch.await();

assertNotEquals(Thread.currentThread(), thread.get());
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,13 @@
package io.reactivex.internal.operators.observable;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotEquals;
import static org.mockito.Matchers.*;
import static org.mockito.Mockito.*;

import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicReference;

import org.junit.*;
import org.mockito.InOrder;
Expand Down Expand Up @@ -858,4 +860,27 @@ public void delayWithTimeDelayError() throws Exception {
.awaitDone(5, TimeUnit.SECONDS)
.assertFailure(TestException.class, 1);
}

@Test
public void testOnErrorCalledOnScheduler() throws Exception {
final CountDownLatch latch = new CountDownLatch(1);
final AtomicReference<Thread> thread = new AtomicReference<Thread>();

Observable.<String>error(new Exception())
.delay(0, TimeUnit.MILLISECONDS, Schedulers.newThread())
.doOnError(new Consumer<Throwable>() {
@Override
public void accept(Throwable throwable) throws Exception {
thread.set(Thread.currentThread());
latch.countDown();
}
})
.onErrorResumeNext(Observable.<String>empty())
.subscribe();

latch.await();

assertNotEquals(Thread.currentThread(), thread.get());
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,14 @@
package io.reactivex.internal.operators.single;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotEquals;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;

import io.reactivex.functions.Consumer;
import org.junit.Test;

import io.reactivex.*;
Expand Down Expand Up @@ -103,4 +107,26 @@ public void delaySubscriptionTimeCustomScheduler() throws Exception {
.assertResult(1);
}

@Test
public void testOnErrorCalledOnScheduler() throws Exception {
final CountDownLatch latch = new CountDownLatch(1);
final AtomicReference<Thread> thread = new AtomicReference<Thread>();

Single.<String>error(new Exception())
.delay(0, TimeUnit.MILLISECONDS, Schedulers.newThread())
.doOnError(new Consumer<Throwable>() {
@Override
public void accept(Throwable throwable) throws Exception {
thread.set(Thread.currentThread());
latch.countDown();
}
})
.onErrorResumeNext(Single.just(""))
.subscribe();

latch.await();

assertNotEquals(Thread.currentThread(), thread.get());
}

}

0 comments on commit 1330373

Please sign in to comment.