Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 54 additions & 1 deletion src/routes/readQueues.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import { AddJobBody, BaseJob } from "../schemas/types";
import { error404ResponseSchema, queueAddJobResponseSchema, queueJobResponseSchema, queueResponseSchema, queueStatsResponseSchema } from "../schemas/response";
import { STATUS, QUEUE_NAMES } from "../lib/bullmq";
import { JobType } from "bullmq";
import { addQueueJobBodySchema, readQueueJobPathParamsSchema, readQueuePathParamsSchema, readQueueQueryStringSchema, readQueueStatsQueryStringSchema, rerunJobsByWorkerBodySchema, rerunQueueJobBodySchema } from "../schemas/request";
import { addQueueJobBodySchema, readQueueJobPathParamsSchema, readQueuePathParamsSchema, readQueueQueryStringSchema, readQueueStatsQueryStringSchema, rerunAndSaveQueueJobBodySchema, rerunJobsByWorkerBodySchema, rerunQueueJobBodySchema } from "../schemas/request";
import { z } from "zod";

export async function readQueuesRoute(app: FastifyInstance) {
Expand Down Expand Up @@ -218,6 +218,59 @@ export async function readQueuesRoute(app: FastifyInstance) {
}
);

app.post(
'/:name/:id/rerun-and-save',
{
schema: {
summary: 'Re-run extract-emissions for this process and save results',
description:
'From a follow-up job (e.g. scope1+2 or scope3), find the original EXTRACT_EMISSIONS job and enqueue a new one with runOnly set to the requested scopes.',
tags: ['Queues'],
params: readQueueJobPathParamsSchema,
body: rerunAndSaveQueueJobBodySchema,
response: {
200: queueJobResponseSchema,
400: error404ResponseSchema,
404: error404ResponseSchema,
},
},
},
async (
request: FastifyRequest<{
Params: { name: string; id: string };
Body: { scopes: string[] };
}>,
reply
) => {
const { name, id } = request.params;
const { scopes } = request.body;

const queueService = await QueueService.getQueueService();

try {
const newJob = await queueService.rerunExtractEmissionsFromFollowup(
name,
id,
scopes
);
return reply.send(newJob);
} catch (error: any) {
const msg = error?.message ?? '';

if (msg.includes('EXTRACT_EMISSIONS job') || msg.includes('threadId')) {
return reply.status(404).send({ error: msg });
}

if (msg.includes('Unknown queue')) {
return reply.status(400).send({ error: msg });
}

app.log.error(error, 'Error in rerun-and-save');
return reply.status(500).send({ error: 'Failed to rerun and save emissions' });
}
}
);

