Skip to content

Commit

Permalink
3.x: Fix toFlowable(ERROR) not cancelling on MBE (#7083)
Browse files Browse the repository at this point in the history
  • Loading branch information
akarnokd committed Sep 23, 2020
1 parent fe71f6a commit 0690c7c
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ public void onNext(T t) {
downstream.onNext(t);
BackpressureHelper.produced(this, 1);
} else {
upstream.cancel();
onError(new MissingBackpressureException("could not emit value due to lack of requests"));
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,16 @@

package io.reactivex.rxjava3.internal.operators.flowable;

import static org.junit.Assert.*;

import org.junit.Test;
import org.reactivestreams.Publisher;

import io.reactivex.rxjava3.core.*;
import io.reactivex.rxjava3.exceptions.MissingBackpressureException;
import io.reactivex.rxjava3.functions.Function;
import io.reactivex.rxjava3.subjects.PublishSubject;
import io.reactivex.rxjava3.subscribers.TestSubscriber;
import io.reactivex.rxjava3.testsupport.TestHelper;

public class FlowableOnBackpressureErrorTest extends RxJavaTest {
Expand Down Expand Up @@ -51,4 +56,20 @@ public Object apply(Flowable<Integer> f) throws Exception {
}
}, false, 1, 1, 1);
}

@Test
public void overflowCancels() {
PublishSubject<Integer> ps = PublishSubject.create();

TestSubscriber<Integer> ts = ps.toFlowable(BackpressureStrategy.ERROR)
.test(0L);

assertTrue(ps.hasObservers());

ps.onNext(1);

assertFalse(ps.hasObservers());

ts.assertFailure(MissingBackpressureException.class);
}
}

0 comments on commit 0690c7c

Please sign in to comment.