From 765d3f081063392e1219215ab66c0cd995a219bf Mon Sep 17 00:00:00 2001 From: Ben Lesh Date: Mon, 17 Apr 2023 17:35:15 -0500 Subject: [PATCH 1/2] fix(groupBy): will no longer leak inner subscriptions `groupBy` no longer supports the behavior where inner subscriptions will cause the outer subscription to stay connected after consumer unsubscribes from result. Resolves #6805 BREAKING CHANGE: `groupBy` no longer allows grouped observable subscriptions to stay connected after parent subscription unsubscribed. If you need this behavior, don't unsubscribe from the parent. --- spec/operators/groupBy-spec.ts | 20 ++++++------- src/internal/operators/OperatorSubscriber.ts | 17 ++++------- src/internal/operators/groupBy.ts | 31 +++----------------- 3 files changed, 19 insertions(+), 49 deletions(-) diff --git a/spec/operators/groupBy-spec.ts b/spec/operators/groupBy-spec.ts index 7237701cb0..c1ffe64113 100644 --- a/spec/operators/groupBy-spec.ts +++ b/spec/operators/groupBy-spec.ts @@ -1,7 +1,7 @@ import { expect } from 'chai'; import { groupBy, delay, tap, map, take, mergeMap, materialize, skip, ignoreElements } from 'rxjs/operators'; import { TestScheduler } from 'rxjs/testing'; -import { ReplaySubject, of, Observable, Operator, Observer, Subject, NextNotification, ErrorNotification } from 'rxjs'; +import { ReplaySubject, of, Observable, Subject, NextNotification, ErrorNotification } from 'rxjs'; import { createNotification } from 'rxjs/internal/NotificationFactories'; import { observableMatcher } from '../helpers/observableMatcher'; @@ -534,8 +534,8 @@ describe('groupBy operator', () => { }); }); - it('should allow the outer to be unsubscribed early but inners continue', () => { - testScheduler.run(({ cold, hot, expectObservable, expectSubscriptions }) => { + it('should unsubscribe inner subscriptions when the result unsubscribes', () => { + testScheduler.run(({ cold, hot, expectObservable }) => { const values = { a: ' foo', b: ' FoO ', @@ -551,10 +551,10 @@ describe('groupBy operator', () => { l: ' fOo ', }; const e1 = hot('-1--2--^-a-b-c-d-e-f-g-h-i-j-k-l-|', values); - const unsub = ' ---------! '; - const expected = ' --w---x--- '; - const w = cold(' a-b---d---------i-----l-| ', values); - const x = cold(' c-------g-h---------| ', values); + const unsub = ' ----------! '; + const expected = ' --w---x---- '; + const w = cold(' a-b---d-- ', values); + const x = cold(' c---- ', values); const expectedValues = { w: w, x: x }; const source = e1.pipe(groupBy((val: string) => val.toLowerCase().trim())); @@ -884,7 +884,7 @@ describe('groupBy operator', () => { }); }); - it('should allow using a durationSelector, and outer unsubscribed early', () => { + it('should allow using a durationSelector, and unsub from outer and inner at the same time', () => { testScheduler.run(({ cold, hot, expectObservable, expectSubscriptions }) => { const values = { a: ' foo', @@ -904,8 +904,8 @@ describe('groupBy operator', () => { const unsub = ' -----------! '; const expected = ' --v---w---x- '; const v = cold(' a-b---(d|) ', values); - const w = cold(' c-------g-(h|) ', values); - const x = cold(' e---------j-(k|) ', values); + const w = cold(' c----- ', values); + const x = cold(' e- ', values); const expectedValues = { v: v, w: w, x: x }; const source = e1.pipe( diff --git a/src/internal/operators/OperatorSubscriber.ts b/src/internal/operators/OperatorSubscriber.ts index 593b937bdd..056121a540 100644 --- a/src/internal/operators/OperatorSubscriber.ts +++ b/src/internal/operators/OperatorSubscriber.ts @@ -38,18 +38,13 @@ export class OperatorSubscriber extends Subscriber { * this handler are sent to the `destination` error handler. * @param onFinalize Additional finalization logic here. This will only be called on finalization if the * subscriber itself is not already closed. This is called after all other finalization logic is executed. - * @param shouldUnsubscribe An optional check to see if an unsubscribe call should truly unsubscribe. - * NOTE: This currently **ONLY** exists to support the strange behavior of {@link groupBy}, where unsubscription - * to the resulting observable does not actually disconnect from the source if there are active subscriptions - * to any grouped observable. (DO NOT EXPOSE OR USE EXTERNALLY!!!) */ constructor( destination: Subscriber, onNext?: (value: T) => void, onComplete?: () => void, onError?: (err: any) => void, - private onFinalize?: () => void, - private shouldUnsubscribe?: () => boolean + private onFinalize?: () => void ) { // It's important - for performance reasons - that all of this class's // members are initialized and that they are always initialized in the same @@ -102,11 +97,9 @@ export class OperatorSubscriber extends Subscriber { } unsubscribe() { - if (!this.shouldUnsubscribe || this.shouldUnsubscribe()) { - const { closed } = this; - super.unsubscribe(); - // Execute additional teardown if we have any and we didn't already do so. - !closed && this.onFinalize?.(); - } + const { closed } = this; + super.unsubscribe(); + // Execute additional teardown if we have any and we didn't already do so. + !closed && this.onFinalize?.(); } } diff --git a/src/internal/operators/groupBy.ts b/src/internal/operators/groupBy.ts index 2b6f71e35d..6d74f82549 100644 --- a/src/internal/operators/groupBy.ts +++ b/src/internal/operators/groupBy.ts @@ -2,7 +2,7 @@ import { Observable } from '../Observable'; import { from } from '../observable/from'; import { Subject } from '../Subject'; import { ObservableInput, Observer, OperatorFunction, SubjectLike } from '../types'; -import { createOperatorSubscriber, OperatorSubscriber } from './OperatorSubscriber'; +import { createOperatorSubscriber } from './OperatorSubscriber'; export interface BasicGroupByOptions { element?: undefined; @@ -165,12 +165,6 @@ export function groupBy( // next call from the source. const handleError = (err: any) => notify((consumer) => consumer.error(err)); - // The number of actively subscribed groups - let activeGroups = 0; - - // Whether or not teardown was attempted on this subscription. - let teardownAttempted = false; - // Capturing a reference to this, because we need a handle to it // in `createGroupedObservable` below. This is what we use to // subscribe to our source observable. This sometimes needs to be unsubscribed @@ -178,7 +172,7 @@ export function groupBy( // in cases where a user unsubscribes from the main resulting subscription, but // still has groups from this subscription subscribed and would expect values from it // Consider: `source.pipe(groupBy(fn), take(2))`. - const groupBySourceSubscriber = new OperatorSubscriber( + const groupBySourceSubscriber = createOperatorSubscriber( subscriber, (value: T) => { // Because we have to notify all groups of any errors that occur in here, @@ -240,14 +234,7 @@ export function groupBy( // When the source subscription is _finally_ torn down, release the subjects and keys // in our groups Map, they may be quite large and we don't want to keep them around if we // don't have to. - () => groups.clear(), - () => { - teardownAttempted = true; - // We only kill our subscription to the source if we have - // no active groups. As stated above, consider this scenario: - // source$.pipe(groupBy(fn), take(2)). - return activeGroups === 0; - } + () => groups.clear() ); // Subscribe to the source @@ -259,17 +246,7 @@ export function groupBy( * @param groupSubject The subject that fuels the group */ function createGroupedObservable(key: K, groupSubject: SubjectLike) { - const result: any = new Observable((groupSubscriber) => { - activeGroups++; - const innerSub = groupSubject.subscribe(groupSubscriber); - return () => { - innerSub.unsubscribe(); - // We can kill the subscription to our source if we now have no more - // active groups subscribed, and a finalization was already attempted on - // the source. - --activeGroups === 0 && teardownAttempted && groupBySourceSubscriber.unsubscribe(); - }; - }); + const result: any = new Observable((groupSubscriber) => groupSubject.subscribe(groupSubscriber)); result.key = key; return result; } From 78fd6084218062cdf321aaa2421a18af0da50d62 Mon Sep 17 00:00:00 2001 From: Ben Lesh Date: Mon, 17 Apr 2023 17:40:51 -0500 Subject: [PATCH 2/2] refactor: Remove direct instantiation of `OperatorSubscriber` + Stopped exporting `OperatorSubscriber` class from module + Updated `onErrorResumeNext` and a `Subject` test to use `createOperatorSubscriber` --- spec/Subject-spec.ts | 4 ++-- src/internal/observable/onErrorResumeNext.ts | 4 ++-- src/internal/operators/OperatorSubscriber.ts | 2 +- 3 files changed, 5 insertions(+), 5 deletions(-) diff --git a/spec/Subject-spec.ts b/spec/Subject-spec.ts index 6d9d84bde4..108dfc2df3 100644 --- a/spec/Subject-spec.ts +++ b/spec/Subject-spec.ts @@ -4,7 +4,7 @@ import { AnonymousSubject } from 'rxjs/internal/Subject'; import { delay } from 'rxjs/operators'; import { TestScheduler } from 'rxjs/testing'; import { observableMatcher } from './helpers/observableMatcher'; -import { OperatorSubscriber } from 'rxjs/internal/operators/OperatorSubscriber'; +import { createOperatorSubscriber } from 'rxjs/internal/operators/OperatorSubscriber'; /** @test {Subject} */ describe('Subject', () => { @@ -733,7 +733,7 @@ describe('Subject', () => { const subject = new Subject(); const destination = new Subscriber(); const results: any[] = []; - const subscriber = new OperatorSubscriber(destination, (value) => { + const subscriber = createOperatorSubscriber(destination, (value) => { results.push(value); }, () => { results.push('complete'); diff --git a/src/internal/observable/onErrorResumeNext.ts b/src/internal/observable/onErrorResumeNext.ts index 11dac7f2ae..02d7d43967 100644 --- a/src/internal/observable/onErrorResumeNext.ts +++ b/src/internal/observable/onErrorResumeNext.ts @@ -1,7 +1,7 @@ import { Observable } from '../Observable'; import { ObservableInputTuple } from '../types'; import { argsOrArgArray } from '../util/argsOrArgArray'; -import { OperatorSubscriber } from '../operators/OperatorSubscriber'; +import { createOperatorSubscriber } from '../operators/OperatorSubscriber'; import { noop } from '../util/noop'; import { from } from './from'; @@ -89,7 +89,7 @@ export function onErrorResumeNext( subscribeNext(); return; } - const innerSubscriber = new OperatorSubscriber(subscriber, undefined, noop, noop); + const innerSubscriber = createOperatorSubscriber(subscriber, undefined, noop, noop); nextSource.subscribe(innerSubscriber); innerSubscriber.add(subscribeNext); } else { diff --git a/src/internal/operators/OperatorSubscriber.ts b/src/internal/operators/OperatorSubscriber.ts index 056121a540..78cd3baab9 100644 --- a/src/internal/operators/OperatorSubscriber.ts +++ b/src/internal/operators/OperatorSubscriber.ts @@ -26,7 +26,7 @@ export function createOperatorSubscriber( * A generic helper for allowing operators to be created with a Subscriber and * use closures to capture necessary state from the operator function itself. */ -export class OperatorSubscriber extends Subscriber { +class OperatorSubscriber extends Subscriber { /** * Creates an instance of an `OperatorSubscriber`. * @param destination The downstream subscriber.