Skip to content

Commit

Permalink
Add job to delete old AgendaJobs (#2646)
Browse files Browse the repository at this point in the history
  • Loading branch information
tzjames committed Jun 20, 2024
1 parent 622b0b4 commit b31951f
Show file tree
Hide file tree
Showing 2 changed files with 54 additions and 0 deletions.
10 changes: 10 additions & 0 deletions packages/back-end/src/init/queue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ import updateStaleInformationSchemaTable from "../jobs/updateStaleInformationSch
import expireOldQueries from "../jobs/expireOldQueries";
import addSdkWebhooksJob from "../jobs/sdkWebhooks";
import updateLicenseJob, { queueUpdateLicense } from "../jobs/updateLicense";
import deleteOldAgendaJobs from "../jobs/deleteOldAgendaJobs";
import { logger } from "../util/logger";

export async function queueInit() {
if (!CRON_ENABLED) return;
Expand All @@ -32,6 +34,14 @@ export async function queueInit() {
addSdkWebhooksJob(agenda);
updateLicenseJob(agenda);

// Make sure we have index needed to delete efficiently
agenda._collection
.createIndex({ lastFinishedAt: 1, nextRunAt: 1 })
.catch((e) => {
logger.error("Error creating index needed for deleteOldAgendaJobs: " + e);
});
deleteOldAgendaJobs(agenda);

await agenda.start();

if (!IS_CLOUD) {
Expand Down
44 changes: 44 additions & 0 deletions packages/back-end/src/jobs/deleteOldAgendaJobs.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
import Agenda from "agenda";
import { trackJob } from "../services/otel";
import { getAgendaInstance } from "../services/queueing";
import { logger } from "../util/logger";
const JOB_NAME = "deleteOldAgendaJobs";

// Delete old agenda jobs that finished over one week ago and are not going to be repeated
const deleteOldAgendaJobs = trackJob(JOB_NAME, async () => {
const agenda = getAgendaInstance();

const startDate = Date.now();

const res = await agenda._collection
.find(
{
lastFinishedAt: { $lt: new Date(Date.now() - 7 * 24 * 3600 * 1000) },
nextRunAt: null,
},
{
limit: 1000,
projection: { _id: 1 },
}
)
.toArray();

const ids = res.map((r) => r._id);

const deleteRes = await agenda._collection.deleteMany({ _id: { $in: ids } });

logger.info(
`Deleted ${deleteRes.deletedCount} old agenda jobs in ` +
(Date.now() - startDate) +
`ms`
);
});

export default async function (agenda: Agenda) {
agenda.define(JOB_NAME, deleteOldAgendaJobs);

const job = agenda.create(JOB_NAME, {});
job.unique({});
job.repeatEvery("5 minutes");
await job.save();
}

0 comments on commit b31951f

Please sign in to comment.