Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Operation: buffer #281

Merged
merged 8 commits into from
Jul 22, 2013
Merged

Operation: buffer #281

merged 8 commits into from
Jul 22, 2013

Conversation

michaeldejong
Copy link
Contributor

The buffer operation is an operation which allows you to buffer incoming events into one or more buffers. Depending on their purpose, these buffers can be emitted to an Observer when needed. In essence this means that events are collected and propagated to the Observer in batches. The original issue, which this pull request addresses, mentions a total of 10 variations on this operator. This pull request contains code which supports all of them.

buffer

The 10 variations which exist in Rx .Net, can be subdivided into two categories: single and multi buffers. With single buffers, only one buffer at any time is actively collecting incoming events, whereas with multi buffers, multiple buffers are actively collecting incoming events. In the first case the Observer will receive every original event only once in one buffer, whereas in the latter case, the Observer will receive every original event zero or more times.

Single buffers

buffer(bufferClosingSelector)

The bufferClosingSelector parameter is a Func0<Observable<BufferClosing>>. It uses to Func0 object to construct an Observable which produces a BufferClosing object. Once this object has been produced by the Observable the currently active buffer will be closed and emitted to the Observer. At the same time a new buffer will be created which will start recording incoming events.

buffer(count)

This operator closes and emits the current buffer after counting a certain amount of received events. At the same time it will create a new buffer which will start recording incoming events. One example would be buffer(2), which with the following input: [0, 1, 2, 3, 4, 5] will output the following buffers: [0, 1], [2, 3], [4, 5].

buffer(timespan)

This operator closes and emits the current buffer after a certain amount of time has elapsed. At the same time it will create a new buffer which will start recording incoming events.

buffer(timespan, scheduler)

Same as previous operator, but now with a custom scheduler.

buffer(timespan, count)

This operator closes and emits the current buffer after counting a certain amount of received events or after a certain amount of time has elapsed. At the same time it will create a new buffer which will start recording incoming events.

buffer(timespan, count, scheduler)

Same as previous operator, but now with a custom scheduler.

Multiple buffers

buffer(count, skip)

This operator will create a new buffer after it has received 'skip' amount of events. Each buffer will be closed once it has reached a capacity of '[count'. One example would be buffer(3, 1), which with the following input: 0, 1, 2, 3, 4, 5], will output the following buffers: [0, 1, 2], [1, 2, 3], [2, 3, 4], [3, 4, 5].

buffer(bufferOpenings, bufferClosingSelector)

This operator takes two parameters: an Observable<BufferOpening> and a Func1<BufferOpening, Observable<BufferClosing>>. The first parameter determines when buffers are created. The second parameter determines when the buffers are closed. Every time a BufferOpening object is received from the Observable a new buffer is created. The received BufferOpening object is fed into the second parameter which yields an Observable<BufferClosing> object. When this Observable produces a BufferClosing object, the associated buffer is closed and emitted.

buffer(timespan, timeshift)

This operator is very similar to buffer(count, skip), but in stead of counting events, it's based on time. The timeshift period defines how often a new buffer will be created. The timespan period defines the period between buffer construction and buffer emission.

buffer(timespan, timeshift, scheduler)

Same as previous operator, but now with a custom scheduler.

As always, feedback is welcome!

@cloudbees-pull-request-builder

RxJava-pull-requests #153 SUCCESS
This pull request looks good

@benjchristensen
Copy link
Member

Thank you @michaeldejong I will review and either merge or provide feedback.

@benjchristensen
Copy link
Member

This is a great piece of code @michaeldejong ... I don't fully understand all the nuances yet but I'm going to start posting some questions inline on the code. The code looks good and is very well documented, and the unit tests are awesome, very key to me understanding the code.

creator.stop();
buffers.emitAllBuffers();
observer.onError(e);
e.printStackTrace();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks like it was accidentally left behind after doing some debugging.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, you're right. I'll remove this...

@cloudbees-pull-request-builder

RxJava-pull-requests #166 FAILURE
Looks like there's a problem with this pull request

@benjchristensen
Copy link
Member

@michaeldejong Sorry for not responding to this yet ... I will but have become very busy on some other things for a bit.

@benjchristensen
Copy link
Member

Anyone else interested and able to provide a code review?

@codefromthecrypt
Copy link

@michaeldejong at nearly 1500 concise lines of code and tests, this PR could be daunting for someone to start reviewing. I'd estimate review at somewhere around an hour minimum.

Why don't you draw in folks in by adding usage examples into your PR description as a first step. Ask someone who uses javarx if they don't mind taking a pass at review? I can make time next week sometime, too.

@cloudbees-pull-request-builder

RxJava-pull-requests #169 FAILURE
Looks like there's a problem with this pull request

@michaeldejong
Copy link
Contributor Author

I've updated the PR description with an explanation of what each operator variation does, and how it does it. I hope this helps in understanding the concepts behind this operator.

@codefromthecrypt
Copy link

FYI: might want to bump this commit as I think the above failure might be transient

@b
Copy link

b commented Jul 10, 2013

@michaeldejong Can you respond to Ben's question about the test returning 3 empty events?

@cloudbees-pull-request-builder

RxJava-pull-requests #179 FAILURE
Looks like there's a problem with this pull request

@cloudbees-pull-request-builder

RxJava-pull-requests #180 SUCCESS
This pull request looks good

benjchristensen added a commit that referenced this pull request Jul 22, 2013
@benjchristensen benjchristensen merged commit cdf8dcd into ReactiveX:master Jul 22, 2013
@@ -635,6 +638,222 @@ public Subscription call(Observer<T> observer) {
}

/**
<<<<<<< HEAD

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks like a leftover from a failed merge.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed here: f651b7a

rickbw pushed a commit to rickbw/RxJava that referenced this pull request Jan 9, 2014
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

6 participants