From 82184b2a0a22517de6cb9bddc583eeb0caaf5ed0 Mon Sep 17 00:00:00 2001 From: Ben Lesh Date: Fri, 19 May 2023 15:50:08 -0500 Subject: [PATCH] fix(forkJoin): the first empty source will now cause an EmptyError BREAKING CHANGE: `forkJoin` will no longer just complete if a source completes without a value. Instead you will get an error of `EmptyError`. If you want to complete in this case, add `catchError(() => EMPTY)` to the forkJoin... however, if you just want to default the sources of your `forkJoin` to some other value if they're empty, apply the `defaultIfEmpty` operator. for example, `forkJoin(sources.map(defaultIfEmpty(null)))`. resolves #5561 --- spec/observables/forkJoin-spec.ts | 70 +++++++++++++++++------------ src/internal/observable/forkJoin.ts | 7 ++- 2 files changed, 46 insertions(+), 31 deletions(-) diff --git a/spec/observables/forkJoin-spec.ts b/spec/observables/forkJoin-spec.ts index 8661fbf0e4..b53fa79e71 100644 --- a/spec/observables/forkJoin-spec.ts +++ b/spec/observables/forkJoin-spec.ts @@ -1,6 +1,6 @@ /** @prettier */ import { expect } from 'chai'; -import { finalize, forkJoin, map, of, timer } from 'rxjs'; +import { EmptyError, finalize, forkJoin, map, of, timer } from 'rxjs'; import { lowerCaseO } from '../helpers/test-helper'; import { TestScheduler } from 'rxjs/testing'; import { observableMatcher } from '../helpers/observableMatcher'; @@ -123,9 +123,9 @@ describe('forkJoin', () => { const s2 = hot(' (b|)'); const s3 = lowerCaseO(); const e1 = forkJoin([s1, s2, s3]); - const expected = '|'; + const expected = '#'; - expectObservable(e1).toBe(expected); + expectObservable(e1).toBe(expected, undefined, new EmptyError()); }); }); @@ -158,32 +158,32 @@ describe('forkJoin', () => { const s2 = hot(' (b|)'); const s3 = hot(' ------------------|'); const e1 = forkJoin([s1, s2, s3]); - const expected = '------------------|'; + const expected = '------------------#'; - expectObservable(e1).toBe(expected); + expectObservable(e1).toBe(expected, undefined, new EmptyError()); }); }); - it('should complete early if any of source is empty and completes before than others', () => { + it('should error with EmptyError if any of source is empty and completes before than others', () => { rxTestScheduler.run(({ hot, expectObservable }) => { const s1 = hot(' --a--b--c--d--|'); const s2 = hot(' (b|)'); const s3 = hot(' ---------|'); const e1 = forkJoin([s1, s2, s3]); - const expected = '---------|'; + const expected = '---------#'; - expectObservable(e1).toBe(expected); + expectObservable(e1).toBe(expected, undefined, new EmptyError()); }); }); - it('should complete when all sources are empty', () => { + it('should error on the first empty source', () => { rxTestScheduler.run(({ hot, expectObservable }) => { const s1 = hot(' --------------|'); const s2 = hot(' ---------|'); const e1 = forkJoin([s1, s2]); - const expected = '---------|'; + const expected = '---------#'; - expectObservable(e1).toBe(expected); + expectObservable(e1).toBe(expected, undefined, new EmptyError()); }); }); @@ -212,9 +212,9 @@ describe('forkJoin', () => { const s1 = hot(' --------------'); const s2 = hot(' ------|'); const e1 = forkJoin([s1, s2]); - const expected = '------|'; + const expected = '------#'; - expectObservable(e1).toBe(expected); + expectObservable(e1).toBe(expected, undefined, new EmptyError()); }); }); @@ -371,16 +371,16 @@ describe('forkJoin', () => { }); }); - it('should accept empty lowercase-o observables', () => { + it('should error for empty lowercase-o observables', () => { rxTestScheduler.run(({ hot, expectObservable }) => { const e1 = forkJoin({ foo: hot(' --a--b--c--d--|'), bar: hot(' (b|)'), baz: lowerCaseO(), }); - const expected = '|'; + const expected = '#'; - expectObservable(e1).toBe(expected); + expectObservable(e1).toBe(expected, undefined, new EmptyError()); }); }); @@ -409,42 +409,41 @@ describe('forkJoin', () => { }); }); - it('should not emit if any of source observable is empty', () => { + it('should error with EmptyError if any of source observable is empty', () => { rxTestScheduler.run(({ hot, expectObservable }) => { const e1 = forkJoin({ foo: hot(' --a--b--c--d--|'), bar: hot(' (b|)'), baz: hot(' ------------------|'), }); - const expected = '------------------|'; + const expected = '------------------#'; - expectObservable(e1).toBe(expected); + expectObservable(e1).toBe(expected, undefined, new EmptyError()); }); }); - // TODO: This seems odd. Filed an issue for discussion here: https://github.com/ReactiveX/rxjs/issues/5561 - it('should complete early if any of source is empty and completes before than others', () => { + it('should error with EmptyError if any of source is empty and completes before than others', () => { rxTestScheduler.run(({ hot, expectObservable }) => { const e1 = forkJoin({ foo: hot(' --a--b--c--d--|'), bar: hot(' (b|)'), baz: hot(' ---------|'), }); - const expected = '---------|'; + const expected = '---------#'; - expectObservable(e1).toBe(expected); + expectObservable(e1).toBe(expected, undefined, new EmptyError()); }); }); - it('should complete when all sources are empty', () => { + it('should error when the first source returns that is empty, even if all sources are empty', () => { rxTestScheduler.run(({ hot, expectObservable }) => { const e1 = forkJoin({ foo: hot(' --------------|'), bar: hot(' ---------|'), }); - const expected = '---------|'; + const expected = '---------#'; - expectObservable(e1).toBe(expected); + expectObservable(e1).toBe(expected, undefined, new EmptyError()); }); }); @@ -471,15 +470,15 @@ describe('forkJoin', () => { }); }); - it('should complete when one of the sources never completes but other completes without values', () => { + it('should error when one of the sources never completes but other completes without values', () => { rxTestScheduler.run(({ hot, expectObservable }) => { const e1 = forkJoin({ foo: hot(' --------------'), bar: hot(' ------|'), }); - const expected = '------|'; + const expected = '------#'; - expectObservable(e1).toBe(expected); + expectObservable(e1).toBe(expected, undefined, new EmptyError()); }); }); @@ -595,5 +594,18 @@ describe('forkJoin', () => { }, }); }); + + it('should error on early completion of an inner observable', () => { + rxTestScheduler.run(({ hot, expectObservable }) => { + const e1 = forkJoin({ + foo: hot(' --a--b--c--d--|'), + bar: hot(' ---|'), + baz: hot(' --1--2--3--|'), + }); + const expected = '---#'; + + expectObservable(e1).toBe(expected, null, new EmptyError()); + }); + }); }); }); diff --git a/src/internal/observable/forkJoin.ts b/src/internal/observable/forkJoin.ts index 0ffb9c81ee..e05af97cbd 100644 --- a/src/internal/observable/forkJoin.ts +++ b/src/internal/observable/forkJoin.ts @@ -7,6 +7,7 @@ import { operate } from '../Subscriber'; import { mapOneOrManyArgs } from '../util/mapOneOrManyArgs'; import { createObject } from '../util/createObject'; import { AnyCatcher } from '../AnyCatcher'; +import { EmptyError } from '../util/EmptyError'; // forkJoin(any) // We put this first because we need to catch cases where the user has supplied @@ -169,10 +170,12 @@ export function forkJoin(...args: any[]): Observable { complete: () => remainingCompletions--, finalize: () => { if (!remainingCompletions || !hasValue) { - if (!remainingEmissions) { + if (remainingEmissions === 0) { destination.next(keys ? createObject(keys, values) : values); + destination.complete(); + } else { + destination.error(new EmptyError()); } - destination.complete(); } }, })