Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

shareReplay with refCount stops working #6760

Open
ghostlytalamaur opened this issue Jan 12, 2022 · 9 comments
Open

shareReplay with refCount stops working #6760

ghostlytalamaur opened this issue Jan 12, 2022 · 9 comments

Comments

@ghostlytalamaur
Copy link

ghostlytalamaur commented Jan 12, 2022

Describe the bug

When subscribe to shared observable multiple times internal connection to the source observable gets broken. It stops emit values on subscribe.

Expected behavior

Expected output:

first: 7.5.1
second: 7.5.1

Actual output

first: 7.5.1

Reproduction code

import { BehaviorSubject, merge, of, Subject } from 'rxjs';
import {
  shareReplay,
  take,
  takeUntil,
  materialize,
  filter,
} from 'rxjs/operators';

const shared$ = new BehaviorSubject('7.5.1').pipe(
  shareReplay({ refCount: true, bufferSize: 1 })
);

const isCompleted$ = shared$.pipe(
  materialize(),
  filter((n) => n.kind === 'C'),
  take(1)
);

const work = new Subject().pipe(takeUntil(isCompleted$));

merge(work, shared$)
  .pipe(take(1))
  .subscribe((value) => console.log('first: ', value));

shared$.subscribe((value) => console.log('second:', value));

Reproduction URL

7.5.1
6.4.0

Version

7.5.1

Environment

No response

Additional context

No response

@EarthyOrange
Copy link

EarthyOrange commented Jan 21, 2022

Removing .pipe(take(1)) makes the 7.5.1 snippet work. So it looks like take is somehow affecting the shareReplay operator.

I had run in to something similar in my own project. So I ended up writing a simple operator that works like take but doesn't complete.

import { filter, map, scan } from 'rxjs/operators';

/**
 * Similar to the take operator, as in, it lets the first few values through.
 * However, unlike 'take' this operator doesn't complete.
 *
 * @param {int} few The number of emissions to let through
 * @returns {function(Observable): Observable}
 */
export default function first(few = 1) {
  return (source) => source.pipe(
    scan(
      (acc, value) => {
        if (acc.count < few) {
          acc.count += 1;
          acc.lastValue = value;
          return acc;
        }
        acc.lastValue = undefined;
        return acc;
      },
      { count: 0, lastValue: undefined },
    ),
    map(({ lastValue }) => lastValue),
    filter((lastValue) => !!lastValue),
  );
}

Replace take with first.

@lincolnthree
Copy link

lincolnthree commented Apr 15, 2022

Confirmed I am also seeing behavior where shareReplay({ bufferSize: 1, refCount: true }) stops emitting. My theory is also that it is causing the source observable to somehow complete when it should not, and it is not reconnecting.

Using first() did not solve my issue. Only setting refCount: false gets things working again (which of course introduces risk for a memory leak)

I feel like I am missing something basic here, but something definitely changed in the functionality between 6.x and 7.current.

For me there seems to be a commonality of using combineLatest([x$, y$]) in combination with shareReplay() that isn't working. Still tracking it down.

@lincolnthree
Copy link

I think this might be related: https://github.com/ReactiveX/rxjs/pull/5634/files#diff-44fa2a928f593fdd8b981dac7737a65f6b0ddc844300af18a539c59b683b5467R130

Looks like shareReplay now calls complete() on the subscribed subject/connector when the source completes. I don't think this this was in the original shareReplay behavior?

Am I missing something?

@henry-alakazhang
Copy link
Contributor

henry-alakazhang commented Jun 16, 2022

I'm experiencing this as well. It happens when you subscribe multiple times, then unsubscribe and resubscribe again (is this called a reentrant subscription?). The code in the original post is a good example since multiple things subscribe to shared$.

Specifically, it happens if the code emits and unsubscribes immediately. If it emits after the first subscription, things seem to work as expected. It looks like it's a race condition in the setup code and the reset at refcount 0 logic.

Excuse my atrocious console logging, but this is what I see when a subscription is made and the source emits after a delay:

image

Whereas this is what happens if the source emits immediately:

image

As you can see, if a value emits immediately and both subscriptions unsubscribe (as you might get with take(1) or firstValueFrom()), the reset happens before the logic for the second subscription finishes. The shareReplay then gets into a bad state - it maintains a subscription to the source, and doesn't resubscribe when new results come in.

This started happening in 7 because of this new code, which prevents excess subscriptions to the source observable. There was also some rearranging of the code which may have prevented it in 6, I'm not sure.

if (!connection) {
// We need to create a subscriber here - rather than pass an observer and
// assign the returned subscription to connection - because it's possible
// for reentrant subscriptions to the shared observable to occur and in
// those situations we want connection to be already-assigned so that we
// don't create another connection to the source.
connection = new SafeSubscriber({

Flipping the order of the code so the source subscription check runs before the refCount decrement subscription seems to fix the issue for me, but there are some comments explicitly wanting to do it in the other order, so I don't know if that solution will work.

@lincolnthree
Copy link

lincolnthree commented Jun 17, 2022

@henry-alakazhang Do you have a code reproduction you can share that demonstrates the issue? It would probably help the devs make sure it gets tested and fixed! (I haven't been able to reproduce this outside of my app - sporadically.)

@henry-alakazhang
Copy link
Contributor

henry-alakazhang commented Jun 17, 2022

I haven't tried to reproduce it outside my app either, but the reproduction in the issue itself should demonstrate the issue fine.

  • shareReplay on an observable that emits immediately (eg. a BehaviorSubject)
  • Multiple simultaneous subscription/unsubscriptions
  • Another later subscription

I can simplify the reproduction a little more:

// Apply `replay` to an observable which emits immediately once
const shared$ = new BehaviorSubject('6.4.0').pipe(
  shareReplay({ refCount: true, bufferSize: 1 })
);

// Subscribe to it multiple times simultaneously, then unsubscribe simultaneously
combineLatest([shared$, shared$]).pipe(
  take(1)
).toPromise();

// It doesn't emit again.
const lateSub = shared$.subscribe((value) => console.log('late:', value));

This example specifically is minimal to the point of silliness (combineLatest on the same observable) but if you imagine doing a .pipe() and some transformations and merging it back in, it can be a real use case.

Note that also:

// If we reset it (via refCount = 0), the state gets fixed.
lateSub.unsubscribe();

shared$.subscribe((value) => console.log('later:', value)); // emits on both 7.5.1 and 6.4.0

Stackblitz 6.4.0: https://stackblitz.com/edit/q7u9jg-xxev1z?file=index.ts
Logs "late:" and "later:"
Stackblitz 7.5.1: https://stackblitz.com/edit/q7u9jg-p16qzd?file=index.ts
Logs only "later:"

@magyargergo
Copy link

Hey guys,

Do you have any update on this?

@jkossis
Copy link

jkossis commented Jul 11, 2022

@benlesh it seems like part of the issue (or a tangential issue) is, with the refactor of share, we lost some of the granularity previously provided.

For example, publishReplay + refCount used to unsubscribe from source when the ref count dropped to 0, while still preserving the underlying ReplaySubject. With the newly configurable share, those two things are coupled with resetOnRefCountZero. So you can have the behavior of unsubscribe from source + generate new ReplaySubject, or neither. But not the aforementioned combination.

tl;dr resetOnRefCountZero has become a proxy for shareReplay's behavior with { refCount: true }. We have lost the functionality previously provided by publishReplay + refCount.

@jkossis
Copy link

jkossis commented Aug 5, 2022

@benlesh bumping this ^

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

6 participants