|
| 1 | +import { Operator } from '../Operator'; |
| 2 | +import { Observable } from '../Observable'; |
| 3 | +import { Subscriber } from '../Subscriber'; |
| 4 | +import { OperatorFunction } from '../interfaces'; |
| 5 | + |
| 6 | +/* tslint:disable:max-line-length */ |
| 7 | +export function scan<T>(accumulator: (acc: T, value: T, index: number) => T, seed?: T): OperatorFunction<T, T>; |
| 8 | +export function scan<T>(accumulator: (acc: T[], value: T, index: number) => T[], seed?: T[]): OperatorFunction<T, T[]>; |
| 9 | +export function scan<T, R>(accumulator: (acc: R, value: T, index: number) => R, seed?: R): OperatorFunction<T, R>; |
| 10 | +/* tslint:enable:max-line-length */ |
| 11 | + |
| 12 | +/** |
| 13 | + * Applies an accumulator function over the source Observable, and returns each |
| 14 | + * intermediate result, with an optional seed value. |
| 15 | + * |
| 16 | + * <span class="informal">It's like {@link reduce}, but emits the current |
| 17 | + * accumulation whenever the source emits a value.</span> |
| 18 | + * |
| 19 | + * <img src="./img/scan.png" width="100%"> |
| 20 | + * |
| 21 | + * Combines together all values emitted on the source, using an accumulator |
| 22 | + * function that knows how to join a new source value into the accumulation from |
| 23 | + * the past. Is similar to {@link reduce}, but emits the intermediate |
| 24 | + * accumulations. |
| 25 | + * |
| 26 | + * Returns an Observable that applies a specified `accumulator` function to each |
| 27 | + * item emitted by the source Observable. If a `seed` value is specified, then |
| 28 | + * that value will be used as the initial value for the accumulator. If no seed |
| 29 | + * value is specified, the first item of the source is used as the seed. |
| 30 | + * |
| 31 | + * @example <caption>Count the number of click events</caption> |
| 32 | + * var clicks = Rx.Observable.fromEvent(document, 'click'); |
| 33 | + * var ones = clicks.mapTo(1); |
| 34 | + * var seed = 0; |
| 35 | + * var count = ones.scan((acc, one) => acc + one, seed); |
| 36 | + * count.subscribe(x => console.log(x)); |
| 37 | + * |
| 38 | + * @see {@link expand} |
| 39 | + * @see {@link mergeScan} |
| 40 | + * @see {@link reduce} |
| 41 | + * |
| 42 | + * @param {function(acc: R, value: T, index: number): R} accumulator |
| 43 | + * The accumulator function called on each source value. |
| 44 | + * @param {T|R} [seed] The initial accumulation value. |
| 45 | + * @return {Observable<R>} An observable of the accumulated values. |
| 46 | + * @method scan |
| 47 | + * @owner Observable |
| 48 | + */ |
| 49 | +export function scan<T, R>(accumulator: (acc: R, value: T, index: number) => R, seed?: T | R): OperatorFunction<T, R> { |
| 50 | + let hasSeed = false; |
| 51 | + // providing a seed of `undefined` *should* be valid and trigger |
| 52 | + // hasSeed! so don't use `seed !== undefined` checks! |
| 53 | + // For this reason, we have to check it here at the original call site |
| 54 | + // otherwise inside Operator/Subscriber we won't know if `undefined` |
| 55 | + // means they didn't provide anything or if they literally provided `undefined` |
| 56 | + if (arguments.length >= 2) { |
| 57 | + hasSeed = true; |
| 58 | + } |
| 59 | + |
| 60 | + return function scanOperatorFunction(source: Observable<T>): Observable<R> { |
| 61 | + return source.lift(new ScanOperator(accumulator, seed, hasSeed)); |
| 62 | + }; |
| 63 | +} |
| 64 | + |
| 65 | +class ScanOperator<T, R> implements Operator<T, R> { |
| 66 | + constructor(private accumulator: (acc: R, value: T, index: number) => R, private seed?: T | R, private hasSeed: boolean = false) {} |
| 67 | + |
| 68 | + call(subscriber: Subscriber<R>, source: any): any { |
| 69 | + return source.subscribe(new ScanSubscriber(subscriber, this.accumulator, this.seed, this.hasSeed)); |
| 70 | + } |
| 71 | +} |
| 72 | + |
| 73 | +/** |
| 74 | + * We need this JSDoc comment for affecting ESDoc. |
| 75 | + * @ignore |
| 76 | + * @extends {Ignored} |
| 77 | + */ |
| 78 | +class ScanSubscriber<T, R> extends Subscriber<T> { |
| 79 | + private index: number = 0; |
| 80 | + |
| 81 | + get seed(): T | R { |
| 82 | + return this._seed; |
| 83 | + } |
| 84 | + |
| 85 | + set seed(value: T | R) { |
| 86 | + this.hasSeed = true; |
| 87 | + this._seed = value; |
| 88 | + } |
| 89 | + |
| 90 | + constructor(destination: Subscriber<R>, private accumulator: (acc: R, value: T, index: number) => R, private _seed: T | R, |
| 91 | + private hasSeed: boolean) { |
| 92 | + super(destination); |
| 93 | + } |
| 94 | + |
| 95 | + protected _next(value: T): void { |
| 96 | + if (!this.hasSeed) { |
| 97 | + this.seed = value; |
| 98 | + this.destination.next(value); |
| 99 | + } else { |
| 100 | + return this._tryNext(value); |
| 101 | + } |
| 102 | + } |
| 103 | + |
| 104 | + private _tryNext(value: T): void { |
| 105 | + const index = this.index++; |
| 106 | + let result: any; |
| 107 | + try { |
| 108 | + result = this.accumulator(<R>this.seed, value, index); |
| 109 | + } catch (err) { |
| 110 | + this.destination.error(err); |
| 111 | + } |
| 112 | + this.seed = result; |
| 113 | + this.destination.next(result); |
| 114 | + } |
| 115 | +} |
0 commit comments