Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

shareReplay with refCount doesn't restart due to unsubscriptions if the source completes first #5455

Closed
josepot opened this issue May 26, 2020 · 7 comments

Comments

@josepot
Copy link
Contributor

josepot commented May 26, 2020

Bug Report

Current Behavior
shareReplay doesn't restart due to unsubscriptions when refCount is true if the source completes before the subscribers.

Reproduction

import {interval} from 'rxjs';
import {finalize, shareReplay, take} from 'rxjs/operators'

const sharedSource = interval(100).pipe(
  take(5),
  finalize(() => {
    console.log('The source has completed!')
  }),
  shareReplay({refCount: true, bufferSize: 1}),
);

const s1 = sharedSource.subscribe(x => console.log('subscription 1', x));
const s2 = sharedSource.subscribe(x => console.log('subscription 2', x));
console.log('Two active subscriptions...')

setTimeout(() => {
  s1.unsubscribe();
  s2.unsubscribe();
  console.log('Both subscriptions have been unsubscribed (refCount === 0)')
}, 550)

setTimeout(() => {
  console.log('Creating a new subscription...')
  sharedSource.subscribe(x => console.log('subscription 3', x))
}, 600)

Expected behavior
I would expect to restart the subscription to the source after the refCount has gone down to zero.

Environment

  • Runtime: Node v12.4.0,
  • RxJS version: 6.5.5
@cartant
Copy link
Collaborator

cartant commented May 26, 2020

What you are seeing is the expected behaviour. Once the source completes, that's it. The completion is replayed. If you don't want this behaviour, use share instead.

@josepot
Copy link
Contributor Author

josepot commented May 26, 2020

Hi @cartant and thanks for your quick reply.

What you are seeing is the expected behaviour. Once the source completes, that's it. The completion is replayed. If you don't want this behaviour, use share instead.

I really thought that this was a bug. Please notice that in the PR that I've opened all the existing tests are passing and I also added a new one to cover this case.

One of the reasons why I thought that this was a bug is that currently there is a test for shareReplay that states:

should restart due to unsubscriptions if refCount is true

I don't understand why that shouldn't be the case if the source has completed and all the subscribers have unsubscribed. I thought that was the point of using refCount. If that's not the expected behavior perhaps would it be worth it to add a test that makes sure that once the source has completed then the refCount is ignored?

Also, I know how to create my own operator with this behavior, but is there a way that I can get this behavior using share or publishReplay... because I can't think of a way to get that.

Thanks a lot for looking into this.

@cartant
Copy link
Collaborator

cartant commented May 26, 2020

I don't understand why that shouldn't be the case ...

Look at it this way: if it resubscribes to the source it is not replaying the notifications.

@josepot
Copy link
Contributor Author

josepot commented May 26, 2020

I do understand what you are saying, but another way to look at it could be: when using refCount: true, then if the refCount goes down to zero then it's like nothing ever happened 😄.

I'm not saying that you are wrong. I'm just saying that's what I expected/wished and when I found a test that seemed to state that, then that lead me to think that this was a bug. I also thought that if I wanted the behavior that you are describing as the expected one, then I could use:

source.pipe(publishReplay(1), refCount())

I thought that by having the refCount option in the shareReplay operator, then we would be able to get a behavior that's closer to the share operator...

Anyways, well, if it isn't a bug then 2 questions:

  • Would it be worth it to add a test that removes the current "ambiguity"?
  • Is there a way to get the behavior that I was expecting by combining other lower-level operators? or should I write my own operator if I wanted that behavior?

Thanks!

@cartant
Copy link
Collaborator

cartant commented May 26, 2020

Would it be worth it to add a test that removes the current "ambiguity"?

Yes, definitely!

Is there a way to get the behavior that I was expecting by combining other lower-level operators?

Yes: multicast(() => ReplaySubject(1))

@josepot
Copy link
Contributor Author

josepot commented May 26, 2020

Thanks a lot! I'm going to get some rest now and I will send a PR with that test tomorrow. Once again: thanks a lot for being so responsive and supportive!

@dietergeerts
Copy link

dietergeerts commented Jan 4, 2023

Would it be worth it to add a test that removes the current "ambiguity"?

Yes, definitely!

Is there a way to get the behavior that I was expecting by combining other lower-level operators?

Yes: multicast(() => ReplaySubject(1))

Hi, this doesn't work for me, and at first, I was very confused on how this should work, but I think what I functionally want isn't really possible, or not that I can find or think of. So if you could provide some help to do the following:

"the source observable must be subscribed once during multiple subscriptions, and re-subscribed when it drops to 0 and a new subscriber subscribes, and for new subscribers, use the last emitted value."

I though that this is what the new shareReplay did, share the source among the subscribers, and use the last value to emit to late subscribers. But that's not the case, because if I understand correctly, the reset on error and on complete, does reset when these events happen when a new subscriber is added, and the reset on refCount doesn't happen when the source completed or has error. I could make sure that the source never errors or completes, and then shareReplay will behave like expected, as it will re-subscribe to the source only when all subscribers where gone, and the first subscribes again. But that doesn't feel right.

So what operators can be used to get this behavior? Or is this not possible, and I have to make sure the observable never errors nor completes?

I tried the following, but that doesn't reset when refCount hits 0, and the source observable had been completed.

share({
        connector: () => new ReplaySubject(1),
        resetOnError: false,
        resetOnComplete: false,
        resetOnRefCountZero: true,
}),

When turning on the reset on complete, new subscribers will reset the source observable, even if there are still others subscribed, and that is unwanted behavior.

Maybe I should also sketch the actual use-case for this wanted behavior to make it more clear. So for our web application, we have build a custom store with RxJS, where we have resources. A resource can be seen as a single data instance, like patient 123, and to get the data of this patient, we subscribe to its observable. We start those resource observables with defer, so that the HTTP call to our service only happens when the data is needed in the UI. Off course, this data might be needed in multiple places in the UI, and to keep development simple, and maintenance low, we just subscribe the same observable. If we do nothing, this will trigger a new HTTP call, while it's technically/functionally not needed. This is the default behavior of observables. Therefore, we added the shareReplay operator, to make sure that during subscriptions, this resource observable only executes once. Internally, such resource observable can listen to events and emit new values based on them. These should never error, but they can complete though, if they don't listen to any events to update themselves. So when all subscribers unsubscribe, because the data isn't needed anymore in the UI, refCount will drop to 0, and the data should be released. During refCount > 0, we never want a reset due to subscriptions being changed. When refCount drops to 0, we want a reset, even if the source did complete, so that we do a new HTTP call next time the data is needed in the UI, as that data could have been changed on the server at that time. I hope this is clear and makes sense? Maybe there are other ways of creating these resource observables to get this behavior, or I could use other operators instead? @cartant

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants