Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

RxSwift Backpressure #1376

Closed
eliasbagley opened this issue Aug 10, 2017 · 14 comments
Closed

RxSwift Backpressure #1376

eliasbagley opened this issue Aug 10, 2017 · 14 comments

Comments

@eliasbagley
Copy link

Hey -
I'm coming over from RxJava land, and the biggest notable difference is the lack of any backpressure mechanisms. I haven't been able to find any real discussions around this online, so I figured I'd start a dialogue around this here.

Is backpressure support a planned addition to RxSwift? If not, why?

In the meantime, what's the most idiomatic way to deal with a fast producer and slow consumer? What happens, exactly in this case? Do the emitted items get buffered internally (like RxJava), or dropped?

@sergdort
Copy link
Contributor

Hi, @eliasbagley.

To be hones I've never needed this in RxSwift, as to me it's hard to imagine where in iOS, macOs development you wound need to handle this kind of problem.

But for the moment I would probably use throttle operator if I had it

@sergdort
Copy link
Contributor

I think @kzaher I best to answer whether we going to support it :)

@freak4pc
Copy link
Member

@eliasbagley Do you have a specific use-case where you need that specific handling ?

Backpressure is a general name for the strategy of reducing fast producer related situations. throttle, debounce, window and buffer are all part of that family of operators: http://reactivex.io/documentation/operators/backpressure.html

If you're referring to onBackpressureBuffer that seems specific to RxJava/Scala/Groovy and not a "ReactiveX" thing.

I'm still interested to learn what is the specific situation where you find yourself needing this. A code example would be awesome 💯

@eliasbagley
Copy link
Author

I don't have a specific use case case I need it for right now, just curious and trying to wrap my head around the differences between RxSwift and RxJava.

@freak4pc
Copy link
Member

@eliasbagley Most things are the same, but eventually every implementation of ReactiveX likes to do some things differently and the maintainers can choose to make some things a bit different, like in RxJava's case.

I think for now we can close this issue, but if you have any specific case where that sort of method could be useful, we could consider the effort needed into adding it.

Also feel free to join our Slack channel for more specific questions !

@sidmani
Copy link

sidmani commented Sep 2, 2017

@freak4pc I have a situation where backpressure handling is necessary- I'm handling inserts/moves/deletes in a tableview (AsyncDisplayKit, so can't use rxcocoa bindings), and there is an animation sequence where I need to stop inserting/moving/deleting and wait until the animation finishes. After that, I want to process whatever inserts/moves/deletes were queued while the animation was happening. I think RxSwiftExt's pauseBuffered would solve the issue, but it'd be great if the main library had support for backpressure.

@freak4pc
Copy link
Member

freak4pc commented Sep 2, 2017

Thanks for the example ! @SIdnani
Can you share a RxJava or similar backpressure example solving what you just described ?

@sidmani
Copy link

sidmani commented Sep 2, 2017

@freak4pc I'm not too familiar with RxJava, but the docs seem to indicate that onBackpressureBuffer (RxJava 1.0) would solve this. RxJS's methods, which RxSwiftExt/pausable is based on (as far as I know) seem better suited though.

@freak4pc
Copy link
Member

freak4pc commented Sep 2, 2017

@sidmani That doesn't sound like backpressure at all tho. You want to hold onto emission for some timeline when something else happens (e.g. animation which could be a second observable). That definitely does sound like pausableBuffered which is part of RxSwiftExt, actually.

@sidmani
Copy link

sidmani commented Sep 2, 2017

Well, pausableBuffered is categorized under backpressure in RxJS, and technically this situation does deal with an observable that emits events faster than they're handled.

@freak4pc
Copy link
Member

freak4pc commented Sep 2, 2017

I'll repeat this again :)
So are Throttle, Debounce, Window, Buffer and Sample.

pausableBuffered is only a RxJS operator. It's not used across ReactiveX projects.

@omatrot
Copy link

omatrot commented Mar 6, 2019

Hello, I would like to present my use case. I'm dealing with a bluetooth IoT that send an array of 20 byte at 200Hz. On Android, I have been able to solve the bakpressure problem with an explicit ring buffer of 500 items (onBackpressureBuffer(500)), which holds data for almost 2.5 seconds.

This is the code in Java:

                ...
                .flatMap(new Func1<Observable<byte[]>, Observable<byte[]>>() {
                    @Override
                    public Observable<byte[]> call(Observable<byte[]> observable) {
                        return observable;
                    }
                })
                .doOnUnsubscribe(new Action0() {
                    @Override
                    public void call() {
                        promise.resolve(null);
                        transactions.removeSubscription(transactionId);
                    }
                })
                .onBackpressureBuffer(500) <---- just before the call to subscribe
                .subscribe(new Observer<byte[]>() {

I'll need to do the same for iOS.
Could someone point me to some guidance on how to achieve the same behavior?

Thanks in advance.

@shengyang998
Copy link

shengyang998 commented May 24, 2019

@omatrot I don't quite understand your problem, but it seems you want to hold that 500 items for 2.5 seconds before sending them. If so, you may want to check out the buffer operator

@mnimants
Copy link

@freak4pc I have a situation where backpressure handling is necessary- I'm handling inserts/moves/deletes in a tableview (AsyncDisplayKit, so can't use rxcocoa bindings), and there is an animation sequence where I need to stop inserting/moving/deleting and wait until the animation finishes. After that, I want to process whatever inserts/moves/deletes were queued while the animation was happening. I think RxSwiftExt's pauseBuffered would solve the issue, but it'd be great if the main library had support for backpressure.

Hey! Did you ever find solution to this? I'm struggling with the same thing using RxASDataSources

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

7 participants