From 0ed2d94c848f706c2048e532b31ca3354b4a41db Mon Sep 17 00:00:00 2001 From: Thomas Mair Date: Fri, 18 Feb 2022 17:42:48 +0100 Subject: [PATCH] chore(publishReplay): convert publishReplay specs to run mode --- spec/operators/publishReplay-spec.ts | 619 ++++++++++++++------------- 1 file changed, 332 insertions(+), 287 deletions(-) diff --git a/spec/operators/publishReplay-spec.ts b/spec/operators/publishReplay-spec.ts index 38e17640fa..1db0b39d10 100644 --- a/spec/operators/publishReplay-spec.ts +++ b/spec/operators/publishReplay-spec.ts @@ -1,233 +1,256 @@ +/** @prettier */ import { expect } from 'chai'; -import { hot, cold, expectObservable, expectSubscriptions } from '../helpers/marble-testing'; import { throwError, ConnectableObservable, EMPTY, NEVER, of, Observable, Subscription, pipe } from 'rxjs'; import { publishReplay, mergeMapTo, tap, mergeMap, refCount, retry, repeat, map } from 'rxjs/operators'; +import { TestScheduler } from 'rxjs/testing'; +import { observableMatcher } from '../helpers/observableMatcher'; /** @test {publishReplay} */ describe('publishReplay operator', () => { + let testScheduler: TestScheduler; + + beforeEach(() => { + testScheduler = new TestScheduler(observableMatcher); + }); + it('should mirror a simple source Observable', () => { - const source = cold('--1-2---3-4--5-|'); - const sourceSubs = '^ !'; - const published = source.pipe(publishReplay(1)) as ConnectableObservable; - const expected = '--1-2---3-4--5-|'; + testScheduler.run(({ cold, expectObservable, expectSubscriptions }) => { + const source = cold('--1-2---3-4--5-|'); + const sourceSubs = ' ^--------------!'; + const published = source.pipe(publishReplay(1)) as ConnectableObservable; + const expected = ' --1-2---3-4--5-|'; - expectObservable(published).toBe(expected); - expectSubscriptions(source.subscriptions).toBe(sourceSubs); + expectObservable(published).toBe(expected); + expectSubscriptions(source.subscriptions).toBe(sourceSubs); - published.connect(); + published.connect(); + }); }); it('should return a ConnectableObservable-ish', () => { const source = of(1).pipe(publishReplay()) as ConnectableObservable; - expect(typeof ( source)._subscribe === 'function').to.be.true; - expect(typeof ( source).getSubject === 'function').to.be.true; + expect(typeof (source)._subscribe === 'function').to.be.true; + expect(typeof (source).getSubject === 'function').to.be.true; expect(typeof source.connect === 'function').to.be.true; expect(typeof source.refCount === 'function').to.be.true; }); it('should do nothing if connect is not called, despite subscriptions', () => { - const source = cold('--1-2---3-4--5-|'); - const sourceSubs: string[] = []; - const published = source.pipe(publishReplay(1)); - const expected = '-'; + testScheduler.run(({ cold, expectObservable, expectSubscriptions }) => { + const source = cold('--1-2---3-4--5-|'); + const sourceSubs: string[] = []; + const published = source.pipe(publishReplay(1)); + const expected = ' -'; - expectObservable(published).toBe(expected); - expectSubscriptions(source.subscriptions).toBe(sourceSubs); + expectObservable(published).toBe(expected); + expectSubscriptions(source.subscriptions).toBe(sourceSubs); + }); }); it('should multicast the same values to multiple observers, bufferSize=1', () => { - const source = cold('-1-2-3----4-|'); - const sourceSubs = '^ !'; - const published = source.pipe(publishReplay(1)) as ConnectableObservable; - const subscriber1 = hot('a| ').pipe(mergeMapTo(published)); - const expected1 = '-1-2-3----4-|'; - const subscriber2 = hot(' b| ').pipe(mergeMapTo(published)); - const expected2 = ' 23----4-|'; - const subscriber3 = hot(' c| ').pipe(mergeMapTo(published)); - const expected3 = ' 3-4-|'; - - expectObservable(subscriber1).toBe(expected1); - expectObservable(subscriber2).toBe(expected2); - expectObservable(subscriber3).toBe(expected3); - expectSubscriptions(source.subscriptions).toBe(sourceSubs); - - published.connect(); - }); + testScheduler.run(({ cold, hot, expectObservable, expectSubscriptions }) => { + const source = cold(' -1-2-3----4-|'); + const sourceSubs = ' ^-----------!'; + const published = source.pipe(publishReplay(1)) as ConnectableObservable; + const subscriber1 = hot('a| ').pipe(mergeMapTo(published)); + const expected1 = ' -1-2-3----4-|'; + const subscriber2 = hot('----b| ').pipe(mergeMapTo(published)); + const expected2 = ' ----23----4-|'; + const subscriber3 = hot('--------c| ').pipe(mergeMapTo(published)); + const expected3 = ' --------3-4-|'; - it('should multicast the same values to multiple observers, bufferSize=2', () => { - const source = cold('-1-2-----3------4-|'); - const sourceSubs = '^ !'; - const published = source.pipe(publishReplay(2)) as ConnectableObservable; - const subscriber1 = hot('a| ').pipe(mergeMapTo(published)); - const expected1 = '-1-2-----3------4-|'; - const subscriber2 = hot(' b| ').pipe(mergeMapTo(published)); - const expected2 = ' (12)-3------4-|'; - const subscriber3 = hot(' c| ').pipe(mergeMapTo(published)); - const expected3 = ' (23)-4-|'; - - expectObservable(subscriber1).toBe(expected1); - expectObservable(subscriber2).toBe(expected2); - expectObservable(subscriber3).toBe(expected3); - expectSubscriptions(source.subscriptions).toBe(sourceSubs); - - published.connect(); - }); + expectObservable(subscriber1).toBe(expected1); + expectObservable(subscriber2).toBe(expected2); + expectObservable(subscriber3).toBe(expected3); + expectSubscriptions(source.subscriptions).toBe(sourceSubs); - it('should multicast an error from the source to multiple observers', () => { - const source = cold('-1-2-3----4-#'); - const sourceSubs = '^ !'; - const published = source.pipe(publishReplay(1)) as ConnectableObservable; - const subscriber1 = hot('a| ').pipe(mergeMapTo(published)); - const expected1 = '-1-2-3----4-#'; - const subscriber2 = hot(' b| ').pipe(mergeMapTo(published)); - const expected2 = ' 23----4-#'; - const subscriber3 = hot(' c| ').pipe(mergeMapTo(published)); - const expected3 = ' 3-4-#'; - - expectObservable(subscriber1).toBe(expected1); - expectObservable(subscriber2).toBe(expected2); - expectObservable(subscriber3).toBe(expected3); - expectSubscriptions(source.subscriptions).toBe(sourceSubs); - - published.connect(); + published.connect(); + }); }); - it('should multicast the same values to multiple observers, ' + - 'but is unsubscribed explicitly and early', () => { - const source = cold('-1-2-3----4-|'); - const sourceSubs = '^ ! '; - const published = source.pipe(publishReplay(1)) as ConnectableObservable; - const unsub = ' u '; - const subscriber1 = hot('a| ').pipe(mergeMapTo(published)); - const expected1 = '-1-2-3---- '; - const subscriber2 = hot(' b| ').pipe(mergeMapTo(published)); - const expected2 = ' 23---- '; - const subscriber3 = hot(' c| ').pipe(mergeMapTo(published)); - const expected3 = ' 3- '; - - expectObservable(subscriber1).toBe(expected1); - expectObservable(subscriber2).toBe(expected2); - expectObservable(subscriber3).toBe(expected3); - expectSubscriptions(source.subscriptions).toBe(sourceSubs); - - // Set up unsubscription action - let connection: Subscription; - expectObservable(hot(unsub).pipe(tap(() => { - connection.unsubscribe(); - }))).toBe(unsub); - - connection = published.connect(); - }); + it('should multicast the same values to multiple observers, bufferSize=2', () => { + testScheduler.run(({ cold, hot, expectObservable, expectSubscriptions }) => { + const source = cold(' -1-2-----3------4-|'); + const sourceSubs = ' ^-----------------!'; + const published = source.pipe(publishReplay(2)) as ConnectableObservable; + const subscriber1 = hot('a| ').pipe(mergeMapTo(published)); + const expected1 = ' -1-2-----3------4-|'; + const subscriber2 = hot('----b| ').pipe(mergeMapTo(published)); + const expected2 = ' ----(12)-3------4-|'; + const subscriber3 = hot('-----------c| ').pipe(mergeMapTo(published)); + const expected3 = ' -----------(23)-4-|'; - it('should not break unsubscription chains when result is unsubscribed explicitly', () => { - const source = cold('-1-2-3----4-|'); - const sourceSubs = '^ ! '; - const published = source.pipe( - mergeMap((x) => of(x)), - publishReplay(1) - ) as ConnectableObservable; - const subscriber1 = hot('a| ').pipe(mergeMapTo(published)); - const expected1 = '-1-2-3---- '; - const subscriber2 = hot(' b| ').pipe(mergeMapTo(published)); - const expected2 = ' 23---- '; - const subscriber3 = hot(' c| ').pipe(mergeMapTo(published)); - const expected3 = ' 3- '; - const unsub = ' u '; - - expectObservable(subscriber1).toBe(expected1); - expectObservable(subscriber2).toBe(expected2); - expectObservable(subscriber3).toBe(expected3); - expectSubscriptions(source.subscriptions).toBe(sourceSubs); - - // Set up unsubscription action - let connection: Subscription; - expectObservable(hot(unsub).pipe(tap(() => { - connection.unsubscribe(); - }))).toBe(unsub); - - connection = published.connect(); + expectObservable(subscriber1).toBe(expected1); + expectObservable(subscriber2).toBe(expected2); + expectObservable(subscriber3).toBe(expected3); + expectSubscriptions(source.subscriptions).toBe(sourceSubs); + + published.connect(); + }); }); - describe('with refCount()', () => { - it('should connect when first subscriber subscribes', () => { - const source = cold( '-1-2-3----4-|'); - const sourceSubs = ' ^ !'; - const replayed = source.pipe( - publishReplay(1), - refCount() - ); - const subscriber1 = hot(' a| ').pipe(mergeMapTo(replayed)); - const expected1 = ' -1-2-3----4-|'; - const subscriber2 = hot(' b| ').pipe(mergeMapTo(replayed)); - const expected2 = ' 23----4-|'; - const subscriber3 = hot(' c| ').pipe(mergeMapTo(replayed)); - const expected3 = ' 3-4-|'; + it('should multicast an error from the source to multiple observers', () => { + testScheduler.run(({ cold, hot, expectObservable, expectSubscriptions }) => { + const source = cold(' -1-2-3----4-#'); + const sourceSubs = ' ^-----------!'; + const published = source.pipe(publishReplay(1)) as ConnectableObservable; + const subscriber1 = hot('a| ').pipe(mergeMapTo(published)); + const expected1 = ' -1-2-3----4-#'; + const subscriber2 = hot('----b| ').pipe(mergeMapTo(published)); + const expected2 = ' ----23----4-#'; + const subscriber3 = hot('--------c| ').pipe(mergeMapTo(published)); + const expected3 = ' --------3-4-#'; expectObservable(subscriber1).toBe(expected1); expectObservable(subscriber2).toBe(expected2); expectObservable(subscriber3).toBe(expected3); expectSubscriptions(source.subscriptions).toBe(sourceSubs); - }); - it('should disconnect when last subscriber unsubscribes', () => { - const source = cold( '-1-2-3----4-|'); - const sourceSubs = ' ^ ! '; - const replayed = source.pipe( - publishReplay(1), - refCount() - ); - const subscriber1 = hot(' a| ').pipe(mergeMapTo(replayed)); - const unsub1 = ' ! '; - const expected1 = ' -1-2-3-- '; - const subscriber2 = hot(' b| ').pipe(mergeMapTo(replayed)); - const unsub2 = ' ! '; - const expected2 = ' 23---- '; - - expectObservable(subscriber1, unsub1).toBe(expected1); - expectObservable(subscriber2, unsub2).toBe(expected2); - expectSubscriptions(source.subscriptions).toBe(sourceSubs); + published.connect(); }); + }); - it('should NOT be retryable', () => { - const source = cold('-1-2-3----4-#'); - // const sourceSubs = '^ !'; - const published = source.pipe( - publishReplay(1), - refCount(), - retry(3) - ); + it('should multicast the same values to multiple observers, but is unsubscribed explicitly and early', () => { + testScheduler.run(({ cold, hot, expectObservable, expectSubscriptions }) => { + const source = cold(' -1-2-3----4-|'); + const sourceSubs = ' ^--------! '; + const published = source.pipe(publishReplay(1)) as ConnectableObservable; + const unsub = ' ---------u '; const subscriber1 = hot('a| ').pipe(mergeMapTo(published)); - const expected1 = '-1-2-3----4-(444#)'; - const subscriber2 = hot(' b| ').pipe(mergeMapTo(published)); - const expected2 = ' 23----4-(444#)'; - const subscriber3 = hot(' c| ').pipe(mergeMapTo(published)); - const expected3 = ' 3-4-(444#)'; + const expected1 = ' -1-2-3---- '; + const subscriber2 = hot('----b| ').pipe(mergeMapTo(published)); + const expected2 = ' ----23---- '; + const subscriber3 = hot('--------c| ').pipe(mergeMapTo(published)); + const expected3 = ' --------3- '; expectObservable(subscriber1).toBe(expected1); expectObservable(subscriber2).toBe(expected2); expectObservable(subscriber3).toBe(expected3); - // expectSubscriptions(source.subscriptions).toBe(sourceSubs); + expectSubscriptions(source.subscriptions).toBe(sourceSubs); + + // Set up unsubscription action + let connection: Subscription; + expectObservable( + hot(unsub).pipe( + tap(() => { + connection.unsubscribe(); + }) + ) + ).toBe(unsub); + + connection = published.connect(); }); + }); - it('should NOT be repeatable', () => { - const source = cold('-1-2-3----4-|'); - // const sourceSubs = '^ !'; + it('should not break unsubscription chains when result is unsubscribed explicitly', () => { + testScheduler.run(({ cold, hot, expectObservable, expectSubscriptions }) => { + const source = cold(' -1-2-3----4-|'); + const sourceSubs = ' ^--------! '; const published = source.pipe( - publishReplay(1), - refCount(), - repeat(3) - ); + mergeMap((x) => of(x)), + publishReplay(1) + ) as ConnectableObservable; const subscriber1 = hot('a| ').pipe(mergeMapTo(published)); - const expected1 = '-1-2-3----4-(44|)'; - const subscriber2 = hot(' b| ').pipe(mergeMapTo(published)); - const expected2 = ' 23----4-(44|)'; - const subscriber3 = hot(' c| ').pipe(mergeMapTo(published)); - const expected3 = ' 3-4-(44|)'; + const expected1 = ' -1-2-3---- '; + const subscriber2 = hot('----b| ').pipe(mergeMapTo(published)); + const expected2 = ' ----23---- '; + const subscriber3 = hot('--------c| ').pipe(mergeMapTo(published)); + const expected3 = ' --------3- '; + const unsub = ' ---------u '; expectObservable(subscriber1).toBe(expected1); expectObservable(subscriber2).toBe(expected2); expectObservable(subscriber3).toBe(expected3); - // expectSubscriptions(source.subscriptions).toBe(sourceSubs); + expectSubscriptions(source.subscriptions).toBe(sourceSubs); + + // Set up unsubscription action + let connection: Subscription; + expectObservable( + hot(unsub).pipe( + tap(() => { + connection.unsubscribe(); + }) + ) + ).toBe(unsub); + + connection = published.connect(); + }); + }); + + describe('with refCount()', () => { + it('should connect when first subscriber subscribes', () => { + testScheduler.run(({ cold, hot, expectObservable, expectSubscriptions }) => { + const source = cold(' -1-2-3----4-|'); + const sourceSubs = ' ---^-----------!'; + const replayed = source.pipe(publishReplay(1), refCount()); + const subscriber1 = hot('---a| ').pipe(mergeMapTo(replayed)); + const expected1 = ' ----1-2-3----4-|'; + const subscriber2 = hot('-------b| ').pipe(mergeMapTo(replayed)); + const expected2 = ' -------23----4-|'; + const subscriber3 = hot('-----------c| ').pipe(mergeMapTo(replayed)); + const expected3 = ' -----------3-4-|'; + + expectObservable(subscriber1).toBe(expected1); + expectObservable(subscriber2).toBe(expected2); + expectObservable(subscriber3).toBe(expected3); + expectSubscriptions(source.subscriptions).toBe(sourceSubs); + }); + }); + + it('should disconnect when last subscriber unsubscribes', () => { + testScheduler.run(({ cold, hot, expectObservable, expectSubscriptions }) => { + const source = cold(' -1-2-3----4-|'); + const sourceSubs = ' ---^--------! '; + const replayed = source.pipe(publishReplay(1), refCount()); + const subscriber1 = hot('---a| ').pipe(mergeMapTo(replayed)); + const unsub1 = ' ----------! '; + const expected1 = ' ----1-2-3-- '; + const subscriber2 = hot('-------b| ').pipe(mergeMapTo(replayed)); + const unsub2 = ' ------------! '; + const expected2 = ' -------23---- '; + + expectObservable(subscriber1, unsub1).toBe(expected1); + expectObservable(subscriber2, unsub2).toBe(expected2); + expectSubscriptions(source.subscriptions).toBe(sourceSubs); + }); + }); + + it('should NOT be retryable', () => { + testScheduler.run(({ cold, hot, expectObservable, expectSubscriptions }) => { + const source = cold(' -1-2-3----4-# '); + const sourceSubs = ' ^-----------! '; + const published = source.pipe(publishReplay(1), refCount(), retry(3)); + const subscriber1 = hot('a| ').pipe(mergeMapTo(published)); + const expected1 = ' -1-2-3----4-(444#)'; + const subscriber2 = hot('----b| ').pipe(mergeMapTo(published)); + const expected2 = ' ----23----4-(444#)'; + const subscriber3 = hot('--------c| ').pipe(mergeMapTo(published)); + const expected3 = ' --------3-4-(444#)'; + + expectObservable(subscriber1).toBe(expected1); + expectObservable(subscriber2).toBe(expected2); + expectObservable(subscriber3).toBe(expected3); + expectSubscriptions(source.subscriptions).toBe(sourceSubs); + }); + }); + + it('should NOT be repeatable', () => { + testScheduler.run(({ cold, hot, expectObservable, expectSubscriptions }) => { + const source = cold(' -1-2-3----4-| '); + const sourceSubs = ' ^-----------! '; + const published = source.pipe(publishReplay(1), refCount(), repeat(3)); + const subscriber1 = hot('a| ').pipe(mergeMapTo(published)); + const expected1 = ' -1-2-3----4-(44|)'; + const subscriber2 = hot('----b| ').pipe(mergeMapTo(published)); + const expected2 = ' ----23----4-(44|)'; + const subscriber3 = hot('--------c| ').pipe(mergeMapTo(published)); + const expected3 = ' --------3-4-(44|)'; + + expectObservable(subscriber1).toBe(expected1); + expectObservable(subscriber2).toBe(expected2); + expectObservable(subscriber3).toBe(expected3); + expectSubscriptions(source.subscriptions).toBe(sourceSubs); + }); }); }); @@ -300,8 +323,7 @@ describe('publishReplay operator', () => { done(); }); - it('should emit replayed values and resubscribe to the source when ' + - 'reconnected without source completion', () => { + it('should emit replayed values and resubscribe to the source when reconnected without source completion', () => { const results1: number[] = []; const results2: number[] = []; let subscriptions = 0; @@ -371,147 +393,170 @@ describe('publishReplay operator', () => { expect(results2).to.deep.equal([]); expect(subscriptions).to.equal(1); - connectable.subscribe({ next: (x) => { - results2.push(x); - }, error: (x) => { - done(new Error('should not be called')); - }, complete: () => { - expect(results2).to.deep.equal([3, 4]); - done(); - } }); + connectable.subscribe({ + next: (x) => { + results2.push(x); + }, + error: (x) => { + done(new Error('should not be called')); + }, + complete: () => { + expect(results2).to.deep.equal([3, 4]); + done(); + }, + }); }); it('should multicast an empty source', () => { - const source = cold('|'); - const sourceSubs = '(^!)'; - const published = source.pipe(publishReplay(1)) as ConnectableObservable; - const expected = '|'; + testScheduler.run(({ cold, expectObservable, expectSubscriptions }) => { + const source = cold('| '); + const sourceSubs = ' (^!)'; + const published = source.pipe(publishReplay(1)) as ConnectableObservable; + const expected = ' |'; - expectObservable(published).toBe(expected); - expectSubscriptions(source.subscriptions).toBe(sourceSubs); + expectObservable(published).toBe(expected); + expectSubscriptions(source.subscriptions).toBe(sourceSubs); - published.connect(); + published.connect(); + }); }); it('should multicast a never source', () => { - const source = cold('-'); - const sourceSubs = '^'; + testScheduler.run(({ cold, expectObservable, expectSubscriptions }) => { + const source = cold('-'); + const sourceSubs = ' ^'; - const published = source.pipe(publishReplay(1)) as ConnectableObservable; - const expected = '-'; + const published = source.pipe(publishReplay(1)) as ConnectableObservable; + const expected = ' -'; - expectObservable(published).toBe(expected); - expectSubscriptions(source.subscriptions).toBe(sourceSubs); + expectObservable(published).toBe(expected); + expectSubscriptions(source.subscriptions).toBe(sourceSubs); - published.connect(); + published.connect(); + }); }); it('should multicast a throw source', () => { - const source = cold('#'); - const sourceSubs = '(^!)'; - const published = source.pipe(publishReplay(1)) as ConnectableObservable; - const expected = '#'; + testScheduler.run(({ cold, expectObservable, expectSubscriptions }) => { + const source = cold('# '); + const sourceSubs = ' (^!)'; + const published = source.pipe(publishReplay(1)) as ConnectableObservable; + const expected = ' # '; - expectObservable(published).toBe(expected); - expectSubscriptions(source.subscriptions).toBe(sourceSubs); + expectObservable(published).toBe(expected); + expectSubscriptions(source.subscriptions).toBe(sourceSubs); - published.connect(); + published.connect(); + }); }); it('should mirror a simple source Observable with selector', () => { - const values = {a: 2, b: 4, c: 6, d: 8}; - const selector = (observable: Observable) => observable.pipe(map(v => 2 * +v)); - const source = cold('--1-2---3-4---|'); - const sourceSubs = '^ !'; - const published = source.pipe(publishReplay(1, Infinity, selector)); - const expected = '--a-b---c-d---|'; - - expectObservable(published).toBe(expected, values); - expectSubscriptions(source.subscriptions).toBe(sourceSubs); + testScheduler.run(({ cold, expectObservable, expectSubscriptions }) => { + const values = { a: 2, b: 4, c: 6, d: 8 }; + const selector = (observable: Observable) => observable.pipe(map((v) => 2 * +v)); + const source = cold('--1-2---3-4---|'); + const sourceSubs = ' ^-------------!'; + const published = source.pipe(publishReplay(1, Infinity, selector)); + const expected = ' --a-b---c-d---|'; + + expectObservable(published).toBe(expected, values); + expectSubscriptions(source.subscriptions).toBe(sourceSubs); + }); }); it('should EMIT an error when the selector throws an exception', () => { - const error = "It's broken"; - const selector = () => { - throw error; - }; - const source = cold('--1-2---3-4---|'); - const published = source.pipe(publishReplay(1, Infinity, selector)); - - expectObservable(published).toBe('#', undefined, "It's broken"); + testScheduler.run(({ cold, expectObservable, expectSubscriptions }) => { + const error = "It's broken"; + const selector = () => { + throw error; + }; + const source = cold('--1-2---3-4---|'); + const published = source.pipe(publishReplay(1, Infinity, selector)); + const expected = ' # '; + + expectObservable(published).toBe(expected, undefined, "It's broken"); + }); }); it('should emit an error when the selector returns an Observable that emits an error', () => { - const error = "It's broken"; - const innerObservable = cold('--5-6----#', undefined, error); - const selector = (observable: Observable) => observable.pipe(mergeMapTo(innerObservable)); - const source = cold('--1--2---3---|'); - const sourceSubs = '^ !'; - const published = source.pipe(publishReplay(1, Infinity, selector)); - const expected = '----5-65-6-#'; - - expectObservable(published).toBe(expected, undefined, error); - expectSubscriptions(source.subscriptions).toBe(sourceSubs); + testScheduler.run(({ cold, expectObservable, expectSubscriptions }) => { + const error = "It's broken"; + const innerObservable = cold('--5-6----#', undefined, error); + const selector = (observable: Observable) => observable.pipe(mergeMapTo(innerObservable)); + const source = cold('--1--2---3---|'); + const sourceSubs = ' ^----------! '; + const published = source.pipe(publishReplay(1, Infinity, selector)); + const expected = ' ----5-65-6-# '; + + expectObservable(published).toBe(expected, undefined, error); + expectSubscriptions(source.subscriptions).toBe(sourceSubs); + }); }); it('should terminate immediately when the selector returns an empty Observable', () => { - const selector = () => EMPTY; - const source = cold('--1--2---3---|'); - const sourceSubs = '(^!)'; - const published = source.pipe(publishReplay(1, Infinity, selector)); - const expected = '|'; - - expectObservable(published).toBe(expected); - expectSubscriptions(source.subscriptions).toBe(sourceSubs); + testScheduler.run(({ cold, expectObservable, expectSubscriptions }) => { + const selector = () => EMPTY; + const source = cold('--1--2---3---|'); + const sourceSubs = ' (^!) '; + const published = source.pipe(publishReplay(1, Infinity, selector)); + const expected = ' | '; + + expectObservable(published).toBe(expected); + expectSubscriptions(source.subscriptions).toBe(sourceSubs); + }); }); it('should not emit and should not complete/error when the selector returns never', () => { - const selector = () => NEVER; - const source = cold('-'); - const sourceSubs = '^'; - const published = source.pipe(publishReplay(1, Infinity, selector)); - const expected = '-'; - - expectObservable(published).toBe(expected); - expectSubscriptions(source.subscriptions).toBe(sourceSubs); + testScheduler.run(({ cold, expectObservable, expectSubscriptions }) => { + const selector = () => NEVER; + const source = cold('-'); + const sourceSubs = ' ^'; + const published = source.pipe(publishReplay(1, Infinity, selector)); + const expected = ' -'; + + expectObservable(published).toBe(expected); + expectSubscriptions(source.subscriptions).toBe(sourceSubs); + }); }); it('should emit error when the selector returns Observable.throw', () => { - const error = "It's broken"; - const selector = () => throwError(() => (error)); - const source = cold('--1--2---3---|'); - const sourceSubs = '(^!)'; - const published = source.pipe(publishReplay(1, Infinity, selector)); - const expected = '#'; - - expectObservable(published).toBe(expected, undefined, error); - expectSubscriptions(source.subscriptions).toBe(sourceSubs); + testScheduler.run(({ cold, expectObservable, expectSubscriptions }) => { + const error = "It's broken"; + const selector = () => throwError(() => error); + const source = cold('--1--2---3---|'); + const sourceSubs = ' (^!) '; + const published = source.pipe(publishReplay(1, Infinity, selector)); + const expected = ' # '; + + expectObservable(published).toBe(expected, undefined, error); + expectSubscriptions(source.subscriptions).toBe(sourceSubs); + }); }); it('should be referentially-transparent', () => { - const source1 = cold('-1-2-3-4-5-|'); - const source1Subs = '^ !'; - const expected1 = '-1-2-3-4-5-|'; - const source2 = cold('-6-7-8-9-0-|'); - const source2Subs = '^ !'; - const expected2 = '-6-7-8-9-0-|'; - - // Calls to the _operator_ must be referentially-transparent. - const partialPipeLine = pipe( - publishReplay(1) - ); - - // The non-referentially-transparent publishing occurs within the _operator function_ - // returned by the _operator_ and that happens when the complete pipeline is composed. - const published1 = source1.pipe(partialPipeLine) as ConnectableObservable; - const published2 = source2.pipe(partialPipeLine) as ConnectableObservable; - - expectObservable(published1).toBe(expected1); - expectSubscriptions(source1.subscriptions).toBe(source1Subs); - expectObservable(published2).toBe(expected2); - expectSubscriptions(source2.subscriptions).toBe(source2Subs); - - published1.connect(); - published2.connect(); + testScheduler.run(({ cold, hot, expectObservable, expectSubscriptions }) => { + const source1 = cold('-1-2-3-4-5-|'); + const source1Subs = ' ^----------!'; + const expected1 = ' -1-2-3-4-5-|'; + const source2 = cold('-6-7-8-9-0-|'); + const source2Subs = ' ^----------!'; + const expected2 = ' -6-7-8-9-0-|'; + + // Calls to the _operator_ must be referentially-transparent. + const partialPipeLine = pipe(publishReplay(1)); + + // The non-referentially-transparent publishing occurs within the _operator function_ + // returned by the _operator_ and that happens when the complete pipeline is composed. + const published1 = source1.pipe(partialPipeLine) as ConnectableObservable; + const published2 = source2.pipe(partialPipeLine) as ConnectableObservable; + + expectObservable(published1).toBe(expected1); + expectSubscriptions(source1.subscriptions).toBe(source1Subs); + expectObservable(published2).toBe(expected2); + expectSubscriptions(source2.subscriptions).toBe(source2Subs); + + published1.connect(); + published2.connect(); + }); }); });