Skip to content

Commit e6932e7

Browse files
author
Boris
committed
feat: add mutation job add bulk
1 parent be02ffc commit e6932e7

File tree

9 files changed

+192
-101
lines changed

9 files changed

+192
-101
lines changed

example/src/schema/index.ts

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,9 +7,7 @@ schemaComposer.Query.addFields({
77
});
88

99
schemaComposer.Mutation.addFields({
10-
...createMutationFields({
11-
schemaComposer,
12-
}),
10+
...createMutationFields(schemaComposer),
1311
});
1412

1513
export default schemaComposer.buildSchema();

example/src/schema/mutation/index.ts

Lines changed: 7 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -7,8 +7,9 @@ import { createQueueResumeFC } from './queueResume';
77
import { createRemoveRepeatableFC } from './queueRemoveRepeatable';
88

99
import { createJobAddFC } from './jobAdd';
10-
import { createJobAddRepeatableCronFC } from './jobAdd';
11-
import { createJobAddRepeatableEveryFC } from './jobAdd';
10+
import { createJobAddBulkFC } from './jobAddBulk';
11+
import { createJobAddCronFC } from './jobAddCron';
12+
import { createJobAddEveryFC } from './jobAddEvery';
1213
import { createJobDiscardFC } from './jobDiscard';
1314
import { createjobPromoteFC } from './jobPromote';
1415
import { createJobRremoveFC } from './jobRemove';
@@ -18,11 +19,7 @@ import { createJobLogAddFC } from './jobLogAdd';
1819

1920
import { createGenerateHelper } from './helpers/wrapMutationFC';
2021

