Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

3.x: Fix concurrent clear() calls when fused chains are canceled #6676

Merged
merged 1 commit into from
Oct 17, 2019

Conversation

akarnokd
Copy link
Member

@akarnokd akarnokd commented Oct 17, 2019

When a fuseable source backed by an SpscLinkedArrayQueue is cancelled and cleared concurrently (i.e., one thread clears while the other cancels the chain), the clear() method could run concurrently and either crash with NPE or end up in an infinite loop due to corrupted queue state.

This PR fixes two kinds of mistakes leading to this scenario:

  • Calling clear() from cancel/dispose when the output is fused.
  • Calling clear() from a fused drain loop when cancellation is detected.

When fused, similar to poll(), calling clear() is the responsibility of the consumer and the producer side is not allowed to call them.

The bug affected the following operators:

  • FlowableOnBackpressureBuffer
  • FlowableGroupBy
  • UnicastProcessor
  • UnicastSubject

Fixes #6673

@akarnokd akarnokd added this to the 3.0 milestone Oct 17, 2019
@codecov
Copy link

codecov bot commented Oct 17, 2019

Codecov Report

Merging #6676 into 3.x will increase coverage by 0.03%.
The diff coverage is 100%.

Impacted file tree graph

@@             Coverage Diff             @@
##                3.x   #6676      +/-   ##
===========================================
+ Coverage     98.06%   98.1%   +0.03%     
  Complexity     6189    6189              
===========================================
  Files           677     677              
  Lines         44682   44681       -1     
  Branches       6169    6170       +1     
===========================================
+ Hits          43819   43834      +15     
+ Misses          314     300      -14     
+ Partials        549     547       -2
Impacted Files Coverage Δ Complexity Δ
...reactivex/rxjava3/processors/UnicastProcessor.java 100% <100%> (ø) 68 <0> (ø) ⬇️
...3/internal/operators/flowable/FlowableGroupBy.java 96.55% <100%> (+1.04%) 3 <0> (ø) ⬇️
...erators/flowable/FlowableOnBackpressureBuffer.java 96.63% <100%> (ø) 2 <0> (ø) ⬇️
.../io/reactivex/rxjava3/subjects/UnicastSubject.java 100% <100%> (ø) 65 <0> (ø) ⬇️
...rxjava3/internal/observers/QueueDrainObserver.java 97.43% <0%> (-2.57%) 21% <0%> (-1%)
...ternal/operators/observable/ObservablePublish.java 97.29% <0%> (-1.81%) 16% <0%> (-1%)
...va3/internal/operators/flowable/FlowableCache.java 98.48% <0%> (-1.52%) 38% <0%> (-1%)
...internal/operators/flowable/FlowableSwitchMap.java 94.39% <0%> (-1.41%) 3% <0%> (ø)
...perators/observable/ObservableMergeWithSingle.java 99.05% <0%> (-0.95%) 2% <0%> (ø)
...nternal/operators/observable/ObservableCreate.java 95.72% <0%> (-0.86%) 2% <0%> (ø)
... and 17 more

Continue to review full report at Codecov.

Legend - Click here to learn more
Δ = absolute <relative> (impact), ø = not affected, ? = missing data
Powered by Codecov. Last update e6406b3...f295e0b. Read the comment docs.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

NPE in SpscLinkedArrayQueue.clear due to concurrent invocation
2 participants