Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 45 additions & 1 deletion src/routes/readQueues.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@ import { AddJobBody, BaseJob } from "../schemas/types";
import { error404ResponseSchema, queueAddJobResponseSchema, queueJobResponseSchema, queueResponseSchema, queueStatsResponseSchema } from "../schemas/response";
import { STATUS, QUEUE_NAMES } from "../lib/bullmq";
import { JobType } from "bullmq";
import { addQueueJobBodySchema, readQueueJobPathParamsSchema, readQueuePathParamsSchema, readQueueQueryStringSchema, readQueueStatsQueryStringSchema, rerunQueueJobBodySchema } from "../schemas/request";
import { addQueueJobBodySchema, readQueueJobPathParamsSchema, readQueuePathParamsSchema, readQueueQueryStringSchema, readQueueStatsQueryStringSchema, rerunJobsByWorkerBodySchema, rerunQueueJobBodySchema } from "../schemas/request";
import { z } from "zod";

export async function readQueuesRoute(app: FastifyInstance) {
app.get(
Expand Down Expand Up @@ -216,4 +217,47 @@ export async function readQueuesRoute(app: FastifyInstance) {
}
}
);

app.post(
'/rerun-by-worker',
{
schema: {
summary: 'Re-run all jobs that match a given worker name',
description: 'Re-runs all jobs across one or more queues whose data.runOnly[] contains the specified worker name. Defaults to completed and failed jobs.',
tags: ['Queues'],
body: rerunJobsByWorkerBodySchema,
response: {
200: z.object({
totalMatched: z.number().describe('Total number of jobs that matched the criteria'),
perQueue: z.record(z.number()).describe('Number of matched jobs per queue name'),
})
},
},
},
async (
request: FastifyRequest<{
Body: {
workerName: string;
statuses?: JobType[];
queues?: string[];
}
}>,
reply
) => {
const { workerName, statuses, queues } = request.body;

const queueService = await QueueService.getQueueService();

const resolvedQueues = queues && queues.length > 0
? queues
: Object.values(QUEUE_NAMES);

const result = await queueService.rerunJobsByWorkerName(workerName, {
queueNames: resolvedQueues,
statuses,
});

return reply.send(result);
}
);
}
6 changes: 6 additions & 0 deletions src/schemas/request.ts
Original file line number Diff line number Diff line change
Expand Up @@ -35,4 +35,10 @@ export const addQueueJobBodySchema = z.object({

export const rerunQueueJobBodySchema = z.object({
data: z.record(z.any()).optional().describe('Optional job data overrides. Will merge with existing job data before re-running'),
});

export const rerunJobsByWorkerBodySchema = z.object({
workerName: z.string().describe('Name of the worker / pipeline step to re-run (e.g. "scope1+2")'),
statuses: z.array(jobStatusSchema).optional().describe('Optional list of job statuses to consider (defaults to completed and failed jobs)'),
queues: z.array(z.string()).optional().describe('Optional list of queue names to restrict the rerun to (defaults to all known queues)'),
});
71 changes: 71 additions & 0 deletions src/services/QueueService.ts
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,77 @@ export class QueueService {
console.info('[QueueService] rerunJob: Completed', { queueName, jobId, finalState: finalJob.status });
return finalJob;
}

/**
* Re-run all jobs that match a given worker name (e.g. a value in data.runOnly[])
* across one or more queues.
*
* By default it will re-run jobs that are either completed or failed, since
* waiting/active jobs are already in progress.
*/
public async rerunJobsByWorkerName(
workerName: string,
options?: {
queueNames?: string[];
statuses?: JobType[];
}
): Promise<{ totalMatched: number; perQueue: Record<string, number> }> {
const queueNames = options?.queueNames && options.queueNames.length > 0
? options.queueNames
: Object.values(QUEUE_NAMES);

const statuses = options?.statuses && options.statuses.length > 0
? options.statuses
: (['completed', 'failed'] as JobType[]);

console.info('[QueueService] rerunJobsByWorkerName: Starting', {
workerName,
queueNames,
statuses
});

const perQueue: Record<string, number> = {};
let totalMatched = 0;

for (const queueName of queueNames) {
const queue = await this.getQueue(queueName);
const jobs = await queue.getJobs(statuses);

const matchingJobs = jobs.filter(job => {
const runOnly = job.data?.runOnly as string[] | undefined;
return Array.isArray(runOnly) && runOnly.includes(workerName);
});

console.info('[QueueService] rerunJobsByWorkerName: Queue scan result', {
queueName,
totalJobs: jobs.length,
matchingJobs: matchingJobs.length
});

for (const job of matchingJobs) {
try {
await this.rerunJob(queueName, job.id!);
} catch (error) {
console.error('[QueueService] rerunJobsByWorkerName: Failed to rerun job', {
queueName,
jobId: job.id,
error
});
}
}

perQueue[queueName] = matchingJobs.length;
totalMatched += matchingJobs.length;
}

console.info('[QueueService] rerunJobsByWorkerName: Completed', {
workerName,
totalMatched,
perQueue
});

return { totalMatched, perQueue };
}
}

export async function transformJobtoBaseJob(job: Job): Promise<BaseJob> {
Expand Down