Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

API to clear MultiSourceProducer's buffer/cache when using CombineLatest #3064

Closed
tongc opened this issue Jul 4, 2015 · 5 comments
Closed
Labels

Comments

@tongc
Copy link

tongc commented Jul 4, 2015

Based on the nice diagram from official documentation, combineLatest http://reactivex.io/documentation/operators/combinelatest.html

I wonder if there is a way, say when I received "C" from the second Observable, I would like to clear the states inside both first and second Observables. So that when "D" arrives from second Observable, it will no longer combine with "2" from the first Observable but wait until "3" arrives.

This is useful in the case like, there is no data after the business day is finished (say 9 to 5, so 16 hours quiet period). So without re-subscribing and re-initializing, it would be nice to clear the state so that the data coming from next business day will not be combined with old data.

@davidmoten
Copy link
Collaborator

If you retain timestamps in both streams then you could filter out results from combineLatest that have different dates.

@tongc
Copy link
Author

tongc commented Jul 5, 2015

Thanks Dave,
Yes. I could do that. But a new field will be introduced and this will make the once-a-day operation becomes a burden of when every message is received.
Also this introduced a logic that may omit some messages that would have been emitted (in my case the only occasion that the message might be omitted is that if the first Observable never emitted any message when the day finishes but not if the timestamp is somehow wrong during the day).
Wouldn't that make sense to have a cache reset? So that at any point we can just get rid of the stale states of the system and start from fresh.

@davidmoten
Copy link
Collaborator

But a new field will be introduced

Doesn't introduce a new field in your class necessarily because you just wrap your object with Timestamped by calling observable.timestamp().

a burden of when every message is received

Sure, that's your call to say if it's significant, benchmarks might be handy to do

in my case the only occasion that the message might be omitted is that if the first Observable never emitted any message when the day finishes but not if the timestamp is somehow wrong during the day).

Sorry I don't follow this bit.

You could also get a reset by using retry. Get the stream to complete at the end of the day and chain it with .retry().

@abersnaze
Copy link
Contributor

Maybe apply one of the window operators before this logic to break up the Obersvable into day length chunks.

@headinthebox
Copy link
Contributor

If you think you need (mutable) state, typically you are doing it wrong.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

5 participants