Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Simplify RetryLink, fix potential memory leak #11424

Merged
merged 2 commits into from
Dec 20, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions .changeset/curvy-seas-hope.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
---
"@apollo/client": minor
---

Simplify RetryLink, fix potential memory leak

Historically, `RetryLink` would keep a `values` array of all previous values,
in case the operation would get an additional subscriber at a later point in time.
In practice, this could lead to a memory leak (#11393) and did not serve any
further purpose, as the resulting observable would only be subscribed to by
Apollo Client itself, and only once - it would be wrapped in a `Concast` before
being exposed to the user, and that `Concast` would handle subscribers on its
own.
3 changes: 3 additions & 0 deletions src/link/batch-http/__tests__/batchHttpLink.ts
Original file line number Diff line number Diff line change
Expand Up @@ -524,6 +524,9 @@ describe("SharedHttpTest", () => {
expect(subscriber.next).toHaveBeenCalledTimes(2);
expect(subscriber.complete).toHaveBeenCalledTimes(2);
expect(subscriber.error).not.toHaveBeenCalled();
// only one call because batchHttpLink can handle more than one subscriber
// without starting a new request
expect(fetchMock.calls().length).toBe(1);
resolve();
}, 50);
});
Expand Down
1 change: 1 addition & 0 deletions src/link/http/__tests__/HttpLink.ts
Original file line number Diff line number Diff line change
Expand Up @@ -634,6 +634,7 @@ describe("HttpLink", () => {
expect(subscriber.next).toHaveBeenCalledTimes(2);
expect(subscriber.complete).toHaveBeenCalledTimes(2);
expect(subscriber.error).not.toHaveBeenCalled();
expect(fetchMock.calls().length).toBe(2);
resolve();
}, 50);
});
Expand Down
21 changes: 16 additions & 5 deletions src/link/retry/__tests__/retryLink.ts
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,12 @@ describe("RetryLink", () => {
expect(unsubscribeStub).toHaveBeenCalledTimes(1);
});

