Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Stream::throttle #342

yoshuawuyts opened this issue Oct 15, 2019 · 4 comments · Fixed by #356


Copy link

@yoshuawuyts yoshuawuyts commented Oct 15, 2019

We should add a Stream::throttle method as "unstable", similar to RxJs's throttle function.

The goal of this function is to limit the amount of items yielded per timeslice in a stream. This is usually considered a somewhat brute tool, but it does have its use and we should give people access to this.


// emit value every 1 second
let s = stream::interval(Duration::from_secs(1)).enumerate();

// throttle for 2 seconds, emit latest value
// in this case it means we skip two iterations, and let through the third
let s = s.throttle(Duration::from_secs(2));

s.for_each(|(_, n)| dbg!(n)).await;
// => 0 .. 3 .. 6 .. 9



This comment has been minimized.

Copy link

@Wassasin Wassasin commented Oct 16, 2019

Made something I'll finish later that needs some testing. We might also consider implementing debounce.


This comment has been minimized.

Copy link

@binarybana binarybana commented Oct 17, 2019

In my mind the operation described is more of a "subsampling" operation, whereas throttle applies backpressure but would otherwise be non lossy. But I'm just bike shedding


This comment has been minimized.

Copy link
Member Author

@yoshuawuyts yoshuawuyts commented Oct 17, 2019

@Wassasin hehe, yeah I wanted to add debounce to the next batch of issues. But this is definitely something we would want, so feel free to go for it!

@binarybana hmm, you might be right. I do think that dropping events is the right approach for any throttle function, but it may make sense for the event source to determine how it sheds load (e.g. LIFO vs FIFO queueing, bounded vs unbounded, max queue size, etc.) — in that case your assessment is accurate and we should make throttle apply backpressure only.


This comment has been minimized.

Copy link

@mikecaines mikecaines commented Oct 18, 2019

I implemented the stream-throttle crate, which uses a backpressure approach.

It uses an intermediate ThrottlePool type, so that you can throttle multiple streams together. Similar functionality (but not identical) can be achieved with Stream::select().

Perhaps some part of it could form the base for a solution. Just a thought!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
None yet
4 participants
You can’t perform that action at this time.