Skip to content

Commit

Permalink
feat(delayWhen): add index to the selector function
Browse files Browse the repository at this point in the history
  • Loading branch information
martinsik committed Feb 23, 2017
1 parent 023d436 commit 5d6291e
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 5 deletions.
19 changes: 19 additions & 0 deletions spec/operators/delayWhen-spec.ts
@@ -1,3 +1,4 @@
import {expect} from 'chai';
import * as Rx from '../../dist/cjs/Rx';
import marbleTestingSignature = require('../helpers/marble-testing'); // tslint:disable-line:no-require-imports

Expand Down Expand Up @@ -215,4 +216,22 @@ describe('Observable.prototype.delayWhen', () => {
expectSubscriptions(selector.subscriptions).toBe([]);
expectSubscriptions(subDelay.subscriptions).toBe(subDelaySub);
});

it('should call predicate with indices starting at 0', () => {
const e1 = hot('--a--b--c--|');
const expected = '--a--b--c--|';
const selector = cold('(x|)');

let indices = [];
const predicate = (value, index) => {
indices.push(index);
return selector;
};

const result = e1.delayWhen(predicate);

expectObservable(result.do(null, null, () => {
expect(indices).to.deep.equal([0, 1, 2]);
})).toBe(expected);
});
});
12 changes: 7 additions & 5 deletions src/operator/delayWhen.ts
Expand Up @@ -40,7 +40,7 @@ import { subscribeToResult } from '../util/subscribeToResult';
* @see {@link debounce}
* @see {@link delay}
*
* @param {function(value: T): Observable} delayDurationSelector A function that
* @param {function(value: T, index: number): Observable} delayDurationSelector A function that
* returns an Observable for each value emitted by the source Observable, which
* is then used to delay the emission of that item on the output Observable
* until the Observable returned from this function emits a value.
Expand All @@ -52,7 +52,7 @@ import { subscribeToResult } from '../util/subscribeToResult';
* @method delayWhen
* @owner Observable
*/
export function delayWhen<T>(this: Observable<T>, delayDurationSelector: (value: T) => Observable<any>,
export function delayWhen<T>(this: Observable<T>, delayDurationSelector: (value: T, index: number) => Observable<any>,
subscriptionDelay?: Observable<any>): Observable<T> {
if (subscriptionDelay) {
return new SubscriptionDelayObservable(this, subscriptionDelay)
Expand All @@ -62,7 +62,7 @@ export function delayWhen<T>(this: Observable<T>, delayDurationSelector: (value:
}

class DelayWhenOperator<T> implements Operator<T, T> {
constructor(private delayDurationSelector: (value: T) => Observable<any>) {
constructor(private delayDurationSelector: (value: T, index: number) => Observable<any>) {
}

call(subscriber: Subscriber<T>, source: any): TeardownLogic {
Expand All @@ -79,9 +79,10 @@ class DelayWhenSubscriber<T, R> extends OuterSubscriber<T, R> {
private completed: boolean = false;
private delayNotifierSubscriptions: Array<Subscription> = [];
private values: Array<T> = [];
private index: number = 0;

constructor(destination: Subscriber<T>,
private delayDurationSelector: (value: T) => Observable<any>) {
private delayDurationSelector: (value: T, index: number) => Observable<any>) {
super(destination);
}

Expand All @@ -106,8 +107,9 @@ class DelayWhenSubscriber<T, R> extends OuterSubscriber<T, R> {
}

protected _next(value: T): void {
const index = this.index++;
try {
const delayNotifier = this.delayDurationSelector(value);
const delayNotifier = this.delayDurationSelector(value, index);
if (delayNotifier) {
this.tryDelay(delayNotifier, value);
}
Expand Down

0 comments on commit 5d6291e

Please sign in to comment.