Skip to content

Commit

Permalink
fix(groupBy): will no longer leak inner subscriptions (#7252)
Browse files Browse the repository at this point in the history
* fix(groupBy): will no longer leak inner subscriptions

`groupBy` no longer supports the behavior where inner subscriptions will cause the outer subscription to stay connected after consumer unsubscribes from result.

Resolves #6805

BREAKING CHANGE: `groupBy` no longer allows grouped observable subscriptions to stay connected after parent subscription unsubscribed. If you need this behavior, don't unsubscribe from the parent.

* refactor: Remove direct instantiation of `OperatorSubscriber`

+ Stopped exporting `OperatorSubscriber` class from module
+ Updated `onErrorResumeNext` and a `Subject` test to use `createOperatorSubscriber`
  • Loading branch information
benlesh committed May 15, 2023
1 parent 6e3e5e4 commit 9ed27bf
Show file tree
Hide file tree
Showing 5 changed files with 24 additions and 54 deletions.
4 changes: 2 additions & 2 deletions spec/Subject-spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import { AnonymousSubject } from 'rxjs/internal/Subject';
import { delay } from 'rxjs/operators';
import { TestScheduler } from 'rxjs/testing';
import { observableMatcher } from './helpers/observableMatcher';
import { OperatorSubscriber } from 'rxjs/internal/operators/OperatorSubscriber';
import { createOperatorSubscriber } from 'rxjs/internal/operators/OperatorSubscriber';

/** @test {Subject} */
describe('Subject', () => {
Expand Down Expand Up @@ -733,7 +733,7 @@ describe('Subject', () => {
const subject = new Subject<number>();
const destination = new Subscriber();
const results: any[] = [];
const subscriber = new OperatorSubscriber(destination, (value) => {
const subscriber = createOperatorSubscriber(destination, (value) => {
results.push(value);
}, () => {
results.push('complete');
Expand Down
20 changes: 10 additions & 10 deletions spec/operators/groupBy-spec.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { expect } from 'chai';
import { groupBy, delay, tap, map, take, mergeMap, materialize, skip, ignoreElements } from 'rxjs/operators';
import { TestScheduler } from 'rxjs/testing';
import { ReplaySubject, of, Observable, Operator, Observer, Subject, NextNotification, ErrorNotification } from 'rxjs';
import { ReplaySubject, of, Observable, Subject, NextNotification, ErrorNotification } from 'rxjs';
import { createNotification } from 'rxjs/internal/NotificationFactories';
import { observableMatcher } from '../helpers/observableMatcher';

Expand Down Expand Up @@ -534,8 +534,8 @@ describe('groupBy operator', () => {
});
});

it('should allow the outer to be unsubscribed early but inners continue', () => {
testScheduler.run(({ cold, hot, expectObservable, expectSubscriptions }) => {
it('should unsubscribe inner subscriptions when the result unsubscribes', () => {
testScheduler.run(({ cold, hot, expectObservable }) => {
const values = {
a: ' foo',
b: ' FoO ',
Expand All @@ -551,10 +551,10 @@ describe('groupBy operator', () => {
l: ' fOo ',
};
const e1 = hot('-1--2--^-a-b-c-d-e-f-g-h-i-j-k-l-|', values);
const unsub = ' ---------! ';
const expected = ' --w---x--- ';
const w = cold(' a-b---d---------i-----l-| ', values);
const x = cold(' c-------g-h---------| ', values);
const unsub = ' ----------! ';
const expected = ' --w---x---- ';
const w = cold(' a-b---d-- ', values);
const x = cold(' c---- ', values);
const expectedValues = { w: w, x: x };

const source = e1.pipe(groupBy((val: string) => val.toLowerCase().trim()));
Expand Down Expand Up @@ -884,7 +884,7 @@ describe('groupBy operator', () => {
});
});

it('should allow using a durationSelector, and outer unsubscribed early', () => {
it('should allow using a durationSelector, and unsub from outer and inner at the same time', () => {
testScheduler.run(({ cold, hot, expectObservable, expectSubscriptions }) => {
const values = {
a: ' foo',
Expand All @@ -904,8 +904,8 @@ describe('groupBy operator', () => {
const unsub = ' -----------! ';
const expected = ' --v---w---x- ';
const v = cold(' a-b---(d|) ', values);
const w = cold(' c-------g-(h|) ', values);
const x = cold(' e---------j-(k|) ', values);
const w = cold(' c----- ', values);
const x = cold(' e- ', values);
const expectedValues = { v: v, w: w, x: x };

const source = e1.pipe(
Expand Down
4 changes: 2 additions & 2 deletions src/internal/observable/onErrorResumeNext.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { Observable } from '../Observable';
import { ObservableInputTuple } from '../types';
import { argsOrArgArray } from '../util/argsOrArgArray';
import { OperatorSubscriber } from '../operators/OperatorSubscriber';
import { createOperatorSubscriber } from '../operators/OperatorSubscriber';
import { noop } from '../util/noop';
import { from } from './from';

Expand Down Expand Up @@ -86,7 +86,7 @@ export function onErrorResumeNext<A extends readonly unknown[]>(
subscribeNext();
return;
}
const innerSubscriber = new OperatorSubscriber(subscriber, undefined, noop, noop);
const innerSubscriber = createOperatorSubscriber(subscriber, undefined, noop, noop);
nextSource.subscribe(innerSubscriber);
innerSubscriber.add(subscribeNext);
} else {
Expand Down
19 changes: 6 additions & 13 deletions src/internal/operators/OperatorSubscriber.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ export function createOperatorSubscriber<T>(
* A generic helper for allowing operators to be created with a Subscriber and
* use closures to capture necessary state from the operator function itself.
*/
export class OperatorSubscriber<T> extends Subscriber<T> {
class OperatorSubscriber<T> extends Subscriber<T> {
/**
* Creates an instance of an `OperatorSubscriber`.
* @param destination The downstream subscriber.
Expand All @@ -38,18 +38,13 @@ export class OperatorSubscriber<T> extends Subscriber<T> {
* this handler are sent to the `destination` error handler.
* @param onFinalize Additional finalization logic here. This will only be called on finalization if the
* subscriber itself is not already closed. This is called after all other finalization logic is executed.
* @param shouldUnsubscribe An optional check to see if an unsubscribe call should truly unsubscribe.
* NOTE: This currently **ONLY** exists to support the strange behavior of {@link groupBy}, where unsubscription
* to the resulting observable does not actually disconnect from the source if there are active subscriptions
* to any grouped observable. (DO NOT EXPOSE OR USE EXTERNALLY!!!)
*/
constructor(
destination: Subscriber<any>,
onNext?: (value: T) => void,
onComplete?: () => void,
onError?: (err: any) => void,
private onFinalize?: () => void,
private shouldUnsubscribe?: () => boolean
private onFinalize?: () => void
) {
// It's important - for performance reasons - that all of this class's
// members are initialized and that they are always initialized in the same
Expand Down Expand Up @@ -102,11 +97,9 @@ export class OperatorSubscriber<T> extends Subscriber<T> {
}

unsubscribe() {
if (!this.shouldUnsubscribe || this.shouldUnsubscribe()) {
const { closed } = this;
super.unsubscribe();
// Execute additional teardown if we have any and we didn't already do so.
!closed && this.onFinalize?.();
}
const { closed } = this;
super.unsubscribe();
// Execute additional teardown if we have any and we didn't already do so.
!closed && this.onFinalize?.();
}
}
31 changes: 4 additions & 27 deletions src/internal/operators/groupBy.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import { Observable } from '../Observable';
import { from } from '../observable/from';
import { Subject } from '../Subject';
import { ObservableInput, Observer, OperatorFunction, SubjectLike } from '../types';
import { createOperatorSubscriber, OperatorSubscriber } from './OperatorSubscriber';
import { createOperatorSubscriber } from './OperatorSubscriber';

export interface BasicGroupByOptions<K, T> {
element?: undefined;
Expand Down Expand Up @@ -165,20 +165,14 @@ export function groupBy<T, K, R>(
// next call from the source.
const handleError = (err: any) => notify((consumer) => consumer.error(err));

// The number of actively subscribed groups
let activeGroups = 0;

// Whether or not teardown was attempted on this subscription.
let teardownAttempted = false;

// Capturing a reference to this, because we need a handle to it
// in `createGroupedObservable` below. This is what we use to
// subscribe to our source observable. This sometimes needs to be unsubscribed
// out-of-band with our `subscriber` which is the downstream subscriber, or destination,
// in cases where a user unsubscribes from the main resulting subscription, but
// still has groups from this subscription subscribed and would expect values from it
// Consider: `source.pipe(groupBy(fn), take(2))`.
const groupBySourceSubscriber = new OperatorSubscriber(
const groupBySourceSubscriber = createOperatorSubscriber(
subscriber,
(value: T) => {
// Because we have to notify all groups of any errors that occur in here,
Expand Down Expand Up @@ -240,14 +234,7 @@ export function groupBy<T, K, R>(
// When the source subscription is _finally_ torn down, release the subjects and keys
// in our groups Map, they may be quite large and we don't want to keep them around if we
// don't have to.
() => groups.clear(),
() => {
teardownAttempted = true;
// We only kill our subscription to the source if we have
// no active groups. As stated above, consider this scenario:
// source$.pipe(groupBy(fn), take(2)).
return activeGroups === 0;
}
() => groups.clear()
);

// Subscribe to the source
Expand All @@ -259,17 +246,7 @@ export function groupBy<T, K, R>(
* @param groupSubject The subject that fuels the group
*/
function createGroupedObservable(key: K, groupSubject: SubjectLike<any>) {
const result: any = new Observable<T>((groupSubscriber) => {
activeGroups++;
const innerSub = groupSubject.subscribe(groupSubscriber);
return () => {
innerSub.unsubscribe();
// We can kill the subscription to our source if we now have no more
// active groups subscribed, and a finalization was already attempted on
// the source.
--activeGroups === 0 && teardownAttempted && groupBySourceSubscriber.unsubscribe();
};
});
const result: any = new Observable<T>((groupSubscriber) => groupSubject.subscribe(groupSubscriber));
result.key = key;
return result;
}
Expand Down

0 comments on commit 9ed27bf

Please sign in to comment.