Skip to content

Commit

Permalink
Fix cleanup observer blocking unsubscribe (2) (#6985)
Browse files Browse the repository at this point in the history
  • Loading branch information
Javier committed Oct 15, 2020
1 parent 2b8ee34 commit 2960a3e
Show file tree
Hide file tree
Showing 5 changed files with 99 additions and 19 deletions.
67 changes: 66 additions & 1 deletion src/core/__tests__/QueryManager/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import {
// mocks
import mockQueryManager from '../../../utilities/testing/mocking/mockQueryManager';
import mockWatchQuery from '../../../utilities/testing/mocking/mockWatchQuery';
import { MockApolloLink, mockSingleLink } from '../../../utilities/testing/mocking/mockLink';
import { MockApolloLink, mockSingleLink, MockLink } from '../../../utilities/testing/mocking/mockLink';

// core
import { ApolloQueryResult } from '../../types';
Expand Down Expand Up @@ -460,6 +460,71 @@ describe('QueryManager', () => {
expect(subscription.unsubscribe).not.toThrow();
});

// Query should be aborted on last .unsubscribe()
itAsync('causes immediate link unsubscription if unsubscribed', (resolve, reject) => {
const expResult = {
data: {
allPeople: {
people: [
{
name: 'Luke Skywalker',
},
],
},
},
};

const request = {
query: gql`
query people {
allPeople(first: 1) {
people {
name
}
}
}
`,
variables: undefined
};

const mockedResponse = {
request,
result: expResult
};

const onRequestSubscribe = jest.fn();
const onRequestUnsubscribe = jest.fn();

const mockedSingleLink = new MockLink([mockedResponse], {
addTypename: true,
onSubscribe: onRequestSubscribe,
onUnsubscribe: onRequestUnsubscribe
});

const mockedQueryManger = new QueryManager({
link: mockedSingleLink,
cache: new InMemoryCache({ addTypename: false }),
});

const observableQuery = mockedQueryManger.watchQuery({
query: request.query,
variables: request.variables,
notifyOnNetworkStatusChange: false
});

const subscription = observableQuery.subscribe({
next: wrap(reject, () => {
reject(new Error('Link subscriptions should have been cancelled'));
}),
});

subscription.unsubscribe();

expect(onRequestSubscribe).toHaveBeenCalledTimes(1)
expect(onRequestUnsubscribe).toHaveBeenCalledTimes(1)
resolve();
});

itAsync('supports interoperability with other Observable implementations like RxJS', (resolve, reject) => {
const expResult = {
data: {
Expand Down
2 changes: 1 addition & 1 deletion src/react/components/__tests__/client/Mutation.test.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ describe('General Mutation testing', () => {

function mockClient(m: any) {
return new ApolloClient({
link: new MockLink(m, false),
link: new MockLink(m, { addTypename: false }),
cache: new Cache({ addTypename: false })
});
}
Expand Down
26 changes: 14 additions & 12 deletions src/utilities/observables/Concast.ts
Original file line number Diff line number Diff line change
Expand Up @@ -130,16 +130,19 @@ export class Concast<T> extends Observable<T> {
observer: Observer<T>,
quietly?: boolean,
) {
if (this.observers.delete(observer) &&
this.observers.size < 1) {
if (quietly) return;
if (this.sub) {
this.sub.unsubscribe();
// In case anyone happens to be listening to this.promise, after
// this.observers has become empty.
this.reject(new Error("Observable cancelled prematurely"));
if (this.observers.delete(observer)) {
--this.addCount;

if (this.addCount < 1) {
if (quietly) return;
if (this.sub) {
this.sub.unsubscribe();
// In case anyone happens to be listening to this.promise, after
// this.observers has become empty.
this.reject(new Error("Observable cancelled prematurely"));
}
this.sub = null;
}
this.sub = null;
}
}

Expand Down Expand Up @@ -209,13 +212,12 @@ export class Concast<T> extends Observable<T> {
const once = () => {
if (!called) {
called = true;
// If there have been no other (non-cleanup) observers added, pass
// true for the quietly argument, so the removal of the cleanup
// Pass true for the quietly argument, so the removal of the cleanup
// observer does not call this.sub.unsubscribe. If a cleanup
// observer is added and removed before any other observers
// subscribe, we do not want to prevent other observers from
// subscribing later.
this.removeObserver(observer, !this.addCount);
this.removeObserver(observer, true);
callback();
}
}
Expand Down
2 changes: 1 addition & 1 deletion src/utilities/testing/mocking/MockedProvider.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ export class MockedProvider extends React.Component<
defaultOptions,
link: link || new MockLink(
mocks || [],
addTypename,
{ addTypename },
),
resolvers,
});
Expand Down
21 changes: 17 additions & 4 deletions src/utilities/testing/mocking/mockLink.ts
Original file line number Diff line number Diff line change
Expand Up @@ -35,17 +35,25 @@ function requestToKey(request: GraphQLRequest, addTypename: Boolean): string {
return JSON.stringify(requestKey);
}

interface MockLinkOptions {
addTypename?: boolean;
onSubscribe?: () => void;
onUnsubscribe?: () => void;
}

export class MockLink extends ApolloLink {
public operation: Operation;
public addTypename: Boolean = true;
private mockedResponsesByKey: { [key: string]: MockedResponse[] } = {};
private options: MockLinkOptions;

constructor(
mockedResponses: ReadonlyArray<MockedResponse>,
addTypename: Boolean = true
options: MockLinkOptions = {}
) {
super();
this.addTypename = addTypename;
this.options = options;
this.addTypename = options.addTypename ?? true;
if (mockedResponses) {
mockedResponses.forEach(mockedResponse => {
this.addMockedResponse(mockedResponse);
Expand Down Expand Up @@ -109,7 +117,9 @@ export class MockLink extends ApolloLink {
}
}

return new Observable(observer => {
const requestObservable = new Observable<FetchResult>(observer => {
this.options.onSubscribe?.();

const timer = setTimeout(() => {
if (configError) {
try {
Expand Down Expand Up @@ -141,9 +151,12 @@ export class MockLink extends ApolloLink {
}, response && response.delay || 0);

return () => {
this.options.onUnsubscribe?.();
clearTimeout(timer);
};
});

return requestObservable;
}

private normalizeMockedResponse(
Expand Down Expand Up @@ -183,5 +196,5 @@ export function mockSingleLink(
maybeTypename = true;
}

return new MockLink(mocks, maybeTypename);
return new MockLink(mocks, { addTypename: maybeTypename });
}

0 comments on commit 2960a3e

Please sign in to comment.