From 865e93feb231291e97e412b7f8efbe93309feeaa Mon Sep 17 00:00:00 2001 From: eli6 Date: Wed, 26 Nov 2025 15:44:22 +0100 Subject: [PATCH] rerun all jobs of a specified type, endpoint --- src/routes/readQueues.ts | 46 ++++++++++++++++++++++- src/schemas/request.ts | 6 +++ src/services/QueueService.ts | 71 ++++++++++++++++++++++++++++++++++++ 3 files changed, 122 insertions(+), 1 deletion(-) diff --git a/src/routes/readQueues.ts b/src/routes/readQueues.ts index 8b6ab1a..7241057 100644 --- a/src/routes/readQueues.ts +++ b/src/routes/readQueues.ts @@ -5,7 +5,8 @@ import { AddJobBody, BaseJob } from "../schemas/types"; import { error404ResponseSchema, queueAddJobResponseSchema, queueJobResponseSchema, queueResponseSchema, queueStatsResponseSchema } from "../schemas/response"; import { STATUS, QUEUE_NAMES } from "../lib/bullmq"; import { JobType } from "bullmq"; -import { addQueueJobBodySchema, readQueueJobPathParamsSchema, readQueuePathParamsSchema, readQueueQueryStringSchema, readQueueStatsQueryStringSchema, rerunQueueJobBodySchema } from "../schemas/request"; +import { addQueueJobBodySchema, readQueueJobPathParamsSchema, readQueuePathParamsSchema, readQueueQueryStringSchema, readQueueStatsQueryStringSchema, rerunJobsByWorkerBodySchema, rerunQueueJobBodySchema } from "../schemas/request"; +import { z } from "zod"; export async function readQueuesRoute(app: FastifyInstance) { app.get( @@ -216,4 +217,47 @@ export async function readQueuesRoute(app: FastifyInstance) { } } ); + + app.post( + '/rerun-by-worker', + { + schema: { + summary: 'Re-run all jobs that match a given worker name', + description: 'Re-runs all jobs across one or more queues whose data.runOnly[] contains the specified worker name. Defaults to completed and failed jobs.', + tags: ['Queues'], + body: rerunJobsByWorkerBodySchema, + response: { + 200: z.object({ + totalMatched: z.number().describe('Total number of jobs that matched the criteria'), + perQueue: z.record(z.number()).describe('Number of matched jobs per queue name'), + }) + }, + }, + }, + async ( + request: FastifyRequest<{ + Body: { + workerName: string; + statuses?: JobType[]; + queues?: string[]; + } + }>, + reply + ) => { + const { workerName, statuses, queues } = request.body; + + const queueService = await QueueService.getQueueService(); + + const resolvedQueues = queues && queues.length > 0 + ? queues + : Object.values(QUEUE_NAMES); + + const result = await queueService.rerunJobsByWorkerName(workerName, { + queueNames: resolvedQueues, + statuses, + }); + + return reply.send(result); + } + ); } \ No newline at end of file diff --git a/src/schemas/request.ts b/src/schemas/request.ts index 3fce8e7..3315d9f 100644 --- a/src/schemas/request.ts +++ b/src/schemas/request.ts @@ -35,4 +35,10 @@ export const addQueueJobBodySchema = z.object({ export const rerunQueueJobBodySchema = z.object({ data: z.record(z.any()).optional().describe('Optional job data overrides. Will merge with existing job data before re-running'), +}); + +export const rerunJobsByWorkerBodySchema = z.object({ + workerName: z.string().describe('Name of the worker / pipeline step to re-run (e.g. "scope1+2")'), + statuses: z.array(jobStatusSchema).optional().describe('Optional list of job statuses to consider (defaults to completed and failed jobs)'), + queues: z.array(z.string()).optional().describe('Optional list of queue names to restrict the rerun to (defaults to all known queues)'), }); \ No newline at end of file diff --git a/src/services/QueueService.ts b/src/services/QueueService.ts index 982523b..508efdc 100644 --- a/src/services/QueueService.ts +++ b/src/services/QueueService.ts @@ -217,6 +217,77 @@ export class QueueService { console.info('[QueueService] rerunJob: Completed', { queueName, jobId, finalState: finalJob.status }); return finalJob; } + + /** + * Re-run all jobs that match a given worker name (e.g. a value in data.runOnly[]) + * across one or more queues. + * + * By default it will re-run jobs that are either completed or failed, since + * waiting/active jobs are already in progress. + */ + public async rerunJobsByWorkerName( + workerName: string, + options?: { + queueNames?: string[]; + statuses?: JobType[]; + } + ): Promise<{ totalMatched: number; perQueue: Record }> { + const queueNames = options?.queueNames && options.queueNames.length > 0 + ? options.queueNames + : Object.values(QUEUE_NAMES); + + const statuses = options?.statuses && options.statuses.length > 0 + ? options.statuses + : (['completed', 'failed'] as JobType[]); + + console.info('[QueueService] rerunJobsByWorkerName: Starting', { + workerName, + queueNames, + statuses + }); + + const perQueue: Record = {}; + let totalMatched = 0; + + for (const queueName of queueNames) { + const queue = await this.getQueue(queueName); + const jobs = await queue.getJobs(statuses); + + const matchingJobs = jobs.filter(job => { + const runOnly = job.data?.runOnly as string[] | undefined; + return Array.isArray(runOnly) && runOnly.includes(workerName); + }); + + console.info('[QueueService] rerunJobsByWorkerName: Queue scan result', { + queueName, + totalJobs: jobs.length, + matchingJobs: matchingJobs.length + }); + + for (const job of matchingJobs) { + try { + await this.rerunJob(queueName, job.id!); + } catch (error) { + console.error('[QueueService] rerunJobsByWorkerName: Failed to rerun job', { + queueName, + jobId: job.id, + error + }); + } + } + + perQueue[queueName] = matchingJobs.length; + totalMatched += matchingJobs.length; + } + + console.info('[QueueService] rerunJobsByWorkerName: Completed', { + workerName, + totalMatched, + perQueue + }); + + return { totalMatched, perQueue }; + } } export async function transformJobtoBaseJob(job: Job): Promise {