Skip to content

Commit

Permalink
test: cancellation of subscription execution and stream execution (#6020
Browse files Browse the repository at this point in the history
)

* test: cancellation of subscription execution and stream execution

* fix: cancelation logic

* add changelog

* fix implementation
  • Loading branch information
n1ru4l committed Mar 28, 2024
1 parent 0672405 commit b07be2b
Show file tree
Hide file tree
Showing 5 changed files with 254 additions and 163 deletions.
5 changes: 5 additions & 0 deletions .changeset/eight-clocks-share.md
@@ -0,0 +1,5 @@
---
"@graphql-tools/executor": patch
---

Correctly raise `AbortError` for defer payloads
296 changes: 223 additions & 73 deletions packages/executor/src/execution/__tests__/abort-signal.test.ts
Expand Up @@ -77,123 +77,115 @@ describe('Abort Signal', () => {
expect(stopped).toBe(true);
expect(results).toEqual([0, 1, 2, 3, 4]);
});
it('should stop the serial mutation execution', async () => {
it('pending subscription execution is canceled', async () => {
const controller = new AbortController();
const firstFn = jest.fn(() => true);
const secondFn = jest.fn(() => {
controller.abort();
return true;
});
const thirdFn = jest.fn(() => true);
const rootResolverGotInvokedD = createDeferred();
const requestGotCancelledD = createDeferred();
let aResolverGotInvoked = false;

const schema = makeExecutableSchema({
typeDefs: /* GraphQL */ `
type Query {
_: Boolean
}
type Mutation {
first: Boolean
second: Boolean
third: Boolean
type Subscription {
a: A!
}
type A {
a: String!
}
`,
resolvers: {
Mutation: {
first: firstFn,
second: secondFn,
third: thirdFn,
Subscription: {
a: {
async *subscribe() {
yield 1;
},
async resolve() {
rootResolverGotInvokedD.resolve();
await requestGotCancelledD.promise;
return { a: 'a' };
},
},
},
A: {
a() {
aResolverGotInvoked = true;
return 'a';
},
},
},
});
const result = await normalizedExecutor({
schema,
document: parse(/* GraphQL */ `
mutation {
first
second
third
subscription {
a {
a
}
}
`),
signal: controller.signal,
});
expect(firstFn).toHaveBeenCalledTimes(1);
expect(secondFn).toHaveBeenCalledTimes(1);
expect(thirdFn).toHaveBeenCalledTimes(0);
expect(result).toMatchObject({
data: {
first: true,
second: true,
third: null,
},
errors: [
{
message: 'Execution aborted',
path: ['second'],
locations: [
{
line: 4,
column: 11,
},
],
},
],
});
assertAsyncIterable(result);
const iterator = result![Symbol.asyncIterator]();
const $next = iterator.next();
await rootResolverGotInvokedD.promise;
controller.abort();
await expect($next).rejects.toMatchInlineSnapshot(`DOMException {}`);
expect(aResolverGotInvoked).toEqual(false);
});
it('should stop the parallel query execution', async () => {
let resolve$: (value: any) => void = () => {};
it('should stop the serial mutation execution', async () => {
const controller = new AbortController();

let didInvokeFirstFn = false;
let didInvokeSecondFn = false;
let didInvokeThirdFn = false;
const schema = makeExecutableSchema({
typeDefs: /* GraphQL */ `
type Query {
_: Boolean
}
type Mutation {
first: Boolean
second: Boolean
third: Boolean
}
`,
resolvers: {
Query: {
first: async () => true,
second: async () => {
Mutation: {
first() {
didInvokeFirstFn = true;
return true;
},
second() {
didInvokeSecondFn = true;
controller.abort();
return true;
},
third: () =>
new Promise(resolve => {
resolve$ = resolve;
}),
third() {
didInvokeThirdFn = true;
return true;
},
},
},
});
const result = await normalizedExecutor({
const result$ = normalizedExecutor({
schema,
document: parse(/* GraphQL */ `
query {
mutation {
first
second
third
}
`),
signal: controller.signal,
});
resolve$?.(true);
expect(result).toMatchObject({
data: {
first: true,
second: true,
third: null,
},
errors: [
{
message: 'Execution aborted',
path: ['second'],
locations: [
{
line: 4,
column: 11,
},
],
},
],
});
expect(result$).rejects.toMatchInlineSnapshot(`DOMException {}`);
expect(didInvokeFirstFn).toBe(true);
expect(didInvokeSecondFn).toBe(true);
expect(didInvokeThirdFn).toBe(false);
});
it('should stop stream execution', async () => {
const controller = new AbortController();
Expand Down Expand Up @@ -244,7 +236,7 @@ describe('Abort Signal', () => {
await expect(result$).rejects.toMatchInlineSnapshot(`DOMException {}`);
expect(isAborted).toEqual(true);
});
it('stops pending stream execution for incremental delivery', async () => {
it('stops pending stream execution for incremental delivery (@stream)', async () => {
const controller = new AbortController();
const d = createDeferred();
let isReturnInvoked = false;
Expand Down Expand Up @@ -306,6 +298,164 @@ describe('Abort Signal', () => {
await expect(next$).rejects.toMatchInlineSnapshot(`DOMException {}`);
expect(isReturnInvoked).toEqual(true);
});
it('stops pending stream execution for parallel sources incremental delivery (@stream)', async () => {
const controller = new AbortController();
const d1 = createDeferred();
const d2 = createDeferred();

let isReturn1Invoked = false;
let isReturn2Invoked = false;

const schema = makeExecutableSchema({
typeDefs: /* GraphQL */ `
type Query {
counter1: [Int!]!
counter2: [Int!]!
}
`,
resolvers: {
Query: {
counter1: () => ({
[Symbol.asyncIterator]() {
return this;
},
next() {
return d1.promise.then(() => ({ done: true }));
},
return() {
isReturn1Invoked = true;
d1.resolve();
return Promise.resolve({ done: true });
},
}),
counter2: () => ({
[Symbol.asyncIterator]() {
return this;
},
next() {
return d2.promise.then(() => ({ done: true }));
},
return() {
isReturn2Invoked = true;
d2.resolve();
return Promise.resolve({ done: true });
},
}),
},
},
});

const result = await normalizedExecutor({
schema,
document: parse(/* GraphQL */ `
query {
counter1 @stream
counter2 @stream
}
`),
signal: controller.signal,
});

if (!isAsyncIterable(result)) {
throw new Error('Result is not an async iterable');
}

const iter = result[Symbol.asyncIterator]();

const next = await iter.next();
expect(next).toEqual({
done: false,
value: {
data: {
counter1: [],
counter2: [],
},
hasNext: true,
},
});

const next$ = iter.next();
controller.abort();
await expect(next$).rejects.toMatchInlineSnapshot(`DOMException {}`);
expect(isReturn1Invoked).toEqual(true);
expect(isReturn2Invoked).toEqual(true);
});
it('stops pending stream execution for incremental delivery (@defer)', async () => {
const aResolverGotInvokedD = createDeferred();
const requestGotCancelledD = createDeferred();
let bResolverGotInvoked = false;

const schema = makeExecutableSchema({
typeDefs: /* GraphQL */ `
type Query {
root: A!
}
type A {
a: B!
}
type B {
b: String
}
`,
resolvers: {
Query: {
async root() {
return { a: 'a' };
},
},
A: {
async a() {
aResolverGotInvokedD.resolve();
await requestGotCancelledD.promise;
return { b: 'b' };
},
},
B: {
b: obj => {
bResolverGotInvoked = true;
return obj.b;
},
},
},
});
const controller = new AbortController();
const result = await normalizedExecutor({
schema,
document: parse(/* GraphQL */ `
query {
root {
... @defer {
a {
b
}
}
}
}
`),
signal: controller.signal,
});

if (!isAsyncIterable(result)) {
throw new Error('Result is not an async iterable');
}

const iterator = result[Symbol.asyncIterator]();
const next = await iterator.next();
expect(next.value).toMatchInlineSnapshot(`
{
"data": {
"root": {},
},
"hasNext": true,
}
`);
const next$ = iterator.next();
await aResolverGotInvokedD.promise;
controller.abort();
requestGotCancelledD.resolve();
await expect(next$).rejects.toThrow('This operation was aborted');
expect(bResolverGotInvoked).toBe(false);
});
it('stops promise execution', async () => {
const controller = new AbortController();
const d = createDeferred();
Expand Down

0 comments on commit b07be2b

Please sign in to comment.