From b0f7266addde8267a099a5f862feee093cd42fcf Mon Sep 17 00:00:00 2001 From: Ben Lesh Date: Sat, 4 Mar 2017 12:42:47 -0800 Subject: [PATCH] feat(shareReplay): adds `shareReplay` variant of `publishReplay` `shareReplay` returns an observable that is the source multicasted over a `ReplaySubject`. That replay subject is recycled on error from the `source`, but not on completion of the source. This makes `shareReplay` ideal for handling things like caching AJAX results, as it's retryable. It's repeat behavior, however, differs from `share` in that it will not repeat the `source` observable, rather it will repeat the `source` observable's values. related #2013, #453, #2043 --- spec/helpers/marble-testing.ts | 4 +- spec/operators/shareReplay-spec.ts | 167 ++++++++++++++++++++++++ src/Rx.ts | 1 + src/add/operator/shareReplay.ts | 11 ++ src/observable/ConnectableObservable.ts | 24 ++-- src/operator/shareReplay.ts | 26 ++++ 6 files changed, 223 insertions(+), 10 deletions(-) create mode 100644 spec/operators/shareReplay-spec.ts create mode 100644 src/add/operator/shareReplay.ts create mode 100644 src/operator/shareReplay.ts diff --git a/spec/helpers/marble-testing.ts b/spec/helpers/marble-testing.ts index 8b3c6a9701..2fb9bcdf72 100644 --- a/spec/helpers/marble-testing.ts +++ b/spec/helpers/marble-testing.ts @@ -3,10 +3,12 @@ import {Observable} from '../../dist/cjs/Observable'; import {SubscriptionLog} from '../../dist/cjs/testing/SubscriptionLog'; import {ColdObservable} from '../../dist/cjs/testing/ColdObservable'; import {HotObservable} from '../../dist/cjs/testing/HotObservable'; -import {observableToBeFn, subscriptionLogsToBeFn} from '../../dist/cjs/testing/TestScheduler'; +import {TestScheduler, observableToBeFn, subscriptionLogsToBeFn} from '../../dist/cjs/testing/TestScheduler'; declare const global: any; +export const rxTestScheduler: TestScheduler = global.rxTestScheduler; + export function hot(marbles: string, values?: any, error?: any): HotObservable { if (!global.rxTestScheduler) { throw 'tried to use hot() in async test'; diff --git a/spec/operators/shareReplay-spec.ts b/spec/operators/shareReplay-spec.ts new file mode 100644 index 0000000000..4a61e00ca6 --- /dev/null +++ b/spec/operators/shareReplay-spec.ts @@ -0,0 +1,167 @@ +import {expect} from 'chai'; +import * as Rx from '../../dist/cjs/Rx'; +import marbleTestingSignature = require('../helpers/marble-testing'); // tslint:disable-line:no-require-imports + +declare const { asDiagram }; +declare const hot: typeof marbleTestingSignature.hot; +declare const cold: typeof marbleTestingSignature.cold; +declare const expectObservable: typeof marbleTestingSignature.expectObservable; +declare const expectSubscriptions: typeof marbleTestingSignature.expectSubscriptions; + +const Observable = Rx.Observable; + +/** @test {shareReplay} */ +describe('Observable.prototype.shareReplay', () => { + it('should mirror a simple source Observable', () => { + const source = cold('--1-2---3-4--5-|'); + const sourceSubs = '^ !'; + const published = source.shareReplay(); + const expected = '--1-2---3-4--5-|'; + + expectObservable(published).toBe(expected); + expectSubscriptions(source.subscriptions).toBe(sourceSubs); + }); + + it('should do nothing if result is not subscribed', () => { + let subscribed = false; + const source = new Observable(() => { + subscribed = true; + }); + source.shareReplay(); + expect(subscribed).to.be.false; + }); + + it('should multicast the same values to multiple observers, bufferSize=1', () => { + const source = cold('-1-2-3----4-|'); const shared = source.shareReplay(1); + const sourceSubs = '^ !'; + const subscriber1 = hot('a| ').mergeMapTo(shared); + const expected1 = '-1-2-3----4-|'; + const subscriber2 = hot(' b| ').mergeMapTo(shared); + const expected2 = ' 23----4-|'; + const subscriber3 = hot(' c| ').mergeMapTo(shared); + const expected3 = ' 3-4-|'; + + expectObservable(subscriber1).toBe(expected1); + expectObservable(subscriber2).toBe(expected2); + expectObservable(subscriber3).toBe(expected3); + expectSubscriptions(source.subscriptions).toBe(sourceSubs); + }); + + it('should multicast the same values to multiple observers, bufferSize=2', () => { + const source = cold('-1-2-----3------4-|'); const shared = source.shareReplay(2); + const sourceSubs = '^ !'; + const subscriber1 = hot('a| ').mergeMapTo(shared); + const expected1 = '-1-2-----3------4-|'; + const subscriber2 = hot(' b| ').mergeMapTo(shared); + const expected2 = ' (12)-3------4-|'; + const subscriber3 = hot(' c| ').mergeMapTo(shared); + const expected3 = ' (23)-4-|'; + + expectObservable(subscriber1).toBe(expected1); + expectObservable(subscriber2).toBe(expected2); + expectObservable(subscriber3).toBe(expected3); + expectSubscriptions(source.subscriptions).toBe(sourceSubs); + }); + + it('should multicast an error from the source to multiple observers', () => { + const source = cold('-1-2-3----4-#'); const shared = source.shareReplay(1); + const sourceSubs = '^ !'; + const subscriber1 = hot('a| ').mergeMapTo(shared); + const expected1 = '-1-2-3----4-#'; + const subscriber2 = hot(' b| ').mergeMapTo(shared); + const expected2 = ' 23----4-#'; + const subscriber3 = hot(' c| ').mergeMapTo(shared); + const expected3 = ' 3-4-#'; + + expectObservable(subscriber1).toBe(expected1); + expectObservable(subscriber2).toBe(expected2); + expectObservable(subscriber3).toBe(expected3); + expectSubscriptions(source.subscriptions).toBe(sourceSubs); + }); + + it('should multicast an empty source', () => { + const source = cold('|'); + const sourceSubs = '(^!)'; + const shared = source.shareReplay(1); + const expected = '|'; + + expectObservable(shared).toBe(expected); + expectSubscriptions(source.subscriptions).toBe(sourceSubs); + }); + + it('should multicast a never source', () => { + const source = cold('-'); + const sourceSubs = '^'; + + const shared = source.shareReplay(1); + const expected = '-'; + + expectObservable(shared).toBe(expected); + expectSubscriptions(source.subscriptions).toBe(sourceSubs); + }); + + it('should multicast a throw source', () => { + const source = cold('#'); + const sourceSubs = '(^!)'; + const shared = source.shareReplay(1); + const expected = '#'; + + expectObservable(shared).toBe(expected); + expectSubscriptions(source.subscriptions).toBe(sourceSubs); + }); + + it('should replay results to subsequent subscriptions if source completes, bufferSize=2', () => { + const source = cold('-1-2-----3-| '); + const shared = source.shareReplay(2); + const sourceSubs = '^ ! '; + const subscriber1 = hot('a| ').mergeMapTo(shared); + const expected1 = '-1-2-----3-| '; + const subscriber2 = hot(' b| ').mergeMapTo(shared); + const expected2 = ' (12)-3-| '; + const subscriber3 = hot(' (c|) ').mergeMapTo(shared); + const expected3 = ' (23|)'; + + expectObservable(subscriber1).toBe(expected1); + expectObservable(subscriber2).toBe(expected2); + expectObservable(subscriber3).toBe(expected3); + expectSubscriptions(source.subscriptions).toBe(sourceSubs); + }); + + it('should completely restart for subsequent subscriptions if source errors, bufferSize=2', () => { + const source = cold('-1-2-----3-# '); + const shared = source.shareReplay(2); + const sourceSubs1 = '^ ! '; + const subscriber1 = hot('a| ').mergeMapTo(shared); + const expected1 = '-1-2-----3-# '; + const subscriber2 = hot(' b| ').mergeMapTo(shared); + const expected2 = ' (12)-3-# '; + const subscriber3 = hot(' (c|) ').mergeMapTo(shared); + const expected3 = ' -1-2-----3-#'; + const sourceSubs2 = ' ^ !'; + + expectObservable(subscriber1).toBe(expected1); + expectObservable(subscriber2).toBe(expected2); + expectObservable(subscriber3).toBe(expected3); + expectSubscriptions(source.subscriptions).toBe([sourceSubs1, sourceSubs2]); + }); + + it('should be retryable, bufferSize=2', () => { + const subs = []; + const source = cold('-1-2-----3-# '); + const shared = source.shareReplay(2).retry(1); + subs.push( '^ ! '); + subs.push( ' ^ ! '); + subs.push( ' ^ !'); + const subscriber1 = hot('a| ').mergeMapTo(shared); + const expected1 = '-1-2-----3--1-2-----3-# '; + const subscriber2 = hot(' b| ').mergeMapTo(shared); + const expected2 = ' (12)-3--1-2-----3-# '; + const subscriber3 = hot(' (c|) ').mergeMapTo(shared); + const expected3 = ' (12)-3--1-2-----3-#'; + + expectObservable(subscriber1).toBe(expected1); + expectObservable(subscriber2).toBe(expected2); + expectObservable(subscriber3).toBe(expected3); + expectSubscriptions(source.subscriptions).toBe(subs); + }); +}); diff --git a/src/Rx.ts b/src/Rx.ts index b04c99c505..42854a0143 100644 --- a/src/Rx.ts +++ b/src/Rx.ts @@ -110,6 +110,7 @@ import './add/operator/sampleTime'; import './add/operator/scan'; import './add/operator/sequenceEqual'; import './add/operator/share'; +import './add/operator/shareReplay'; import './add/operator/single'; import './add/operator/skip'; import './add/operator/skipUntil'; diff --git a/src/add/operator/shareReplay.ts b/src/add/operator/shareReplay.ts new file mode 100644 index 0000000000..89a310df5f --- /dev/null +++ b/src/add/operator/shareReplay.ts @@ -0,0 +1,11 @@ + +import { Observable } from '../../Observable'; +import { shareReplay } from '../../operator/shareReplay'; + +Observable.prototype.shareReplay = shareReplay; + +declare module '../../Observable' { + interface Observable { + shareReplay: typeof shareReplay; + } +} \ No newline at end of file diff --git a/src/observable/ConnectableObservable.ts b/src/observable/ConnectableObservable.ts index 896d7cdcb3..a53eb1e319 100644 --- a/src/observable/ConnectableObservable.ts +++ b/src/observable/ConnectableObservable.ts @@ -12,6 +12,7 @@ export class ConnectableObservable extends Observable { protected _subject: Subject; protected _refCount: number = 0; protected _connection: Subscription; + _isComplete = false; constructor(protected source: Observable, protected subjectFactory: () => Subject) { @@ -33,6 +34,7 @@ export class ConnectableObservable extends Observable { connect(): Subscription { let connection = this._connection; if (!connection) { + this._isComplete = false; connection = this._connection = new Subscription(); connection.add(this.source .subscribe(new ConnectableSubscriber(this.getSubject(), this))); @@ -51,15 +53,18 @@ export class ConnectableObservable extends Observable { } } +const connectableProto = ConnectableObservable.prototype; + export const connectableObservableDescriptor: PropertyDescriptorMap = { operator: { value: null }, _refCount: { value: 0, writable: true }, _subject: { value: null, writable: true }, _connection: { value: null, writable: true }, - _subscribe: { value: ( ConnectableObservable.prototype)._subscribe }, - getSubject: { value: ( ConnectableObservable.prototype).getSubject }, - connect: { value: ( ConnectableObservable.prototype).connect }, - refCount: { value: ( ConnectableObservable.prototype).refCount } + _subscribe: { value: connectableProto._subscribe }, + _isComplete: { value: connectableProto._isComplete, writable: true }, + getSubject: { value: connectableProto.getSubject }, + connect: { value: connectableProto.connect }, + refCount: { value: connectableProto.refCount } }; class ConnectableSubscriber extends SubjectSubscriber { @@ -72,17 +77,18 @@ class ConnectableSubscriber extends SubjectSubscriber { super._error(err); } protected _complete(): void { + this.connectable._isComplete = true; this._unsubscribe(); super._complete(); } protected _unsubscribe() { - const { connectable } = this; + const connectable = this.connectable; if (connectable) { this.connectable = null; - const connection = ( connectable)._connection; - ( connectable)._refCount = 0; - ( connectable)._subject = null; - ( connectable)._connection = null; + const connection = connectable._connection; + connectable._refCount = 0; + connectable._subject = null; + connectable._connection = null; if (connection) { connection.unsubscribe(); } diff --git a/src/operator/shareReplay.ts b/src/operator/shareReplay.ts new file mode 100644 index 0000000000..eabc910eb0 --- /dev/null +++ b/src/operator/shareReplay.ts @@ -0,0 +1,26 @@ +import { Observable } from '../Observable'; +import { multicast } from './multicast'; +import { ReplaySubject } from '../ReplaySubject'; +import { ConnectableObservable } from '../observable/ConnectableObservable'; +import { IScheduler } from '../Scheduler'; + +/** + * @method shareReplay + * @owner Observable + */ +export function shareReplay( + this: Observable, + bufferSize?: number, + windowTime?: number, + scheduler?: IScheduler +): Observable { + let subject: ReplaySubject; + const connectable = multicast.call(this, function shareReplaySubjectFactory(this: ConnectableObservable) { + if (this._isComplete) { + return subject; + } else { + return (subject = new ReplaySubject(bufferSize, windowTime, scheduler)); + } + }); + return connectable.refCount(); +}; \ No newline at end of file