From 7868f57ed51968ae42bf9ec80b19ac14c12d8ddd Mon Sep 17 00:00:00 2001 From: Benjie Gillam Date: Tue, 11 Jun 2024 16:24:05 +0100 Subject: [PATCH] Missing file --- src/sql/returnJob.ts | 68 ++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 68 insertions(+) create mode 100644 src/sql/returnJob.ts diff --git a/src/sql/returnJob.ts b/src/sql/returnJob.ts new file mode 100644 index 00000000..f7b22f4f --- /dev/null +++ b/src/sql/returnJob.ts @@ -0,0 +1,68 @@ +import { DbJob, EnhancedWithPgClient } from "../interfaces"; +import { CompiledSharedOptions } from "../lib"; + +export async function returnJob( + compiledSharedOptions: CompiledSharedOptions, + withPgClient: EnhancedWithPgClient, + poolId: string, + jobs: ReadonlyArray, +): Promise { + const { + escapedWorkerSchema, + workerSchema, + resolvedPreset: { + worker: { preparedStatements }, + }, + } = compiledSharedOptions; + + const jobsWithQueues: DbJob[] = []; + const jobsWithoutQueues: DbJob[] = []; + + for (const job of jobs) { + if (job.job_queue_id != null) { + jobsWithQueues.push(job); + } else { + jobsWithoutQueues.push(job); + } + } + + if (jobsWithQueues.length > 0) { + await withPgClient.withRetries((client) => + client.query({ + text: `\ +with j as ( +update ${escapedWorkerSchema}._private_jobs as jobs +set +attempts = GREATEST(0, attempts - 1), +locked_by = null, +locked_at = null +where id = ANY($2::bigint[]) +and locked_by = $1::text +returning * +) +update ${escapedWorkerSchema}._private_job_queues as job_queues +set locked_by = null, locked_at = null +from j +where job_queues.id = j.job_queue_id and job_queues.locked_by = $1::text;`, + values: [poolId, jobsWithQueues.map((job) => job.id)], + name: !preparedStatements ? undefined : `return_job_q/${workerSchema}`, + }), + ); + } + if (jobsWithoutQueues.length > 0) { + await withPgClient.withRetries((client) => + client.query({ + text: `\ +update ${escapedWorkerSchema}._private_jobs as jobs +set +attempts = GREATEST(0, attempts - 1), +locked_by = null, +locked_at = null +where id = ANY($2::bigint[]) +and locked_by = $1::text;`, + values: [poolId, jobsWithoutQueues.map((job) => job.id)], + name: !preparedStatements ? undefined : `return_job/${workerSchema}`, + }), + ); + } +}