Skip to content

Commit ff4c2be

Browse files
Boris Dorofeevnodkz
authored andcommitted
feat: add queue delete mutation
1 parent c1c54fd commit ff4c2be

File tree

5 files changed

+81
-0
lines changed

5 files changed

+81
-0
lines changed

src/composeBull.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import {
88
createQueuePauseFC,
99
createQueueResumeFC,
1010
createRemoveRepeatableFC,
11+
createQueueDeleteFC,
1112
createJobAddFC,
1213
createJobAddBulkFC,
1314
createJobAddCronFC,
@@ -43,6 +44,7 @@ export function composeBull(opts: Options & { schemaComposer?: SchemaComposer<an
4344
queuePause: wrapMutation(createQueuePauseFC),
4445
queueResume: wrapMutation(createQueueResumeFC),
4546
queueRemoveRepeatable: wrapMutation(createRemoveRepeatableFC),
47+
queueDelete: wrapMutation(createQueueDeleteFC),
4648
jobAdd: wrapMutation(createJobAddFC),
4749
jobAddBulk: wrapMutation(createJobAddBulkFC),
4850
jobAddRepeatableCron: wrapMutation(createJobAddCronFC),

src/helpers/deleteKeys.ts

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
import { MutationError, ErrorCodeEnum } from './MutationError';
2+
import { getBullConnection } from './getBullConnection';
3+
import { Options } from '../definitions';
4+
5+
export async function deleteQueue(
6+
prefix: string,
7+
queueName: string,
8+
opts: Options,
9+
checkExistence: boolean = true
10+
): Promise<number> {
11+
const connection = getBullConnection(opts);
12+
13+
const fullName = [prefix, queueName].join(':');
14+
15+
if (checkExistence) {
16+
const queueExists = await connection.exists([fullName, 'meta'].join(':'));
17+
18+
if (!queueExists) {
19+
throw new MutationError('Queue not found!', ErrorCodeEnum.QUEUE_NOT_FOUND);
20+
}
21+
}
22+
23+
//redis-cli: scan 0 match fullName* count 300
24+
const stream = connection.scanStream({ match: fullName + '*', count: 300 });
25+
26+
let total = 0;
27+
28+
stream.on('data', async (keys) => {
29+
for (let i = 0; i < keys.length; i++) {
30+
const del = await connection.del(keys[i]);
31+
if (del) {
32+
console.log(keys[i]);
33+
total++;
34+
}
35+
}
36+
});
37+
38+
stream.on('end', function () {
39+
console.log('all keys have been deleted!');
40+
});
41+
42+
return total;
43+
}

src/helpers/index.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,3 +5,4 @@ export * from './normalizePrefixGlob';
55
export * from './wrapMutationFC';
66
export * from './wrapQueueArgs';
77
export * from './composeFC';
8+
export * from './deleteKeys';

src/mutation/index.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ export { createQueueCleanFC } from './queueClean';
22
export { createQueueDrainFC } from './queueDrain';
33
export { createQueuePauseFC } from './queuePause';
44
export { createQueueResumeFC } from './queueResume';
5+
export { createQueueDeleteFC } from './queueDelete';
56
export { createRemoveRepeatableFC } from './queueRemoveRepeatable';
67
export { createJobAddFC } from './jobAdd';
78
export { createJobAddBulkFC } from './jobAddBulk';

src/mutation/queueDelete.ts

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
import { SchemaComposer, ObjectTypeComposerFieldConfigAsObjectDefinition } from 'graphql-compose';
2+
import { deleteQueue } from '../helpers';
3+
import { Options } from '../definitions';
4+
5+
export function createQueueDeleteFC(
6+
sc: SchemaComposer<any>,
7+
opts: Options
8+
): ObjectTypeComposerFieldConfigAsObjectDefinition<any, any> {
9+
const { typePrefix } = opts;
10+
11+
return {
12+
type: sc.createObjectTC({
13+
name: `${typePrefix}QueueDeletePayload`,
14+
fields: {
15+
total: 'Int',
16+
},
17+
}),
18+
args: {
19+
prefix: {
20+
type: 'String!',
21+
defaultValue: 'bull',
22+
},
23+
queueName: 'String!',
24+
checkExistence: {
25+
type: 'Boolean',
26+
defaultValue: true,
27+
},
28+
},
29+
resolve: async (_, { prefix, queueName, checkExistence }) => {
30+
const total = await deleteQueue(prefix, queueName, opts, checkExistence);
31+
return { total };
32+
},
33+
};
34+
}

0 commit comments

Comments
 (0)