Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

3.x: Fix parallel() on grouped flowable not replenishing properly #6719

Merged
merged 6 commits into from
Nov 20, 2019

Conversation

akarnokd
Copy link
Member

Fix a case when the GroupedFlowable is consumed by a parallel() in fusion mode causing the source to stop replenishing items from the upstream, hanging the whole sequence.

parallel() was slightly different from the usual queue consumers because it checks for isEmpty before trying to pull for an item. This was necessary because the rails may not be ready for more and an eager pull to check for emptyness would lose that item. The replenishing was done in GroupedFlowable.pull but a call to GroupedFlowable.isEmpty would not replenish.

The fix is to have isEmpty replenish similar to when poll detects emptyness and replenishes.

Reported in reactor/reactor-core#1959

@akarnokd akarnokd added this to the 3.0 milestone Nov 20, 2019
@akarnokd akarnokd changed the title Group by parallel fix 3.x: Fix parallel() on grouped flowable not replenishing properly Nov 20, 2019
@codecov
Copy link

codecov bot commented Nov 20, 2019

Codecov Report

Merging #6719 into 3.x will decrease coverage by 0.05%.
The diff coverage is 100%.

Impacted file tree graph

@@             Coverage Diff              @@
##                3.x    #6719      +/-   ##
============================================
- Coverage     98.17%   98.11%   -0.06%     
+ Complexity     6189     6187       -2     
============================================
  Files           677      677              
  Lines         44658    44663       +5     
  Branches       6170     6171       +1     
============================================
- Hits          43843    43823      -20     
- Misses          291      302      +11     
- Partials        524      538      +14
Impacted Files Coverage Δ Complexity Δ
...3/internal/operators/flowable/FlowableGroupBy.java 95.54% <100%> (-0.21%) 3 <0> (ø)
...l/operators/observable/ObservableFlatMapMaybe.java 90.14% <0%> (-7.05%) 2% <0%> (ø)
.../operators/flowable/FlowableBlockingSubscribe.java 93.02% <0%> (-4.66%) 10% <0%> (-1%)
...eactivex/rxjava3/processors/BehaviorProcessor.java 96.58% <0%> (-2.44%) 51% <0%> (ø)
...rnal/operators/observable/ObservableSwitchMap.java 91.57% <0%> (-2.11%) 3% <0%> (ø)
...tivex/rxjava3/disposables/CompositeDisposable.java 98.14% <0%> (-1.86%) 39% <0%> (-1%)
...java3/internal/operators/flowable/FlowableZip.java 97.39% <0%> (-1.05%) 6% <0%> (ø)
...ernal/operators/flowable/FlowableFlatMapMaybe.java 95.31% <0%> (-1.05%) 2% <0%> (ø)
...rxjava3/internal/schedulers/ExecutorScheduler.java 96% <0%> (-1%) 10% <0%> (ø)
...perators/single/SingleFlatMapIterableFlowable.java 95.83% <0%> (-0.84%) 2% <0%> (ø)
... and 12 more

Continue to review full report at Codecov.

Legend - Click here to learn more
Δ = absolute <relative> (impact), ø = not affected, ? = missing data
Powered by Codecov. Last update 5026999...7c7b028. Read the comment docs.

@akarnokd akarnokd merged commit 7c0793d into ReactiveX:3.x Nov 20, 2019
@akarnokd akarnokd deleted the GroupByParallelFix branch November 20, 2019 13:56
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

2 participants