Skip to content

Commit 3ebab25

Browse files
author
Boris
committed
feat: predefine queue for schema
1 parent ab49682 commit 3ebab25

36 files changed

+151
-195
lines changed

example/src/schema.ts

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,10 @@ const { queryFields, mutationFields } = composeBull({
55
schemaComposer,
66
typePrefix: 'Prefix',
77
jobDataTC: `type MyJobData { fieldA: String! fieldB: String}`,
8+
queue: {
9+
name: 'fetch_metrics',
10+
prefix: 'bull.demo',
11+
},
812
});
913

1014
schemaComposer.Query.addFields({

src/composeBull.ts

Lines changed: 29 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ import {
1919
createJobUpdateFC,
2020
createJobLogAddFC,
2121
} from './mutation';
22-
import { createMutationFC } from './helpers/wrapMutationFC';
22+
import { createMutationFC, predefineQueueArgs } from './helpers';
2323

2424
export function composeBull(opts: Options & { schemaComposer?: SchemaComposer<any> }) {
2525
const sc = opts?.schemaComposer || schemaComposer;
@@ -28,27 +28,36 @@ export function composeBull(opts: Options & { schemaComposer?: SchemaComposer<an
2828
QueueTC: getQueueTC(sc, opts),
2929
JobTC: getJobTC(sc, opts),
3030
queryFields: {
31-
queueKeys: createQueueKeysFC(sc, opts),
32-
queues: createQueuesFC(sc, opts),
33-
queue: createQueueFC(sc, opts),
34-
job: createJobFC(sc, opts),
31+
queueKeys: predefineQueueArgs(createQueueKeysFC(sc, opts), opts),
32+
queues: predefineQueueArgs(createQueuesFC(sc, opts), opts),
33+
queue: predefineQueueArgs(createQueueFC(sc, opts), opts),
34+
job: predefineQueueArgs(createJobFC(sc, opts), opts),
3535
},
3636
mutationFields: {
37-
queueClean: createMutationFC(createQueueCleanFC, sc, opts),
38-
queueDrain: createMutationFC(createQueueDrainFC, sc, opts),
39-
queuePause: createMutationFC(createQueuePauseFC, sc, opts),
40-
queueResume: createMutationFC(createQueueResumeFC, sc, opts),
41-
queueRemoveRepeatable: createMutationFC(createRemoveRepeatableFC, sc, opts),
42-
jobAdd: createMutationFC(createJobAddFC, sc, opts),
43-
jobAddBulk: createMutationFC(createJobAddBulkFC, sc, opts),
44-
jobAddRepeatableCron: createMutationFC(createJobAddCronFC, sc, opts),
45-
jobAddRepeatableEvery: createMutationFC(createJobAddEveryFC, sc, opts),
46-
jobDiscard: createMutationFC(createJobDiscardFC, sc, opts),
47-
jobPromote: createMutationFC(createjobPromoteFC, sc, opts),
48-
jobRemove: createMutationFC(createJobRremoveFC, sc, opts),
49-
jobRetry: createMutationFC(createJobRetryFC, sc, opts),
50-
jobUpdate: createMutationFC(createJobUpdateFC, sc, opts),
51-
jobLogAdd: createMutationFC(createJobLogAddFC, sc, opts),
37+
queueClean: predefineQueueArgs(createMutationFC(createQueueCleanFC, sc, opts), opts),
38+
queueDrain: predefineQueueArgs(createMutationFC(createQueueDrainFC, sc, opts), opts),
39+
queuePause: predefineQueueArgs(createMutationFC(createQueuePauseFC, sc, opts), opts),
40+
queueResume: predefineQueueArgs(createMutationFC(createQueueResumeFC, sc, opts), opts),
41+
queueRemoveRepeatable: predefineQueueArgs(
42+
createMutationFC(createRemoveRepeatableFC, sc, opts),
43+
opts
44+
),
45+
jobAdd: predefineQueueArgs(createMutationFC(createJobAddFC, sc, opts), opts),
46+
jobAddBulk: predefineQueueArgs(createMutationFC(createJobAddBulkFC, sc, opts), opts),
47+
jobAddRepeatableCron: predefineQueueArgs(
48+
createMutationFC(createJobAddCronFC, sc, opts),
49+
opts
50+
),
51+
jobAddRepeatableEvery: predefineQueueArgs(
52+
createMutationFC(createJobAddEveryFC, sc, opts),
53+
opts
54+
),
55+
jobDiscard: predefineQueueArgs(createMutationFC(createJobDiscardFC, sc, opts), opts),
56+
jobPromote: predefineQueueArgs(createMutationFC(createjobPromoteFC, sc, opts), opts),
57+
jobRemove: predefineQueueArgs(createMutationFC(createJobRremoveFC, sc, opts), opts),
58+
jobRetry: predefineQueueArgs(createMutationFC(createJobRetryFC, sc, opts), opts),
59+
jobUpdate: predefineQueueArgs(createMutationFC(createJobUpdateFC, sc, opts), opts),
60+
jobLogAdd: predefineQueueArgs(createMutationFC(createJobLogAddFC, sc, opts), opts),
5261
},
5362
};
5463
}
File renamed without changes.

src/helpers/queueFind.ts renamed to src/helpers/gettingQueues.ts

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,20 @@
1-
import { MutationError, ErrorCodeEnum } from './Error';
2-
import { createBullConnection } from '../connectRedis';
1+
import { Title } from './queueTitles';
32
import { Queue } from 'bullmq';
3+
import { createBullConnection } from '../connectRedis';
4+
import { MutationError, ErrorCodeEnum } from './MutationError';
5+
6+
export function getQueues(titles: Array<Title>): Array<Queue> {
7+
return titles.map((title) => getQueue(title.prefix, title.queueName));
8+
}
9+
10+
export function getQueue(prefix: string, queueName: string): Queue {
11+
const queue = new Queue(queueName, {
12+
prefix,
13+
connection: createBullConnection('queue'),
14+
});
15+
16+
return queue;
17+
}
418

519
export async function findQueue(
620
prefix: string,

src/helpers/index.ts

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
export * from './gettingQueues';
2+
export * from './queueTitles';
3+
export * from './MutationError';
4+
export * from './normalizePrefixGlob';
5+
export * from './wrapMutationFC';
6+
export * from './predefineQueueArgs';

src/helpers/normalizePrefixGlob.ts

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
export function normalizePrefixGlob(prefixGlob: string): string {
2+
let prefixGlobNorm = prefixGlob;
3+
const nameCase = prefixGlobNorm.split(':').length - 1;
4+
if (nameCase >= 2) {
5+
prefixGlobNorm = prefixGlobNorm.split(':').slice(0, 2).join(':') + ':';
6+
} else if (nameCase === 1) {
7+
prefixGlobNorm += prefixGlobNorm.endsWith(':') ? '*:' : ':';
8+
} else {
9+
prefixGlobNorm += ':*:';
10+
}
11+
prefixGlobNorm += 'meta';
12+
13+
return prefixGlobNorm;
14+
}

src/helpers/predefineQueueArgs.ts

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
import { Options } from './../definitions';
2+
import { ObjectTypeComposerFieldConfigAsObjectDefinition } from 'graphql-compose';
3+
4+
export function predefineQueueArgs(
5+
fieldConfig: ObjectTypeComposerFieldConfigAsObjectDefinition<any, any>,
6+
opts: Options
7+
): ObjectTypeComposerFieldConfigAsObjectDefinition<any, any> {
8+
if (!opts.queue) {
9+
fieldConfig.args = {
10+
queueName: 'String!',
11+
prefix: {
12+
type: 'String',
13+
defaultValue: 'bull',
14+
},
15+
...fieldConfig.args,
16+
};
17+
} else {
18+
const subResolve = fieldConfig.resolve || (() => ({}));
19+
fieldConfig.resolve = async (source, args, context, info) => {
20+
return subResolve(
21+
source,
22+
{ queueName: opts.queue?.name, prefix: opts.queue?.prefix, ...args },
23+
context,
24+
info
25+
);
26+
};
27+
}
28+
29+
return fieldConfig;
30+
}

src/helpers/queueTitles.ts

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
import { createBullConnection } from '../connectRedis';
2+
import { normalizePrefixGlob } from './normalizePrefixGlob';
3+
4+
export type Title = { prefix: string; queueName: string };
5+
6+
export async function fetchQueueTitles(prefix: string): Promise<Array<Title>> {
7+
const connection = createBullConnection('custom');
8+
const keys = await connection.keys(normalizePrefixGlob(prefix));
9+
10+
return keys.map((key) => {
11+
const parts = key.split(':');
12+
return {
13+
prefix: parts[0],
14+
queueName: parts[1],
15+
};
16+
});
17+
}

src/helpers/wrapMutationFC.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ import {
66
ObjectTypeComposer,
77
inspect,
88
} from 'graphql-compose';
9-
import { MutationError, ErrorCodeEnum } from './Error';
9+
import { MutationError, ErrorCodeEnum } from './MutationError';
1010

1111
export enum MutationStatusEnum {
1212
OK = 'ok',

src/mutation/jobAdd.ts

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
import { SchemaComposer, ObjectTypeComposerFieldConfigAsObjectDefinition } from 'graphql-compose';
22
import { getJobTC } from '../types/job/Job';
3-
import { findQueue } from '../helpers/queueFind';
3+
import { findQueue } from '../helpers';
44
import { Options } from '../definitions';
55
import { createJobDataITC } from '../types/job/JobInput';
66

@@ -18,11 +18,6 @@ export function createJobAddFC(
1818
},
1919
}),
2020
args: {
21-
prefix: {
22-
type: 'String',
23-
defaultValue: 'bull',
24-
},
25-
queueName: 'String!',
2621
jobName: 'String!',
2722
data: {
2823
type: () => createJobDataITC(sc, opts),

0 commit comments

Comments
 (0)