Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Operation: throttle #258

Merged
merged 5 commits into from
Sep 11, 2013
Merged

Operation: throttle #258

merged 5 commits into from
Sep 11, 2013

Conversation

michaeldejong
Copy link
Contributor

The throttle operation is a filtering operator which is meant to combat receiving bursts of events in short periods of time. The original Rx .Net implementation of this operator works as followed:

Upon receiving an event A, it waits a certain specified amount of time Z before propagating it to the Observer. If another event B is received within this period of time, the propagation of A is cancelled, and B will be propagated in stead as soon as a Z amount of time has passed since B was received.

As @benjchristensen already mentioned, the use of threads in this way is quite inefficient. To this end I proposed two additional variations on this scheme of throttling here. The question now is which schemes do we want (to support)?

Any thoughts on this is highly appreciated!

@jmhofer
Copy link
Contributor

jmhofer commented May 5, 2013

Awesome.

(Btw I'm not entirely convinced about implementation and usage of Notification<T> - what if T is Exception? - but that's a completely different story)

@michaeldejong
Copy link
Contributor Author

Yes I had some doubts about that too. I think the easiest way to fix this is to make three distinct ThrottleAction classes for onNext, onError and onCompleted. That way we could avoid using Notification all together.

@cloudbees-pull-request-builder

RxJava-pull-requests #123 SUCCESS
This pull request looks good

@cloudbees-pull-request-builder

RxJava-pull-requests #124 FAILURE
Looks like there's a problem with this pull request

@michaeldejong
Copy link
Contributor Author

Hmm that's strange. Locally I don't have any build failures, but when looking at the console output of the failed CloudBees build, I can see that the rx.operators.OperatorTester$UnitTest.testConcurrentNextNext failed. Unless I'm not aware of it, this doesn't seem to be related to the throttle operator. Could this test case be faulty?

@cloudbees-pull-request-builder

RxJava-pull-requests #125 FAILURE
Looks like there's a problem with this pull request

@jmhofer
Copy link
Contributor

jmhofer commented May 6, 2013

Afaik, a few of the tests fail sporadically (quite often, though). This is one of them.

@benjchristensen
Copy link
Member

Yes CloudBees (the CI server) has odd thread scheduling it seems (very slow machines?). We have unit tests that rely on Thread.sleep timing instead of using latches and barriers to deterministically control when threads are released, waited upon, etc.

if (!timerHasExpired()) {
subscription.get().unsubscribe();
}
subscription.set(scheduler.schedule(action, timeout, unit));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This always scheduled in the future with timeout. Shouldn't it be the time until next timeout?

Let's say timeout is 1000ms and I get an onNext call every 50ms. This code seems to schedule each action to execute 1000ms in the future even if it comes in 950ms since the last onNext was permitted through.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah yes, I hadn't thought of it that way. Excellent catch! I'll try to fix this.

@benjchristensen
Copy link
Member

In general I also question whether we even need a Scheduler for this (other than getting now()). There is extreme overhead in scheduling every single event especially when most will be immediately cancelled.

Instead we just remember the last value and time since we last sent it on and each time we pass lastSent+timeout we emit the next event we receive (onCompleted/onError we emit immediately regardless of time) and skip events until timeout has passed again.

That way we skip all the scheduling overhead.

@michaeldejong
Copy link
Contributor Author

Yes but what happens that when events are few and far apart? Without a Scheduler (or some sort of timer) wouldn't it take extremely long for events to propagate?

@benjchristensen
Copy link
Member

I envision a scenario like this:

Timeout = 1000ms
onNext at 50ms (delivered immediately)
onNext at 750ms (skipped)
onNext at 3000ms (delivered immediately)
onNext at 3999ms (skipped)
onNext at 10000ms (delivered immediately)
onNext at 45600ms (delivered immediately)
onCompleted at 45601ms (delivered immediately)

The only time something is skipped is when it's within 1000ms of the previously sent value.

So the question is ... are we trying to delivery the latest value within each time window, or just deliver a single value per each time window?

If we choose latest then the above would now look like this:

Timeout = 1000ms
onNext at 50ms (skipped)
onNext at 750ms (delivered after 250ms delay)
onNext at 3000ms (skipped)
onNext at 3999ms (delivered after 1ms delay)
onNext at 10000ms (delivered after 1000ms delay)
onNext at 45600ms (delivered after 1000ms delay)
onCompleted at 45601ms (delivered immediately)

