From 260d52af35698b4d99190691d1ff90153e024666 Mon Sep 17 00:00:00 2001 From: peaBerberian Date: Sun, 19 Aug 2018 14:03:02 +0200 Subject: [PATCH 1/6] fix(switchMap): stop listening to a synchronous inner-obervable when unsubscribed --- spec/operators/switchMap-spec.ts | 29 +++++++++++++++++++++++++++-- src/internal/operators/switchMap.ts | 4 +++- 2 files changed, 30 insertions(+), 3 deletions(-) diff --git a/spec/operators/switchMap-spec.ts b/spec/operators/switchMap-spec.ts index 4eabe0ae3d..779b4ebf55 100644 --- a/spec/operators/switchMap-spec.ts +++ b/spec/operators/switchMap-spec.ts @@ -1,7 +1,7 @@ import { expect } from 'chai'; import { hot, cold, expectObservable, expectSubscriptions } from '../helpers/marble-testing'; -import { switchMap, mergeMap, map } from 'rxjs/operators'; -import { of, Observable } from 'rxjs'; +import { switchMap, mergeMap, map, takeWhile } from 'rxjs/operators'; +import { concat, defer, of, Observable } from 'rxjs'; declare function asDiagram(arg: string): Function; @@ -169,6 +169,31 @@ describe('switchMap', () => { expectSubscriptions(e1.subscriptions).toBe(e1subs); }); + it('should stop listening to a synchronous observable when unsubscribed', () => { + const sideEffects: number[] = []; + const synchronousObservable = concat( + defer(() => { + sideEffects.push(1); + return of(1); + }), + defer(() => { + sideEffects.push(2); + return of(2); + }), + defer(() => { + sideEffects.push(3); + return of(3); + }) + ); + + of(null).pipe( + switchMap(() => synchronousObservable), + takeWhile((x) => x != 2) // unsubscribe at the second side-effect + ).subscribe(() => { /* noop */ }); + + expect(sideEffects).to.deep.equal([1, 2]); + }); + it('should switch inner cold observables, inner never completes', () => { const x = cold( '--a--b--c--d--e--| '); const xsubs = ' ^ ! '; diff --git a/src/internal/operators/switchMap.ts b/src/internal/operators/switchMap.ts index 136fae3d43..ee83978c08 100644 --- a/src/internal/operators/switchMap.ts +++ b/src/internal/operators/switchMap.ts @@ -113,7 +113,9 @@ class SwitchMapSubscriber extends OuterSubscriber { if (innerSubscription) { innerSubscription.unsubscribe(); } - this.add(this.innerSubscription = subscribeToResult(this, result, value, index)); + const innerSubscriber = new InnerSubscriber(this, undefined, undefined); + this.add(innerSubscriber); + this.innerSubscription = subscribeToResult(this, result, value, index, innerSubscriber); } protected _complete(): void { From ee1a339dfd02f1729911d38e0c9b1fb640197074 Mon Sep 17 00:00:00 2001 From: peaBerberian Date: Sun, 19 Aug 2018 14:03:33 +0200 Subject: [PATCH 2/6] fix(exhaustMap): stop listening to a synchronous inner-obervable when unsubscribed --- spec/operators/exhaustMap-spec.ts | 29 ++++++++++++++++++++++++++-- src/internal/operators/exhaustMap.ts | 17 +++++++++++----- 2 files changed, 39 insertions(+), 7 deletions(-) diff --git a/spec/operators/exhaustMap-spec.ts b/spec/operators/exhaustMap-spec.ts index ee588e92c2..2819e57b32 100644 --- a/spec/operators/exhaustMap-spec.ts +++ b/spec/operators/exhaustMap-spec.ts @@ -1,6 +1,6 @@ import { hot, cold, expectObservable, expectSubscriptions } from '../helpers/marble-testing'; -import { Observable, of, from } from 'rxjs'; -import { exhaustMap, mergeMap } from 'rxjs/operators'; +import { concat, defer, Observable, of, from } from 'rxjs'; +import { exhaustMap, mergeMap, takeWhile } from 'rxjs/operators'; import { expect } from 'chai'; declare function asDiagram(arg: string): Function; @@ -202,6 +202,31 @@ describe('exhaustMap', () => { expectSubscriptions(e1.subscriptions).toBe(e1subs); }); + it('should stop listening to a synchronous observable when unsubscribed', () => { + const sideEffects: number[] = []; + const synchronousObservable = concat( + defer(() => { + sideEffects.push(1); + return of(1); + }), + defer(() => { + sideEffects.push(2); + return of(2); + }), + defer(() => { + sideEffects.push(3); + return of(3); + }) + ); + + of(null).pipe( + exhaustMap(() => synchronousObservable), + takeWhile((x) => x != 2) // unsubscribe at the second side-effect + ).subscribe(() => { /* noop */ }); + + expect(sideEffects).to.deep.equal([1, 2]); + }); + it('should switch inner cold observables, inner never completes', () => { const x = cold( '--a--b--c--| '); const xsubs = ' ^ ! '; diff --git a/src/internal/operators/exhaustMap.ts b/src/internal/operators/exhaustMap.ts index 07773ecacb..67a6ee8bab 100644 --- a/src/internal/operators/exhaustMap.ts +++ b/src/internal/operators/exhaustMap.ts @@ -106,15 +106,22 @@ class ExhaustMapSubscriber extends OuterSubscriber { } private tryNext(value: T): void { + let result: ObservableInput; const index = this.index++; - const destination = this.destination; try { - const result = this.project(value, index); - this.hasSubscription = true; - this.add(subscribeToResult(this, result, value, index)); + result = this.project(value, index); } catch (err) { - destination.error(err); + this.destination.error(err); + return; } + this.hasSubscription = true; + this._innerSub(result, value, index); + } + + private _innerSub(result: ObservableInput, value: T, index: number): void { + const innerSubscriber = new InnerSubscriber(this, undefined, undefined); + this.add(innerSubscriber); + subscribeToResult(this, result, value, index, innerSubscriber); } protected _complete(): void { From 456ef33a2def91cec15db06e5d56c42e0dfae81c Mon Sep 17 00:00:00 2001 From: peaBerberian Date: Sun, 19 Aug 2018 14:04:03 +0200 Subject: [PATCH 3/6] fix(catchError): stop listening to a synchronous inner-obervable when unsubscribed --- spec/operators/catch-spec.ts | 29 ++++++++++++++++++++++++++-- src/internal/operators/catchError.ts | 5 ++++- 2 files changed, 31 insertions(+), 3 deletions(-) diff --git a/spec/operators/catch-spec.ts b/spec/operators/catch-spec.ts index 7994f67c9f..12cb5ea77f 100644 --- a/spec/operators/catch-spec.ts +++ b/spec/operators/catch-spec.ts @@ -1,6 +1,6 @@ import { expect } from 'chai'; -import { concat, Observable, of, throwError, EMPTY, from } from 'rxjs'; -import { catchError, map, mergeMap } from 'rxjs/operators'; +import { concat, defer, Observable, of, throwError, EMPTY, from } from 'rxjs'; +import { catchError, map, mergeMap, takeWhile } from 'rxjs/operators'; import { TestScheduler } from 'rxjs/testing'; import * as sinon from 'sinon'; import { createObservableInputs } from '../helpers/test-helper'; @@ -121,6 +121,31 @@ describe('catchError operator', () => { expectSubscriptions(e2.subscriptions).toBe(e2subs); }); + it('should stop listening to a synchronous observable when unsubscribed', () => { + const sideEffects: number[] = []; + const synchronousObservable = concat( + defer(() => { + sideEffects.push(1); + return of(1); + }), + defer(() => { + sideEffects.push(2); + return of(2); + }), + defer(() => { + sideEffects.push(3); + return of(3); + }) + ); + + throwError(new Error('Some error')).pipe( + catchError(() => synchronousObservable), + takeWhile((x) => x != 2) // unsubscribe at the second side-effect + ).subscribe(() => { /* noop */ }); + + expect(sideEffects).to.deep.equal([1, 2]); + }); + it('should catch error and replace it with a hot Observable', () => { const e1 = hot('--a--b--# '); const e1subs = '^ ! '; diff --git a/src/internal/operators/catchError.ts b/src/internal/operators/catchError.ts index 6f7aeace90..784e299d03 100644 --- a/src/internal/operators/catchError.ts +++ b/src/internal/operators/catchError.ts @@ -3,6 +3,7 @@ import {Subscriber} from '../Subscriber'; import {Observable} from '../Observable'; import {OuterSubscriber} from '../OuterSubscriber'; +import { InnerSubscriber } from '../InnerSubscriber'; import {subscribeToResult} from '../util/subscribeToResult'; import {ObservableInput, OperatorFunction, MonoTypeOperatorFunction} from '../types'; @@ -121,7 +122,9 @@ class CatchSubscriber extends OuterSubscriber { return; } this._unsubscribeAndRecycle(); - this.add(subscribeToResult(this, result)); + const innerSubscriber = new InnerSubscriber(this, undefined, undefined); + this.add(innerSubscriber); + subscribeToResult(this, result, undefined, undefined, innerSubscriber); } } } From c4002f3808a2a67bd2fb53b91ef0881d197e5eba Mon Sep 17 00:00:00 2001 From: peaBerberian Date: Sun, 19 Aug 2018 14:04:32 +0200 Subject: [PATCH 4/6] fix(mergeScan): stop listening to a synchronous inner-obervable when unsubscribed --- spec/operators/mergeScan-spec.ts | 30 +++++++++++++++++++++++++++-- src/internal/operators/mergeScan.ts | 4 +++- 2 files changed, 31 insertions(+), 3 deletions(-) diff --git a/spec/operators/mergeScan-spec.ts b/spec/operators/mergeScan-spec.ts index 7d41f39cbe..bdd8f3fee6 100644 --- a/spec/operators/mergeScan-spec.ts +++ b/spec/operators/mergeScan-spec.ts @@ -1,7 +1,8 @@ import { hot, cold, expectObservable, expectSubscriptions } from '../helpers/marble-testing'; import { TestScheduler } from 'rxjs/testing'; -import { of, EMPTY, NEVER, concat, throwError } from 'rxjs'; -import { mergeScan, delay, mergeMap } from 'rxjs/operators'; +import { of, defer, EMPTY, NEVER, concat, throwError } from 'rxjs'; +import { mergeScan, delay, mergeMap, takeWhile } from 'rxjs/operators'; +import { expect } from 'chai'; declare const rxTestScheduler: TestScheduler; /** @test {mergeScan} */ @@ -136,6 +137,31 @@ describe('mergeScan', () => { expectSubscriptions(e1.subscriptions).toBe(e1subs); }); + it('should stop listening to a synchronous observable when unsubscribed', () => { + const sideEffects: number[] = []; + const synchronousObservable = concat( + defer(() => { + sideEffects.push(1); + return of(1); + }), + defer(() => { + sideEffects.push(2); + return of(2); + }), + defer(() => { + sideEffects.push(3); + return of(3); + }) + ); + + of(null).pipe( + mergeScan(() => synchronousObservable, 0), + takeWhile((x) => x != 2) // unsubscribe at the second side-effect + ).subscribe(() => { /* noop */ }); + + expect(sideEffects).to.deep.equal([1, 2]); + }); + it('should handle errors in the projection function', () => { const e1 = hot('--a--^--b--c--d--e--f--g--|'); const e1subs = '^ !'; diff --git a/src/internal/operators/mergeScan.ts b/src/internal/operators/mergeScan.ts index ba7d2165d6..5c34d03a31 100644 --- a/src/internal/operators/mergeScan.ts +++ b/src/internal/operators/mergeScan.ts @@ -100,7 +100,9 @@ export class MergeScanSubscriber extends OuterSubscriber { } private _innerSub(ish: any, value: T, index: number): void { - this.add(subscribeToResult(this, ish, value, index)); + const innerSubscriber = new InnerSubscriber(this, undefined, undefined); + this.add(innerSubscriber); + subscribeToResult(this, ish, value, index, innerSubscriber); } protected _complete(): void { From 1d142776f6948c66130714b466058c9977d148d1 Mon Sep 17 00:00:00 2001 From: peaBerberian Date: Sun, 19 Aug 2018 14:05:07 +0200 Subject: [PATCH 5/6] fix(onErrorResumeNext): stop listening to a synchronous inner-obervable when unsubscribed --- spec/operators/onErrorResumeNext-spec.ts | 29 +++++++++++++++++++-- src/internal/operators/onErrorResumeNext.ts | 4 ++- 2 files changed, 30 insertions(+), 3 deletions(-) diff --git a/spec/operators/onErrorResumeNext-spec.ts b/spec/operators/onErrorResumeNext-spec.ts index f1fa1b2cdf..b7becf9b45 100644 --- a/spec/operators/onErrorResumeNext-spec.ts +++ b/spec/operators/onErrorResumeNext-spec.ts @@ -1,7 +1,7 @@ import { expect } from 'chai'; import { hot, cold, expectObservable, expectSubscriptions } from '../helpers/marble-testing'; -import { onErrorResumeNext } from 'rxjs/operators'; -import { concat, throwError, of } from 'rxjs'; +import { onErrorResumeNext, takeWhile } from 'rxjs/operators'; +import { concat, defer, throwError, of } from 'rxjs'; declare function asDiagram(arg: string): Function; @@ -104,6 +104,31 @@ describe('onErrorResumeNext operator', () => { expectSubscriptions(source.subscriptions).toBe(subs); }); + it('should stop listening to a synchronous observable when unsubscribed', () => { + const sideEffects: number[] = []; + const synchronousObservable = concat( + defer(() => { + sideEffects.push(1); + return of(1); + }), + defer(() => { + sideEffects.push(2); + return of(2); + }), + defer(() => { + sideEffects.push(3); + return of(3); + }) + ); + + throwError(new Error('Some error')).pipe( + onErrorResumeNext(synchronousObservable), + takeWhile((x) => x != 2) // unsubscribe at the second side-effect + ).subscribe(() => { /* noop */ }); + + expect(sideEffects).to.deep.equal([1, 2]); + }); + it('should work with promise', (done: MochaDone) => { const expected = [1, 2]; const source = concat(of(1), throwError('meh')); diff --git a/src/internal/operators/onErrorResumeNext.ts b/src/internal/operators/onErrorResumeNext.ts index df6f4125e0..d3197e5698 100644 --- a/src/internal/operators/onErrorResumeNext.ts +++ b/src/internal/operators/onErrorResumeNext.ts @@ -152,7 +152,9 @@ class OnErrorResumeNextSubscriber extends OuterSubscriber { private subscribeToNextSource(): void { const next = this.nextSources.shift(); if (next) { - this.add(subscribeToResult(this, next)); + const innerSubscriber = new InnerSubscriber(this, undefined, undefined); + this.add(innerSubscriber); + subscribeToResult(this, next, undefined, undefined, innerSubscriber); } else { this.destination.complete(); } From 1c257dbb389d021f72b07690add4ad4074f817f6 Mon Sep 17 00:00:00 2001 From: peaBerberian Date: Sun, 19 Aug 2018 14:05:50 +0200 Subject: [PATCH 6/6] fix(skipUntil): stop listening to a synchronous notifier after its first nexted value --- spec/operators/skipUntil-spec.ts | 23 ++++++++++++++++++++++- src/internal/operators/skipUntil.ts | 5 ++++- 2 files changed, 26 insertions(+), 2 deletions(-) diff --git a/spec/operators/skipUntil-spec.ts b/spec/operators/skipUntil-spec.ts index 5248973db7..12430e8879 100644 --- a/spec/operators/skipUntil-spec.ts +++ b/spec/operators/skipUntil-spec.ts @@ -1,6 +1,6 @@ import { expect } from 'chai'; import { hot, cold, expectObservable, expectSubscriptions } from '../helpers/marble-testing'; -import { Observable, of, Subject } from 'rxjs'; +import { concat, defer, Observable, of, Subject } from 'rxjs'; import { skipUntil, mergeMap } from 'rxjs/operators'; declare function asDiagram(arg: string): Function; @@ -246,4 +246,25 @@ describe('skipUntil', () => { expectObservable(result).toBe(expected); expectSubscriptions(notifier.subscriptions).toBe(nSubs); }); + + it('should stop listening to a synchronous notifier after its first nexted value', () => { + // const source = hot('-^-o---o---o---o---o---o---|'); + const sideEffects: number[] = []; + const synchronousNotifer = concat( + defer(() => { + sideEffects.push(1); + return of(1); + }), + defer(() => { + sideEffects.push(2); + return of(2); + }), + defer(() => { + sideEffects.push(3); + return of(3); + }) + ); + of(null).pipe(skipUntil(synchronousNotifer)).subscribe(() => { /* noop */ }); + expect(sideEffects).to.deep.equal([1]); + }); }); diff --git a/src/internal/operators/skipUntil.ts b/src/internal/operators/skipUntil.ts index f2fc2d186f..f565ae4d6c 100644 --- a/src/internal/operators/skipUntil.ts +++ b/src/internal/operators/skipUntil.ts @@ -44,7 +44,10 @@ class SkipUntilSubscriber extends OuterSubscriber { constructor(destination: Subscriber, notifier: ObservableInput) { super(destination); - this.add(this.innerSubscription = subscribeToResult(this, notifier)); + const innerSubscriber = new InnerSubscriber(this, undefined, undefined); + this.add(innerSubscriber); + this.innerSubscription = innerSubscriber; + subscribeToResult(this, notifier, undefined, undefined, innerSubscriber); } protected _next(value: T) {