Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement cache operator #209

Closed
wants to merge 1 commit into from
Closed

Implement cache operator #209

wants to merge 1 commit into from

Conversation

johngmyers
Copy link
Contributor

Returns an Observable that repeats the original Observable sequence to all subscribers.
The source Observable is subscribed to at most once.

When looking into integrating Hystrix and RxJava, the need for this operator became apparent. It appears to be of general use, so should be considered for RxJava core.

@cloudbees-pull-request-builder

RxJava-pull-requests #50 FAILURE
Looks like there's a problem with this pull request

@benjchristensen
Copy link
Member

How does this relate to Publish (#15) and Multicast (#65)?

Returns an Observable that repeats the original Observable sequence to all subscribers.
The source Observable is subscribed to at most once.
@cloudbees-pull-request-builder

RxJava-pull-requests #51 SUCCESS
This pull request looks good

@johngmyers
Copy link
Contributor Author

My understanding is that Publish and Multicast don't remember or replay events that the source Observable generated before a given subscriber subscribed.

@benjchristensen
Copy link
Member

I think you're right. How about Replay (#71)?

Reading through your code submission now ...

@benjchristensen
Copy link
Member

I really like how many unit tests there are ... very nice.

I need to think through the implications of this compared with Replay/RefCount/Publish/Multicast etc. In other words, if Rx.Net doesn't have this operator for such a common case it seems that it should already be covered by one of the ones we have not yet implemented.

@johngmyers
Copy link
Contributor Author

Replay could be what this is, modulo Replay's connect() semantics (which I can't quite decipher from the documentation).

@benjchristensen
Copy link
Member

That part is still somewhat confusing to me as well as I haven't yet spent the time to fully grok these operators and what it will take to implement.

Perhaps though a cache operator would be a good cover method for the "typical usecase" that underneath is using replay and takes care of connections etc ... perhaps similar to how RefCount does the connection/disconnect by counting subscriptions?

What I'm trying to accomplish is first achieve the functionality provided by Rx and only then add new functionality if it's not already accounted for.

Rx.Net has years of thought into the API design and functionality and there is value in common knowledge across implementations if we can maintain the same behavior.

@johngmyers
Copy link
Contributor Author

I can see why one would need a connect() method for Publish: one would want to be able to subscribe multiple observers to a Publish before having the Publish subscribe to its source. If the Publish subscribed to its source upon the first subscription it got, its second subscriber wouldn't get the initial set of notifications.

The connect() method makes a lot less sense for Replay, as the second and later subscribers would get the initial set of notifications either way. The only things I can think of are:

  • A desire to have the initial set of notification calls interleaved across the initial set of observers. But I see nothing in the contract preventing an implementation from sending a batch of notifications to one observer before sending any of them to another.
  • A roundabout way of allowing the Replay to be forcibly unsubscribed from its source. But this is not nearly as useful as a way to inform the Replay that it will have no new subscribers, allowing it to unsubscribe from its source when its last current subscriber unsubscribes.
  • Some useful semantic around multiple connections. The Microsoft documentation does not specify what happens if connect() is called a second time, either with or without the first connection being disposed.

@johngmyers
Copy link
Contributor Author

I believe cache is a wrapper around Replay that calls the Replay's connect() upon the first subscription.

@benjchristensen
Copy link
Member

Are you interested in exploring what an implementation of Replay would look like (not necessarily all of the overloads that we eventually want but whatever is necessary for this use case) and perhaps submitting that implementation to accomplish cache functionality while retaining our goal of complying to the Rx.Net API?

@johngmyers
Copy link
Contributor Author

I think so. I would need to know what the behavior of a second call to connect() is supposed to be, both with and without the first call being unsubscribed.

@benjchristensen
Copy link
Member

@johngmyers
Copy link
Contributor Author

  • Connecting without previous disconnect is idempotent.
  • Disconnecting is idempotent and generates no events.
  • Connecting after a disconnect re-subscribes to the source.

@benjchristensen
Copy link
Member

Is this going away because of #218 or does this still make sense?

@johngmyers
Copy link
Contributor Author

A variant of Replay that connects on first subscription could make sense, but might not meet the threshold for adding a new operator. In any case, it wouldn't use the commit in this request.

@johngmyers johngmyers closed this Apr 19, 2013
@benjchristensen
Copy link
Member

I think I figured out when the ConnectableObservable would come into play with Replay in a cache scenario.

  • source emits 10 values
  • subscriberA subscribes and receives the 10 values
  • source emits 5 values (and subscriberA receives them)
  • subscribeB subscribes and receives the 15 values
  • subscriberA unsubscribes

Now what do we do? We can't unsubscribe from source because subscriberB still expects to receive values.

  • source emits 5 new values and subscriberB receives them
  • subscriberB unsubscribes

Do we unsubscribe from source now? That's how the refcount operator works.

  • subscriberC subscribes and receives 20 values
  • subscriberC unsubscribes
  • source keeps emitting values that keep getting cached and it never goes away

