Skip to content

Commit

Permalink
2.x: fix LambdaObserver calling dispose when terminating (#4957)
Browse files Browse the repository at this point in the history
  • Loading branch information
akarnokd committed Jan 4, 2017
1 parent 19fac95 commit 479f89f
Show file tree
Hide file tree
Showing 6 changed files with 190 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ public void onNext(T t) {
@Override
public void onError(Throwable t) {
if (!isDisposed()) {
dispose();
lazySet(DisposableHelper.DISPOSED);
try {
onError.accept(t);
} catch (Throwable e) {
Expand All @@ -80,7 +80,7 @@ public void onError(Throwable t) {
@Override
public void onComplete() {
if (!isDisposed()) {
dispose();
lazySet(DisposableHelper.DISPOSED);
try {
onComplete.run();
} catch (Throwable e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@

import static org.junit.Assert.*;

import java.util.List;
import java.util.*;

import org.junit.Test;
import org.reactivestreams.*;
Expand Down Expand Up @@ -438,4 +438,84 @@ public void onComplete() {

assertEquals(1, calls);
}

@Test
public void eventOrdering() {
final List<String> list = new ArrayList<String>();

Flowable.error(new TestException())
.doOnCancel(new Action() {
@Override
public void run() throws Exception {
list.add("cancel");
}
})
.doFinally(new Action() {
@Override
public void run() throws Exception {
list.add("finally");
}
})
.subscribe(
new Consumer<Object>() {
@Override
public void accept(Object v) throws Exception {
list.add("onNext");
}
},
new Consumer<Throwable>() {
@Override
public void accept(Throwable e) throws Exception {
list.add("onError");
}
},
new Action() {
@Override
public void run() throws Exception {
list.add("onComplete");
}
});

assertEquals(Arrays.asList("onError", "finally"), list);
}

@Test
public void eventOrdering2() {
final List<String> list = new ArrayList<String>();

Flowable.just(1)
.doOnCancel(new Action() {
@Override
public void run() throws Exception {
list.add("cancel");
}
})
.doFinally(new Action() {
@Override
public void run() throws Exception {
list.add("finally");
}
})
.subscribe(
new Consumer<Object>() {
@Override
public void accept(Object v) throws Exception {
list.add("onNext");
}
},
new Consumer<Throwable>() {
@Override
public void accept(Throwable e) throws Exception {
list.add("onError");
}
},
new Action() {
@Override
public void run() throws Exception {
list.add("onComplete");
}
});

assertEquals(Arrays.asList("onNext", "onComplete", "finally"), list);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,8 @@ public void testUnsubscribesFromUpstreamFlowable() {
public void run() {
unsub.set(true);
}})
.ignoreElements()
.toFlowable()
.subscribe().dispose();

assertTrue(unsub.get());
Expand Down Expand Up @@ -207,6 +209,7 @@ public void testUnsubscribesFromUpstream() {
public void run() {
unsub.set(true);
}})
.ignoreElements()
.subscribe().dispose();

assertTrue(unsub.get());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,13 @@

import static org.junit.Assert.*;

import java.util.List;
import java.util.*;

import org.junit.Test;

import io.reactivex.*;
import io.reactivex.Observable;
import io.reactivex.Observer;
import io.reactivex.TestHelper;
import io.reactivex.disposables.Disposable;
import io.reactivex.exceptions.TestException;
import io.reactivex.functions.*;
Expand Down Expand Up @@ -442,4 +444,86 @@ public void onComplete() {

assertEquals(1, calls);
}


@Test
public void eventOrdering() {
final List<String> list = new ArrayList<String>();

Observable.error(new TestException())
.doOnDispose(new Action() {
@Override
public void run() throws Exception {
list.add("dispose");
}
})
.doFinally(new Action() {
@Override
public void run() throws Exception {
list.add("finally");
}
})
.subscribe(
new Consumer<Object>() {
@Override
public void accept(Object v) throws Exception {
list.add("onNext");
}
},
new Consumer<Throwable>() {
@Override
public void accept(Throwable e) throws Exception {
list.add("onError");
}
},
new Action() {
@Override
public void run() throws Exception {
list.add("onComplete");
}
});

assertEquals(Arrays.asList("onError", "finally"), list);
}

@Test
public void eventOrdering2() {
final List<String> list = new ArrayList<String>();

Observable.just(1)
.doOnDispose(new Action() {
@Override
public void run() throws Exception {
list.add("dispose");
}
})
.doFinally(new Action() {
@Override
public void run() throws Exception {
list.add("finally");
}
})
.subscribe(
new Consumer<Object>() {
@Override
public void accept(Object v) throws Exception {
list.add("onNext");
}
},
new Consumer<Throwable>() {
@Override
public void accept(Throwable e) throws Exception {
list.add("onError");
}
},
new Action() {
@Override
public void run() throws Exception {
list.add("onComplete");
}
});

assertEquals(Arrays.asList("onNext", "onComplete", "finally"), list);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -82,12 +82,16 @@ public void testErrorReceivedObservable() {
@Test
public void testUnsubscribesFromUpstreamObservable() {
final AtomicBoolean unsub = new AtomicBoolean();
Observable.range(1, 10).doOnDispose(new Action() {
Observable.range(1, 10).concatWith(Observable.<Integer>never())
.doOnDispose(new Action() {
@Override
public void run() {
unsub.set(true);
}})
.subscribe();
.ignoreElements()
.toObservable()
.subscribe()
.dispose();
assertTrue(unsub.get());
}

Expand Down Expand Up @@ -145,12 +149,15 @@ public void testErrorReceived() {
@Test
public void testUnsubscribesFromUpstream() {
final AtomicBoolean unsub = new AtomicBoolean();
Observable.range(1, 10).doOnDispose(new Action() {
Observable.range(1, 10).concatWith(Observable.<Integer>never())
.doOnDispose(new Action() {
@Override
public void run() {
unsub.set(true);
}})
.subscribe();
.ignoreElements()
.subscribe()
.dispose();
assertTrue(unsub.get());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,8 +67,13 @@ public void run() {
unsubscribed.set(true);
}
};
Observable.just(1).doOnDispose(unsubscribeAction)
.takeLast(1).subscribe();
Observable.just(1)
.concatWith(Observable.<Integer>never())
.doOnDispose(unsubscribeAction)
.takeLast(1)
.subscribe()
.dispose();

assertTrue(unsubscribed.get());
}

Expand Down

0 comments on commit 479f89f

Please sign in to comment.