Skip to content

Commit

Permalink
feat(publishReplay): add selector function to publishReplay
Browse files Browse the repository at this point in the history
2844
  • Loading branch information
martinsik committed Oct 1, 2017
1 parent e8d8c08 commit 6d1742e
Show file tree
Hide file tree
Showing 3 changed files with 17 additions and 2 deletions.
11 changes: 11 additions & 0 deletions spec/operators/publishReplay-spec.ts
Expand Up @@ -409,4 +409,15 @@ describe('Observable.prototype.publishReplay', () => {

published.connect();
});

it('should mirror a simple source Observable with selector', () => {
const selector = observable => observable.map(v => String.fromCharCode(96 + parseInt(v)));
const source = cold('--1-2---3-4---|');
const sourceSubs = '^ !';
const published = source.publishReplay(1, Number.POSITIVE_INFINITY, selector);
const expected = '--a-b---c-d---|';

expectObservable(published).toBe(expected);
expectSubscriptions(source.subscriptions).toBe(sourceSubs);
});
});
4 changes: 3 additions & 1 deletion src/operator/publishReplay.ts
Expand Up @@ -6,13 +6,15 @@ import { publishReplay as higherOrder } from '../operators';
/**
* @param bufferSize
* @param windowTime
* @param selector
* @param scheduler
* @return {ConnectableObservable<T>}
* @method publishReplay
* @owner Observable
*/
export function publishReplay<T>(this: Observable<T>, bufferSize: number = Number.POSITIVE_INFINITY,
windowTime: number = Number.POSITIVE_INFINITY,
selector?: (source: Observable<T>) => Observable<T>,
scheduler?: IScheduler): ConnectableObservable<T> {
return higherOrder(bufferSize, windowTime, scheduler)(this);
return higherOrder(bufferSize, windowTime, selector, scheduler)(this);
}
4 changes: 3 additions & 1 deletion src/operators/publishReplay.ts
Expand Up @@ -7,6 +7,8 @@ import { UnaryFunction } from '../interfaces';

export function publishReplay<T>(bufferSize: number = Number.POSITIVE_INFINITY,
windowTime: number = Number.POSITIVE_INFINITY,
selector: (source: Observable<T>) => Observable<T>,
scheduler?: IScheduler): UnaryFunction<Observable<T>, ConnectableObservable<T>> {
return (source: Observable<T>) => multicast(new ReplaySubject<T>(bufferSize, windowTime, scheduler))(source) as ConnectableObservable<T>;
const subject = new ReplaySubject<T>(bufferSize, windowTime, scheduler);
return (source: Observable<T>) => multicast(() => subject, selector)(source) as ConnectableObservable<T>;
}

0 comments on commit 6d1742e

Please sign in to comment.