New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

shareReplay not cleaning up subscriptions. #3336

Open
benlesh opened this Issue Feb 21, 2018 · 26 comments

Comments

Projects
None yet
@benlesh
Member

benlesh commented Feb 21, 2018

RxJS version: 5 or 6

Code to reproduce:

const source = interval(1000).pipe(
  tap(() => console.log('tick'),
  shareReplay(1),
);

const sub = source.subscribe(x => console.log(x));

sub.unsubscribe();

Expected behavior:

It should never log "tick"

Actual behavior:

It TOTALLY logs "tick".

Additional information:

shareReplay should definitely not recycle the underlying ReplaySubject when refCount drops to 0 due to unsubscription. However, it should end the subscription, which it's currently not doing, because I'm a dolt.

It should be an easy fix.

@benlesh benlesh self-assigned this Feb 21, 2018

@benlesh benlesh added the type: bug label Feb 21, 2018

@liqwid

This comment has been minimized.

liqwid commented Feb 27, 2018

Explored this bug a bit, it seems that Teardown logic for shareReplay is never triggered at all, because current implementation of Observable.prototype.subscribe doesn't do anything about function returned from operator.call:

  subscribe(observerOrNext?: PartialObserver<T> | ((value: T) => void),
            error?: (error: any) => void,
            complete?: () => void): Subscription {

    const { operator } = this;
    const sink = toSubscriber(observerOrNext, error, complete);

    if (operator) {
      operator.call(sink, this.source);
    } else {
      sink.add(this.source ? this._subscribe(sink) : this._trySubscribe(sink));
    }

    return sink;
  }

I guess best bet here is to refactor shareReplay to a class with own _next, _error & _unsubscribe methods

Other fix which is more hacky is to modify unsubscribe method of sink inside shareReplay

@liqwid

This comment has been minimized.

liqwid commented Feb 27, 2018

Also as far as I see behavior described in this bug contradicts with unit test

should not restart if refCount hits 0 due to unsubscriptions

because we can prevent the source observable from restarting only by maintaining the subscription to it.

In other words shareReplay returns a hot observable and currently upon reaching 0 subscriptions it is still hot.
If we don't want it to restart source it should remain hot.
But if we don't want any actions in pipe prior to shareReplay to be triggered we should make it cold upon reaching 0 subscriptions which will cause a full restart upon resubscribe.
BTW according to source code shareReplay should become cold once source is completed but that part is currently broken( described in previous comment).

@liqwid

This comment has been minimized.

liqwid commented Feb 28, 2018

Added a pr fixing not working teardown logic by injecting code into sink's unsubscribe

@brechtbilliet

This comment has been minimized.

brechtbilliet commented Apr 11, 2018

Any updates on this?

@ksaldana1

This comment has been minimized.

ksaldana1 commented May 14, 2018

I ran into this issue today in our React-Native mobile application. We are using shareReplay to drive performant, observable-based Redux selectors.

export const KITCHENS_ALL: Observable<IDMap<Kitchen>> = extractPath(
  Paths.KITCHENS, // path to property on state tree e.g. ['app', 'kitchens']
  emptyObj // fallback value
);

where extractPath is

function extractPath<T>(path: string[], defaultVal: T): Observable<T> {
   // access to state stream in closure
    return state$.pipe(
      pluck<T, any>(...path),
      map(val => (val == null ? defaultVal : val)),
      distinctUntilChanged(),
      shareReplay(1)
    );
  }

@benlesh Do you have a suggested workaround for the time being? Is there something more appropriate than the shareReplay operator that I should be using here?

@d3lm

This comment has been minimized.

d3lm commented May 15, 2018

@ksaldana1 You can use publishReplay(1).refCount() as an alternative for the time being.

@martinsik

This comment has been minimized.

Contributor

martinsik commented May 15, 2018

I remember this was reported and discussed some time ago in this issue #3127

@intellix

This comment has been minimized.

intellix commented May 22, 2018

This workaround was given to me in gitter as I cried that my teardown was never run:

const timer$ = Rx.Observable.timer(0, 500)
  .multicast(x => new Rx.ReplaySubject(1))
  .refCount();

The publishReplay(1).refCount() above with a timer emitted a weird result when subscribed to with 0 refs (it initially gave the last result and then started fresh)

@jbedard

This comment has been minimized.

jbedard commented Jul 12, 2018

I'm having the same issue as @intellix with publishReplay(1) + refCount() which seems the same as this shareReplay issue... publishReplay is reusing the underlying ReplaySubject so when the refCount reaches 0 the replay values are not removed.

Does this issue (recycling the ReplaySubject) also apply to publishReplay?

@cartant

This comment has been minimized.

Collaborator

cartant commented Jul 12, 2018

@jbedard What you are seeing is the intended behaviour. publish - and all of its variants - reuse the subject by design. That's one of the things that distinguishes publish from share.

If you want a new subject to be created and subscribed to the source when the reference count hits zero, you need to pass a factory to multicast - as in the comment above.

@jbedard

This comment has been minimized.

jbedard commented Jul 13, 2018

That means the "weird result" described above is correct? I guess I've been using the wrong operators all this time 🤔

However this bug is essentially making shareReplay the same as publishReplay + refCount because the underlying ReplaySubject is being recycled?

@cartant

This comment has been minimized.

Collaborator

cartant commented Jul 13, 2018

@jbedard Yes, the "weird result" was the expected behaviour.

And, no, they aren't the same.

publishReplay + refCount would unsubscribe from the source, but would resubscribe using the same subject. The current implementation of shareReplay never unsubscribes from the source; it depends upon the source completing (or erroring) to effect unsubscription. That's what this issue is about.

@jbedard

This comment has been minimized.

jbedard commented Jul 13, 2018

I see. Thanks for the info 👍

@Xample

This comment has been minimized.

Xample commented Jul 30, 2018

A good explanation on shareReplay(1) vs publishReplay(1).refCount()
https://blog.strongbrew.io/share-replay-issue/

Note that on the official RxJS v 4 page:
This operator is a specialization of replay that connects to the connectable observable sequence when the number of observers goes from zero to one, and disconnects when there are no more observers.

Which contracticts with the unit test of the V5 and V6

@DomiR

This comment has been minimized.

Contributor

DomiR commented Aug 15, 2018

@benlesh "this should be an easy fix" 😄
Is it just removing the isComplete from https://github.com/ReactiveX/rxjs/blob/master/src/internal/operators/shareReplay.ts#L87? Or is this behaviour still beeing discussed?

@OliverJAsh

This comment has been minimized.

Contributor

OliverJAsh commented Aug 17, 2018

There are two workarounds posted here, but I'm not familiar enough with the multicast operators to know the difference. Is there a workaround that provides exactly the desired behaviour of shareReplay?

@cartant

This comment has been minimized.

Collaborator

cartant commented Aug 27, 2018

@benlesh It's clear from this issue that when the ref count drops to zero, it should unsubscribe from the source. However, there are other aspects that are unclear. Precisely how is it supposed to behave? In particular:

  • If the ref count drops to zero and then another subscription is made, is it supposed to resubscribe to the source?
  • If it is supposed to resubscribe, should a new ReplaySubject be created?

I'd like to fix this, as I'm seeing people have to resort to unusual solutions - like placing takeUntil before shareReplay - to avoid the subscription leak effected by this bug.

@benlesh

This comment has been minimized.

Member

benlesh commented Aug 27, 2018

Okay... so here's the problem, overall. I've seen a split on what the behavior should be.

Given what this operator is used for, which is "hey, this thing is underway, and I want to share it's results with everyone, including those that are late to the game"... The problem is, the semantics are the question here.

If the whole point of shareReplay is to cache success, then I'd think that we'd want it to just finish and see if it's successful, which is, I believe, the current behavior.

But if you're using it for other scenarios (such as endless streaming data), I can see why you'd want to clean up on unsubscription to ref count 0.

It might be best to put this behavior behind a flag which can be passed as an argument to shareReplay, that way both behaviors are possible. That sucks, but I can't think of anything else right now, barring implementing yet another operator.

@benlesh

This comment has been minimized.

Member

benlesh commented Aug 27, 2018

Either way, I think it should be consistent with regards to the behavior when ref count hits zero. That is to say:

  • Completions and Errors always tear down
  • Errors always recycle
  • Completions never recycle
  • Unsubscription to ref count zero should be behind a flag to either:
    • keep the current connection and continue
    • or teardown and recycle
@PSanetra

This comment has been minimized.

PSanetra commented Aug 27, 2018

I think you should deprecate the current shareReplay operator and replace it by a new operator: shareReplayUntil(Observable | REF_COUNT, number)

shareReplayUntil(Observable, number): caches until provided observable emits. shareReplayUntil(NEVER, n) would behave like the current implementation of shareReplay. The new api makes it more obvious that it may leak subscriptions if not properly used.
shareReplayUntil(REF_COUNT, number): caches until refCount drops to zero. It should never recycle its ReplaySubject after an error or after completion. (I don't know why to handle both cases different. As a new subscriber I wouldn't care if it stopped emitting because of an error or because of completion, would I?). This api provides a different behavior than publishReplay + refCount.

@d3lm

This comment has been minimized.

d3lm commented Aug 28, 2018

I think it would be better not to deprecate shareReplay and introduce another operator shareReplayUntil. It is totally legit to introduce a config parameter for shareReplay where you can specify which behavior you want, either completion when refCount hits 0 or not. As introduced in @cartant's PR, it's a simple boolean flag and it doesn't require yet another operator, even if the original one would be deprecated.

@PSanetra

This comment has been minimized.

PSanetra commented Aug 28, 2018

In my opinion is shareReplay too easy to misuse. A new api like shareReplayUntil would make its behavior more obvious. This issue exists because it is unexpected that shareReplay does never unsubscribe.

@fjozsef

This comment has been minimized.

fjozsef commented Aug 28, 2018

This issue exists because it is unexpected that shareReplay does never unsubscribe.

I totally agree, therefore I think the default behavior of shareReplay should be teardown and recycle and with an option the current (keep the current connection and continue) behavior could be enabled. Unfortunatelly this would be a breaking change.

@pvinis

This comment has been minimized.

pvinis commented Sep 7, 2018

maybe something like this?
https://github.com/ReactiveX/RxSwift/blob/53cd723d40d05177e790c8c34c36cec7092a6106/RxSwift/Observables/ShareReplayScope.swift#L143-L154
also, my thought is to just reuse the shareReplay operator. no need for a new one. just add an argument.

@nilfalse

This comment has been minimized.

nilfalse commented Jan 9, 2019

Just a personal opinion here, but at glance (judging by the operator name) I was expecting both teardown and recycle be part of shareReplay() lifecycle. It was quite surprising (at least to me) to find this in the docs:

A successfully completed source will stay cached in the shareReplayed observable forever, but an errored source can be retried.

Would it be a good argument to say that a stream produced by an operator should not introduce any discrepancies in its consumption for cases where the source errors or successfully completes.

Either cache in both cases or not cache in both cases?

@cartant

This comment has been minimized.

Collaborator

cartant commented Jan 9, 2019

@nilfalse See #4059

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment