Skip to content

Commit

Permalink
Cleanup now allows you to specify additional identifiers to keep (#465)
Browse files Browse the repository at this point in the history
  • Loading branch information
benjie committed Apr 26, 2024
2 parents c89ab35 + 98e27cc commit 6ef086c
Show file tree
Hide file tree
Showing 6 changed files with 48 additions and 17 deletions.
5 changes: 5 additions & 0 deletions RELEASE_NOTES.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,11 @@ Read more:
there existed any job that was not in a queue
- If errors happen writing to stdout/stderr (e.g. `SIGPIPE`), we'll trigger a
graceful shutdown (and swallow further errors)
- `workerUtils.cleanup({ tasks: ['GC_TASK_IDENTIFIERS'] })` now allows you to
specify additional task identifiers to keep (`taskIdentifiersToKeep: [...]`)
in order to reduce impact on other workers
- `graphile-worker --cleanup GC_TASK_IDENTIFIERS` will attempt to keep all
locally defined task identifiers

## v0.16.5

Expand Down
11 changes: 9 additions & 2 deletions __tests__/workerUtils.cleanup.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -147,9 +147,10 @@ test("cleanup with GC_TASK_IDENTIFIERS", () =>
"test_job1",
"test_job2",
"test_job3",
"test_job4",
]) {
const job = await utils.addJob(taskIdentifier, {});
if (taskIdentifier === "test_job2") {
if (["test_job2", "test_job4"].includes(taskIdentifier)) {
await utils.completeJobs([job.id]);
}
}
Expand All @@ -162,15 +163,21 @@ test("cleanup with GC_TASK_IDENTIFIERS", () =>
"test_job1",
"test_job2",
"test_job3",
"test_job4",
]);

await utils.cleanup({ tasks: ["GC_TASK_IDENTIFIERS"] });
await utils.cleanup({
tasks: ["GC_TASK_IDENTIFIERS"],
taskIdentifiersToKeep: ["test_job4"],
});
const { rows: tasksAfter } = (await pgClient.query(
`select identifier from ${ESCAPED_GRAPHILE_WORKER_SCHEMA}._private_tasks`,
)) as { rows: { identifier: string }[] };
expect(tasksAfter.map((q) => q.identifier).sort()).toEqual([
"job3",
"test_job1",
// test_job2 has been cleaned up
"test_job3",
"test_job4", // test_job4 would have been cleaned up, but we explicitly said to keep it
]);
}));
12 changes: 9 additions & 3 deletions src/cleanup.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { CleanupTask } from "./interfaces";
import { CleanupOptions, CleanupTask } from "./interfaces";
import { CompiledOptions } from "./lib";

const ALL_CLEANUP_TASKS: CleanupTask[] = [
Expand All @@ -24,8 +24,12 @@ export function assertCleanupTasks(

export async function cleanup(
compiledOptions: CompiledOptions,
tasks: CleanupTask[] = ["GC_JOB_QUEUES", "GC_TASK_IDENTIFIERS"],
options: CleanupOptions,
) {
const {
tasks = ["GC_JOB_QUEUES", "GC_TASK_IDENTIFIERS"],
taskIdentifiersToKeep = [],
} = options;
const { withPgClient, escapedWorkerSchema } = compiledOptions;
await withPgClient(async (client) => {
if (tasks.includes("DELETE_PERMAFAILED_JOBS")) {
Expand All @@ -43,7 +47,9 @@ delete from ${escapedWorkerSchema}._private_tasks tasks
where tasks.id not in (
select jobs.task_id
from ${escapedWorkerSchema}._private_jobs jobs
);`,
)
and tasks.identifier <> all ($1::text[]);`,
[taskIdentifiersToKeep],
);
}
if (tasks.includes("GC_JOB_QUEUES")) {
Expand Down
22 changes: 14 additions & 8 deletions src/cli.ts
Original file line number Diff line number Diff line change
Expand Up @@ -137,19 +137,25 @@ async function main() {
return;
}

if (CLEANUP != null) {
const cleanups = Array.isArray(CLEANUP) ? CLEANUP : [CLEANUP];
const tasks = cleanups.flatMap((t) => t.split(",")).map((t) => t.trim());
assertCleanupTasks(tasks);
await cleanup(compiledOptions, tasks);
return;
}

const watchedTasks = await getTasksInternal(
compiledOptions,
compiledOptions.resolvedPreset.worker.taskDirectory,
);
compiledOptions.releasers.push(() => watchedTasks.release());

if (CLEANUP != null) {
const cleanups = Array.isArray(CLEANUP) ? CLEANUP : [CLEANUP];
const cleanupTasks = cleanups
.flatMap((t) => t.split(","))
.map((t) => t.trim());
assertCleanupTasks(cleanupTasks);
await cleanup(compiledOptions, {
tasks: cleanupTasks,
taskIdentifiersToKeep: Object.keys(watchedTasks.tasks),
});
return;
}

const watchedCronItems = await getCronItemsInternal(
compiledOptions,
compiledOptions.resolvedPreset.worker.crontabFile,
Expand Down
7 changes: 6 additions & 1 deletion src/interfaces.ts
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,11 @@ export type CleanupTask =
| "GC_JOB_QUEUES"
| "DELETE_PERMAFAILED_JOBS";

export interface CleanupOptions {
tasks?: readonly CleanupTask[];
taskIdentifiersToKeep?: readonly string[];
}

/**
* Utilities for working with Graphile Worker. Primarily useful for migrating
* the jobs database and queueing jobs.
Expand Down Expand Up @@ -200,7 +205,7 @@ export interface WorkerUtils extends Helpers {
*
* Default: ["GC_JOB_QUEUES"]
*/
cleanup(options: { tasks?: CleanupTask[] }): Promise<void>;
cleanup(options: CleanupOptions): Promise<void>;
}

export type PromiseOrDirect<T> = Promise<T> | T;
Expand Down
8 changes: 5 additions & 3 deletions src/workerUtils.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
/* eslint-disable @typescript-eslint/ban-types */
import { cleanup } from "./cleanup";
import {
CleanupTask,
CleanupOptions,
DbJob,
TaskSpec,
WorkerUtils,
Expand Down Expand Up @@ -86,11 +86,13 @@ export async function makeWorkerUtils(
},

async cleanup(
options: { tasks?: CleanupTask[] } = {
options: CleanupOptions = {
tasks: ["GC_JOB_QUEUES"],
},
): Promise<void> {
return cleanup(compiledOptions, options.tasks);
// TODO: would be great to guess the current task identifiers (e.g. by
// reading the `tasks` folder) and add them to `taskIdentifiersToKeep`
return cleanup(compiledOptions, options);
},
};
}
Expand Down

0 comments on commit 6ef086c

Please sign in to comment.