From 9bef2285d4510d69bfb59d81d4d72251cbc10c0a Mon Sep 17 00:00:00 2001 From: OJ Kwon Date: Fri, 4 Dec 2015 00:14:42 -0800 Subject: [PATCH] feat(publishLast): add publishLast operator closes #883 --- spec/operators/publishLast-spec.js | 222 +++++++++++++++++++++++++++++ src/CoreOperators.ts | 1 + src/Observable.ts | 1 + src/Rx.KitchenSink.ts | 1 + src/Rx.ts | 1 + src/add/operator/publishLast.ts | 4 + src/operator/publishLast.ts | 7 + 7 files changed, 237 insertions(+) create mode 100644 spec/operators/publishLast-spec.js create mode 100644 src/add/operator/publishLast.ts create mode 100644 src/operator/publishLast.ts diff --git a/spec/operators/publishLast-spec.js b/spec/operators/publishLast-spec.js new file mode 100644 index 0000000000..2e75307523 --- /dev/null +++ b/spec/operators/publishLast-spec.js @@ -0,0 +1,222 @@ +/* globals describe, it, expect, expectObservable, expectSubscriptions, hot, cold */ +var Rx = require('../../dist/cjs/Rx'); +var Observable = Rx.Observable; +var Subject = Rx.Subject; + +describe('Observable.prototype.publishLast()', function () { + it('should return a ConnectableObservable', function () { + var source = Observable.of(1).publishLast(); + + expect(source instanceof Rx.ConnectableObservable).toBe(true); + }); + + it('should emit last notification of a simple source Observable', function () { + var source = cold('--1-2---3-4--5-|'); + var sourceSubs = '^ !'; + var published = source.publishLast(); + var expected = '---------------(5|)'; + + expectObservable(published).toBe(expected); + expectSubscriptions(source.subscriptions).toBe(sourceSubs); + + published.connect(); + }); + + it('should do nothing if connect is not called, despite subscriptions', function () { + var source = cold('--1-2---3-4--5-|'); + var sourceSubs = []; + var published = source.publishLast(); + var expected = '-'; + + expectObservable(published).toBe(expected); + expectSubscriptions(source.subscriptions).toBe(sourceSubs); + }); + + it('should multicast the same values to multiple observers', function () { + var source = cold('-1-2-3----4-|'); + var sourceSubs = '^ !'; + var published = source.publishLast(); + var subscriber1 = hot('a| ').mergeMapTo(published); + var expected1 = '------------(4|)'; + var subscriber2 = hot(' b| ').mergeMapTo(published); + var expected2 = ' --------(4|)'; + var subscriber3 = hot(' c| ').mergeMapTo(published); + var expected3 = ' ----(4|)'; + + expectObservable(subscriber1).toBe(expected1); + expectObservable(subscriber2).toBe(expected2); + expectObservable(subscriber3).toBe(expected3); + expectSubscriptions(source.subscriptions).toBe(sourceSubs); + + published.connect(); + }); + + it('should multicast an error from the source to multiple observers', function () { + var source = cold('-1-2-3----4-#'); + var sourceSubs = '^ !'; + var published = source.publishLast(); + var subscriber1 = hot('a| ').mergeMapTo(published); + var expected1 = '------------#'; + var subscriber2 = hot(' b| ').mergeMapTo(published); + var expected2 = ' --------#'; + var subscriber3 = hot(' c| ').mergeMapTo(published); + var expected3 = ' ----#'; + + expectObservable(subscriber1).toBe(expected1); + expectObservable(subscriber2).toBe(expected2); + expectObservable(subscriber3).toBe(expected3); + expectSubscriptions(source.subscriptions).toBe(sourceSubs); + + published.connect(); + }); + + it('should not cast any values to multiple observers, ' + + 'when source is unsubscribed explicitly and early', function () { + var source = cold('-1-2-3----4-|'); + var sourceSubs = '^ ! '; + var published = source.publishLast(); + var unsub = ' u '; + var subscriber1 = hot('a| ').mergeMapTo(published); + var expected1 = '---------- '; + var subscriber2 = hot(' b| ').mergeMapTo(published); + var expected2 = ' ------ '; + var subscriber3 = hot(' c| ').mergeMapTo(published); + var expected3 = ' -- '; + + expectObservable(subscriber1).toBe(expected1); + expectObservable(subscriber2).toBe(expected2); + expectObservable(subscriber3).toBe(expected3); + expectSubscriptions(source.subscriptions).toBe(sourceSubs); + + // Set up unsubscription action + var connection; + expectObservable(hot(unsub).do(function () { + connection.unsubscribe(); + })).toBe(unsub); + + connection = published.connect(); + }); + + describe('with refCount()', function () { + it('should connect when first subscriber subscribes', function () { + var source = cold( '-1-2-3----4-|'); + var sourceSubs = ' ^ !'; + var replayed = source.publishLast().refCount(); + var subscriber1 = hot(' a| ').mergeMapTo(replayed); + var expected1 = ' ------------(4|)'; + var subscriber2 = hot(' b| ').mergeMapTo(replayed); + var expected2 = ' --------(4|)'; + var subscriber3 = hot(' c| ').mergeMapTo(replayed); + var expected3 = ' ----(4|)'; + + expectObservable(subscriber1).toBe(expected1); + expectObservable(subscriber2).toBe(expected2); + expectObservable(subscriber3).toBe(expected3); + expectSubscriptions(source.subscriptions).toBe(sourceSubs); + }); + + it('should disconnect when last subscriber unsubscribes', function () { + var source = cold( '-1-2-3----4-|'); + var sourceSubs = ' ^ ! '; + var replayed = source.publishLast().refCount(); + var subscriber1 = hot(' a| ').mergeMapTo(replayed); + var unsub1 = ' ! '; + var expected1 = ' -------- '; + var subscriber2 = hot(' b| ').mergeMapTo(replayed); + var unsub2 = ' ! '; + var expected2 = ' ------ '; + + expectObservable(subscriber1, unsub1).toBe(expected1); + expectObservable(subscriber2, unsub2).toBe(expected2); + expectSubscriptions(source.subscriptions).toBe(sourceSubs); + }); + + it('should NOT be retryable', function () { + var source = cold('-1-2-3----4-#'); + var sourceSubs = '^ !'; + var published = source.publishLast().refCount().retry(3); + var subscriber1 = hot('a| ').mergeMapTo(published); + var expected1 = '------------#'; + var subscriber2 = hot(' b| ').mergeMapTo(published); + var expected2 = ' --------#'; + var subscriber3 = hot(' c| ').mergeMapTo(published); + var expected3 = ' ----#'; + + expectObservable(subscriber1).toBe(expected1); + expectObservable(subscriber2).toBe(expected2); + expectObservable(subscriber3).toBe(expected3); + expectSubscriptions(source.subscriptions).toBe(sourceSubs); + }); + }); + + it('should multicast an empty source', function () { + var source = cold('|'); + var sourceSubs = '(^!)'; + var published = source.publishLast(); + var expected = '|'; + + expectObservable(published).toBe(expected); + expectSubscriptions(source.subscriptions).toBe(sourceSubs); + + published.connect(); + }); + + it('should multicast a never source', function () { + var source = cold('-'); + var sourceSubs = '^'; + var published = source.publishLast(); + var expected = '-'; + + expectObservable(published).toBe(expected); + expectSubscriptions(source.subscriptions).toBe(sourceSubs); + + published.connect(); + }); + + it('should multicast a throw source', function () { + var source = cold('#'); + var sourceSubs = '(^!)'; + var published = source.publishLast(); + var expected = '#'; + + expectObservable(published).toBe(expected); + expectSubscriptions(source.subscriptions).toBe(sourceSubs); + + published.connect(); + }); + + it('should multicast one observable to multiple observers', function (done) { + var results1 = []; + var results2 = []; + var subscriptions = 0; + + var source = new Observable(function (observer) { + subscriptions++; + observer.next(1); + observer.next(2); + observer.next(3); + observer.next(4); + observer.complete(); + }); + + var connectable = source.publishLast(); + + connectable.subscribe(function (x) { + results1.push(x); + }); + + connectable.subscribe(function (x) { + results2.push(x); + }); + + expect(results1).toEqual([]); + expect(results2).toEqual([]); + + connectable.connect(); + + expect(results1).toEqual([4]); + expect(results2).toEqual([4]); + expect(subscriptions).toBe(1); + done(); + }); +}); \ No newline at end of file diff --git a/src/CoreOperators.ts b/src/CoreOperators.ts index 278bb99a23..986c42c187 100644 --- a/src/CoreOperators.ts +++ b/src/CoreOperators.ts @@ -57,6 +57,7 @@ export interface CoreOperators { publish?: () => ConnectableObservable; publishBehavior?: (value: any) => ConnectableObservable; publishReplay?: (bufferSize?: number, windowTime?: number, scheduler?: Scheduler) => ConnectableObservable; + publishLast?: () => ConnectableObservable; reduce?: (project: (acc: R, x: T) => R, seed?: R) => Observable; repeat?: (count?: number) => Observable; retry?: (count?: number) => Observable; diff --git a/src/Observable.ts b/src/Observable.ts index 0d4742fcf2..7aa132d93f 100644 --- a/src/Observable.ts +++ b/src/Observable.ts @@ -217,6 +217,7 @@ export class Observable implements CoreOperators { publish: () => ConnectableObservable; publishBehavior: (value: any) => ConnectableObservable; publishReplay: (bufferSize?: number, windowTime?: number, scheduler?: Scheduler) => ConnectableObservable; + publishLast: () => ConnectableObservable; reduce: (project: (acc: R, x: T) => R, seed?: R) => Observable; repeat: (count?: number) => Observable; retry: (count?: number) => Observable; diff --git a/src/Rx.KitchenSink.ts b/src/Rx.KitchenSink.ts index 2b4ab20e92..518bb1233f 100644 --- a/src/Rx.KitchenSink.ts +++ b/src/Rx.KitchenSink.ts @@ -91,6 +91,7 @@ import './add/operator/partition'; import './add/operator/publish'; import './add/operator/publishBehavior'; import './add/operator/publishReplay'; +import './add/operator/publishLast'; import './add/operator/reduce'; import './add/operator/repeat'; import './add/operator/retry'; diff --git a/src/Rx.ts b/src/Rx.ts index 1dc9afb700..f398663da8 100644 --- a/src/Rx.ts +++ b/src/Rx.ts @@ -69,6 +69,7 @@ import './add/operator/partition'; import './add/operator/publish'; import './add/operator/publishBehavior'; import './add/operator/publishReplay'; +import './add/operator/publishLast'; import './add/operator/reduce'; import './add/operator/repeat'; import './add/operator/retry'; diff --git a/src/add/operator/publishLast.ts b/src/add/operator/publishLast.ts new file mode 100644 index 0000000000..bedab84fcd --- /dev/null +++ b/src/add/operator/publishLast.ts @@ -0,0 +1,4 @@ +import {Observable} from '../../Observable'; +import {publishLast} from '../../operator/publishLast'; + +Observable.prototype.publishLast = publishLast; \ No newline at end of file diff --git a/src/operator/publishLast.ts b/src/operator/publishLast.ts new file mode 100644 index 0000000000..a2d6bee746 --- /dev/null +++ b/src/operator/publishLast.ts @@ -0,0 +1,7 @@ +import {AsyncSubject} from '../subject/AsyncSubject'; +import {multicast} from './multicast'; +import {ConnectableObservable} from '../observable/ConnectableObservable'; + +export function publishLast(): ConnectableObservable { + return multicast.call(this, new AsyncSubject()); +} \ No newline at end of file