Skip to content

Commit

Permalink
2.x: Fix switchMap not canceling properly during onNext-cancel races
Browse files Browse the repository at this point in the history
  • Loading branch information
akarnokd committed Feb 27, 2020
1 parent e2b7816 commit 7980c85
Show file tree
Hide file tree
Showing 3 changed files with 143 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,6 @@ void drain() {
for (;;) {

if (cancelled) {
active.lazySet(null);
return;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,12 @@
package io.reactivex.internal.operators.flowable;

import static org.junit.Assert.*;
import static org.mockito.ArgumentMatchers.*;
import static org.mockito.Mockito.*;

import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.*;

import org.junit.*;
import org.mockito.InOrder;
Expand Down Expand Up @@ -1229,4 +1230,74 @@ public Publisher<Integer> apply(Integer v)
.test()
.assertResult(10, 20);
}

@Test
public void cancellationShouldTriggerInnerCancellationRace() throws Throwable {
final AtomicInteger outer = new AtomicInteger();
final AtomicInteger inner = new AtomicInteger();

int n = 10000;
for (int i = 0; i < n; i++) {
Flowable.<Integer>create(new FlowableOnSubscribe<Integer>() {
@Override
public void subscribe(FlowableEmitter<Integer> it)
throws Exception {
it.onNext(0);
}
}, BackpressureStrategy.MISSING)
.switchMap(new Function<Integer, Publisher<? extends Object>>() {
@Override
public Publisher<? extends Object> apply(Integer v)
throws Exception {
return createFlowable(inner);
}
})
.observeOn(Schedulers.computation())
.doFinally(new Action() {
@Override
public void run() throws Exception {
outer.incrementAndGet();
}
})
.take(1)
.blockingSubscribe(Functions.emptyConsumer(), new Consumer<Throwable>() {
@Override
public void accept(Throwable e) throws Exception {
e.printStackTrace();
}
});
}

Thread.sleep(100);
assertEquals(inner.get(), outer.get());
assertEquals(n, inner.get());
}

Flowable<Integer> createFlowable(final AtomicInteger inner) {
return Flowable.<Integer>unsafeCreate(new Publisher<Integer>() {
@Override
public void subscribe(Subscriber<? super Integer> s) {
final SerializedSubscriber<Integer> it = new SerializedSubscriber<Integer>(s);
it.onSubscribe(new BooleanSubscription());
Schedulers.io().scheduleDirect(new Runnable() {
@Override
public void run() {
it.onNext(1);
}
}, 0, TimeUnit.MILLISECONDS);
Schedulers.io().scheduleDirect(new Runnable() {
@Override
public void run() {
it.onNext(2);
}
}, 0, TimeUnit.MILLISECONDS);
}
})
.doFinally(new Action() {
@Override
public void run() throws Exception {
inner.incrementAndGet();
}
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
import io.reactivex.internal.functions.Functions;
import io.reactivex.internal.schedulers.ImmediateThinScheduler;
import io.reactivex.internal.util.ExceptionHelper;
import io.reactivex.observers.TestObserver;
import io.reactivex.observers.*;
import io.reactivex.plugins.RxJavaPlugins;
import io.reactivex.schedulers.*;
import io.reactivex.subjects.PublishSubject;
Expand Down Expand Up @@ -1222,4 +1222,74 @@ public Observable<Integer> apply(Integer v)
.test()
.assertResult(10, 20);
}

@Test
public void cancellationShouldTriggerInnerCancellationRace() throws Throwable {
final AtomicInteger outer = new AtomicInteger();
final AtomicInteger inner = new AtomicInteger();

int n = 10000;
for (int i = 0; i < n; i++) {
Observable.<Integer>create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> it)
throws Exception {
it.onNext(0);
}
})
.switchMap(new Function<Integer, ObservableSource<Integer>>() {
@Override
public ObservableSource<Integer> apply(Integer v)
throws Exception {
return createObservable(inner);
}
})
.observeOn(Schedulers.computation())
.doFinally(new Action() {
@Override
public void run() throws Exception {
outer.incrementAndGet();
}
})
.take(1)
.blockingSubscribe(Functions.emptyConsumer(), new Consumer<Throwable>() {
@Override
public void accept(Throwable e) throws Exception {
e.printStackTrace();
}
});
}

Thread.sleep(100);
assertEquals(inner.get(), outer.get());
assertEquals(n, inner.get());
}

Observable<Integer> createObservable(final AtomicInteger inner) {
return Observable.<Integer>unsafeCreate(new ObservableSource<Integer>() {
@Override
public void subscribe(Observer<? super Integer> observer) {
final SerializedObserver<Integer> it = new SerializedObserver<Integer>(observer);
it.onSubscribe(Disposables.empty());
Schedulers.io().scheduleDirect(new Runnable() {
@Override
public void run() {
it.onNext(1);
}
}, 0, TimeUnit.MILLISECONDS);
Schedulers.io().scheduleDirect(new Runnable() {
@Override
public void run() {
it.onNext(2);
}
}, 0, TimeUnit.MILLISECONDS);
}
})
.doFinally(new Action() {
@Override
public void run() throws Exception {
inner.incrementAndGet();
}
});
}
}

0 comments on commit 7980c85

Please sign in to comment.