In other words, the cache operator we're discussing here would ONLY work if the source sequence correctly completed at some point. If it was an infinite (or very long running) observable it would keep filling up the ReplaySubject in the background and likely become a memory leak.

Thus the ConnectableObservable is a way to unsubscribe.

I still think a cache operator is useful, but it can be misused easily ... but so can toList which has the exact same problem on an infinite sequence.

@johngmyers
Copy link
Contributor Author

I was thinking about adding a method to the Cache operator to notify it when it gets evicted and can rely on having no additional subscribers. That way, it could unsubscribe from its source when all of the existing subscribers unsubscribed.

As gravy, it could even drop references to events that have been sent to all existing subscribers, but that's less important.

@benjchristensen
Copy link
Member

Unsubscribing once all subscribers are unsubscribed is what the refCount operator does and I can't see how to make that work for the use cases where cache is intended.

Here is an example timeline using cache:

  • origin starts
  • onNext(1)
  • A subscribes - receives (1)
  • onNext(2)
  • A receives (2)
  • B subscribes - receives (1, 2)
  • origin onCompleted - A & B receives (onCompleted)
  • C subscribes - receives (1, 2, onCompleted)

Using a refCount system that unsubscribes from the origin would result in re-executing the origin which is exactly what this operator is there to prevent.

Same problem with dropping references to events that have been sent to all existing subscribers, C wouldn't receive anything in that case.

Here is the same timeline with refCount or something else that unsubscribes when all subscribers leave (or the origin completes which is the same):

  • origin starts
  • onNext(1)
  • A subscribes - receives (1)
  • onNext(2)
  • A receives (2)
  • B subscribes - receives (1, 2)
  • origin onCompleted - A & B receives (onCompleted)
  • C subscribes - origin is subscribed to again
  • onNext(1)
  • C receives (1)
  • onNext(2)
  • C receives (2)
  • origin onCompleted - C receives (onCompleted)

Both of the use cases you suggest seem like they are exactly what refCount is intended for and its many overloads for max number of events to store, time windows, etc. refCount works great for hot-observables (infinite sequences) or very long running ones from what I can tell, but it's not very good for a sequence that emits 1 or a handful of responses and completes.

Am I misunderstanding your meaning?

@johngmyers
Copy link
Contributor Author

A Cache would differ from a refCount in that it would stay subscribed until after it is explicitly notified by this new method that it has been evicted and there will be no more subscribers. So in your example timeline C would get (1, 2, Completed) as there was no eviction notification call.

The use case is that the creator of the Cache (e.g. Hystrix) would place the Cache Observable into some sort of data structure (e.g. HystrixRequestCache) from which the Cache Observable can gain new subscribers. When the data structure expires or otherwise evicts the Cache Observable, it would use this new method to notify the Cache Observable that it will receive no more subscribers. At that point, the Cache Observable can unsubscribe from its source when its reference count reaches zero.

This is only worth doing if subscribers are likely to unsubscribe before the sequence completes or if it is worth discarding early history while existing subscribers remain post-eviction.

@benjchristensen
Copy link
Member

I think the key line you said is this:

This is only worth doing if subscribers are likely to unsubscribe before the sequence completes or if it is worth discarding early history while existing subscribers remain post-eviction.

Since Hystrix will always complete (onError or onCompleted) it doesn't really matter (nor is there a reliable hook to know when to cause eviction since even HystrixRequestContext is not a required thing).

Thus, it seems that an object being dereferenced and then garbage collected is sufficient for the Hystrix case, since the Observable would have terminated and have no resources to clean up.

Of note, it was my working on Hystrix to support RxJava that drove me to add cache.

How would your proposed changes modify the method signature of cache in case we want to add the behavior you're describing? Would it need to return a CachedObservable with the eviction hook?

@johngmyers
Copy link
Contributor Author

How would your proposed changes modify the method signature of cache in case we want to add the behavior you're describing? Would it need to return a CachedObservable with the eviction hook?

Pretty much. Also, the code maintaining the data structure would have to control subscriptions to the Cache so that it would be able to know when there are no more subscribers.

I had decided this embellishment probably wasn't worth doing, but it was relevant to your question about sequences that never complete.

@benjchristensen
Copy link
Member

I'm going to stick with cache as a simple thing without the eviction support and if needed we can evolve over time (since returning a CachedObservable instead of Observable would be a non-breaking change ... I think).

I also want to better understand how combinations of things like replay().refCount() may provide similar functionality to what we were discussing before inventing a new type of Observable.

Thanks for the feedback.

rickbw pushed a commit to rickbw/RxJava that referenced this pull request Jan 9, 2014
Cache operator as discussed in ReactiveX#209

Similar to `replay()` except that this auto-subscribes to the source sequence. This comes with the same cautions as `toList` when dealing with infinite or very large sequences.
jarroddrury pushed a commit to jarroddrury/RxJava that referenced this pull request Apr 26, 2024
Cache operator as discussed in ReactiveX/RxJava#209

Similar to `replay()` except that this auto-subscribes to the source sequence. This comes with the same cautions as `toList` when dealing with infinite or very large sequences.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

3 participants