Skip to content

Commit 722811a

Browse files
committed
feat(backend): implement cron job system for scheduled tasks
1 parent 7479c5e commit 722811a

File tree

8 files changed

+252
-23
lines changed

8 files changed

+252
-23
lines changed

package-lock.json

Lines changed: 10 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

services/backend/package.json

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@
4141
"isomorphic-dompurify": "^2.28.0",
4242
"lucia": "^3.2.2",
4343
"nanoid": "^5.1.6",
44+
"node-cron": "^4.2.1",
4445
"nodemailer": "^7.0.6",
4546
"pino": "^10.0.0",
4647
"pino-pretty": "^13.1.1",
Lines changed: 101 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,101 @@
1+
import * as cron from 'node-cron';
2+
import type { FastifyBaseLogger } from 'fastify';
3+
4+
/**
5+
* CronJob interface - each cron job file must export this structure
6+
*/
7+
export interface CronJob {
8+
/** Cron schedule expression (e.g., every 2 minutes, hourly, daily) */
9+
schedule: string;
10+
11+
/** Optional job name for logging */
12+
name?: string;
13+
14+
/** The task function to execute */
15+
task: () => void | Promise<void>;
16+
}
17+
18+
/**
19+
* CronManager - Manages all cron jobs in the application
20+
*
21+
* This class discovers, registers, and manages all cron jobs.
22+
* It provides lifecycle management including graceful shutdown.
23+
*/
24+
export class CronManager {
25+
private jobs: Map<string, cron.ScheduledTask> = new Map();
26+
private logger: FastifyBaseLogger;
27+
28+
constructor(logger: FastifyBaseLogger) {
29+
this.logger = logger;
30+
}
31+
32+
/**
33+
* Register a new cron job
34+
*
35+
* @param jobDefinition - The cron job definition
36+
*/
37+
register(jobDefinition: CronJob): void {
38+
const jobName = jobDefinition.name || 'unnamed-job';
39+
40+
try {
41+
const task = cron.schedule(jobDefinition.schedule, async () => {
42+
this.logger.info({
43+
job: jobName,
44+
schedule: jobDefinition.schedule
45+
}, 'Executing cron job');
46+
47+
try {
48+
await jobDefinition.task();
49+
this.logger.debug({ job: jobName }, 'Cron job completed successfully');
50+
} catch (error) {
51+
this.logger.error({
52+
job: jobName,
53+
error
54+
}, 'Cron job execution failed');
55+
}
56+
});
57+
58+
this.jobs.set(jobName, task);
59+
this.logger.info({
60+
job: jobName,
61+
schedule: jobDefinition.schedule
62+
}, 'Cron job registered');
63+
64+
} catch (error) {
65+
this.logger.error({
66+
job: jobName,
67+
schedule: jobDefinition.schedule,
68+
error
69+
}, 'Failed to register cron job');
70+
}
71+
}
72+
73+
/**
74+
* Start all registered cron jobs
75+
*/
76+
start(): void {
77+
this.logger.info({ count: this.jobs.size }, 'Cron jobs are running (auto-started on schedule)');
78+
}
79+
80+
/**
81+
* Stop all cron jobs gracefully
82+
*/
83+
stop(): void {
84+
this.logger.info({ count: this.jobs.size }, 'Stopping cron jobs');
85+
86+
for (const [name, task] of this.jobs.entries()) {
87+
task.stop();
88+
this.logger.debug({ job: name }, 'Cron job stopped');
89+
}
90+
91+
this.jobs.clear();
92+
this.logger.info('All cron jobs stopped successfully');
93+
}
94+
95+
/**
96+
* Get list of registered job names
97+
*/
98+
getJobNames(): string[] {
99+
return Array.from(this.jobs.keys());
100+
}
101+
}