app.post(
'/rerun-by-worker',
{
Expand Down
6 changes: 6 additions & 0 deletions src/schemas/request.ts
Original file line number Diff line number Diff line change
Expand Up @@ -41,4 +41,10 @@ export const rerunJobsByWorkerBodySchema = z.object({
workerName: z.string().describe('Name of the worker / pipeline step to re-run (e.g. "scope1+2")'),
statuses: z.array(jobStatusSchema).optional().describe('Optional list of job statuses to consider (defaults to completed and failed jobs)'),
queues: z.array(z.string()).optional().describe('Optional list of queue names to restrict the rerun to (defaults to all known queues)'),
});

export const rerunAndSaveQueueJobBodySchema = z.object({
scopes: z.array(z.string())
.min(1)
.describe('Scopes to rerun, e.g. [\"scope1+2\"], [\"scope3\"], or [\"scope1+2\", \"scope3\"]'),
});
176 changes: 176 additions & 0 deletions src/services/QueueService.ts
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,182 @@ export class QueueService {
return finalJob;
}

/**
* From a follow-up job (e.g. scope1+2 or scope3), find the original
* EXTRACT_EMISSIONS job for the same process/thread and enqueue a new
* extract-emissions job with runOnly set to the requested scopes.
*/
public async rerunExtractEmissionsFromFollowup(
followupQueueName: string,
followupJobId: string,
scopes: string[],
): Promise<DataJob> {
console.info('[QueueService] rerunExtractEmissionsFromFollowup: Starting', {
followupQueueName,
followupJobId,
scopes,
});

const followupJob = await this.getFollowupJob(followupQueueName, followupJobId);
const threadId = this.getThreadIdFromJob(followupJob);

const extractEmissionsJob = await this.getLatestExtractEmissionsJobForThread(threadId);
const fiscalYear = await this.getLatestFiscalYearForThread(threadId);

const companyName = this.getCompanyNameFromJobs(
extractEmissionsJob,
followupJob,
threadId
);

const mergedData = this.buildExtractRerunData(
followupJob,
extractEmissionsJob,
fiscalYear,
scopes
);

const newJob = await this.enqueueExtractRerun(companyName, mergedData);

console.info('[QueueService] rerunExtractEmissionsFromFollowup: New job created', {
newJobId: newJob.id,
scopes,
});

return this.getJobData(QUEUE_NAMES.EXTRACT_EMISSIONS, newJob.id!);
}

private async getFollowupJob(
followupQueueName: string,
followupJobId: string
): Promise<DataJob> {
return this.getJobData(followupQueueName, followupJobId);
}

private getThreadIdFromJob(job: DataJob): string {
const followupData: any = job.data ?? {};

const threadId =
followupData.threadId ??
job.threadId ??
job.processId;

if (!threadId) {
console.error('[QueueService] getThreadIdFromJob: Missing threadId', {
jobId: job.id,
});
throw new Error('Cannot locate process/thread for this job (no threadId).');
}

return threadId;
}

private async getLatestExtractEmissionsJobForThread(threadId: string): Promise<DataJob> {
const extractJobs = await this.getDataJobs(
[QUEUE_NAMES.EXTRACT_EMISSIONS],
undefined,
threadId
);

if (!extractJobs.length) {
console.error('[QueueService] getLatestExtractEmissionsJobForThread: No EXTRACT_EMISSIONS job found', {
threadId,
});
throw new Error('No EXTRACT_EMISSIONS job found for this process.');
}

return extractJobs.sort(
(firstJob, secondJob) => (secondJob.timestamp ?? 0) - (firstJob.timestamp ?? 0)
)[0];
}

private getCompanyNameFromJobs(
extractEmissionsJob: DataJob,
followupJob: DataJob,
threadId: string
): string {
const extractData: any = extractEmissionsJob.data ?? {};
const followupData: any = followupJob.data ?? {};

return (
extractData.companyName ??
followupData.companyName ??
threadId
);
}

private buildExtractRerunData(
followupJob: DataJob,
extractEmissionsJob: DataJob,
fiscalYear: any | undefined,
scopes: string[],
): any {
const extractData: any = extractEmissionsJob.data ?? {};
const followupData: any = followupJob.data ?? {};

return {
...extractData,
...(followupData.wikidata ? { wikidata: followupData.wikidata } : {}),
...(fiscalYear ? { fiscalYear } : {}),
runOnly: scopes,
};
}

private async enqueueExtractRerun(
companyName: string,
jobData: any,
): Promise<Job> {
const extractQueue = await this.getQueue(QUEUE_NAMES.EXTRACT_EMISSIONS);
return extractQueue.add('rerun emissions ' + companyName, jobData);
}

private async getLatestFiscalYearForThread(threadId: string): Promise<any | undefined> {
// For FOLLOW_UP_FISCAL_YEAR jobs, the fiscal year lives in the *return value* JSON, e.g.:
// { "value": { "fiscalYear": { startMonth, endMonth } }, ... }.
try {
const fiscalJobs = await this.getDataJobs(
[QUEUE_NAMES.FOLLOW_UP_FISCAL_YEAR],
undefined,
threadId
);

if (fiscalJobs.length === 0) {
return undefined;
}

const latestFiscal = fiscalJobs.sort(
(firstJob, secondJob) => (secondJob.timestamp ?? 0) - (firstJob.timestamp ?? 0)
)[0];

const returnValue = latestFiscal.returnvalue;
if (typeof returnValue === 'string') {
try {
const parsed = JSON.parse(returnValue);
return parsed.fiscalYear ?? parsed.value?.fiscalYear ?? undefined;
} catch (parseErr) {
console.warn('[QueueService] getLatestFiscalYearForThread: Failed to parse fiscalYear returnvalue', {
threadId,
error: parseErr,
});
return undefined;
}
}

if (returnValue && typeof returnValue === 'object') {
const parsed: any = returnValue;
return parsed.fiscalYear ?? parsed.value?.fiscalYear ?? undefined;
}

return undefined;
} catch (err) {
console.warn('[QueueService] getLatestFiscalYearForThread: Failed to fetch FOLLOW_UP_FISCAL_YEAR jobs', {
threadId,
error: err,
});
return undefined;
}
}

/**
* Re-run all jobs that match a given worker name (e.g. a value in data.runOnly[])
* across one or more queues.
Expand Down