Skip to content

Commit 815f19b

Browse files
Borisnodkz
authored andcommitted
feat: add jobMoveToDelayed mutation
1 parent 7f67154 commit 815f19b

File tree

6 files changed

+64
-7
lines changed

6 files changed

+64
-7
lines changed

src/composeBull.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ import {
1818
createJobRetryFC,
1919
createJobUpdateFC,
2020
createJobLogAddFC,
21+
createJobMoveToDelayedFC,
2122
} from './mutation';
2223
import { wrapMutationFC, wrapQueueArgs, composeFC } from './helpers';
2324

@@ -52,6 +53,7 @@ export function composeBull(opts: Options & { schemaComposer?: SchemaComposer<an
5253
jobRetry: wrapMutation(createJobRetryFC),
5354
jobUpdate: wrapMutation(createJobUpdateFC),
5455
jobLogAdd: wrapMutation(createJobLogAddFC),
56+
jobMoveToDelayed: wrapMutation(createJobMoveToDelayedFC),
5557
},
5658
};
5759
}

src/helpers/__tests__/normalizePrefixGlob-test.ts

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,10 @@
11
import { normalizePrefixGlob } from '../normalizePrefixGlob';
22

33
describe('normalizePrefixGlob', () => {
4+
it('check empty mask', () => {
5+
expect(normalizePrefixGlob('')).toBe('*:*:meta');
6+
});
7+
48
it('check simple mask', () => {
59
expect(normalizePrefixGlob('bull*')).toBe('bull*:*:meta');
610
expect(normalizePrefixGlob('bull')).toBe('bull:*:meta');
@@ -10,11 +14,13 @@ describe('normalizePrefixGlob', () => {
1014
expect(normalizePrefixGlob('bull:metrics*')).toBe('bull:metrics*:meta');
1115
});
1216

13-
it('check mask with queue name', () => {
17+
it('check mask with queue name complete', () => {
1418
expect(normalizePrefixGlob('bull:metrics')).toBe('bull:metrics:meta');
19+
expect(normalizePrefixGlob('bull*:metrics')).toBe('bull*:metrics:meta');
1520
});
1621

1722
it('check mask with prefix which contains semicolons', () => {
1823
expect(normalizePrefixGlob('my:service:bull:metrics')).toBe('my:service:bull:metrics:meta');
24+
expect(normalizePrefixGlob('my:service*:bull:metrics*')).toBe('my:service*:bull:metrics*:meta');
1925
});
2026
});

src/helpers/normalizePrefixGlob.ts

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,15 @@
11
export function normalizePrefixGlob(prefixGlob: string): string {
22
let prefixGlobNorm = prefixGlob;
3-
const nameCase = prefixGlobNorm.split(':');
4-
if (nameCase.length >= 3) {
5-
prefixGlobNorm = nameCase.filter((s) => s !== '').join(':') + ':';
6-
} else if (nameCase.length === 2) {
3+
const sectionsCount = prefixGlobNorm.split(':').length - 1;
4+
5+
if (sectionsCount > 1) {
6+
prefixGlobNorm += prefixGlobNorm.endsWith(':') ? '' : ':';
7+
} else if (sectionsCount == 1) {
78
prefixGlobNorm += prefixGlobNorm.endsWith(':') ? '*:' : ':';
89
} else {
9-
prefixGlobNorm += ':*:';
10+
prefixGlobNorm += prefixGlobNorm.trim().length > 0 ? ':*:' : '*:*:';
1011
}
12+
1113
prefixGlobNorm += 'meta';
1214

1315
return prefixGlobNorm;

src/mutation/index.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,3 +13,4 @@ export { createJobRremoveFC } from './jobRemove';
1313
export { createJobRetryFC } from './jobRetry';
1414
export { createJobUpdateFC } from './jobUpdate';
1515
export { createJobLogAddFC } from './jobLogAdd';
16+
export { createJobMoveToDelayedFC } from './jobMoveToDelayed';

src/mutation/jobMoveToCompleted.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ import { SchemaComposer, ObjectTypeComposerFieldConfigAsObjectDefinition } from
33
import { getJobTC } from '../types/job/Job';
44
import { Options } from '../definitions';
55

6-
export function jobMoveToCompletedFC(
6+
export function createJobMoveToCompletedFC(
77
sc: SchemaComposer<any>,
88
opts: Options
99
): ObjectTypeComposerFieldConfigAsObjectDefinition<any, any> {

src/mutation/jobMoveToDelayed.ts

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
import { MutationError, ErrorCodeEnum } from './../helpers/MutationError';
2+
import { findQueue } from '../helpers';
3+
import { SchemaComposer, ObjectTypeComposerFieldConfigAsObjectDefinition } from 'graphql-compose';
4+
import { getJobTC } from '../types/job/Job';
5+
import { Options } from '../definitions';
6+
7+
export function createJobMoveToDelayedFC(
8+
sc: SchemaComposer<any>,
9+
opts: Options
10+
): ObjectTypeComposerFieldConfigAsObjectDefinition<any, any> {
11+
const { typePrefix } = opts;
12+
13+
return {
14+
description: 'Moves job from active to delayed.',
15+
type: sc.createObjectTC({
16+
name: `${typePrefix}JobMoveToDelayedPayload`,
17+
fields: {
18+
job: getJobTC(sc, opts),
19+
willBeProcessedOn: 'Date',
20+
},
21+
}),
22+
args: {
23+
prefix: {
24+
type: 'String!',
25+
defaultValue: 'bull',
26+
},
27+
queueName: 'String!',
28+
id: 'String!',
29+
delay: {
30+
type: 'Int!',
31+
defaultValue: 60000,
32+
},
33+
},
34+
resolve: async (_, { prefix, queueName, id, delay }) => {
35+
const queue = await findQueue(prefix, queueName, opts);
36+
const job = await queue.getJob(id);
37+
if (!job) throw new MutationError('Job not found!', ErrorCodeEnum.JOB_NOT_FOUND);
38+
const willBeProcessedOn = Date.now() + delay;
39+
await job.moveToDelayed(willBeProcessedOn);
40+
return {
41+
willBeProcessedOn,
42+
job,
43+
};
44+
},
45+
};
46+
}

0 commit comments

Comments
 (0)