Skip to content

Commit

Permalink
feat(multicast): subjectfactory allows selectors
Browse files Browse the repository at this point in the history
  • Loading branch information
kwonoj authored and benlesh committed May 2, 2016
1 parent cf45540 commit 32fa3a4
Show file tree
Hide file tree
Showing 5 changed files with 53 additions and 4 deletions.
18 changes: 18 additions & 0 deletions spec/operators/multicast-spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,24 @@ describe('Observable.prototype.multicast', () => {
connectable.connect();
});

it('should accept selectors to factory functions', () => {
const source = hot('-1-2-3----4-|');
const sourceSubs = ['^ !'];
const multicasted = source.multicast(() => new Subject(),
x => x.zip(x, (a, b) => (parseInt(a) + parseInt(b)).toString()));
const subscriber1 = hot('a| ').mergeMapTo(multicasted);
const expected1 = '-2-4-6----8-|';
const subscriber2 = hot(' b| ').mergeMapTo(multicasted);
const expected2 = ' -6----8-|';
const subscriber3 = hot(' c| ').mergeMapTo(multicasted);
const expected3 = ' --8-|';

expectObservable(subscriber1).toBe(expected1);
expectObservable(subscriber2).toBe(expected2);
expectObservable(subscriber3).toBe(expected3);
expectSubscriptions(source.subscriptions).toBe(sourceSubs);
});

it('should do nothing if connect is not called, despite subscriptions', () => {
const source = cold('--1-2---3-4--5-|');
const sourceSubs = [];
Expand Down
1 change: 1 addition & 0 deletions src/Rx.ts
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,7 @@ export {Subscriber} from './Subscriber';
export {AsyncSubject} from './AsyncSubject';
export {ReplaySubject} from './ReplaySubject';
export {BehaviorSubject} from './BehaviorSubject';
export {MulticastObservable} from './observable/MulticastObservable';
export {ConnectableObservable} from './observable/ConnectableObservable';
export {Notification} from './Notification';
export {EmptyError} from './util/EmptyError';
Expand Down
2 changes: 1 addition & 1 deletion src/add/operator/multicast.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import {Observable} from '../../Observable';
import {multicast, MulticastSignature} from '../../operator/multicast';

Observable.prototype.multicast = multicast;
Observable.prototype.multicast = <any>multicast;

declare module '../../Observable' {
interface Observable<T> {
Expand Down
20 changes: 20 additions & 0 deletions src/observable/MulticastObservable.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
import {Observable} from '../Observable';
import {Subscriber} from '../Subscriber';
import {Subscription} from '../Subscription';
import {ConnectableObservable} from '../observable/ConnectableObservable';

export class MulticastObservable<T> extends Observable<T> {
constructor(protected source: Observable<T>,
private connectable: ConnectableObservable<T>,
private selector: (source: Observable<T>) => Observable<T>) {
super();
}

protected _subscribe(subscriber: Subscriber<T>): Subscription {
const {selector, connectable} = this;

const subscription = selector(connectable).subscribe(subscriber);
subscription.add(connectable.connect());
return subscription;
}
}
16 changes: 13 additions & 3 deletions src/operator/multicast.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
import {Subject} from '../Subject';
import {Observable} from '../Observable';
import {MulticastObservable} from '../observable/MulticastObservable';
import {ConnectableObservable} from '../observable/ConnectableObservable';

/**
Expand All @@ -7,7 +9,10 @@ import {ConnectableObservable} from '../observable/ConnectableObservable';
*
* <img src="./img/multicast.png" width="100%">
*
* @param {Function} selector - a function that can use the multicasted source stream
* @param {Function|Subject} Factory function to create an intermediate subject through
* which the source sequence's elements will be multicast to the selector function
* or Subject to push source elements into.
* @param {Function} Optional selector function that can use the multicasted source stream
* as many times as needed, without causing multiple subscriptions to the source stream.
* Subscribers to the given source will receive all notifications of the source from the
* time of the subscription forward.
Expand All @@ -17,7 +22,8 @@ import {ConnectableObservable} from '../observable/ConnectableObservable';
* @method multicast
* @owner Observable
*/
export function multicast<T>(subjectOrSubjectFactory: Subject<T> | (() => Subject<T>)): ConnectableObservable<T> {
export function multicast<T>(subjectOrSubjectFactory: Subject<T> | (() => Subject<T>),
selector?: (source: Observable<T>) => Observable<T>): Observable<T> | ConnectableObservable<T> {
let subjectFactory: () => Subject<T>;
if (typeof subjectOrSubjectFactory === 'function') {
subjectFactory = <() => Subject<T>>subjectOrSubjectFactory;
Expand All @@ -26,11 +32,15 @@ export function multicast<T>(subjectOrSubjectFactory: Subject<T> | (() => Subjec
return <Subject<T>>subjectOrSubjectFactory;
};
}
return new ConnectableObservable(this, subjectFactory);

const connectable = new ConnectableObservable(this, subjectFactory);
return selector ? new MulticastObservable(this, connectable, selector) : connectable;
}

export type factoryOrValue<T> = T | (() => T);
export type selector<T> = (source: Observable<T>) => Observable<T>;

export interface MulticastSignature<T> {
(subjectOrSubjectFactory: factoryOrValue<Subject<T>>): ConnectableObservable<T>;
(SubjectFactory: () => Subject<T>, selector?: selector<T>): Observable<T>;
}

0 comments on commit 32fa3a4

Please sign in to comment.