Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

3.x: ConnectableFlowable/ConnetableObservabe redesign #6519

Merged
merged 1 commit into from
Jun 20, 2019

Conversation

akarnokd
Copy link
Member

@akarnokd akarnokd commented Jun 19, 2019

This PR changes the connectable API to have a specific reset method to reset a terminated connectable source as part of the official API.

In 2.x, when publish() terminated, it reset itself to a fresh state which could lead to late consumers not receiving events as there might be no one to call connect() again (see #6501). However, replay() did not reset itself, thus late consumers got the cached events, however, a reconnect started the sequence and new consumers may have missed items.

In 3.x, this two corner cases have been fixed by the introduction of reset(). Both publish and replay now remain in their terminated state until reset is called. If the connection is disposed, it will automatically reset their state just like before. The state transitions are as follows:

  1. fresh -> connect() -> running -> onComplete()/onError() -> terminated -> reset() -> fresh
  2. fresh -> connect() -> running -> dispose() -> fresh
  3. fresh -> connect() -> running -> onComplete()/onError() -> terminated -> dispose() -> fresh
  4. fresh -> connect() -> running -> onComplete()/onError() -> terminated -> connect() -> running

This does resolve the race condition with publish().refCount() described in #6501.

In addition, there are some changes to Flowable.publish()'s behavior:

It no longer keeps consuming the upstream if there are no subscribers. This implies if the source terminates while there are unconsumed items in the internal buffer, those will be available for observation.
I have no strong preference on this property and in comparison, Observable.publish drops items because there is no backpressure buffer in its implementation.

Upstream errors are not reported to the RxJavaPlugins.onError handler when if there are no subscribers but have to be observed via a subscriber.
Because terminal events are available until reset now, we can't know really if there is going to be a subscriber or not. However, it might be possible to detect the no-consumer case upon an error and still report it when reset or dispose is called.

Resolves #5628
Resolves #5899

@akarnokd akarnokd added this to the 3.0 milestone Jun 19, 2019
@akarnokd akarnokd changed the title 3.x: ConnectableFlowable/ConnetableFlowable redesign 3.x: ConnectableFlowable/ConnetableObservabe redesign Jun 19, 2019
@codecov
Copy link

codecov bot commented Jun 19, 2019

Codecov Report

Merging #6519 into 3.x will decrease coverage by 0.02%.
The diff coverage is 95.35%.

Impacted file tree graph

@@             Coverage Diff             @@
##               3.x    #6519      +/-   ##
===========================================
- Coverage     98.2%   98.17%   -0.03%     
- Complexity    6245     6247       +2     
===========================================
  Files          675      675              
  Lines        45051    45011      -40     
  Branches      6225     6211      -14     
===========================================
- Hits         44241    44189      -52     
- Misses         275      283       +8     
- Partials       535      539       +4
Impacted Files Coverage Δ Complexity Δ
...o/reactivex/observables/ConnectableObservable.java 100% <ø> (ø) 12 <0> (ø) ⬇️
...va/io/reactivex/flowables/ConnectableFlowable.java 100% <ø> (ø) 12 <0> (ø) ⬇️
src/main/java/io/reactivex/Flowable.java 100% <100%> (ø) 565 <0> (ø) ⬇️
...ternal/operators/observable/ObservablePublish.java 100% <100%> (+5.3%) 18 <18> (+7) ⬆️
src/main/java/io/reactivex/Observable.java 100% <100%> (ø) 540 <1> (ø) ⬇️
...ernal/operators/observable/ObservableRefCount.java 100% <100%> (ø) 24 <0> (-4) ⬇️
.../internal/operators/flowable/FlowableRefCount.java 100% <100%> (ø) 24 <0> (-4) ⬇️
...nternal/operators/observable/ObservableReplay.java 98.37% <40%> (-0.26%) 20 <1> (ø)
...ex/internal/operators/flowable/FlowableReplay.java 94.35% <40%> (-0.59%) 20 <1> (ø)
...x/internal/operators/flowable/FlowablePublish.java 96.55% <96.32%> (+0.34%) 18 <17> (+7) ⬆️
... and 33 more

Continue to review full report at Codecov.

Legend - Click here to learn more
Δ = absolute <relative> (impact), ø = not affected, ? = missing data
Powered by Codecov. Last update 583c4e7...36a8733. Read the comment docs.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

1 participant