diff --git a/src/operator/buffer.ts b/src/operator/buffer.ts index ca43bec9f2..42e6da93e5 100644 --- a/src/operator/buffer.ts +++ b/src/operator/buffer.ts @@ -1,10 +1,6 @@ -import { Operator } from '../Operator'; -import { Subscriber } from '../Subscriber'; -import { Observable } from '../Observable'; -import { OuterSubscriber } from '../OuterSubscriber'; -import { InnerSubscriber } from '../InnerSubscriber'; -import { subscribeToResult } from '../util/subscribeToResult'; +import { Observable } from '../Observable'; +import { buffer as higherOrder } from '../operators'; /** * Buffers the source Observable values until `closingNotifier` emits. @@ -39,41 +35,5 @@ import { subscribeToResult } from '../util/subscribeToResult'; * @owner Observable */ export function buffer(this: Observable, closingNotifier: Observable): Observable { - return this.lift(new BufferOperator(closingNotifier)); -} - -class BufferOperator implements Operator { - - constructor(private closingNotifier: Observable) { - } - - call(subscriber: Subscriber, source: any): any { - return source.subscribe(new BufferSubscriber(subscriber, this.closingNotifier)); - } -} - -/** - * We need this JSDoc comment for affecting ESDoc. - * @ignore - * @extends {Ignored} - */ -class BufferSubscriber extends OuterSubscriber { - private buffer: T[] = []; - - constructor(destination: Subscriber, closingNotifier: Observable) { - super(destination); - this.add(subscribeToResult(this, closingNotifier)); - } - - protected _next(value: T) { - this.buffer.push(value); - } - - notifyNext(outerValue: T, innerValue: any, - outerIndex: number, innerIndex: number, - innerSub: InnerSubscriber): void { - const buffer = this.buffer; - this.buffer = []; - this.destination.next(buffer); - } + return higherOrder(closingNotifier)(this); } diff --git a/src/operators/buffer.ts b/src/operators/buffer.ts new file mode 100644 index 0000000000..255894d390 --- /dev/null +++ b/src/operators/buffer.ts @@ -0,0 +1,81 @@ +import { Operator } from '../Operator'; +import { Subscriber } from '../Subscriber'; +import { Observable } from '../Observable'; +import { OuterSubscriber } from '../OuterSubscriber'; +import { InnerSubscriber } from '../InnerSubscriber'; +import { subscribeToResult } from '../util/subscribeToResult'; +import { OperatorFunction } from '../interfaces'; + +/** + * Buffers the source Observable values until `closingNotifier` emits. + * + * Collects values from the past as an array, and emits + * that array only when another Observable emits. + * + * + * + * Buffers the incoming Observable values until the given `closingNotifier` + * Observable emits a value, at which point it emits the buffer on the output + * Observable and starts a new buffer internally, awaiting the next time + * `closingNotifier` emits. + * + * @example On every click, emit array of most recent interval events + * var clicks = Rx.Observable.fromEvent(document, 'click'); + * var interval = Rx.Observable.interval(1000); + * var buffered = interval.buffer(clicks); + * buffered.subscribe(x => console.log(x)); + * + * @see {@link bufferCount} + * @see {@link bufferTime} + * @see {@link bufferToggle} + * @see {@link bufferWhen} + * @see {@link window} + * + * @param {Observable} closingNotifier An Observable that signals the + * buffer to be emitted on the output Observable. + * @return {Observable} An Observable of buffers, which are arrays of + * values. + * @method buffer + * @owner Observable + */ +export function buffer(closingNotifier: Observable): OperatorFunction { + return function bufferOperatorFunction(source: Observable) { + return source.lift(new BufferOperator(closingNotifier)); + }; +} + +class BufferOperator implements Operator { + + constructor(private closingNotifier: Observable) { + } + + call(subscriber: Subscriber, source: any): any { + return source.subscribe(new BufferSubscriber(subscriber, this.closingNotifier)); + } +} + +/** + * We need this JSDoc comment for affecting ESDoc. + * @ignore + * @extends {Ignored} + */ +class BufferSubscriber extends OuterSubscriber { + private buffer: T[] = []; + + constructor(destination: Subscriber, closingNotifier: Observable) { + super(destination); + this.add(subscribeToResult(this, closingNotifier)); + } + + protected _next(value: T) { + this.buffer.push(value); + } + + notifyNext(outerValue: T, innerValue: any, + outerIndex: number, innerIndex: number, + innerSub: InnerSubscriber): void { + const buffer = this.buffer; + this.buffer = []; + this.destination.next(buffer); + } +} diff --git a/src/operators/index.ts b/src/operators/index.ts index d243518afd..62e79b7b26 100644 --- a/src/operators/index.ts +++ b/src/operators/index.ts @@ -1,5 +1,6 @@ export { audit } from './audit'; export { auditTime } from './auditTime'; +export { buffer } from './buffer'; export { catchError } from './catchError'; export { concat } from './concat'; export { concatAll } from './concatAll';