diff --git a/src/routes/readQueues.ts b/src/routes/readQueues.ts index 7241057..7a2f1f7 100644 --- a/src/routes/readQueues.ts +++ b/src/routes/readQueues.ts @@ -5,7 +5,7 @@ import { AddJobBody, BaseJob } from "../schemas/types"; import { error404ResponseSchema, queueAddJobResponseSchema, queueJobResponseSchema, queueResponseSchema, queueStatsResponseSchema } from "../schemas/response"; import { STATUS, QUEUE_NAMES } from "../lib/bullmq"; import { JobType } from "bullmq"; -import { addQueueJobBodySchema, readQueueJobPathParamsSchema, readQueuePathParamsSchema, readQueueQueryStringSchema, readQueueStatsQueryStringSchema, rerunJobsByWorkerBodySchema, rerunQueueJobBodySchema } from "../schemas/request"; +import { addQueueJobBodySchema, readQueueJobPathParamsSchema, readQueuePathParamsSchema, readQueueQueryStringSchema, readQueueStatsQueryStringSchema, rerunAndSaveQueueJobBodySchema, rerunJobsByWorkerBodySchema, rerunQueueJobBodySchema } from "../schemas/request"; import { z } from "zod"; export async function readQueuesRoute(app: FastifyInstance) { @@ -218,6 +218,59 @@ export async function readQueuesRoute(app: FastifyInstance) { } ); + app.post( + '/:name/:id/rerun-and-save', + { + schema: { + summary: 'Re-run extract-emissions for this process and save results', + description: + 'From a follow-up job (e.g. scope1+2 or scope3), find the original EXTRACT_EMISSIONS job and enqueue a new one with runOnly set to the requested scopes.', + tags: ['Queues'], + params: readQueueJobPathParamsSchema, + body: rerunAndSaveQueueJobBodySchema, + response: { + 200: queueJobResponseSchema, + 400: error404ResponseSchema, + 404: error404ResponseSchema, + }, + }, + }, + async ( + request: FastifyRequest<{ + Params: { name: string; id: string }; + Body: { scopes: string[] }; + }>, + reply + ) => { + const { name, id } = request.params; + const { scopes } = request.body; + + const queueService = await QueueService.getQueueService(); + + try { + const newJob = await queueService.rerunExtractEmissionsFromFollowup( + name, + id, + scopes + ); + return reply.send(newJob); + } catch (error: any) { + const msg = error?.message ?? ''; + + if (msg.includes('EXTRACT_EMISSIONS job') || msg.includes('threadId')) { + return reply.status(404).send({ error: msg }); + } + + if (msg.includes('Unknown queue')) { + return reply.status(400).send({ error: msg }); + } + + app.log.error(error, 'Error in rerun-and-save'); + return reply.status(500).send({ error: 'Failed to rerun and save emissions' }); + } + } + ); + app.post( '/rerun-by-worker', { diff --git a/src/schemas/request.ts b/src/schemas/request.ts index 3315d9f..8908ec2 100644 --- a/src/schemas/request.ts +++ b/src/schemas/request.ts @@ -41,4 +41,10 @@ export const rerunJobsByWorkerBodySchema = z.object({ workerName: z.string().describe('Name of the worker / pipeline step to re-run (e.g. "scope1+2")'), statuses: z.array(jobStatusSchema).optional().describe('Optional list of job statuses to consider (defaults to completed and failed jobs)'), queues: z.array(z.string()).optional().describe('Optional list of queue names to restrict the rerun to (defaults to all known queues)'), +}); + +export const rerunAndSaveQueueJobBodySchema = z.object({ + scopes: z.array(z.string()) + .min(1) + .describe('Scopes to rerun, e.g. [\"scope1+2\"], [\"scope3\"], or [\"scope1+2\", \"scope3\"]'), }); \ No newline at end of file diff --git a/src/services/QueueService.ts b/src/services/QueueService.ts index 508efdc..8e8bb51 100644 --- a/src/services/QueueService.ts +++ b/src/services/QueueService.ts @@ -218,6 +218,182 @@ export class QueueService { return finalJob; } + /** + * From a follow-up job (e.g. scope1+2 or scope3), find the original + * EXTRACT_EMISSIONS job for the same process/thread and enqueue a new + * extract-emissions job with runOnly set to the requested scopes. + */ + public async rerunExtractEmissionsFromFollowup( + followupQueueName: string, + followupJobId: string, + scopes: string[], + ): Promise { + console.info('[QueueService] rerunExtractEmissionsFromFollowup: Starting', { + followupQueueName, + followupJobId, + scopes, + }); + + const followupJob = await this.getFollowupJob(followupQueueName, followupJobId); + const threadId = this.getThreadIdFromJob(followupJob); + + const extractEmissionsJob = await this.getLatestExtractEmissionsJobForThread(threadId); + const fiscalYear = await this.getLatestFiscalYearForThread(threadId); + + const companyName = this.getCompanyNameFromJobs( + extractEmissionsJob, + followupJob, + threadId + ); + + const mergedData = this.buildExtractRerunData( + followupJob, + extractEmissionsJob, + fiscalYear, + scopes + ); + + const newJob = await this.enqueueExtractRerun(companyName, mergedData); + + console.info('[QueueService] rerunExtractEmissionsFromFollowup: New job created', { + newJobId: newJob.id, + scopes, + }); + + return this.getJobData(QUEUE_NAMES.EXTRACT_EMISSIONS, newJob.id!); + } + + private async getFollowupJob( + followupQueueName: string, + followupJobId: string + ): Promise { + return this.getJobData(followupQueueName, followupJobId); + } + + private getThreadIdFromJob(job: DataJob): string { + const followupData: any = job.data ?? {}; + + const threadId = + followupData.threadId ?? + job.threadId ?? + job.processId; + + if (!threadId) { + console.error('[QueueService] getThreadIdFromJob: Missing threadId', { + jobId: job.id, + }); + throw new Error('Cannot locate process/thread for this job (no threadId).'); + } + + return threadId; + } + + private async getLatestExtractEmissionsJobForThread(threadId: string): Promise { + const extractJobs = await this.getDataJobs( + [QUEUE_NAMES.EXTRACT_EMISSIONS], + undefined, + threadId + ); + + if (!extractJobs.length) { + console.error('[QueueService] getLatestExtractEmissionsJobForThread: No EXTRACT_EMISSIONS job found', { + threadId, + }); + throw new Error('No EXTRACT_EMISSIONS job found for this process.'); + } + + return extractJobs.sort( + (firstJob, secondJob) => (secondJob.timestamp ?? 0) - (firstJob.timestamp ?? 0) + )[0]; + } + + private getCompanyNameFromJobs( + extractEmissionsJob: DataJob, + followupJob: DataJob, + threadId: string + ): string { + const extractData: any = extractEmissionsJob.data ?? {}; + const followupData: any = followupJob.data ?? {}; + + return ( + extractData.companyName ?? + followupData.companyName ?? + threadId + ); + } + + private buildExtractRerunData( + followupJob: DataJob, + extractEmissionsJob: DataJob, + fiscalYear: any | undefined, + scopes: string[], + ): any { + const extractData: any = extractEmissionsJob.data ?? {}; + const followupData: any = followupJob.data ?? {}; + + return { + ...extractData, + ...(followupData.wikidata ? { wikidata: followupData.wikidata } : {}), + ...(fiscalYear ? { fiscalYear } : {}), + runOnly: scopes, + }; + } + + private async enqueueExtractRerun( + companyName: string, + jobData: any, + ): Promise { + const extractQueue = await this.getQueue(QUEUE_NAMES.EXTRACT_EMISSIONS); + return extractQueue.add('rerun emissions ' + companyName, jobData); + } + + private async getLatestFiscalYearForThread(threadId: string): Promise { + // For FOLLOW_UP_FISCAL_YEAR jobs, the fiscal year lives in the *return value* JSON, e.g.: + // { "value": { "fiscalYear": { startMonth, endMonth } }, ... }. + try { + const fiscalJobs = await this.getDataJobs( + [QUEUE_NAMES.FOLLOW_UP_FISCAL_YEAR], + undefined, + threadId + ); + + if (fiscalJobs.length === 0) { + return undefined; + } + + const latestFiscal = fiscalJobs.sort( + (firstJob, secondJob) => (secondJob.timestamp ?? 0) - (firstJob.timestamp ?? 0) + )[0]; + + const returnValue = latestFiscal.returnvalue; + if (typeof returnValue === 'string') { + try { + const parsed = JSON.parse(returnValue); + return parsed.fiscalYear ?? parsed.value?.fiscalYear ?? undefined; + } catch (parseErr) { + console.warn('[QueueService] getLatestFiscalYearForThread: Failed to parse fiscalYear returnvalue', { + threadId, + error: parseErr, + }); + return undefined; + } + } + + if (returnValue && typeof returnValue === 'object') { + const parsed: any = returnValue; + return parsed.fiscalYear ?? parsed.value?.fiscalYear ?? undefined; + } + + return undefined; + } catch (err) { + console.warn('[QueueService] getLatestFiscalYearForThread: Failed to fetch FOLLOW_UP_FISCAL_YEAR jobs', { + threadId, + error: err, + }); + return undefined; + } + } + /** * Re-run all jobs that match a given worker name (e.g. a value in data.runOnly[]) * across one or more queues.