Skip to content

Commit

Permalink
Merge pull request #7170 from javier-garcia-meteologica/fix-concast-c…
Browse files Browse the repository at this point in the history
…leanup2

Fix cleanup observer blocking unsubscribe, part 2. Fixes #6985.
  • Loading branch information
benjamn committed Oct 16, 2020
2 parents 1a7c9a0 + ef936ed commit fd1d03a
Show file tree
Hide file tree
Showing 2 changed files with 106 additions and 23 deletions.
85 changes: 85 additions & 0 deletions src/core/__tests__/QueryManager/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -462,6 +462,91 @@ describe('QueryManager', () => {
expect(subscription.unsubscribe).not.toThrow();
});

// Query should be aborted on last .unsubscribe()
itAsync('causes link unsubscription if unsubscribed', (resolve, reject) => {
const expResult = {
data: {
allPeople: {
people: [
{
name: 'Luke Skywalker',
},
],
},
},
};

const request = {
query: gql`
query people {
allPeople(first: 1) {
people {
name
}
}
}
`,
variables: undefined
};

const mockedResponse = {
request,
result: expResult
};

const onRequestSubscribe = jest.fn();
const onRequestUnsubscribe = jest.fn();

const mockedSingleLink = new ApolloLink(() => {
return new Observable(observer => {
onRequestSubscribe();

const timer = setTimeout(() => {
observer.next(mockedResponse.result);
observer.complete();
}, 0);

return () => {
onRequestUnsubscribe();
clearTimeout(timer);
};
});
});

const mockedQueryManger = new QueryManager({
link: mockedSingleLink,
cache: new InMemoryCache({ addTypename: false }),
});

const observableQuery = mockedQueryManger.watchQuery({
query: request.query,
variables: request.variables,
notifyOnNetworkStatusChange: false
});

const observerCallback = wrap(reject, () => {
reject(new Error('Link subscription should have been cancelled'));
});

const subscription = observableQuery.subscribe({
next: observerCallback,
error: observerCallback,
complete: observerCallback
});

subscription.unsubscribe();

return new Promise(
// Unsubscribing from the link happens after a microtask
// (Promise.resolve().then) delay, so we need to wait at least that
// long before verifying onRequestUnsubscribe was called.
resolve => setTimeout(resolve, 0)
).then(() => {
expect(onRequestSubscribe).toHaveBeenCalledTimes(1);
expect(onRequestUnsubscribe).toHaveBeenCalledTimes(1);
}).then(resolve, reject);
});

itAsync('supports interoperability with other Observable implementations like RxJS', (resolve, reject) => {
const expResult = {
data: {
Expand Down
44 changes: 21 additions & 23 deletions src/utilities/observables/Concast.ts
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ export class Concast<T> extends Observable<T> {
// source observable. It's tempting to do this step lazily in
// addObserver, but this.promise can be accessed without calling
// addObserver, so consumption needs to begin eagerly.
this.handlers.complete!();
this.handlers.complete();
}

private deliverLastMessage(observer: Observer<T>) {
Expand Down Expand Up @@ -131,15 +131,12 @@ export class Concast<T> extends Observable<T> {
quietly?: boolean,
) {
if (this.observers.delete(observer) &&
this.observers.size < 1) {
if (quietly) return;
if (this.sub) {
this.sub.unsubscribe();
// In case anyone happens to be listening to this.promise, after
// this.observers has become empty.
this.reject(new Error("Observable cancelled prematurely"));
}
this.sub = null;
--this.addCount < 1 &&
!quietly) {
// In case there are still any cleanup observers in this.observers,
// and no error or completion has been broadcast yet, make sure
// those observers receive an error that terminates them.
this.handlers.error(new Error("Observable cancelled prematurely"));
}
}

Expand All @@ -159,17 +156,21 @@ export class Concast<T> extends Observable<T> {

// Bound handler functions that can be reused for every internal
// subscription.
private handlers: Observer<T> = {
next: result => {
private handlers = {
next: (result: T) => {
if (this.sub !== null) {
this.latest = ["next", result];
iterateObserversSafely(this.observers, "next", result);
}
},

error: error => {
if (this.sub !== null) {
if (this.sub) this.sub.unsubscribe();
error: (error: any) => {
const { sub } = this;
if (sub !== null) {
// Delay unsubscribing from the underlying subscription slightly,
// so that immediately subscribing another observer can keep the
// subscription active.
if (sub) Promise.resolve().then(() => sub.unsubscribe());
this.sub = null;
this.latest = ["error", error];
this.reject(error);
Expand Down Expand Up @@ -209,13 +210,10 @@ export class Concast<T> extends Observable<T> {
const once = () => {
if (!called) {
called = true;
// If there have been no other (non-cleanup) observers added, pass
// true for the quietly argument, so the removal of the cleanup
// observer does not call this.sub.unsubscribe. If a cleanup
// observer is added and removed before any other observers
// subscribe, we do not want to prevent other observers from
// subscribing later.
this.removeObserver(observer, !this.addCount);
// Removing a cleanup observer should not unsubscribe from the
// underlying Observable, so the only removeObserver behavior we
// need here is to delete observer from this.observers.
this.observers.delete(observer);
callback();
}
}
Expand All @@ -236,7 +234,7 @@ export class Concast<T> extends Observable<T> {
public cancel = (reason: any) => {
this.reject(reason);
this.sources = [];
this.handlers.complete!();
this.handlers.complete();
}
}

Expand Down

0 comments on commit fd1d03a

Please sign in to comment.