Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Replace concast.cleanup method with simpler concast.beforeNext API #9718

Merged
merged 4 commits into from
May 13, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@
{
"name": "apollo-client",
"path": "./dist/apollo-client.min.cjs",
"maxSize": "29.45kB"
"maxSize": "29.5kB"
}
],
"engines": {
Expand Down
7 changes: 2 additions & 5 deletions src/core/ObservableQuery.ts
Original file line number Diff line number Diff line change
Expand Up @@ -831,12 +831,9 @@ Did you mean to call refetch(variables) instead of refetch({ variables })?`);

if (!useDisposableConcast) {
// We use the {add,remove}Observer methods directly to avoid wrapping
// observer with an unnecessary SubscriptionObserver object, in part so
// that we can remove it here without triggering any unsubscriptions,
// because we just want to ignore the old observable, not prematurely shut
// it down, since other consumers may be awaiting this.concast.promise.
// observer with an unnecessary SubscriptionObserver object.
if (this.concast && this.observer) {
this.concast.removeObserver(this.observer, true);
this.concast.removeObserver(this.observer);
}

this.concast = concast;
Expand Down
4 changes: 2 additions & 2 deletions src/core/QueryManager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -989,7 +989,7 @@ export class QueryManager<TStore> {

byVariables.set(varJson, observable = concast);

concast.cleanup(() => {
concast.beforeNext(() => {
if (byVariables.delete(varJson) &&
byVariables.size < 1) {
inFlightLinkObservables.delete(serverQuery);
Expand Down Expand Up @@ -1161,7 +1161,7 @@ export class QueryManager<TStore> {
: fromVariables(normalized.variables!)
);

concast.cleanup(() => {
concast.beforeNext(() => {
this.fetchCancelFns.delete(queryId);

if (queryInfo.observableQuery) {
Expand Down
77 changes: 43 additions & 34 deletions src/utilities/observables/Concast.ts
Original file line number Diff line number Diff line change
Expand Up @@ -121,29 +121,24 @@ export class Concast<T> extends Observable<T> {
}
}

// Note: cleanup observers do not count towards this total.
private addCount = 0;

public addObserver(observer: Observer<T>) {
if (!this.observers.has(observer)) {
// Immediately deliver the most recent message, so we can always
// be sure all observers have the latest information.
this.deliverLastMessage(observer);
this.observers.add(observer);
++this.addCount;
}
}

public removeObserver(
observer: Observer<T>,
quietly?: boolean,
) {
if (this.observers.delete(observer) &&
--this.addCount < 1 &&
!quietly) {
// In case there are still any cleanup observers in this.observers, and no
// error or completion has been broadcast yet, make sure those observers
// have a chance to run and then remove themselves from this.observers.
public removeObserver(observer: Observer<T>) {
if (
this.observers.delete(observer) &&
this.observers.size < 1
) {
// In case there are still any listeners in this.nextResultListeners, and
// no error or completion has been broadcast yet, make sure those
// observers have a chance to run and then remove themselves from
// this.observers.
this.handlers.complete();
}
}
Expand All @@ -160,14 +155,15 @@ export class Concast<T> extends Observable<T> {

// Name and argument of the most recently invoked observer method, used
// to deliver latest results immediately to new observers.
private latest?: ["next" | "error", any];
private latest?: ["next", T] | ["error", any];

// Bound handler functions that can be reused for every internal
// subscription.
private handlers = {
next: (result: T) => {
if (this.sub !== null) {
this.latest = ["next", result];
this.notify("next", result);
iterateObserversSafely(this.observers, "next", result);
}
},
Expand All @@ -182,6 +178,7 @@ export class Concast<T> extends Observable<T> {
this.sub = null;
this.latest = ["error", error];
this.reject(error);
this.notify("error", error);
iterateObserversSafely(this.observers, "error", error);
}
},
Expand All @@ -199,6 +196,7 @@ export class Concast<T> extends Observable<T> {
} else {
this.resolve();
}
this.notify("complete");
// We do not store this.latest = ["complete"], because doing so
// discards useful information about the previous next (or
// error) message. Instead, if new observers subscribe after
Expand All @@ -215,29 +213,35 @@ export class Concast<T> extends Observable<T> {
},
};

public cleanup(callback: () => any) {
private nextResultListeners = new Set<NextResultListener>();

private notify(
method: Parameters<NextResultListener>[0],
arg?: Parameters<NextResultListener>[1],
) {
const { nextResultListeners } = this;
if (nextResultListeners.size) {
// Replacing this.nextResultListeners first ensures it does not grow while
// we are iterating over it, potentially leading to infinite loops.
this.nextResultListeners = new Set;
nextResultListeners.forEach(listener => listener(method, arg));
}
}

// We need a way to run callbacks just *before* the next result (or error or
// completion) is delivered by this Concast, so we can be sure any code that
// runs as a result of delivering that result/error observes the effects of
// running the callback(s). It was tempting to reuse the Observer type instead
// of introducing NextResultListener, but that messes with the sizing and
// maintenance of this.observers, and ends up being more code overall.
beforeNext(callback: NextResultListener) {
let called = false;
const once = () => {
this.nextResultListeners.add((method, arg) => {
if (!called) {
called = true;
// Removing a cleanup observer should not unsubscribe from the
// underlying Observable, so the only removeObserver behavior we
// need here is to delete observer from this.observers.
this.observers.delete(observer);
callback();
callback(method, arg);
}
}
const observer = {
next: once,
error: once,
complete: once,
};
const count = this.addCount;
this.addObserver(observer);
// Normally addObserver increments this.addCount, but we can "hide"
// cleanup observers by restoring this.addCount to its previous value
// after adding any cleanup observer.
this.addCount = count;
});
}

// A public way to abort observation and broadcast.
Expand All @@ -248,6 +252,11 @@ export class Concast<T> extends Observable<T> {
}
}

type NextResultListener = (
method: "next" | "error" | "complete",
arg?: any,
) => any;

// Necessary because the Concast constructor has a different signature
// than the Observable constructor.
fixObservableSubclass(Concast);
71 changes: 69 additions & 2 deletions src/utilities/observables/__tests__/Concast.ts
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ describe("Concast Observable (similar to Behavior Subject in RxJS)", () => {
second: 0,
};

concast.cleanup(() => {
concast.beforeNext(() => {
++cleanupCounts.first;
});

Expand All @@ -58,7 +58,7 @@ describe("Concast Observable (similar to Behavior Subject in RxJS)", () => {
},
});

concast.cleanup(() => {
concast.beforeNext(() => {
++cleanupCounts.second;
});

Expand All @@ -75,4 +75,71 @@ describe("Concast Observable (similar to Behavior Subject in RxJS)", () => {
resolve();
}).catch(reject);
});

it("concast.beforeNext listeners run before next result/error", () => {
const log: Array<number | [string, any?]> = [];
let resolve7Promise: undefined | (() => void);

const concast = new Concast([
Observable.of(1, 2),

new Promise(resolve => setTimeout(resolve, 10)).then(() => {
enqueueListener();
return Observable.of(3, 4);
}),

Observable.of(5, 6),

new Promise<void>(resolve => {
resolve7Promise = resolve;
}).then(() => {
enqueueListener();
return Observable.of(7);
}),

Observable.of(8, 9),
]);

function enqueueListener() {
concast.beforeNext((method, arg) => {
log.push([method, arg]);
});
}

const sub = concast.subscribe({
next(num) {
log.push(num);
if (num === 6) {
resolve7Promise!();
} else if (num === 8) {
enqueueListener();
// Prevent delivery of final 9 result.
sub.unsubscribe();
}
},
});

enqueueListener();

return concast.promise.then(lastResult => {
expect(lastResult).toBe(8);

expect(log).toEqual([
["next", 1],
1,
2,
["next", 3],
3,
4,
5,
6,
["next", 7],
7,
8,
["complete", void 0],
]);

sub.unsubscribe();
});
});
});