Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Operator for buffer until stream #590

Closed
joeljeske opened this issue Mar 31, 2016 · 12 comments
Closed

Operator for buffer until stream #590

joeljeske opened this issue Mar 31, 2016 · 12 comments

Comments

@joeljeske
Copy link

Is there a buffer-like operator that buffers until another stream emits an item? I know other implementations support a buffer-until-stream operator but was unable to find such an operator in RxSwift. If there is not an operator like this, can it be added?

@kzaher
Copy link
Member

kzaher commented Apr 2, 2016

Hi @joeljeske ,

I think that we'll need to figure out how to upscale those operators. I don't think it would be a good idea to increase number of operators in base implementation because Swift generates a lot bigger frameworks then other implementations do.

Can you explain me some context why do you need it so I can get a feeling how common is that use case?

@joeljeske
Copy link
Author

Sure that makes sense.

I am wanting to buffer a stream of items until a user clicks a button. Essentially, it is not a time-based buffer or a item-count-based buffer but an event-based-buffer. The operator would emit a collection of the items containing 0 or more previously emitted items.

I am not partial to having a separate buffer implementation or an overloaded buffer implementation as long as I could ignore the time based boundary and the count based boundary.

Here are the docs for the RxJava buffer(rx.Observable) that I am referring to.

@kzaher
Copy link
Member

kzaher commented Apr 2, 2016

Hi @joeljeske,

I usually try to fit all of these problems to same model:

  • There is some state -> scan operator
  • There is some set of commands that control that state -> merge operator

It's usually handy to define something like a convenience scanAndMaybeEmit operator:

// if emit != nil, then emits an item
public func scanAndMaybeEmit<State, Emit>(state: State, accumulator: (State, E) throws -> (State, Emit?)) -> Observable<Emit> {
        return self.scan((state, nil)) { stateEmitPair, element in
                return try accumulator(stateEmitPair.0, element)
            }
            .flatMap { stateEmitPair in
                return stateEmitPair.1.map(Observable.just) ?? Observable.empty()
            }
    }

Then you have two commands that you merge that controls that scanAndMaybeEmit operator.

enum Commands {
     case .NewElementAdded(element) // map from what ever stream you are using
     case .FlushThem // map from button click
}

That should give you some ideas :)

I always mentally go to this "redux" like model because it's applicable in enormous number of cases, and there is absolutely no way you will run into performance issues if you do this on UI events, etc :)

@kzaher
Copy link
Member

kzaher commented Apr 5, 2016

We are probably going to start a new project in RxSwiftCommunity with more additional operators, so we might push this there.

@kzaher
Copy link
Member

kzaher commented Apr 26, 2016

Yeah, I think we currently don't plan to implement that operator in the core library, but maybe we'll be able to include it with time in some other library inside RxSwiftCommunity.

We can probably close this for now.

@kzaher kzaher closed this as completed Apr 26, 2016
@neven-s
Copy link

neven-s commented May 11, 2016

you could use this implementation for your needs
https://gist.github.com/neven-s/f865bd3c1fb0212862f4c079511eb6a1

@colinmorelli
Copy link

colinmorelli commented Aug 28, 2016

I have a use case for a similar operator and was hoping for a pointer on a solution @kzaher . I have posted a more detailed question here: http://stackoverflow.com/questions/39184148/buffer-observable-until-another-observable-completes

The gist of it is basically: I have an Observable<Event>. Upon receiving a certain type of event, I want to buffer the original Observable<Event> while another Observable<SynchronizationStatus> runs. When that sync completes, I want to flush the buffer of Observable<Event> through, and let it continue emitting items as they come in. Hope that makes sense.

Edit:
After looking through other Rx implementations, I believe what I'm looking for is closest to buffer(Observable<? extends TOpening> bufferOpenings, Func1<? super TOpening, ? extends Observable<? extends TClosing>> bufferClosingSelector) in Java. The bufferOpenings observable is based on whenever a certain event is received. The bufferClosed observable is the synchronization observable. I'm trying to figure out how to get this behavior in Swift.

@mikumi
Copy link

mikumi commented Oct 8, 2016

Hi guys, I am also looking for a buffer with a flexible closing condition and posted a simple implementation on stackoverflow (a little different from what @colinmorelli was asking):
http://stackoverflow.com/questions/39928928/implementing-a-debounced-buffer-with-rxswift-is-this-correct

If anybody is interested, I would love to hear some feedback, especially if there are any leaks in my implementation or if I am violating any Rx contracts. Thanks :)

@diegoperini
Copy link

@mikumi 's answer on SO seams to solve the case but I am still curious about the potential leaks it may cause on complex chains.

There are also other buffer() overloads in RxJava which allows boundary selectors, opening selectors, closing selectors. Currently one has to port the necessary implementation by herself if required. I'm struggling with understanding buffer(openingSelector, closingSelectorProvider) implementation in RxJava in order to port it for my use case.

@diegoperini
Copy link

Here is the one I mentioned earlier. Requires review: http://stackoverflow.com/a/43410595/1853938

@danielt1263
Copy link
Collaborator

Here's one based on RxJS's buffer operator. https://gist.github.com/dtartaglia/15416d95313255e12f009e3b1a9f79b8

@jondwillis
Copy link
Contributor

jondwillis commented Sep 4, 2017

From RxSwiftCommunity/RxSwiftExt:

https://github.com/RxSwiftCommunity/RxSwiftExt/blob/master/Source/RxSwift/pausableBuffered.swift

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

8 participants