21-
export function createMutationFields({
22-
schemaComposer,
23-
}: {
24-
schemaComposer: SchemaComposer<any>;
25-
}): any {
22+
export function createMutationFields(schemaComposer: SchemaComposer<any>): any {
2623
const generateHelper = createGenerateHelper(schemaComposer);
2724

2825
function generateWrappedFC(
@@ -39,8 +36,9 @@ export function createMutationFields({
3936
queueRemoveRepeatable: generateWrappedFC(createRemoveRepeatableFC),
4037

4138
jobAdd: generateWrappedFC(createJobAddFC),
42-
jobAddRepeatableCron: generateWrappedFC(createJobAddRepeatableCronFC),
43-
jobAddRepeatableEvery: generateWrappedFC(createJobAddRepeatableEveryFC),
39+
jobAddBulk: generateWrappedFC(createJobAddBulkFC),
40+
jobAddRepeatableCron: generateWrappedFC(createJobAddCronFC),
41+
jobAddRepeatableEvery: generateWrappedFC(createJobAddEveryFC),
4442

4543
jobDiscard: generateWrappedFC(createJobDiscardFC),
4644
jobPromote: generateWrappedFC(createjobPromoteFC),
Lines changed: 24 additions & 80 deletions
Original file line numberDiff line numberDiff line change
@@ -1,74 +1,36 @@
11
import { getQueue } from './helpers/wrapMutationFC';
2-
import {
3-
SchemaComposer,
4-
InputTypeComposer,
5-
ObjectTypeComposerFieldConfigAsObjectDefinition,
6-
} from 'graphql-compose';
2+
import { SchemaComposer, ObjectTypeComposerFieldConfigAsObjectDefinition } from 'graphql-compose';
73
import { getJobTC } from '../types/job/Job';
84

9-
function getPayloadTC(sc: SchemaComposer<any>) {
10-
return sc.getOrCreateOTC('JobAddPayload', (etc) => {
11-
etc.addFields({
12-
job: getJobTC(sc),
13-
});
14-
});
15-
}
16-
17-
export function createJobAddFC(sc: SchemaComposer<any>) {
18-
return composeJobAddFC(sc, createJobOptionsInputTC('JobOptionsInput', sc));
19-
}
20-
21-
export function createJobAddRepeatableCronFC(sc: SchemaComposer<any>) {
22-
return composeJobAddFC(
23-
sc,
24-
createJobOptionsInputTC('JobOptionsInputRepeatableCron', sc).addFields({
25-
repeat: sc
26-
.createInputTC({
27-
name: 'RepeatOptionsInputCron',
28-
fields: {
29-
tz: 'String',
30-
endDate: 'Date',
31-
limit: 'Int',
32-
cron: 'String!',
33-
startDate: 'Date',
34-
},
35-
})
36-
.getTypeNonNull(),
37-
})
38-
);
39-
}
40-
41-
export function createJobAddRepeatableEveryFC(sc: SchemaComposer<any>) {
42-
return composeJobAddFC(
43-
sc,
44-
createJobOptionsInputTC('JobOptionsInputRepeatableEvery', sc).addFields({
45-
repeat: sc
46-
.createInputTC({
47-
name: 'RepeatOptionsInputEvery',
48-
fields: {
49-
tz: 'String',
50-
endDate: 'Date',
51-
limit: 'Int',
52-
cron: 'String!',
53-
startDate: 'Date',
54-
},
55-
})
56-
.getTypeNonNull(),
57-
})
58-
);
59-
}
60-
61-
function composeJobAddFC(
62-
sc: SchemaComposer<any>,
63-
optionsTC: InputTypeComposer<any>
5+
export function createJobAddFC(
6+
sc: SchemaComposer<any>
647
): ObjectTypeComposerFieldConfigAsObjectDefinition<any, any> {
658
return {
66-
type: getPayloadTC(sc),
9+
type: sc.createObjectTC({
10+
name: 'JobAddPayload',
11+
fields: {
12+
job: getJobTC(sc),
13+
},
14+
}),
6715
args: {
6816
queueName: 'String!',
6917
jobName: 'String!',
7018
data: 'JSON!',
71-
options: optionsTC,
19+
options: sc.createInputTC({
20+
name: 'JobOptionsInput',
21+
fields: {
22+
priority: 'Int',
23+
delay: 'Int',
24+
attempts: 'Int',
25+
backoff: 'Int', // | TODO: BackoffOptions
26+
lifo: 'Boolean',
27+
timeout: 'Int',
28+
jobId: 'String',
29+
removeOnComplete: 'Boolean', //TODO: bool or int
30+
removeOnFail: 'Boolean', //TODO: bool or int
31+
stackTraceLimit: 'Int',
32+
},
33+
}),
7234
},
7335
resolve: async (_, { queueName, jobName, data, options }, context) => {
7436
const queue = getQueue(queueName, context);
@@ -79,21 +41,3 @@ function composeJobAddFC(
7941
},
8042
};
8143
}
82-
83-
function createJobOptionsInputTC(name: string, sc: SchemaComposer<any>): InputTypeComposer<any> {
84-
return sc.createInputTC({
85-
name,
86-
fields: {
87-
priority: 'Int',
88-
delay: 'Int',
89-
attempts: 'Int',
90-
backoff: 'Int', // | TODO: BackoffOptions
91-
lifo: 'Boolean',
92-
timeout: 'Int',
93-
jobId: 'String',
94-
removeOnComplete: 'Boolean', //TODO: bool or int
95-
removeOnFail: 'Boolean', //TODO: bool or int
96-
stackTraceLimit: 'Int',
97-
},
98-
});
99-
}
Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
import { getQueue } from './helpers/wrapMutationFC';
2+
import { SchemaComposer, ObjectTypeComposerFieldConfigAsObjectDefinition } from 'graphql-compose';
3+
import { getJobTC } from '../types/job/Job';
4+
5+
export function createJobAddBulkFC(
6+
sc: SchemaComposer<any>
7+
): ObjectTypeComposerFieldConfigAsObjectDefinition<any, any> {
8+
return {
9+
type: sc.createObjectTC({
10+
name: 'JobAddBulkPayload',
11+
fields: {
12+
jobs: getJobTC(sc).getTypePlural(),
13+
},
14+
}),
15+
args: {
16+
queueName: 'String!',
17+
jobs: sc
18+
.createInputTC({
19+
name: 'JobAddInputBulk',
20+
fields: {
21+
name: 'String!',
22+
data: 'JSON!',
23+
options: sc.createInputTC({
24+
name: 'JobOptionsInputBulk',
25+
fields: {
26+
priority: 'Int',
27+
delay: 'Int',
28+
attempts: 'Int',
29+
backoff: 'Int', // | TODO: BackoffOptions
30+
lifo: 'Boolean',
31+
timeout: 'Int',
32+
jobId: 'String',
33+
removeOnComplete: 'Boolean', //TODO: bool or int
34+
removeOnFail: 'Boolean', //TODO: bool or int
35+
stackTraceLimit: 'Int',
36+
},
37+
}),
38+
},
39+
})
40+
.getTypePlural(),
41+
},
42+
resolve: async (_, { queueName, jobs }, context) => {
43+
console.log('Я здесб!');
44+
const queue = getQueue(queueName, context);
45+
const jobsRes = await queue.addBulk(jobs);
46+
return {
47+
jobs: jobsRes,
48+
};
49+
},
50+
};
51+
}
Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
import { getQueue } from './helpers/wrapMutationFC';
2+
import { SchemaComposer, ObjectTypeComposerFieldConfigAsObjectDefinition } from 'graphql-compose';
3+
import { getJobTC } from '../types/job/Job';
4+
5+
export function createJobAddCronFC(
6+
sc: SchemaComposer<any>
7+
): ObjectTypeComposerFieldConfigAsObjectDefinition<any, any> {
8+
return {
9+
type: sc.createObjectTC({
10+
name: 'JobAddCronPayload',
11+
fields: {
12+
job: getJobTC(sc),
13+
},
14+
}),
15+
args: {
16+
queueName: 'String!',
17+
jobName: 'String!',
18+
data: 'JSON!',
19+
options: sc.createInputTC({
20+
name: 'JobOptionsInputCron',
21+
fields: {
22+
priority: 'Int',
23+
delay: 'Int',
24+
attempts: 'Int',
25+
backoff: 'Int', // | TODO: BackoffOptions
26+
lifo: 'Boolean',
27+
timeout: 'Int',
28+
jobId: 'String',
29+
removeOnComplete: 'Boolean', //TODO: bool or int
30+
removeOnFail: 'Boolean', //TODO: bool or int
31+
stackTraceLimit: 'Int',
32+
repeat: sc
33+
.createInputTC({
34+
name: 'JobOptionsInputRepeatCron',
35+
fields: {
36+
tz: 'String',
37+
endDate: 'Date',
38+
limit: 'Int',
39+
cron: 'String!',
40+
startDate: 'Date',
41+
},
42+
})
43+
.getTypeNonNull(),
44+
},
45+
}),
46+
},
47+
resolve: async (_, { queueName, jobName, data, options }, context) => {
48+
const queue = getQueue(queueName, context);
49+
const job = await queue.add(jobName, data, options);
50+
return {
51+
job,
52+
};
53+
},
54+
};
55+
}
Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
import { getQueue } from './helpers/wrapMutationFC';
2+
import { SchemaComposer, ObjectTypeComposerFieldConfigAsObjectDefinition } from 'graphql-compose';
3+
import { getJobTC } from '../types/job/Job';
4+
5+
export function createJobAddEveryFC(
6+
sc: SchemaComposer<any>
7+
): ObjectTypeComposerFieldConfigAsObjectDefinition<any, any> {
8+
return {
9+
type: sc.createObjectTC({
10+
name: 'JobAddEveryPayload',
11+
fields: {
12+
job: getJobTC(sc),
13+
},
14+
}),
15+
args: {
16+
queueName: 'String!',
17+
jobName: 'String!',
18+
data: 'JSON!',
19+
options: sc.createInputTC({
20+
name: 'JobOptionsInputEvery',
21+
fields: {
22+
priority: 'Int',
23+
delay: 'Int',
24+
attempts: 'Int',
25+
backoff: 'Int', // | TODO: BackoffOptions
26+
lifo: 'Boolean',
27+
timeout: 'Int',
28+
jobId: 'String',
29+
removeOnComplete: 'Boolean', //TODO: bool or int
30+
removeOnFail: 'Boolean', //TODO: bool or int
31+
stackTraceLimit: 'Int',
32+
repeat: sc
33+
.createInputTC({
34+
name: 'JobOptionsInputRepeatEvery',
35+
fields: {
36+
tz: 'String',
37+
endDate: 'Date',
38+
limit: 'Int',
39+
every: 'String!',
40+
},
41+
})
42+
.getTypeNonNull(),
43+
},
44+
}),
45+
},
46+
resolve: async (_, { queueName, jobName, data, options }, context) => {
47+
const queue = getQueue(queueName, context);
48+
const job = await queue.add(jobName, data, options);
49+
return {
50+
job,
51+
};
52+
},
53+
};
54+
}

example/src/schema/mutation/queueDrain.ts

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -9,9 +9,6 @@ export function createQueueDrainFC(
99
'Drains the queue, i.e., removes all jobs that are waiting or delayed, but not active, completed or failed.',
1010
type: sc.createObjectTC({
1111
name: 'QueueDrainPayload',
12-
fields: {
13-
jobsId: '[String!]',
14-
},
1512
}),
1613
args: {
1714
queueName: 'String!',

example/src/schema/mutation/queuePause.ts

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,9 +7,6 @@ export function createQueuePauseFC(
77
return {
88
type: sc.createObjectTC({
99
name: 'QueuePausePayload',
10-
fields: {
11-
queueName: 'String!',
12-
},
1310
}),
1411
args: {
1512
queueName: 'String!',

example/src/schema/mutation/queueResume.ts

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,9 +7,6 @@ export function createQueueResumeFC(
77
return {
88
type: sc.createObjectTC({
99
name: 'QueueResumePayload',
10-
fields: {
11-
queueName: 'String!',
12-
},
1310
}),
1411
args: {
1512
queueName: 'String!',

0 commit comments

Comments
 (0)