From 4657b7a4eca17091c1f6e20c5f98d2a513c89737 Mon Sep 17 00:00:00 2001 From: Lenz Weber-Tronic Date: Mon, 11 Dec 2023 14:42:09 +0100 Subject: [PATCH] Simplify RetryLink, fix potential memory leak fixes #11393 --- .changeset/curvy-seas-hope.md | 13 ++ .../batch-http/__tests__/batchHttpLink.ts | 3 + src/link/http/__tests__/HttpLink.ts | 1 + src/link/retry/__tests__/retryLink.ts | 21 ++- src/link/retry/retryLink.ts | 121 +++--------------- 5 files changed, 51 insertions(+), 108 deletions(-) create mode 100644 .changeset/curvy-seas-hope.md diff --git a/.changeset/curvy-seas-hope.md b/.changeset/curvy-seas-hope.md new file mode 100644 index 00000000000..65491ac6318 --- /dev/null +++ b/.changeset/curvy-seas-hope.md @@ -0,0 +1,13 @@ +--- +"@apollo/client": minor +--- + +Simplify RetryLink, fix potential memory leak + +Historically, `RetryLink` would keep a `values` array of all previous values, +in case the operation would get an additional subscriber at a later point in time. +In practice, this could lead to a memory leak (#11393) and did not serve any +further purpose, as the resulting observable would only be subscribed to by +Apollo Client itself, and only once - it would be wrapped in a `Concast` before +being exposed to the user, and that `Concast` would handle subscribers on its +own. diff --git a/src/link/batch-http/__tests__/batchHttpLink.ts b/src/link/batch-http/__tests__/batchHttpLink.ts index 544f44c304f..6dea1805b89 100644 --- a/src/link/batch-http/__tests__/batchHttpLink.ts +++ b/src/link/batch-http/__tests__/batchHttpLink.ts @@ -524,6 +524,9 @@ describe("SharedHttpTest", () => { expect(subscriber.next).toHaveBeenCalledTimes(2); expect(subscriber.complete).toHaveBeenCalledTimes(2); expect(subscriber.error).not.toHaveBeenCalled(); + // only one call because batchHttpLink can handle more than one subscriber + // without starting a new request + expect(fetchMock.calls().length).toBe(1); resolve(); }, 50); }); diff --git a/src/link/http/__tests__/HttpLink.ts b/src/link/http/__tests__/HttpLink.ts index b2ce5308cfd..5c02986b279 100644 --- a/src/link/http/__tests__/HttpLink.ts +++ b/src/link/http/__tests__/HttpLink.ts @@ -634,6 +634,7 @@ describe("HttpLink", () => { expect(subscriber.next).toHaveBeenCalledTimes(2); expect(subscriber.complete).toHaveBeenCalledTimes(2); expect(subscriber.error).not.toHaveBeenCalled(); + expect(fetchMock.calls().length).toBe(2); resolve(); }, 50); }); diff --git a/src/link/retry/__tests__/retryLink.ts b/src/link/retry/__tests__/retryLink.ts index 3f5413e5b39..b9f3e14440d 100644 --- a/src/link/retry/__tests__/retryLink.ts +++ b/src/link/retry/__tests__/retryLink.ts @@ -92,7 +92,12 @@ describe("RetryLink", () => { expect(unsubscribeStub).toHaveBeenCalledTimes(1); }); - it("supports multiple subscribers to the same request", async () => { + it("multiple subscribers will trigger multiple requests", async () => { + const subscriber = { + next: jest.fn(console.log), + error: jest.fn(console.error), + complete: jest.fn(console.info), + }; const retry = new RetryLink({ delay: { initial: 1 }, attempts: { max: 5 }, @@ -102,13 +107,19 @@ describe("RetryLink", () => { stub.mockReturnValueOnce(fromError(standardError)); stub.mockReturnValueOnce(fromError(standardError)); stub.mockReturnValueOnce(Observable.of(data)); + stub.mockReturnValueOnce(fromError(standardError)); + stub.mockReturnValueOnce(fromError(standardError)); + stub.mockReturnValueOnce(Observable.of(data)); const link = ApolloLink.from([retry, stub]); const observable = execute(link, { query }); - const [result1, result2] = (await waitFor(observable, observable)) as any; - expect(result1.values).toEqual([data]); - expect(result2.values).toEqual([data]); - expect(stub).toHaveBeenCalledTimes(3); + observable.subscribe(subscriber); + observable.subscribe(subscriber); + await new Promise((resolve) => setTimeout(resolve, 3500)); + expect(subscriber.next).toHaveBeenNthCalledWith(1, data); + expect(subscriber.next).toHaveBeenNthCalledWith(2, data); + expect(subscriber.complete).toHaveBeenCalledTimes(2); + expect(stub).toHaveBeenCalledTimes(6); }); it("retries independently for concurrent requests", async () => { diff --git a/src/link/retry/retryLink.ts b/src/link/retry/retryLink.ts index d44a382500c..cde2dd2ea9c 100644 --- a/src/link/retry/retryLink.ts +++ b/src/link/retry/retryLink.ts @@ -1,14 +1,12 @@ import type { Operation, FetchResult, NextLink } from "../core/index.js"; import { ApolloLink } from "../core/index.js"; -import type { - Observer, - ObservableSubscription, -} from "../../utilities/index.js"; +import type { ObservableSubscription } from "../../utilities/index.js"; import { Observable } from "../../utilities/index.js"; import type { DelayFunction, DelayFunctionOptions } from "./delayFunction.js"; import { buildDelayFunction } from "./delayFunction.js"; import type { RetryFunction, RetryFunctionOptions } from "./retryFunction.js"; import { buildRetryFunction } from "./retryFunction.js"; +import type { SubscriptionObserver } from "zen-observable-ts"; export namespace RetryLink { export interface Options { @@ -27,78 +25,18 @@ export namespace RetryLink { /** * Tracking and management of operations that may be (or currently are) retried. */ -class RetryableOperation { +class RetryableOperation { private retryCount: number = 0; - private values: any[] = []; - private error: any; - private complete = false; - private canceled = false; - private observers: (Observer | null)[] = []; private currentSubscription: ObservableSubscription | null = null; private timerId: number | undefined; constructor( + private observer: SubscriptionObserver, private operation: Operation, - private nextLink: NextLink, + private forward: NextLink, private delayFor: DelayFunction, private retryIf: RetryFunction - ) {} - - /** - * Register a new observer for this operation. - * - * If the operation has previously emitted other events, they will be - * immediately triggered for the observer. - */ - public subscribe(observer: Observer) { - if (this.canceled) { - throw new Error( - `Subscribing to a retryable link that was canceled is not supported` - ); - } - this.observers.push(observer); - - // If we've already begun, catch this observer up. - for (const value of this.values) { - observer.next!(value); - } - - if (this.complete) { - observer.complete!(); - } else if (this.error) { - observer.error!(this.error); - } - } - - /** - * Remove a previously registered observer from this operation. - * - * If no observers remain, the operation will stop retrying, and unsubscribe - * from its downstream link. - */ - public unsubscribe(observer: Observer) { - const index = this.observers.indexOf(observer); - if (index < 0) { - throw new Error( - `RetryLink BUG! Attempting to unsubscribe unknown observer!` - ); - } - // Note that we are careful not to change the order of length of the array, - // as we are often mid-iteration when calling this method. - this.observers[index] = null; - - // If this is the last observer, we're done. - if (this.observers.every((o) => o === null)) { - this.cancel(); - } - } - - /** - * Start the initial request. - */ - public start() { - if (this.currentSubscription) return; // Already started. - + ) { this.try(); } @@ -112,33 +50,16 @@ class RetryableOperation { clearTimeout(this.timerId); this.timerId = undefined; this.currentSubscription = null; - this.canceled = true; } private try() { - this.currentSubscription = this.nextLink(this.operation).subscribe({ - next: this.onNext, + this.currentSubscription = this.forward(this.operation).subscribe({ + next: this.observer.next.bind(this.observer), error: this.onError, - complete: this.onComplete, + complete: this.observer.complete.bind(this.observer), }); } - private onNext = (value: any) => { - this.values.push(value); - for (const observer of this.observers) { - if (!observer) continue; - observer.next!(value); - } - }; - - private onComplete = () => { - this.complete = true; - for (const observer of this.observers) { - if (!observer) continue; - observer.complete!(); - } - }; - private onError = async (error: any) => { this.retryCount += 1; @@ -153,11 +74,7 @@ class RetryableOperation { return; } - this.error = error; - for (const observer of this.observers) { - if (!observer) continue; - observer.error!(error); - } + this.observer.error(error); }; private scheduleRetry(delay: number) { @@ -189,18 +106,16 @@ export class RetryLink extends ApolloLink { operation: Operation, nextLink: NextLink ): Observable { - const retryable = new RetryableOperation( - operation, - nextLink, - this.delayFor, - this.retryIf - ); - retryable.start(); - return new Observable((observer) => { - retryable.subscribe(observer); + const retryable = new RetryableOperation( + observer, + operation, + nextLink, + this.delayFor, + this.retryIf + ); return () => { - retryable.unsubscribe(observer); + retryable.cancel(); }; }); }