it("supports multiple subscribers to the same request", async () => {
it("multiple subscribers will trigger multiple requests", async () => {
const subscriber = {
next: jest.fn(console.log),
error: jest.fn(console.error),
complete: jest.fn(console.info),
};
const retry = new RetryLink({
delay: { initial: 1 },
attempts: { max: 5 },
Expand All @@ -102,13 +107,19 @@ describe("RetryLink", () => {
stub.mockReturnValueOnce(fromError(standardError));
stub.mockReturnValueOnce(fromError(standardError));
stub.mockReturnValueOnce(Observable.of(data));
stub.mockReturnValueOnce(fromError(standardError));
stub.mockReturnValueOnce(fromError(standardError));
stub.mockReturnValueOnce(Observable.of(data));
const link = ApolloLink.from([retry, stub]);

const observable = execute(link, { query });
const [result1, result2] = (await waitFor(observable, observable)) as any;
expect(result1.values).toEqual([data]);
expect(result2.values).toEqual([data]);
expect(stub).toHaveBeenCalledTimes(3);
observable.subscribe(subscriber);
observable.subscribe(subscriber);
await new Promise((resolve) => setTimeout(resolve, 3500));
expect(subscriber.next).toHaveBeenNthCalledWith(1, data);
expect(subscriber.next).toHaveBeenNthCalledWith(2, data);
expect(subscriber.complete).toHaveBeenCalledTimes(2);
expect(stub).toHaveBeenCalledTimes(6);
});

it("retries independently for concurrent requests", async () => {
Expand Down
121 changes: 18 additions & 103 deletions src/link/retry/retryLink.ts
Original file line number Diff line number Diff line change
@@ -1,14 +1,12 @@
import type { Operation, FetchResult, NextLink } from "../core/index.js";
import { ApolloLink } from "../core/index.js";
import type {
Observer,
ObservableSubscription,
} from "../../utilities/index.js";
import type { ObservableSubscription } from "../../utilities/index.js";
import { Observable } from "../../utilities/index.js";
import type { DelayFunction, DelayFunctionOptions } from "./delayFunction.js";
import { buildDelayFunction } from "./delayFunction.js";
import type { RetryFunction, RetryFunctionOptions } from "./retryFunction.js";
import { buildRetryFunction } from "./retryFunction.js";
import type { SubscriptionObserver } from "zen-observable-ts";

export namespace RetryLink {
export interface Options {
Expand All @@ -27,78 +25,18 @@ export namespace RetryLink {
/**
* Tracking and management of operations that may be (or currently are) retried.
*/
class RetryableOperation<TValue = any> {
class RetryableOperation {
private retryCount: number = 0;
private values: any[] = [];
private error: any;
private complete = false;
private canceled = false;
private observers: (Observer<TValue> | null)[] = [];
private currentSubscription: ObservableSubscription | null = null;
private timerId: number | undefined;

constructor(
private observer: SubscriptionObserver<FetchResult>,
private operation: Operation,
private nextLink: NextLink,
private forward: NextLink,
private delayFor: DelayFunction,
private retryIf: RetryFunction
) {}

/**
* Register a new observer for this operation.
*
* If the operation has previously emitted other events, they will be
* immediately triggered for the observer.
*/
public subscribe(observer: Observer<TValue>) {
if (this.canceled) {
throw new Error(
`Subscribing to a retryable link that was canceled is not supported`
);
}
this.observers.push(observer);

// If we've already begun, catch this observer up.
for (const value of this.values) {
observer.next!(value);
}

if (this.complete) {
observer.complete!();
} else if (this.error) {
observer.error!(this.error);
}
}

/**
* Remove a previously registered observer from this operation.
*
* If no observers remain, the operation will stop retrying, and unsubscribe
* from its downstream link.
*/
public unsubscribe(observer: Observer<TValue>) {
const index = this.observers.indexOf(observer);
if (index < 0) {
throw new Error(
`RetryLink BUG! Attempting to unsubscribe unknown observer!`
);
}
// Note that we are careful not to change the order of length of the array,
// as we are often mid-iteration when calling this method.
this.observers[index] = null;

// If this is the last observer, we're done.
if (this.observers.every((o) => o === null)) {
this.cancel();
}
}

/**
* Start the initial request.
*/
public start() {
if (this.currentSubscription) return; // Already started.

) {
this.try();
}

Expand All @@ -112,33 +50,16 @@ class RetryableOperation<TValue = any> {
clearTimeout(this.timerId);
this.timerId = undefined;
this.currentSubscription = null;
this.canceled = true;
}

private try() {
this.currentSubscription = this.nextLink(this.operation).subscribe({
next: this.onNext,
this.currentSubscription = this.forward(this.operation).subscribe({
next: this.observer.next.bind(this.observer),
error: this.onError,
complete: this.onComplete,
complete: this.observer.complete.bind(this.observer),
});
}

private onNext = (value: any) => {
this.values.push(value);
for (const observer of this.observers) {
if (!observer) continue;
observer.next!(value);
}
};

private onComplete = () => {
this.complete = true;
for (const observer of this.observers) {
if (!observer) continue;
observer.complete!();
}
};

private onError = async (error: any) => {
this.retryCount += 1;

Expand All @@ -153,11 +74,7 @@ class RetryableOperation<TValue = any> {
return;
}

this.error = error;
for (const observer of this.observers) {
if (!observer) continue;
observer.error!(error);
}
this.observer.error(error);
};

private scheduleRetry(delay: number) {
Expand Down Expand Up @@ -189,18 +106,16 @@ export class RetryLink extends ApolloLink {
operation: Operation,
nextLink: NextLink
): Observable<FetchResult> {
const retryable = new RetryableOperation(
operation,
nextLink,
this.delayFor,
this.retryIf
);
retryable.start();

return new Observable((observer) => {
retryable.subscribe(observer);
const retryable = new RetryableOperation(
observer,
operation,
nextLink,
this.delayFor,
this.retryIf
);
return () => {
retryable.unsubscribe(observer);
retryable.cancel();
};
});
}
Expand Down
Loading