I don't know that this behavior is necessary for functionality (what we're doing is by definition a race condition anyways) and it means that each value waits for timeout to pass before delivery which I think is worse. Using a timer for every onNext also means we get all the non-determinism of the thread-scheduler (which on a busy system can be all over the map .... 10s or 100s of milliseconds of drift easily ... timers are not very accurate).

Even if "latest in window" is the required behavior I'd like to look at making it less costly and just changing the value to be delivered in each scheduled onNext via a compareAndSet flip on the "currently scheduled ThrottledOnNext" rather than cancelling and re-scheduling each time. We can also do it without the use of synchronized and instead just a CAS operation.

Before those implementation details though, is there strong documentation somewhere dictating whether throttle give the first or last value of a time window?

@michaeldejong
Copy link
Contributor Author

The links to MSDN in the issue state the following: "Ignores the values from an observable sequence which are followed by another value before due time with the specified source and dueTime.".

What I'm reading here is that any value which is proceeded (so after) with another value within a specified period of time will not be published. If the value is proceeded after this period of time, it is published.

Although both throttle events, they each do it in another way. I'm all for better performance, but I think this breaks a bit too far away from the original specification. I believe I'm having a hackathon with Erik Meijer next week, so I can try to pick his brain on this subject too. But I'd like to hear more thoughts on this.

@michaeldejong
Copy link
Contributor Author

I'll hold off on making changes to this pull request until we have a better view on what we want here.

@benjchristensen
Copy link
Member

Feedback from Erik would be great, particularly with the use cases in this discussion shared with him and the performance impact of the decision.

@michaeldejong
Copy link
Contributor Author

So I've been talking with Erik about throttling. To make it easier to discuss this operator he suggested making some marble diagrams. I've come up with the following:

How Rx (.Net) throttles events
image

But I've also been looking for other throttling schemes:

Throttling alternative 1
image

Throttling alternative 2
image

To me alternative 1 makes the least sense. This publishes an old event in favor of a new event when an event is received. And this alternative suffers when the source Observable doesn't publish any events for a while. However I do believe there could be a valid use case for it.

Alternative 2 makes a more sense to me. It's very similar to the sample operator. This publishes the received event as long as it has not published anything within the timeout period. However this operator will not propagate the last event (which might be important) if the timeout condition was not met.

I think it's easy to come up with different use cases for each of these three throttling operators (which make sense). However, that said, I don't think it would be wise to break consistency with the .Net implementation of Rx. At the very least I think we should offer the .Net version of throttle. Alternatively we could offer multiple implementations of throttle using an additional enum parameter. This somewhat breaks with the .Net implementation (different method signature), but allows users to pick their own throttling scheme...

source.throttle(Scheme.ACTIVELY_DELAYED, 100, TimeUnit.MILLISECONDS);
source.throttle(Scheme.PASSIVELY_DELAYED, 100, TimeUnit.MILLISECONDS);
source.throttle(Scheme.MOST_RECENT, 100, TimeUnit.MILLISECONDS);

I'd love to hear some thoughts on this!

@benjchristensen
Copy link
Member

I like your thoughts on this. I need to spend more time on this than I have right now so will delay a little more.

We definitely need different implementations, the trick is the naming, as throttle as named in Rx.Net is not at all what anyone I've asked doing server-side Java expects it to do, but we want to also match Rx.Net as much as possible. Your proposed solution is an interesting one.

@benjchristensen
Copy link
Member

I appreciate some other input on the design and naming choices for this before proceeding. Anyone else able to get involved?

@codefromthecrypt
Copy link

since this thread is a bit long, it would be easier for folks to jump in and help review if the current design and naming choices were summarized in the PR description.

@michaeldejong
Copy link
Contributor Author

I've updated the PR description with the current state. We're basically trying to figure out if we need to support additional/different schemes of throttling Observable objects.

@benjchristensen
Copy link
Member

We could use some help nailing down the naming convention for different throttle schemes.

@mairbek @jmhofer @mttkay would you mind weighing in please?

The naming convention being proposed is:

source.throttle(Scheme.ACTIVELY_DELAYED, 100, TimeUnit.MILLISECONDS);
source.throttle(Scheme.PASSIVELY_DELAYED, 100, TimeUnit.MILLISECONDS);
source.throttle(Scheme.MOST_RECENT, 100, TimeUnit.MILLISECONDS);

This addresses the point that 'throttle' is ambiguous and there are different approaches that can be applied with very different behavior.

These could be given names instead like throttleFirst, throttleLast, etc but I have yet to determine proper names to handle these.

I also think an alias for debounce makes sense to represent the throttle solution that UIs use (that is the ACTIVELY_DELAYED scheme I believe) and I've seen debounce become normal for describing this in the Javascript community if I understand it correctly.

