Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

3.x: Fix Flowable.concatMap backpressure w/ scalars #7089

Merged
merged 1 commit into from
Oct 5, 2020

Conversation

akarnokd
Copy link
Member

@akarnokd akarnokd commented Oct 2, 2020

In concatMap, there is a shortcut for when the mapped Flowable turns out to be a scalar value and thus the full subscription process can be skipped. This used a so-called weak subscription that expected non-concurrent requesting to emits its single value.

Unfortunately, there is a race condition for when the downstream requests exactly when this weak subscription is activated, resulting in either double emission or no emission at all. The fix is to do the proper compareAndSet to ensure the emission happens exactly once.

Discovered while running the build matrix and a test failed with:

io.reactivex.rxjava3.internal.operators.flowable.FlowableConcatMapSchedulerTest > boundaryFusionDelayError FAILED
    java.lang.AssertionError: Error(s) present: [io.reactivex.rxjava3.exceptions.MissingBackpressureException: Queue is full?!] (latch = 0, values = 1, errors = 1, completions = 0)
        at io.reactivex.rxjava3.observers.BaseTestConsumer.fail(BaseTestConsumer.java:125)
        at io.reactivex.rxjava3.observers.BaseTestConsumer.assertNoErrors(BaseTestConsumer.java:212)
        at io.reactivex.rxjava3.observers.BaseTestConsumer.assertResult(BaseTestConsumer.java:525)
        at io.reactivex.rxjava3.internal.operators.flowable.FlowableConcatMapSchedulerTest.boundaryFusionDelayError(FlowableConcatMapSchedulerTest.java:94)
        Caused by:
        io.reactivex.rxjava3.exceptions.MissingBackpressureException: Queue is full?!
            at io.reactivex.rxjava3.internal.operators.flowable.FlowableObserveOn$BaseObserveOnSubscriber.onNext(FlowableObserveOn.java:114)
            at io.reactivex.rxjava3.internal.operators.flowable.FlowableConcatMapScheduler$ConcatMapDelayed.innerNext(FlowableConcatMapScheduler.java:396)
            at io.reactivex.rxjava3.internal.operators.flowable.FlowableConcatMap$ConcatMapInner.onNext(FlowableConcatMap.java:559)
            at io.reactivex.rxjava3.internal.operators.flowable.FlowableConcatMap$WeakScalarSubscription.request(FlowableConcatMap.java:343)
            at io.reactivex.rxjava3.internal.subscriptions.SubscriptionArbiter.setSubscription(SubscriptionArbiter.java:99)
            at io.reactivex.rxjava3.internal.operators.flowable.FlowableConcatMapScheduler$ConcatMapDelayed.run(FlowableConcatMapScheduler.java:531)
            at io.reactivex.rxjava3.internal.schedulers.ImmediateThinScheduler$ImmediateThinWorker.schedule(ImmediateThinScheduler.java:89)
            at io.reactivex.rxjava3.internal.operators.flowable.FlowableConcatMapScheduler$ConcatMapDelayed.schedule(FlowableConcatMapScheduler.java:431)
            at io.reactivex.rxjava3.internal.operators.flowable.FlowableConcatMapScheduler$BaseConcatMapSubscriber.onNext(FlowableConcatMapScheduler.java:156)
            at io.reactivex.rxjava3.internal.operators.flowable.FlowableMap$MapSubscriber.onNext(FlowableMap.java:69)
            at io.reactivex.rxjava3.internal.operators.flowable.FlowableObserveOn$ObserveOnSubscriber.runSync(FlowableObserveOn.java:337)
            at io.reactivex.rxjava3.internal.operators.flowable.FlowableObserveOn$BaseObserveOnSubscriber.run(FlowableObserveOn.java:174)
            at io.reactivex.rxjava3.internal.schedulers.ScheduledRunnable.run(ScheduledRunnable.java:65)
            at io.reactivex.rxjava3.internal.schedulers.ScheduledRunnable.call(ScheduledRunnable.java:56)
            at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
            at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
            at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1130)
            at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:630)
            at java.base/java.lang.Thread.run(Thread.java:832)

@akarnokd akarnokd added this to the 3.1 milestone Oct 2, 2020
@codecov
Copy link

codecov bot commented Oct 2, 2020

Codecov Report

Merging #7089 into 3.x will increase coverage by 0.00%.
The diff coverage is 100.00%.

Impacted file tree graph

@@            Coverage Diff            @@
##                3.x    #7089   +/-   ##
=========================================
  Coverage     99.52%   99.53%           
+ Complexity     6669     6666    -3     
=========================================
  Files           742      742           
  Lines         47276    47275    -1     
  Branches       6374     6374           
=========================================
+ Hits          47051    47054    +3     
+ Misses          102       99    -3     
+ Partials        123      122    -1     
Impacted Files Coverage Δ Complexity Δ
...internal/operators/flowable/FlowableConcatMap.java 100.00% <100.00%> (ø) 6.00 <0.00> (ø)
...operators/flowable/FlowableConcatMapScheduler.java 99.20% <100.00%> (ø) 4.00 <0.00> (ø)
.../operators/flowable/FlowableBlockingSubscribe.java 93.02% <0.00%> (-4.66%) 10.00% <0.00%> (-1.00%)
...l/operators/observable/ObservableFlatMapMaybe.java 93.66% <0.00%> (-2.12%) 2.00% <0.00%> (ø%)
...ternal/operators/observable/ObservablePublish.java 97.29% <0.00%> (-1.81%) 16.00% <0.00%> (-1.00%)
...ctivex/rxjava3/internal/util/QueueDrainHelper.java 98.61% <0.00%> (-1.39%) 57.00% <0.00%> (-1.00%)
...ernal/operators/flowable/FlowableFromIterable.java 97.91% <0.00%> (-1.05%) 5.00% <0.00%> (ø%)
...java3/internal/operators/flowable/FlowableZip.java 98.97% <0.00%> (-1.03%) 6.00% <0.00%> (ø%)
...nternal/operators/observable/ObservableCreate.java 99.14% <0.00%> (-0.86%) 2.00% <0.00%> (ø%)
...rnal/operators/flowable/FlowableFlatMapSingle.java 97.67% <0.00%> (-0.59%) 2.00% <0.00%> (ø%)
... and 12 more

Continue to review full report at Codecov.

Legend - Click here to learn more
Δ = absolute <relative> (impact), ø = not affected, ? = missing data
Powered by Codecov. Last update 88697a4...c4743fe. Read the comment docs.

@akarnokd akarnokd merged commit 0668d04 into ReactiveX:3.x Oct 5, 2020
@akarnokd akarnokd deleted the ConcatMapWeakScalarFix branch October 5, 2020 12:07
Copy link
Collaborator

@vanniktech vanniktech left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants