Skip to content

Commit 40b4376

Browse files
author
Boris
committed
feat: add queue drain mutation
1 parent d859113 commit 40b4376

File tree

3 files changed

+56
-0
lines changed

3 files changed

+56
-0
lines changed

example/src/schema/mutation/index.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import { SchemaComposer } from 'graphql-compose';
22
import { createQueueCleanFC } from './queueClean';
3+
import { createQueueDrainFC } from './queueDrain';
34
import { createQueuePauseFC } from './queuePause';
45
import { createQueueResumeFC } from './queueResume';
56

@@ -26,6 +27,7 @@ export function createMutationFields({
2627
//TODO: пропустить через map это
2728
return {
2829
queueClean: generateHelper(createQueueCleanFC({ schemaComposer })),
30+
queueDrain: generateHelper(createQueueDrainFC({ schemaComposer })),
2931
queuePause: generateHelper(createQueuePauseFC()),
3032
queueResume: generateHelper(createQueueResumeFC()),
3133
queueRemoveRepeatable: generateHelper(createRemoveRepeatableFC()),
Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
import { getQueue } from './_helpers';
2+
import { SchemaComposer } from 'graphql-compose';
3+
import { getJobTC } from '../types/job';
4+
5+
export function jobMoveToCompletedFC(schemaComposer: SchemaComposer<any>) {
6+
return {
7+
type: {
8+
name: 'JobMoveToCompletedPayload',
9+
fields: {
10+
id: 'String',
11+
job: getJobTC(schemaComposer),
12+
},
13+
},
14+
args: {
15+
queueName: 'String!',
16+
id: 'String!',
17+
},
18+
resolve: async (_, { queueName, id }, context) => {
19+
const queue = getQueue(queueName, context);
20+
const job = await queue.getJob(id);
21+
if (job) {
22+
await job.moveToCompleted({}, 'tokenmustbehere'); //TODO: нати где брать токен
23+
}
24+
return {
25+
id,
26+
job,
27+
};
28+
},
29+
};
30+
}
Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
import { getQueue } from './_helpers';
2+
3+
export function createQueueDrainFC({ schemaComposer }) {
4+
return {
5+
type: {
6+
name: 'QueueDrainPayload',
7+
fields: {
8+
jobsId: '[String!]',
9+
},
10+
},
11+
args: {
12+
queueName: 'String!',
13+
delayed: {
14+
type: 'Boolean',
15+
defaultValue: false,
16+
},
17+
},
18+
resolve: async (_, { queueName, delayed }, context) => {
19+
const queue = getQueue(queueName, context);
20+
await queue.drain(delayed);
21+
return {};
22+
},
23+
};
24+
}

0 commit comments

Comments
 (0)