services/backend/src/cron/index.ts

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
import type { FastifyBaseLogger } from 'fastify';
2+
import type { JobQueueService } from '../services/jobQueueService';
3+
import { CronManager } from './cronManager';
4+
// import { createExampleCronJob } from './jobs/exampleJob';
5+
6+
/**
7+
* Initialize and register all cron jobs
8+
*
9+
* This function is called during server startup to set up all cron jobs.
10+
* Add new cron jobs by:
11+
* 1. Creating a job file in the jobs/ directory
12+
* 2. Importing it here
13+
* 3. Registering it with the CronManager
14+
*
15+
* @param jobQueueService - Job queue service for creating background jobs
16+
* @param logger - Logger instance
17+
* @returns CronManager instance
18+
*/
19+
export function initializeCronJobs(
20+
jobQueueService: JobQueueService,
21+
logger: FastifyBaseLogger
22+
): CronManager {
23+
const cronManager = new CronManager(logger);
24+
25+
// Example cron job - commented out, uncomment to test
26+
// cronManager.register(createExampleCronJob(jobQueueService));
27+
28+
// Add your cron jobs here
29+
// Example:
30+
// cronManager.register(createDailyBackupJob(jobQueueService));
31+
// cronManager.register(createHourlyCleanupJob(jobQueueService));
32+
33+
return cronManager;
34+
}
Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
import type { CronJob } from '../cronManager';
2+
import type { JobQueueService } from '../../services/jobQueueService';
3+
4+
/**
5+
* Example Cron Job - Runs every 2 minutes
6+
*
7+
* This job demonstrates how to push work to the job queue system
8+
* from a cron schedule. Every 2 minutes, it creates a new job in
9+
* the queue which will be processed by the ExampleCronWorker.
10+
*
11+
* The worker will log: "hello from queue every 2min"
12+
*/
13+
export function createExampleCronJob(jobQueueService: JobQueueService): CronJob {
14+
return {
15+
name: 'example-every-2min',
16+
schedule: '*/2 * * * *', // Every 2 minutes
17+
18+
task: async () => {
19+
await jobQueueService.createJob('example_cron_job', {
20+
message: 'hello from queue every 2min'
21+
});
22+
}
23+
};
24+
}

services/backend/src/server.ts

Lines changed: 42 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -197,6 +197,40 @@ export async function initializeDatabaseDependentServices(
197197
server.log.warn('⚠️ Continuing without Job Queue System due to error');
198198
}
199199

200+
// Initialize and start Cron Job System (after Job Queue System)
201+
try {
202+
server.log.debug('🔄 Initializing Cron Job System...');
203+
const { initializeCronJobs } = await import('./cron');
204+
205+
// Check if jobQueueService exists before initializing cron
206+
if ((server as any).jobQueueService) {
207+
const cronManager = initializeCronJobs((server as any).jobQueueService, server.log);
208+
server.log.debug('✅ Cron jobs registered');
209+
210+
// Start all cron jobs
211+
cronManager.start();
212+
server.log.info('✅ Cron Job System started and jobs scheduled');
213+
214+
// Decorate server with cron manager for graceful shutdown
215+
if (!server.hasDecorator('cronManager')) {
216+
server.decorate('cronManager', cronManager);
217+
} else {
218+
(server as any).cronManager = cronManager;
219+
}
220+
} else {
221+
server.log.warn('⚠️ Job Queue Service not available, skipping Cron Job System initialization');
222+
}
223+
224+
} catch (cronError) {
225+
server.log.error({
226+
error: cronError,
227+
message: cronError instanceof Error ? cronError.message : 'Unknown error',
228+
stack: cronError instanceof Error ? cronError.stack : 'No stack trace'
229+
}, '❌ Cron Job System failed to initialize:');
230+
// Don't throw - continue with startup but log the error
231+
server.log.warn('⚠️ Continuing without Cron Job System due to error');
232+
}
233+
200234
// Start Token Cleanup Service (only after database is ready)
201235
try {
202236
server.log.debug('Starting Token Cleanup Service...');
@@ -601,7 +635,14 @@ export const createServer = async () => {
601635
server.log.info('Authentication routes registered under /api/auth.');
602636

603637
server.addHook('onClose', async () => {
604-
// Stop job processor first to gracefully finish current jobs
638+
// Stop cron jobs first
639+
if ((server as any).cronManager) {
640+
server.log.info('Stopping cron jobs...');
641+
(server as any).cronManager.stop();
642+
server.log.info('Cron jobs stopped.');
643+
}
644+
645+
// Stop job processor to gracefully finish current jobs
605646
if ((server as any).jobProcessorService) {
606647
server.log.info('Stopping job processor...');
607648
await (server as any).jobProcessorService.stop();

services/backend/src/types/fastify.ts

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import { type AnyDatabase } from '../db'
55
import type SqliteDriver from 'better-sqlite3'
66
import { type PluginManager } from '../plugin-system'
77
import { type DeployStackEventBus } from '../events'
8+
import { type CronManager } from '../cron/cronManager'
89

910
declare module 'fastify' {
1011
interface FastifyInstance {
@@ -27,6 +28,9 @@ declare module 'fastify' {
2728
// Methods for re-initializing database services after setup
2829
reinitializeDatabaseServices: () => Promise<boolean>
2930
reinitializePluginsWithDatabase: () => Promise<void>
31+
32+
// Cron manager for scheduled jobs
33+
cronManager?: CronManager
3034
}
3135

3236
interface FastifyReply {

0 commit comments

Comments
 (0)