Skip to content

Commit

Permalink
feat(store): add Sentry support to the queue
Browse files Browse the repository at this point in the history
  • Loading branch information
dirkdev98 committed Apr 14, 2024
1 parent a8a4ef2 commit b8041fe
Showing 1 changed file with 92 additions and 64 deletions.
156 changes: 92 additions & 64 deletions packages/store/src/queue-worker.js
@@ -1,5 +1,6 @@
import { setTimeout } from "node:timers/promises";
import {
_compasSentryExport,
AppError,
eventStart,
eventStop,
Expand Down Expand Up @@ -498,82 +499,109 @@ async function queueWorkerExecuteJob(logger, sql, options, job) {
job,
});

if (_compasSentryExport && isCronJob) {
_compasSentryExport.captureException(
new Error("No handler registered for the job."),
);
}

return;
}

const event = newEvent(
newLogger({
ctx: {
type: "queue_handler",
id: job.id,
if (_compasSentryExport) {
await _compasSentryExport.startSpan(
{
op: "job",
name: job.name,
description: job.name,
},
}),
AbortSignal.timeout(timeout),
);
async () => {
return await exec();
},
);
} else {
await exec();
}

event.log.info({
job,
});
async function exec() {
const event = newEvent(
newLogger({
ctx: {
type: "queue_handler",
id: job.id,
},
}),
AbortSignal.timeout(timeout),
);

try {
// @ts-expect-error
await sql.savepoint(async (sql) => {
// @ts-expect-error
await handler(event, sql, job);
});
isJobComplete = true;
} catch (e) {
event.log.error({
type: "job_error",
name: job.name,
scheduledAt: job.scheduledAt,
retryCount: job.retryCount,
error: AppError.format(e),
event.log.info({
job,
});

isJobComplete = job.retryCount + 1 >= options.maxRetryCount;

if (options.deleteJobOnCompletion && !isJobComplete) {
// Re insert the job, since this transaction did remove the job.
await queries.jobInsert(sql, {
...job,
isComplete: false,
retryCount: job.retryCount + 1,
try {
// @ts-expect-error
await sql.savepoint(async (sql) => {
// @ts-expect-error
await handler(event, sql, job);
});
} else {
await queries.jobUpdate(sql, {
update: {
isComplete: isJobComplete,
isJobComplete = true;
} catch (e) {
if (_compasSentryExport) {
_compasSentryExport.captureException(e);
}

event.log.error({
type: "job_error",
name: job.name,
scheduledAt: job.scheduledAt,
retryCount: job.retryCount,
error: AppError.format(e),
});

isJobComplete = job.retryCount + 1 >= options.maxRetryCount;

if (options.deleteJobOnCompletion && !isJobComplete) {
// Re insert the job, since this transaction did remove the job.
await queries.jobInsert(sql, {
...job,
isComplete: false,
retryCount: job.retryCount + 1,
},
where: {
id: job.id,
});
} else {
await queries.jobUpdate(sql, {
update: {
isComplete: isJobComplete,
retryCount: job.retryCount + 1,
},
where: {
id: job.id,
},
});
}
}

if (isCronJob && isJobComplete) {
const nextValue = cron
.parseExpression(job.data.cronExpression, {
utc: true,
})
.next()
.toDate();

// This causes an extra insert if a finished cron job is manually restarted. We don't
// correct for this behaviour here, since we are kinda in a hot loop. It will be
// autocorrected on the next call of `queueWorkerRegisterCronJobs` (at queue
// startup).
await queries.jobInsert(sql, {
name: job.name,
priority: job.priority,
scheduledAt: nextValue,
data: {
jobType: JOB_TYPE_CRON,
cronExpression: job.data.cronExpression,
cronLastCompletedAt: new Date(),
},
});
}
}

if (isCronJob && isJobComplete) {
const nextValue = cron
.parseExpression(job.data.cronExpression, {
utc: true,
})
.next()
.toDate();

// This causes an extra insert if a finished cron job is manually restarted. We don't
// correct for this behaviour here, since we are kinda in a hot loop. It will be
// autocorrected on the next call of `queueWorkerRegisterCronJobs` (at queue
// startup).
await queries.jobInsert(sql, {
name: job.name,
priority: job.priority,
scheduledAt: nextValue,
data: {
jobType: JOB_TYPE_CRON,
cronExpression: job.data.cronExpression,
cronLastCompletedAt: new Date(),
},
});
}
}

0 comments on commit b8041fe

Please sign in to comment.