Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

2.x: Improve the Observable/Flowable cache() operators #6275

Merged
merged 3 commits into from Nov 1, 2018

Conversation

akarnokd
Copy link
Member

@akarnokd akarnokd commented Oct 31, 2018

This PR rewrites the Observable.cache and Flowable.cache operators to allocate less and be more up-to-date algorithmically.

I've also added comments to help understand its inner workings in case someone is interested.

Resolves: #6270

@codecov
Copy link

codecov bot commented Oct 31, 2018

Codecov Report

Merging #6275 into 2.x will decrease coverage by <.01%.
The diff coverage is 98.25%.

Impacted file tree graph

@@             Coverage Diff              @@
##                2.x    #6275      +/-   ##
============================================
- Coverage     98.22%   98.22%   -0.01%     
- Complexity     6213     6257      +44     
============================================
  Files           667      667              
  Lines         44905    44887      -18     
  Branches       6228     6213      -15     
============================================
- Hits          44110    44089      -21     
- Misses          251      255       +4     
+ Partials        544      543       -1
Impacted Files Coverage Δ Complexity Δ
src/main/java/io/reactivex/Observable.java 100% <100%> (ø) 541 <2> (ø) ⬇️
...internal/operators/observable/ObservableCache.java 95.9% <98.18%> (+2.2%) 34 <33> (+25) ⬆️
...vex/internal/operators/flowable/FlowableCache.java 98.48% <98.27%> (-0.15%) 38 <37> (+27)
.../operators/flowable/FlowableBlockingSubscribe.java 93.02% <0%> (-4.66%) 10% <0%> (-1%)
...a/io/reactivex/internal/util/QueueDrainHelper.java 95.83% <0%> (-4.17%) 55% <0%> (-3%)
...va/io/reactivex/internal/util/LinkedArrayList.java 97.22% <0%> (-2.78%) 8% <0%> (-1%)
...nternal/operators/observable/ObservableCreate.java 95.72% <0%> (-2.57%) 2% <0%> (ø)
...activex/internal/observers/QueueDrainObserver.java 97.43% <0%> (-2.57%) 21% <0%> (-1%)
...ernal/operators/flowable/FlowableFromIterable.java 95.18% <0%> (-2.14%) 5% <0%> (ø)
.../io/reactivex/disposables/CompositeDisposable.java 98.14% <0%> (-1.86%) 39% <0%> (-1%)
... and 19 more

Continue to review full report at Codecov.

Legend - Click here to learn more
Δ = absolute <relative> (impact), ø = not affected, ? = missing data
Powered by Codecov. Last update fba8b61...0dc6be2. Read the comment docs.

@@ -35,7 +35,7 @@
public class ObservableCacheTest {
@Test
public void testColdReplayNoBackpressure() {
ObservableCache<Integer> source = (ObservableCache<Integer>)ObservableCache.from(Observable.range(0, 1000));
ObservableCache<Integer> source = (ObservableCache<Integer>)new ObservableCache<Integer>(Observable.range(0, 1000), 16);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the cast really needed now?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So the test can access the the package-private methods verifying the internal state.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Either I'm missing something or

ObservableCache<Integer> source = (ObservableCache<Integer>)new ObservableCache<Integer>(Observable.range(0, 1000), 16);

can just be

ObservableCache<Integer> source = new ObservableCache<Integer>(Observable.range(0, 1000), 16);

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are right. Updated.

@@ -351,4 +351,23 @@ public void run() {
.assertSubscribed().assertValueCount(500).assertComplete().assertNoErrors();
}
}

@Test
public void cancelledUpFront() {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Flowable has this test?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes it has.

void remove(CacheSubscription<T> consumer) {
for (;;) {
CacheSubscription<T>[] current = subscribers.get();
int n = current.length;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: final

long index = consumer.index;
int offset = consumer.offset;
Node<T> node = consumer.node;
AtomicLong requested = consumer.requested;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe remove this reference? It looks like the value is captured here and it's a bit misleading

I doubt you win much by capturing it here

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is accessed every turn as it doubles as the cancelled indicator.

@SuppressWarnings("unchecked")
void remove(CacheSubscription<T> consumer) {
for (;;) {
CacheSubscription<T>[] current = subscribers.get();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: final

int offset = consumer.offset;
Node<T> node = consumer.node;
AtomicLong requested = consumer.requested;
Subscriber<? super T> downstream = consumer.downstream;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would be great to mark what's possible as final here to improve readability

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not our style.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea, I think I was overexcited to push code readability a bit further after recent rewrites you've done with multi-character variable names and comments :D

@@ -951,11 +951,11 @@ public void accept(String v) {
@Test
public void testUnsubscribeSource() throws Exception {
Action unsubscribe = mock(Action.class);
Flowable<Integer> f = Flowable.just(1).doOnCancel(unsubscribe).cache();
Flowable<Integer> f = Flowable.just(1).doOnCancel(unsubscribe).replay().autoConnect();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ouch

@akarnokd akarnokd merged commit c3cfb5a into ReactiveX:2.x Nov 1, 2018
@akarnokd akarnokd deleted the CacheImprovements branch November 1, 2018 06:31
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

3 participants