Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(publishLast): add publishLast operator #885

Merged
merged 1 commit into from
Dec 4, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
222 changes: 222 additions & 0 deletions spec/operators/publishLast-spec.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,222 @@
/* globals describe, it, expect, expectObservable, expectSubscriptions, hot, cold */
var Rx = require('../../dist/cjs/Rx');
var Observable = Rx.Observable;
var Subject = Rx.Subject;

describe('Observable.prototype.publishLast()', function () {
it('should return a ConnectableObservable', function () {
var source = Observable.of(1).publishLast();

expect(source instanceof Rx.ConnectableObservable).toBe(true);
});

it('should emit last notification of a simple source Observable', function () {
var source = cold('--1-2---3-4--5-|');
var sourceSubs = '^ !';
var published = source.publishLast();
var expected = '---------------(5|)';

expectObservable(published).toBe(expected);
expectSubscriptions(source.subscriptions).toBe(sourceSubs);

published.connect();
});

it('should do nothing if connect is not called, despite subscriptions', function () {
var source = cold('--1-2---3-4--5-|');
var sourceSubs = [];
var published = source.publishLast();
var expected = '-';

expectObservable(published).toBe(expected);
expectSubscriptions(source.subscriptions).toBe(sourceSubs);
});

it('should multicast the same values to multiple observers', function () {
var source = cold('-1-2-3----4-|');
var sourceSubs = '^ !';
var published = source.publishLast();
var subscriber1 = hot('a| ').mergeMapTo(published);
var expected1 = '------------(4|)';
var subscriber2 = hot(' b| ').mergeMapTo(published);
var expected2 = ' --------(4|)';
var subscriber3 = hot(' c| ').mergeMapTo(published);
var expected3 = ' ----(4|)';

expectObservable(subscriber1).toBe(expected1);
expectObservable(subscriber2).toBe(expected2);
expectObservable(subscriber3).toBe(expected3);
expectSubscriptions(source.subscriptions).toBe(sourceSubs);

published.connect();
});

it('should multicast an error from the source to multiple observers', function () {
var source = cold('-1-2-3----4-#');
var sourceSubs = '^ !';
var published = source.publishLast();
var subscriber1 = hot('a| ').mergeMapTo(published);
var expected1 = '------------#';
var subscriber2 = hot(' b| ').mergeMapTo(published);
var expected2 = ' --------#';
var subscriber3 = hot(' c| ').mergeMapTo(published);
var expected3 = ' ----#';

expectObservable(subscriber1).toBe(expected1);
expectObservable(subscriber2).toBe(expected2);
expectObservable(subscriber3).toBe(expected3);
expectSubscriptions(source.subscriptions).toBe(sourceSubs);

published.connect();
});

it('should not cast any values to multiple observers, ' +
'when source is unsubscribed explicitly and early', function () {
var source = cold('-1-2-3----4-|');
var sourceSubs = '^ ! ';
var published = source.publishLast();
var unsub = ' u ';
var subscriber1 = hot('a| ').mergeMapTo(published);
var expected1 = '---------- ';
var subscriber2 = hot(' b| ').mergeMapTo(published);
var expected2 = ' ------ ';
var subscriber3 = hot(' c| ').mergeMapTo(published);
var expected3 = ' -- ';

expectObservable(subscriber1).toBe(expected1);
expectObservable(subscriber2).toBe(expected2);
expectObservable(subscriber3).toBe(expected3);
expectSubscriptions(source.subscriptions).toBe(sourceSubs);

// Set up unsubscription action
var connection;
expectObservable(hot(unsub).do(function () {
connection.unsubscribe();
})).toBe(unsub);

connection = published.connect();
});

describe('with refCount()', function () {
it('should connect when first subscriber subscribes', function () {
var source = cold( '-1-2-3----4-|');
var sourceSubs = ' ^ !';
var replayed = source.publishLast().refCount();
var subscriber1 = hot(' a| ').mergeMapTo(replayed);
var expected1 = ' ------------(4|)';
var subscriber2 = hot(' b| ').mergeMapTo(replayed);
var expected2 = ' --------(4|)';
var subscriber3 = hot(' c| ').mergeMapTo(replayed);
var expected3 = ' ----(4|)';

expectObservable(subscriber1).toBe(expected1);
expectObservable(subscriber2).toBe(expected2);
expectObservable(subscriber3).toBe(expected3);
expectSubscriptions(source.subscriptions).toBe(sourceSubs);
});

it('should disconnect when last subscriber unsubscribes', function () {
var source = cold( '-1-2-3----4-|');
var sourceSubs = ' ^ ! ';
var replayed = source.publishLast().refCount();
var subscriber1 = hot(' a| ').mergeMapTo(replayed);
var unsub1 = ' ! ';
var expected1 = ' -------- ';
var subscriber2 = hot(' b| ').mergeMapTo(replayed);
var unsub2 = ' ! ';
var expected2 = ' ------ ';

expectObservable(subscriber1, unsub1).toBe(expected1);
expectObservable(subscriber2, unsub2).toBe(expected2);
expectSubscriptions(source.subscriptions).toBe(sourceSubs);
});

it('should NOT be retryable', function () {
var source = cold('-1-2-3----4-#');
var sourceSubs = '^ !';
var published = source.publishLast().refCount().retry(3);
var subscriber1 = hot('a| ').mergeMapTo(published);
var expected1 = '------------#';
var subscriber2 = hot(' b| ').mergeMapTo(published);
var expected2 = ' --------#';
var subscriber3 = hot(' c| ').mergeMapTo(published);
var expected3 = ' ----#';

expectObservable(subscriber1).toBe(expected1);
expectObservable(subscriber2).toBe(expected2);
expectObservable(subscriber3).toBe(expected3);
expectSubscriptions(source.subscriptions).toBe(sourceSubs);
});
});

it('should multicast an empty source', function () {
var source = cold('|');
var sourceSubs = '(^!)';
var published = source.publishLast();
var expected = '|';

expectObservable(published).toBe(expected);
expectSubscriptions(source.subscriptions).toBe(sourceSubs);

published.connect();
});

it('should multicast a never source', function () {
var source = cold('-');
var sourceSubs = '^';
var published = source.publishLast();
var expected = '-';

expectObservable(published).toBe(expected);
expectSubscriptions(source.subscriptions).toBe(sourceSubs);

published.connect();
});

it('should multicast a throw source', function () {
var source = cold('#');
var sourceSubs = '(^!)';
var published = source.publishLast();
var expected = '#';

expectObservable(published).toBe(expected);
expectSubscriptions(source.subscriptions).toBe(sourceSubs);

published.connect();
});

it('should multicast one observable to multiple observers', function (done) {
var results1 = [];
var results2 = [];
var subscriptions = 0;

var source = new Observable(function (observer) {
subscriptions++;
observer.next(1);
observer.next(2);
observer.next(3);
observer.next(4);
observer.complete();
});

var connectable = source.publishLast();

connectable.subscribe(function (x) {
results1.push(x);
});

connectable.subscribe(function (x) {
results2.push(x);
});

expect(results1).toEqual([]);
expect(results2).toEqual([]);

connectable.connect();

expect(results1).toEqual([4]);
expect(results2).toEqual([4]);
expect(subscriptions).toBe(1);
done();
});
});
1 change: 1 addition & 0 deletions src/CoreOperators.ts
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ export interface CoreOperators<T> {
publish?: () => ConnectableObservable<T>;
publishBehavior?: (value: any) => ConnectableObservable<T>;
publishReplay?: (bufferSize?: number, windowTime?: number, scheduler?: Scheduler) => ConnectableObservable<T>;
publishLast?: () => ConnectableObservable<T>;
reduce?: <R>(project: (acc: R, x: T) => R, seed?: R) => Observable<R>;
repeat?: (count?: number) => Observable<T>;
retry?: (count?: number) => Observable<T>;
Expand Down
1 change: 1 addition & 0 deletions src/Observable.ts
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,7 @@ export class Observable<T> implements CoreOperators<T> {
publish: () => ConnectableObservable<T>;
publishBehavior: (value: any) => ConnectableObservable<T>;
publishReplay: (bufferSize?: number, windowTime?: number, scheduler?: Scheduler) => ConnectableObservable<T>;
publishLast: () => ConnectableObservable<T>;
reduce: <R>(project: (acc: R, x: T) => R, seed?: R) => Observable<R>;
repeat: (count?: number) => Observable<T>;
retry: (count?: number) => Observable<T>;
Expand Down
1 change: 1 addition & 0 deletions src/Rx.KitchenSink.ts
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ import './add/operator/partition';
import './add/operator/publish';
import './add/operator/publishBehavior';
import './add/operator/publishReplay';
import './add/operator/publishLast';
import './add/operator/reduce';
import './add/operator/repeat';
import './add/operator/retry';
Expand Down
1 change: 1 addition & 0 deletions src/Rx.ts
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ import './add/operator/partition';
import './add/operator/publish';
import './add/operator/publishBehavior';
import './add/operator/publishReplay';
import './add/operator/publishLast';
import './add/operator/reduce';
import './add/operator/repeat';
import './add/operator/retry';
Expand Down
4 changes: 4 additions & 0 deletions src/add/operator/publishLast.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
import {Observable} from '../../Observable';
import {publishLast} from '../../operator/publishLast';

Observable.prototype.publishLast = publishLast;
7 changes: 7 additions & 0 deletions src/operator/publishLast.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
import {AsyncSubject} from '../subject/AsyncSubject';
import {multicast} from './multicast';
import {ConnectableObservable} from '../observable/ConnectableObservable';

export function publishLast<T>(): ConnectableObservable<T> {
return multicast.call(this, new AsyncSubject());
}