Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Publish, Replay and Cache Operators #263

Merged

Conversation

benjchristensen
Copy link
Member

(Redo of #260 after merging)

Added basic Publish (#15) and Replay (#71) operators to Observable. I have not done any of the overloads (particularly Replay which has 10+ in .Net.

I also added a new Cache operator as discussed by @johngmyers and I at #209.

Playing with Replay and ConnectableObservable it does not cater well to the typical use case of needing to just de-dupe calls (cache the responses) so this Cache operator can be thought of as an automatic version of Replay. It comes with the same risk as toList if used with infinite or very large sequences as you can not unsubscribe from it.

Cache operator as discussed in ReactiveX#209

Similar to `replay()` except that this auto-subscribes to the source sequence. This comes with the same cautions as `toList` when dealing with infinite or very large sequences.
benjchristensen added a commit that referenced this pull request May 7, 2013
@benjchristensen benjchristensen merged commit 4e21ee7 into ReactiveX:master May 7, 2013
@Override
public void run() {
counter.incrementAndGet();
System.out.println("published observable being executed");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Still here (and twice below). :)

@cloudbees-pull-request-builder

RxJava-pull-requests #134 FAILURE
Looks like there's a problem with this pull request

@mttkay
Copy link
Contributor

mttkay commented May 22, 2013

Hi,

I noticed that calling Observable.replay() doesn't merely wrap the given observable into a ReplaySubject, but also combines this with OperationMulticast.

I have several questions around this.

  1. Why do this? Aren't multicasting and caching/replaying both independent concepts in their own right?

  2. When combined, it raises questions about how to deal with the (different) Subscription instances returned from replay().subscribe and replay().connect. Unfortunately there are no samples or unit/integration tests (that I'm aware of) which actually combine both operators, so looking at the tests didn't help. More precisely:

ConnectableObservable obs = sourceObservable.replay()
Subscription s1 = obs.subscribe(observer);
Subscription s2 = obs.connect();

s1 and s2 are entirely different subscriptions. Looking at the sources, s1 is a RepeatSubjectSubscription, which is put in an internal map and removed upon unsubscription. s1 however is an anonymous inner class instance in OperationMulticast which delegates to the subscription the sourceObservable would return normally.

Now, to which one do I hold on and unsubscribe from when I want to go through a disconnect/reconnect cycle? I actually tried a few combinations and none worked for me. I either ended up triggering the sourceObservable twice upon reconnection (which I don't want) or I end up not getting previous emissions replayed on my observer.

Any insight around this would be great.

@benjchristensen
Copy link
Member Author

Have you read this? => http://northhorizon.net/2011/sharing-in-rx/ It helped me understand it quite a bit.

Here is a snippet from it:

Fortunately, we have Replay() for this. It literally has 15 overloads, which cover just about every permutation of windowing by count and/or time, and supplying an optional scheduler and/or selector. The parameterless call, however, just remembers everything. Replay is just like Publish in the sense that it also forwards a call to Multicast, but this time with a ReplaySubject.

The reason multicast is involved is because it gives a ConnectedObservable associated with a Subject. It supports multiple subscriptions to a single Observable which is exactly what replay and publish both want.

Anything with ConnectedObservable has the underlying source managed by the ConnectedObservable itself. Thus in your example it would be s2 which you then unsubscribe from when you want to shut down the source.

If you just want simple caching behavior then the cache operator may help since it does not require the ConnectedObservable interaction.

The ConnectedObservable behavior is important for use cases when the start/stop of the underlying sequence needs to be controlled independently of when child subscriptions subscribe/unsubscribe. The refcount operator is another example (we haven't yet implemented this one) of controlling this situation. I have found cache to be exactly what I need for most use cases (though we need to add overloads for time, count limits, etc) as I just want everything replayed as long as the reference is not garbage collected - but that is not how ConnectedObservable behaves.

Hope this helps.

@mttkay
Copy link
Contributor

mttkay commented May 23, 2013

Thanks @benjchristensen, as always this was very helpful. I think what got me confused was the terminology. The fact that both replay() and cache() are implemented in terms of ReplaySubject is a little confusing at first (the name replay() clearly implies that items will be re-emitted, while cache() doesn't make that clear). The former wraps the ReplaySubject in a ConnectableObservable using OperationMulticast, while the latter simply forwards the subscription to ReplaySubject.

(Re)reading the article you posted (I did read it before, but that was before ReplaySubject had been released so it had slipped off my mind) and looking at the implementation, would you say this summary is correct:

  • replay: Creates a connectable event stream where upon reconnection, events that had been in flight during the period of disconnection will be replayed onto the observer
  • cache: Creates an event stream where upon subscription, events will be replayed onto the observer
  • publish: Creates a connectable event stream where upon reconnection, new events will be received by the observer, but events that were in flight during the period of disconnection will not be replayed

@benjchristensen
Copy link
Member Author

Sorry I lost this question in my backlog ...

Yes those summaries make sense to me. The cache one could be expanded further though:

Creates an event stream where upon subscription, all previously emitted events will be replayed onto the observer and future events if not completed.

@mttkay
Copy link
Contributor

mttkay commented Jul 16, 2013

👍

rickbw pushed a commit to rickbw/RxJava that referenced this pull request Jan 9, 2014
…cache

Publish, Replay and Cache Operators
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

4 participants