Skip to content

Commit

Permalink
Add jobKey support to crontab (#294)
Browse files Browse the repository at this point in the history
  • Loading branch information
benjie authored Oct 19, 2023
2 parents 3b95eff + d3be09d commit b46fe49
Show file tree
Hide file tree
Showing 10 changed files with 191 additions and 7 deletions.
2 changes: 1 addition & 1 deletion Dockerfile.dev
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
FROM node:14-alpine
FROM node:18-alpine
RUN apk add --no-cache bash

ENTRYPOINT ["yarn"]
Expand Down
2 changes: 2 additions & 0 deletions RELEASE_NOTES.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ emitter payloads are now `unknown` rather than `any`, so you might need to cast.
Adds support for `graphile-config` - configuration can now be read from a
`graphile.config.ts` (or `.js`, `.cjs`, etc) file.

Crontab: now supports `jobKey` and `jobKeyMode` opts (thanks @spiffytech!)

### v0.15.1

Fixes issues with graceful worker shutdowns:
Expand Down
44 changes: 44 additions & 0 deletions __tests__/__snapshots__/crontab.test.ts.snap
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ Array [
"match": [Function],
"options": Object {
"backfillPeriod": 0,
"jobKey": undefined,
"jobKeyMode": undefined,
"maxAttempts": undefined,
"priority": undefined,
"queueName": undefined,
Expand All @@ -20,6 +22,8 @@ Array [
"match": [Function],
"options": Object {
"backfillPeriod": 0,
"jobKey": undefined,
"jobKeyMode": undefined,
"maxAttempts": undefined,
"priority": undefined,
"queueName": undefined,
Expand All @@ -33,6 +37,8 @@ Array [
"match": [Function],
"options": Object {
"backfillPeriod": 0,
"jobKey": undefined,
"jobKeyMode": undefined,
"maxAttempts": undefined,
"priority": undefined,
"queueName": undefined,
Expand All @@ -46,6 +52,8 @@ Array [
"match": [Function],
"options": Object {
"backfillPeriod": 0,
"jobKey": undefined,
"jobKeyMode": undefined,
"maxAttempts": undefined,
"priority": undefined,
"queueName": undefined,
Expand All @@ -59,6 +67,8 @@ Array [
"match": [Function],
"options": Object {
"backfillPeriod": 0,
"jobKey": undefined,
"jobKeyMode": undefined,
"maxAttempts": undefined,
"priority": undefined,
"queueName": undefined,
Expand All @@ -74,6 +84,8 @@ Array [
"match": [Function],
"options": Object {
"backfillPeriod": 2685660000,
"jobKey": undefined,
"jobKeyMode": undefined,
"maxAttempts": 3,
"priority": 3,
"queueName": "my_queue",
Expand All @@ -91,6 +103,8 @@ Array [
"match": [Function],
"options": Object {
"backfillPeriod": 0,
"jobKey": undefined,
"jobKeyMode": undefined,
"maxAttempts": undefined,
"priority": undefined,
"queueName": undefined,
Expand All @@ -99,5 +113,35 @@ Array [
"task": "lots_of_spaces",
Symbol(isParsed): true,
},
Object {
"identifier": "with_key",
"match": [Function],
"options": Object {
"backfillPeriod": 0,
"jobKey": "my_key",
"jobKeyMode": "replace",
"maxAttempts": undefined,
"priority": undefined,
"queueName": undefined,
},
"payload": null,
"task": "with_key",
Symbol(isParsed): true,
},
Object {
"identifier": "with_key_and_mode",
"match": [Function],
"options": Object {
"backfillPeriod": 0,
"jobKey": "my_key",
"jobKeyMode": "preserve_run_at",
"maxAttempts": undefined,
"priority": undefined,
"queueName": undefined,
},
"payload": null,
"task": "with_key_and_mode",
Symbol(isParsed): true,
},
]
`;
52 changes: 52 additions & 0 deletions __tests__/cron-timing.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -139,3 +139,55 @@ test("clock skew doesn't prevent task from being scheduled at the right time", (
await setTime(REFERENCE_TIMESTAMP + 4 * HOUR + 1 * SECOND); // 4:00:01am
expect(cronScheduleCalls.count).toEqual(1);
}));

test("does not schedule duplicate jobs when a job key is supplied", () =>
withOptions(async (options) => {
await setTime(REFERENCE_TIMESTAMP + HOUR); // 1am
const { pgPool } = options;
await reset(pgPool, options);
const eventMonitor = new EventMonitor();
const cronFinishedBackfilling = eventMonitor.awaitNext("cron:started");
const poolReady = eventMonitor.awaitNext("pool:listen:success");
runner = await run({
...options,
crontab: `0 */4 * * * my_task ?jobKey=foo`,
events: eventMonitor.events,
});
await cronFinishedBackfilling;
await poolReady;

const cronScheduleComplete = eventMonitor.awaitNext("cron:scheduled");

const cronScheduleCalls = eventMonitor.count("cron:schedule");
// Allow the first copy of the job to get scheduled
await setTime(REFERENCE_TIMESTAMP + 4 * HOUR + 1 * SECOND); // 4:00:01am
expect(cronScheduleCalls.count).toEqual(1);
// Makes the test more reliable, due to separate connections to Postgres being slightly out of sync
await sleep(150);
const jobs = await getJobs(pgPool);
expect(jobs).toEqual([
expect.objectContaining({ task_identifier: "my_task", key: "foo" }),
]);

// Allow the system to reschedule the job after seeing it wasn't picked up
await setTime(REFERENCE_TIMESTAMP + 8 * HOUR + 1 * SECOND); // 8:00:01am
expect(cronScheduleCalls.count).toEqual(2);
// Makes the test more reliable, due to separate connections to Postgres being slightly out of sync
await sleep(150);
const jobs2 = await getJobs(pgPool);
expect(jobs2).toEqual([
expect.objectContaining({
task_identifier: "my_task",
key: "foo",
id: jobs[0].id,
}),
]);
// Original job at 4am
expect(jobs[0].payload._cron.ts).toEqual("2021-01-01T04:00:00.000Z");
// Check the job is actually updated, should be 8am now
expect(jobs2[0].payload._cron.ts).toEqual("2021-01-01T08:00:00.000Z");

// After this, the jobs should exist in the DB
await cronScheduleComplete;
await sleep(50);
}));
36 changes: 35 additions & 1 deletion __tests__/crontab.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ test("parses crontab file correctly", () => {
0 4 * * 2 every_tuesday_at_4_am {isTuesday: true}
*/10,7,56-59 1 1 1 1 one ?id=stuff&fill=4w3d2h1m&max=3&queue=my_queue&priority=3 {myExtraPayload:{stuff:"here with # hash char"}}
* * * * * lots_of_spaces
* * * * * with_key ?jobKey=my_key
* * * * * with_key_and_mode ?jobKey=my_key&jobKeyMode=preserve_run_at
`;
const parsed = parseCrontab(exampleCrontab);

Expand Down Expand Up @@ -139,6 +141,38 @@ test("parses crontab file correctly", () => {
expect(parsed[6].options).toEqual({ backfillPeriod: 0 });
expect(parsed[6].payload).toEqual(null);

expect(parsed[7].task).toEqual("with_key");
expect(parsed[7].identifier).toEqual("with_key");
const parsedCronMatch7 = (parsed[7].match as any)
.parsedCronMatch as ParsedCronMatch;
expect(parsedCronMatch7.minutes).toEqual(ALL_MINUTES);
expect(parsedCronMatch7.hours).toEqual(ALL_HOURS);
expect(parsedCronMatch7.dates).toEqual(ALL_DATES);
expect(parsedCronMatch7.months).toEqual(ALL_MONTHS);
expect(parsedCronMatch7.dows).toEqual(ALL_DOWS);
expect(parsed[7].options).toEqual({
backfillPeriod: 0,
jobKey: "my_key",
jobKeyMode: "replace",
});
expect(parsed[7].payload).toEqual(null);

expect(parsed[8].task).toEqual("with_key_and_mode");
expect(parsed[8].identifier).toEqual("with_key_and_mode");
const parsedCronMatch8 = (parsed[8].match as any)
.parsedCronMatch as ParsedCronMatch;
expect(parsedCronMatch8.minutes).toEqual(ALL_MINUTES);
expect(parsedCronMatch8.hours).toEqual(ALL_HOURS);
expect(parsedCronMatch8.dates).toEqual(ALL_DATES);
expect(parsedCronMatch8.months).toEqual(ALL_MONTHS);
expect(parsedCronMatch8.dows).toEqual(ALL_DOWS);
expect(parsed[8].options).toEqual({
backfillPeriod: 0,
jobKey: "my_key",
jobKeyMode: "preserve_run_at",
});
expect(parsed[8].payload).toEqual(null);

expect(parsed).toMatchSnapshot();
});

Expand Down Expand Up @@ -189,7 +223,7 @@ describe("gives error on syntax error", () => {
* * * * * invalid_options ?unknown=3
`),
).toThrowErrorMatchingInlineSnapshot(
`"Options on line 1 of crontab contains unsupported key 'unknown'; supported keys are: 'id', 'fill', 'max', 'queue', 'priority'."`,
`"Options on line 1 of crontab contains unsupported key 'unknown'; supported keys are: 'id', 'fill', 'max', 'queue', 'jobKey', 'jobKeyMode', 'priority'."`,
);
});

Expand Down
14 changes: 10 additions & 4 deletions src/cron.ts
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,8 @@ function makeJobForItem(
queueName: item.options.queueName,
runAt: ts,
maxAttempts: item.options.maxAttempts,
jobKey: item.options.jobKey,
jobKeyMode: item.options.jobKeyMode,
priority: item.options.priority,
};
}
Expand Down Expand Up @@ -122,8 +124,10 @@ async function scheduleCronJobs(
((json->'job')->'payload')::json as payload,
((json->'job')->>'queueName')::text as queue_name,
((json->'job')->>'runAt')::timestamptz as run_at,
((json->'job')->>'maxAttempts')::int as max_attempts,
((json->'job')->>'priority')::int as priority
((json->'job')->>'maxAttempts')::smallint as max_attempts,
((json->'job')->>'priority')::smallint as priority,
((json->'job')->>'jobKey')::text as job_key,
((json->'job')->>'jobKeyMode')::text as job_key_mode
from json_array_elements($1::json) with ordinality AS entries (json, index)
),
locks as (
Expand All @@ -145,8 +149,10 @@ async function scheduleCronJobs(
specs.queue_name,
coalesce(specs.run_at, $3::timestamptz, now()),
specs.max_attempts,
null, -- job key
specs.priority
specs.job_key,
specs.priority,
null, -- flags
specs.job_key_mode
)
from specs
inner join locks on (locks.identifier = specs.identifier)
Expand Down
4 changes: 4 additions & 0 deletions src/cronConstants.ts
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,10 @@ export const CRONTAB_OPTIONS_BACKFILL = /^((?:[0-9]+[smhdw])+)$/;
export const CRONTAB_OPTIONS_MAX = /^([0-9]+)$/;
/** Matches the queue=name option, capturing the queue name */
export const CRONTAB_OPTIONS_QUEUE = /^([-a-zA-Z0-9_:]+)$/;
/** Matches the jobKey=foo option, capturing the unique identifier for the job */
export const CRONTAB_OPTIONS_JOB_KEY = /^(.*)$/;
/** Matches the jobKeyMode=replace option, defining the replacement strategy this job */
export const CRONTAB_OPTIONS_JOB_KEY_MODE = /^(replace|preserve_run_at)$/;
/** Matches the priority=n option, capturing the priority value */
export const CRONTAB_OPTIONS_PRIORITY = /^(-?[0-9]+)$/;

Expand Down
28 changes: 27 additions & 1 deletion src/crontab.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ import {
CRONTAB_LINE_PARTS,
CRONTAB_OPTIONS_BACKFILL,
CRONTAB_OPTIONS_ID,
CRONTAB_OPTIONS_JOB_KEY,
CRONTAB_OPTIONS_JOB_KEY_MODE,
CRONTAB_OPTIONS_MAX,
CRONTAB_OPTIONS_PRIORITY,
CRONTAB_OPTIONS_QUEUE,
Expand Down Expand Up @@ -63,6 +65,8 @@ const parseCrontabOptions = (
let maxAttempts: number | undefined = undefined;
let identifier: string | undefined = undefined;
let queueName: string | undefined = undefined;
let jobKey: string | undefined = undefined;
let jobKeyMode: CronItemOptions["jobKeyMode"] = undefined;
let priority: number | undefined = undefined;

type MatcherTuple = [RegExp, (matches: RegExpExecArray) => void];
Expand Down Expand Up @@ -92,6 +96,18 @@ const parseCrontabOptions = (
queueName = matches[1];
},
],
jobKey: [
CRONTAB_OPTIONS_JOB_KEY,
(matches) => {
jobKey = matches[1];
},
],
jobKeyMode: [
CRONTAB_OPTIONS_JOB_KEY_MODE,
(matches) => {
jobKeyMode = matches[1] as CronItemOptions["jobKeyMode"];
},
],
priority: [
CRONTAB_OPTIONS_PRIORITY,
(matches) => {
Expand Down Expand Up @@ -135,9 +151,19 @@ const parseCrontabOptions = (
if (!backfillPeriod) {
backfillPeriod = 0;
}
if (!jobKeyMode && jobKey) {
jobKeyMode = "replace";
}

return {
options: { backfillPeriod, maxAttempts, queueName, priority },
options: {
backfillPeriod,
maxAttempts,
queueName,
priority,
jobKey,
jobKeyMode,
},
identifier,
};
};
Expand Down
12 changes: 12 additions & 0 deletions src/interfaces.ts
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,16 @@ export interface CronItemOptions {

/** Optionally set the job priority */
priority?: number;

/** Optionally prevent duplicate copies of this job from running */
jobKey?: string;

/**
* Modifies the behavior of `jobKey`; when 'replace' all attributes will be
* updated, when 'preserve_run_at' all attributes except 'run_at' will be
* updated. (Default: 'replace')
*/
jobKeyMode?: "replace" | "preserve_run_at";
}

/**
Expand Down Expand Up @@ -559,6 +569,8 @@ export interface CronJob {
};
queueName?: string;
runAt: string;
jobKey?: string;
jobKeyMode: CronItemOptions["jobKeyMode"];
maxAttempts?: number;
priority?: number;
}
Expand Down
4 changes: 4 additions & 0 deletions website/docs/cron.md
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,10 @@ Currently we support the following `opts`:
the job.
- `queue=name` where `name` is an alphanumeric queue name - add the job to a
named queue so it executes serially.
- `jobKey=key` where `key` is any valid job key - replace/update the existing
job with this key, if present
- `jobKeyMode=replace|preserve_run_at` - if `jobKey` is specified, affects what
it does
- `priority=n` where `n` is a relatively small integer - override the priority
of the job.

Expand Down

0 comments on commit b46fe49

Please sign in to comment.