Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Batch link can cancel operations that are in queue or in flight #9248

Merged
merged 11 commits into from
Feb 2, 2022
2 changes: 1 addition & 1 deletion src/link/batch-http/__tests__/batchHttpLink.ts
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ describe('BatchHttpLink', () => {
},
batchInterval: 1,
//if batchKey does not work, then the batch size would be 3
batchMax: 3,
batchMax: 2,
batchKey,
}),
]);
Expand Down
217 changes: 190 additions & 27 deletions src/link/batch/__tests__/batchLink.ts
Original file line number Diff line number Diff line change
@@ -1,17 +1,16 @@
import gql from 'graphql-tag';
import { print } from 'graphql';

import { ApolloLink } from '../../core/ApolloLink';
import { execute } from '../../core/execute';
import { ApolloLink, execute } from '../../core';
import { Operation, FetchResult, GraphQLRequest } from '../../core/types';
import { Observable } from '../../../utilities/observables/Observable';
import { Observable } from '../../../utilities';
import { itAsync } from '../../../testing';
import {
BatchLink,
OperationBatcher,
BatchHandler,
BatchableRequest,
} from '../batchLink';
import { itAsync } from '../../../testing';

interface MockedResponse {
request: GraphQLRequest;
Expand All @@ -27,7 +26,7 @@ function getKey(operation: GraphQLRequest) {
return JSON.stringify([operationName, query, variables]);
}

export function createOperation(
function createOperation(
starting: any,
operation: GraphQLRequest,
): Operation {
Expand Down Expand Up @@ -158,11 +157,11 @@ describe('OperationBatcher', () => {
batchKey: () => 'yo',
});

expect(batcher.queuedRequests.get('')).toBeUndefined();
expect(batcher.queuedRequests.get('yo')).toBeUndefined();
expect(batcher["batchesByKey"].get('')).toBeUndefined();
expect(batcher["batchesByKey"].get('yo')).toBeUndefined();
batcher.consumeQueue();
expect(batcher.queuedRequests.get('')).toBeUndefined();
expect(batcher.queuedRequests.get('yo')).toBeUndefined();
expect(batcher["batchesByKey"].get('')).toBeUndefined();
expect(batcher["batchesByKey"].get('yo')).toBeUndefined();
});

it('should be able to add to the queue', () => {
Expand All @@ -186,11 +185,11 @@ describe('OperationBatcher', () => {
operation: createOperation({}, { query }),
};

expect(batcher.queuedRequests.get('')).toBeUndefined();
expect(batcher["batchesByKey"].get('')).toBeUndefined();
batcher.enqueueRequest(request).subscribe({});
expect(batcher.queuedRequests.get('')!.length).toBe(1);
expect(batcher["batchesByKey"].get('')!.size).toBe(1);
batcher.enqueueRequest(request).subscribe({});
expect(batcher.queuedRequests.get('')!.length).toBe(2);
expect(batcher["batchesByKey"].get('')!.size).toBe(2);
});

describe('request queue', () => {
Expand Down Expand Up @@ -233,7 +232,7 @@ describe('OperationBatcher', () => {

myBatcher.enqueueRequest({ operation }).subscribe(
terminatingCheck(resolve, reject, (resultObj: any) => {
expect(myBatcher.queuedRequests.get('')).toBeUndefined();
expect(myBatcher["batchesByKey"].get('')).toBeUndefined();
expect(resultObj).toEqual({ data });
}),
);
Expand Down Expand Up @@ -304,11 +303,11 @@ describe('OperationBatcher', () => {
});

try {
expect(myBatcher.queuedRequests.get('')!.length).toBe(2);
expect(myBatcher["batchesByKey"].get('')!.size).toBe(2);
const observables: (
| Observable<FetchResult>
| undefined)[] = myBatcher.consumeQueue()!;
expect(myBatcher.queuedRequests.get('')).toBeUndefined();
expect(myBatcher["batchesByKey"].get('')).toBeUndefined();
expect(observables.length).toBe(2);
} catch (e) {
reject(e);
Expand Down Expand Up @@ -346,27 +345,27 @@ describe('OperationBatcher', () => {
myBatcher.enqueueRequest({ operation }).subscribe({});
myBatcher.enqueueRequest({ operation }).subscribe({});
myBatcher.enqueueRequest({ operation }).subscribe({});
expect(myBatcher.queuedRequests.get('')!.length).toEqual(3);
expect(myBatcher["batchesByKey"].get('')!.size).toEqual(3);

// 2. Run the timer halfway.
jest.advanceTimersByTime(batchInterval / 2);
expect(myBatcher.queuedRequests.get('')!.length).toEqual(3);
expect(myBatcher["batchesByKey"].get('')!.size).toEqual(3);

// 3. Queue a 4th request, causing the timer to reset.
myBatcher.enqueueRequest({ operation }).subscribe({});
expect(myBatcher.queuedRequests.get('')!.length).toEqual(4);
expect(myBatcher["batchesByKey"].get('')!.size).toEqual(4);

// 4. Run the timer to batchInterval + 1, at this point, if debounce were
// not set, the original 3 requests would have fired, but we expect
// instead that the queries will instead fire at
// (batchInterval + batchInterval / 2).
jest.advanceTimersByTime(batchInterval / 2 + 1);
expect(myBatcher.queuedRequests.get('')!.length).toEqual(4);
expect(myBatcher["batchesByKey"].get('')!.size).toEqual(4);

// 5. Finally, run the timer to (batchInterval + batchInterval / 2) +1,
// and expect the queue to be empty.
jest.advanceTimersByTime(batchInterval / 2);
expect(myBatcher.queuedRequests.size).toEqual(0);
expect(myBatcher["batchesByKey"].size).toEqual(0);
resolve();
});
});
Expand Down Expand Up @@ -396,19 +395,130 @@ describe('OperationBatcher', () => {

batcher.enqueueRequest({ operation }).subscribe({});
try {
expect(batcher.queuedRequests.get('')!.length).toBe(1);
expect(batcher["batchesByKey"].get('')!.size).toBe(1);
} catch (e) {
reject(e);
}

setTimeout(
terminatingCheck(resolve, reject, () => {
expect(batcher.queuedRequests.get('')).toBeUndefined();
expect(batcher["batchesByKey"].get('')).toBeUndefined();
}),
20,
);
});

itAsync('should cancel single query in queue when unsubscribing', (resolve, reject) => {
const data = {
lastName: 'Ever',
firstName: 'Greatest',
};

const batcher = new OperationBatcher({
batchInterval: 10,
batchHandler: () =>
new Observable(observer => {
observer.next([{data}]);
setTimeout(observer.complete.bind(observer));
}),
});

const query = gql`
query {
author {
firstName
lastName
}
}
`;

batcher.enqueueRequest({
operation: createOperation({}, { query }),
}).subscribe(() => reject('next should never be called')).unsubscribe();

expect(batcher["batchesByKey"].get('')).toBeUndefined();
resolve();
});

itAsync('should cancel single query in queue with multiple subscriptions', (resolve, reject) => {
const data = {
lastName: 'Ever',
firstName: 'Greatest',
};
const batcher = new OperationBatcher({
batchInterval: 10,
batchHandler: () =>
new Observable(observer => {
observer.next([{data}]);
setTimeout(observer.complete.bind(observer));
}),
});
const query = gql`
query {
author {
firstName
lastName
}
}
`;
const operation: Operation = createOperation({}, {query});

const observable = batcher.enqueueRequest({operation});

const checkQueuedRequests = (
expectedSubscriberCount: number,
) => {
const batch = batcher["batchesByKey"].get('');
expect(batch).not.toBeUndefined();
expect(batch!.size).toBe(1);
batch!.forEach(request => {
expect(request.subscribers.size).toBe(expectedSubscriberCount);
});
};

const sub1 = observable.subscribe(() => reject('next should never be called'));
checkQueuedRequests(1);

const sub2 = observable.subscribe(() => reject('next should never be called'));
checkQueuedRequests(2);

sub1.unsubscribe();
checkQueuedRequests(1);

sub2.unsubscribe();
expect(batcher["batchesByKey"].get('')).toBeUndefined();
resolve();
});

itAsync('should cancel single query in flight when unsubscribing', (resolve, reject) => {
const batcher = new OperationBatcher({
batchInterval: 10,
batchHandler: () =>
new Observable(() => {
// Instead of typically starting an XHR, we trigger the unsubscription from outside
setTimeout(() => subscription?.unsubscribe(), 5);

return () => {
expect(batcher["batchesByKey"].get('')).toBeUndefined();
resolve();
};
}),
});

const query = gql`
query {
author {
firstName
lastName
}
}
`;

const subscription = batcher.enqueueRequest({
operation: createOperation({}, { query }),
}).subscribe(() => reject('next should never be called'));
});

itAsync('should correctly batch multiple queries', (resolve, reject) => {
const data = {
lastName: 'Ever',
Expand Down Expand Up @@ -441,7 +551,7 @@ describe('OperationBatcher', () => {
batcher.enqueueRequest({ operation }).subscribe({});
batcher.enqueueRequest({ operation: operation2 }).subscribe({});
try {
expect(batcher.queuedRequests.get('')!.length).toBe(2);
expect(batcher["batchesByKey"].get('')!.size).toBe(2);
} catch (e) {
reject(e);
}
Expand All @@ -450,7 +560,7 @@ describe('OperationBatcher', () => {
// The batch shouldn't be fired yet, so we can add one more request.
batcher.enqueueRequest({ operation: operation3 }).subscribe({});
try {
expect(batcher.queuedRequests.get('')!.length).toBe(3);
expect(batcher["batchesByKey"].get('')!.size).toBe(3);
} catch (e) {
reject(e);
}
Expand All @@ -459,12 +569,65 @@ describe('OperationBatcher', () => {
setTimeout(
terminatingCheck(resolve, reject, () => {
// The batch should've been fired by now.
expect(batcher.queuedRequests.get('')).toBeUndefined();
expect(batcher["batchesByKey"].get('')).toBeUndefined();
}),
20,
);
});

itAsync('should cancel multiple queries in queue when unsubscribing and let pass still subscribed one', (resolve, reject) => {
const data2 = {
lastName: 'Hauser',
firstName: 'Evans',
};

const batcher = new OperationBatcher({
batchInterval: 10,
batchHandler: () =>
new Observable(observer => {
observer.next([{ data: data2 }]);
setTimeout(observer.complete.bind(observer));
}),
});

const query = gql`
query {
author {
firstName
lastName
}
}
`;

const operation: Operation = createOperation({}, {query});
const operation2: Operation = createOperation({}, {query});
const operation3: Operation = createOperation({}, {query});

const sub1 = batcher.enqueueRequest({operation}).subscribe(() => reject('next should never be called'));
batcher.enqueueRequest({operation: operation2}).subscribe(result => {
expect(result.data).toBe(data2);

// The batch should've been fired by now.
expect(batcher["batchesByKey"].get('')).toBeUndefined();

resolve();
});

expect(batcher["batchesByKey"].get('')!.size).toBe(2);

sub1.unsubscribe();
expect(batcher["batchesByKey"].get('')!.size).toBe(1);

setTimeout(() => {
// The batch shouldn't be fired yet, so we can add one more request.
const sub3 = batcher.enqueueRequest({operation: operation3}).subscribe(() => reject('next should never be called'));
expect(batcher["batchesByKey"].get('')!.size).toBe(2);

sub3.unsubscribe();
expect(batcher["batchesByKey"].get('')!.size).toBe(1);
}, 5);
});

itAsync('should reject the promise if there is a network error', (resolve, reject) => {
const query = gql`
query {
Expand Down Expand Up @@ -765,14 +928,14 @@ describe('BatchLink', () => {
new BatchLink({
batchInterval: 1,
//if batchKey does not work, then the batch size would be 3
batchMax: 3,
batchMax: 2,
batchHandler,
batchKey,
}),
]);

let count = 0;
[1, 2, 3, 4].forEach(x => {
[1, 2, 3, 4].forEach(() => {
execute(link, {
query,
}).subscribe({
Expand Down
Loading