Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Better cache implementation #1541

Closed
christianacca opened this issue Mar 28, 2016 · 13 comments
Closed

Better cache implementation #1541

christianacca opened this issue Mar 28, 2016 · 13 comments

Comments

@christianacca
Copy link

This issue is reopening issue #839.

As it stands the implementation for caching does not satisfy the common use case.

Quoting @Blesh:

The desired behaviour for a cache (in most cases) would be:

  1. run the source Observable once and cache the values
  2. if completed, all subsequent subscribers would get the cached values
  3. if errored, retry would clear the cache, and go to 1.

I have tried coming up with a basic implementation myself that I think satisfies the above:

function safeCache<T>(
  bufferSize: number = Number.POSITIVE_INFINITY,
  windowTime: number = Number.POSITIVE_INFINITY,
  scheduler?: Scheduler) {

  const subject: Rx.ReplaySubject;

  return Rx.Observable.create(observer => {
    let isStarted = subject != null;
    if (!isStarted) {
      subject = new Rx.ReplaySubject(bufferSize, windowTime, scheduler);
    }

    const subscription = subject.subscribe(observer);

    if (!isStarted) {
      this.subscribe(
        val => subject.next(val),
        err => {
          subject.error(err);
          subject = null;
        },
        () => subject.complete()
      );
    }

    return subscription;
  });
}

Note on above:

  • For a working sample see this codepen
  • I named the operator safeCache more to avoid overwriting the existing cache operator rather than to denote behaviour
  • I know this isn't idiomatic rxjs 5 operator code, and so it would need to be refactored
  • There is no mechanism for cache invalidation
    • I had a go at adding an invalidateWhen parameter typed to be an observable but got quickly out of my depth!

Now I don't suppose the above implementation is sufficiently robust, even after refactoring, it's more of a way to stimulate more conversation on getting a better implementation.

@benlesh
Copy link
Member

benlesh commented Mar 28, 2016

Hey, @christianacca! That's some nice looking code. So you know, you can accomplish roughly the same thing with:

source.multicast(() => new ReplaySubject()).refCount()

When multicast gets a factory function, it'll use that factory function to refresh the underlying Subject on subscription. refCount() will cause the underlying Subject to go to an isUnsubscribed state, thus requiring a refresh on the next subscription.

I like that you're thinking about new ways to solve this problem, though. A more complicated problem around cache would be a solid API around complex cache evictions. That's something I'd love to see, but honestly don't have the time to investigate further at the moment.

@christianacca
Copy link
Author

Hi Ben,

Your suggestion - isn't that pretty much describing the implementation of the existing cache operator?

Unless I'm missing something cache doesn't solve the problem of ensuring subsequent subscriptions receive cached values from the source.

That's because as soon as refCount drops to zero the source observable will be resubscribed to on the next subscription losing the values stored in the previous subject.

@benlesh
Copy link
Member

benlesh commented Mar 28, 2016

Oh, nevermind, I see what you're saying. Basically it's that on an error it's going to recycle the Subject, but on complete it will not. Sorry, I misread the code the first time because I was in a hurry to go through all of my Github messages.

@benlesh
Copy link
Member

benlesh commented Mar 28, 2016

It might be worth discussing having the regular cache operator function this way:

Paging @staltz, @robwormald, @trxcllnt, @mattpodwysocki

@christianacca
Copy link
Author

OK cool. Note, that the implementation I posted might not be robust. For example, what would be the implications if the next or error handler registered with the subject throws?

@benlesh
Copy link
Member

benlesh commented Mar 28, 2016

OK cool. Note, that the implementation I posted might not be robust. For example, what would be the implications if the next or error handler registered with the subject throws?

Right. It probably wouldn't meet up to our performance requirements either. But it works as a solid reference for the concept, and I appreciate it.

@staltz
Copy link
Member

staltz commented Mar 29, 2016

Yep, I agree the proposal makes more sense than the current implementation.

@jadbox
Copy link
Contributor

jadbox commented Mar 29, 2016

What are first steps here?

E.g.

  • Use @christianacca as an initial replacement for cache or use a temporary alt cache method
  • Start a PR to fully build out the replacement for cache

This is particularly important for us to evaluate Rx 5 as a replacement for 4.

@matthewwithanm
Copy link

If cache is changed in this way, could we also get a version that's implemented like @Blesh's code above? It seems to me that those are the two most common use cases: "cache this thing" and "cache this thing until the subscriptions have ended."

@Spittal
Copy link

Spittal commented Sep 13, 2016

+1

I used this strategy back in Angular1's $http days with it's cache() and it was very useful.

@Dorus
Copy link

Dorus commented Nov 24, 2016

Looks like this is the discussion on cache. Over the past month, i've been gathering the different behaviours of cache people seem to expect and/or need. This resulted in about 7 flavors of cache as well as some alternative like a cacheSubject or cacheMap function.

  1. Cache operator: Cache all output from an observable. Run eager. Never cancel. Replay all results on subsequent subscriptions.
    Optionally take a max buffer size or time.
    1a. As 1, but lazy.
    1b. As 1a, but unsubscribe source if all subsriptions unsubscribe before complete.
    i. Also flush cache. New subscriptions will cause resubscription to the source.
    ii. Do not flush cache. New subscriptions will receive (partial) data from source and complete.
    1c. Accept 'reset' or 'flush' event that reset the cache.
    i. (If source is not already completed/errored) Current subscriptions will 'complete' and source unsubscribe.
    New subscriptions will cause subscription to source.
    ii. Current subscrtptions will not 'complete' and source stays subscribed.
    New subscriptions will receive everything emited after flush event as normal.
  2. cacheMap: Take a project function like mergeMap. If item has been request before, replay previouse project.
    If item has not been requested before, run project function and cache result.
    Optionally take a max buffer size or time.
  3. cacheObject: Is created with a project function and has a get method. The get method will get a key return an observable that is the result of the project function ran with that key (and also cache it). If the same key has been requested before, return the cached result.
  4. cacheSubject. Cache all data without subscribtions. Once subscribed, emit all cached data and forget the cached data. Start caching again if the number of subscribers reach zero.

@ivan7237d
Copy link

It seems that it's possible to reuse multicast() but still have the feature where the subject is thrown away when there's an error:

const multicastRetry = function<T>(
  this: Rx.Observable<T>,
  subjectFactory: () => Rx.Subject<T>,
): Rx.ConnectableObservable<T> {
  let subject = subjectFactory();
  return this.multicast(
    () => (subject.hasError ? (subject = subjectFactory()) : subject),
  );
};

@benlesh
Copy link
Member

benlesh commented Aug 18, 2020

shareReplay was the answer to this.

@benlesh benlesh closed this as completed Aug 18, 2020
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

8 participants