@jmhofer
Copy link
Contributor

jmhofer commented Jul 6, 2013

I find these operations and names difficult. That 2nd alternative doesn't look useful enough to me to include it at all. But maybe that's just me. Any already established names for these schemes from people already using them would be great to hear. Personally, I'd call the 1st alternative throttleWithDelay (or delayedThrottle) and throttle for Rx compatibility. The 2nd I'd probably call throttleToMostRecent. I don't know about JS, but I agree that it's a good idea to take their naming schemes into account, too.

@mttkay
Copy link
Contributor

mttkay commented Jul 6, 2013

First, I'd just like to add that the .NET version as illustrated in the first marble diagram (if I understand the marble diagram correctly, I haven't actually worked with Rx.NET before) is extremely useful when dealing with user input events. It's a pattern I've used plenty of times before whenever an asynchronous operation needs to be triggered based on user input that changes rapidly. A good example is a suggest-as-you-type input field: every key stroke triggers an event scheduled for execution, but may become obsolete with the next keystroke. By cancelling out events that are superseded by subsequent keystrokes, and at the same time rescheduling a new one with the same delay t, as soon as there are no more keystrokes arriving in t time, the next event emitted will reflect the most recent state from the perspective of the user.

Which brings me to the naming. In a way, I see variant 0 (the .NET implementation) to carry "most recent" semantics, which sort of conflicts with the naming of alternative 2. If I understand correctly, variant 2 works on an absolute time scale rather than a relative one, so perhaps that should be reflected in the name? I think throttle (or say its possible variants) carries some similarities to Java's Timer[0] class and variants of the schedule method, since it resembles similar notions of time and delays. In Timer, the schedule method schedules on a relative time scale: if the task is scheduled with a delay t, but is further delayed for some reason (e.g. garbage collection), the next will still be executed with the same delay t. With scheduleAtFixedRate, time is absolute, so if an event/execution is delayed further than it should be, the the next scheduled execution (based on t) will account for that by bringing as close as possible to when it should have happened.

Replacing the potential source of unexpected delays in TimerTasks with a source Observable that may emit events at random points in time, the problems bear a lot of similarities.

I have to say I don't really understand alternative 1 or what it accomplishes. So I guess my vote would be:

source.throttle(Scheme.RELATIVE_DELAY, 100, TimeUnit.MILLISECONDS);
source.throttle(Scheme.?, 100, TimeUnit.MILLISECONDS); // how useful is this anyway?
source.throttle(Scheme.ABSOLUTE_DELAY, 100, TimeUnit.MILLISECONDS);

Hope I'm not adding further to the confusion since this seems to go in a different direction in terms of naming and might not align well with alternative 1 either. Take it for what it's worth :-)

[0] http://docs.oracle.com/javase/6/docs/api/java/util/Timer.html

@benjchristensen
Copy link
Member

I think the use of enums for giving the schema is awkward considering how different that is compared with the patterns everywhere else in the library, though I understand how it could be useful.

I prefer the throttleLatest, throttleRecent style naming convention.

I agree with @jmhofer and think alternative 1 is likely not worth pursuing at this time unless someone can give a real reason for its existence.

I suggest the following:

  • Rx.Net 'debounce' style behavior as throttleLast or throttleWithDelay and have an alias for debounce which as far as I can tell is the same thing from the JS community.
  • rate limiting (alternative 2) behavior as throttleFirst or throttleToMostRecent.

Since this is very subjective and arbitrary, can folks weigh in one last time with voting on the following options?

  1. Rx.Net Behavior

This allows the last value through for a defined time window, delaying the delivery until the window is completed to ensure it is the last and provide a regular rhythm of delivery. This uses a scheduler.

a) throttleLast
b) throttleWithDelay
c) throttleLastWithDelay

  1. Rate Limiting without Delay

This will allow the first value through for the defined time window without any delay. This does not require a scheduler.

a) throttleFirst
b) throttleWithoutDelay
c) throttleToMostRecent

@jmhofer
Copy link
Contributor

jmhofer commented Jul 22, 2013

I'd vote for 1b and 2c.

This was referenced Sep 10, 2013
@benjchristensen benjchristensen merged commit 2519ef8 into ReactiveX:master Sep 11, 2013
jihoonson pushed a commit to jihoonson/RxJava that referenced this pull request Mar 6, 2020
* Add response predicate to retry sync and async for enhancement ReactiveX#259

*  ReactiveX#258 add the support to the webflux types in the circuit breaker annotation AOP

*  ReactiveX#258 review comments

*  ReactiveX#258 review comments
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

6 participants