Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
113 changes: 45 additions & 68 deletions apps/api/src/controllers/healthcheck.controller.ts
Original file line number Diff line number Diff line change
@@ -1,83 +1,60 @@
import { round } from '@openpanel/common';
import { TABLE_NAMES, chQuery, db } from '@openpanel/db';
import { eventsQueue } from '@openpanel/queue';
import { isShuttingDown } from '@/utils/graceful-shutdown';
import { chQuery, db } from '@openpanel/db';
import { getRedisCache } from '@openpanel/redis';
import type { FastifyReply, FastifyRequest } from 'fastify';

async function withTimings<T>(promise: Promise<T>) {
const time = performance.now();
try {
const data = await promise;
return {
time: round(performance.now() - time, 2),
data,
} as const;
} catch (e) {
return null;
}
}

// For docker compose healthcheck
export async function healthcheck(
request: FastifyRequest,
reply: FastifyReply,
) {
if (process.env.DISABLE_HEALTHCHECK) {
return reply.status(200).send({
ok: true,
try {
const redisRes = await getRedisCache().ping();
const dbRes = await db.project.findFirst();
const chRes = await chQuery('SELECT 1');
const status = redisRes && dbRes && chRes ? 200 : 503;

reply.status(status).send({
ready: status === 200,
redis: redisRes === 'PONG',
db: !!dbRes,
ch: chRes && chRes.length > 0,
});
} catch (error) {
return reply.status(503).send({
ready: false,
reason: 'dependencies not ready',
});
}
const redisRes = await withTimings(getRedisCache().ping());
const dbRes = await withTimings(db.project.findFirst());
const queueRes = await withTimings(eventsQueue.getCompleted());
const chRes = await withTimings(
chQuery(
`SELECT * FROM ${TABLE_NAMES.events} WHERE created_at > now() - INTERVAL 10 MINUTE LIMIT 1`,
),
);
const status = redisRes && dbRes && queueRes && chRes ? 200 : 500;
}

reply.status(status).send({
redis: redisRes
? {
ok: redisRes.data === 'PONG',
time: `${redisRes.time}ms`,
}
: null,
db: dbRes
? {
ok: !!dbRes.data,
time: `${dbRes.time}ms`,
}
: null,
queue: queueRes
? {
ok: !!queueRes.data,
time: `${queueRes.time}ms`,
}
: null,
ch: chRes
? {
ok: !!chRes.data,
time: `${chRes.time}ms`,
}
: null,
});
// Kubernetes - Liveness probe - returns 200 if process is alive
export async function liveness(request: FastifyRequest, reply: FastifyReply) {
return reply.status(200).send({ live: true });
}

export async function healthcheckQueue(
request: FastifyRequest,
reply: FastifyReply,
) {
const count = await eventsQueue.getWaitingCount();
if (count > 40) {
reply.status(500).send({
ok: false,
count,
});
} else {
reply.status(200).send({
ok: true,
count,
// Kubernetes - Readiness probe - returns 200 only when accepting requests, 503 during shutdown
export async function readiness(request: FastifyRequest, reply: FastifyReply) {
if (isShuttingDown()) {
return reply.status(503).send({ ready: false, reason: 'shutting down' });
}

// Perform lightweight dependency checks for readiness
const redisRes = await getRedisCache().ping();
const dbRes = await db.project.findFirst();
const chRes = await chQuery('SELECT 1');

const isReady = redisRes && dbRes && chRes;

if (!isReady) {
return reply.status(503).send({
ready: false,
reason: 'dependencies not ready',
redis: redisRes === 'PONG',
db: !!dbRes,
ch: chRes && chRes.length > 0,
});
}

return reply.status(200).send({ ready: true });
}
2 changes: 1 addition & 1 deletion apps/api/src/hooks/request-logging.hook.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import type { FastifyReply, FastifyRequest } from 'fastify';
import { path, pick } from 'ramda';

const ignoreLog = ['/healthcheck', '/metrics', '/misc'];
const ignoreLog = ['/healthcheck', '/healthz', '/metrics', '/misc'];
const ignoreMethods = ['OPTIONS'];

const getTrpcInput = (
Expand Down
28 changes: 18 additions & 10 deletions apps/api/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,8 @@ import {
import sourceMapSupport from 'source-map-support';
import {
healthcheck,
healthcheckQueue,
liveness,
readiness,
} from './controllers/healthcheck.controller';
import { fixHook } from './hooks/fix.hook';
import { ipHook } from './hooks/ip.hook';
Expand All @@ -40,6 +41,7 @@ import profileRouter from './routes/profile.router';
import trackRouter from './routes/track.router';
import webhookRouter from './routes/webhook.router';
import { HttpError } from './utils/errors';
import { shutdown } from './utils/graceful-shutdown';
import { logger } from './utils/logger';

sourceMapSupport.install();
Expand Down Expand Up @@ -172,8 +174,11 @@ const startServer = async () => {
instance.register(importRouter, { prefix: '/import' });
instance.register(insightsRouter, { prefix: '/insights' });
instance.register(trackRouter, { prefix: '/track' });
// Keep existing endpoints for backward compatibility
instance.get('/healthcheck', healthcheck);
instance.get('/healthcheck/queue', healthcheckQueue);
// New Kubernetes-style health endpoints
instance.get('/healthz/live', liveness);
instance.get('/healthz/ready', readiness);
instance.get('/', (_request, reply) =>
reply.send({ name: 'openpanel sdk api' }),
);
Expand Down Expand Up @@ -211,14 +216,17 @@ const startServer = async () => {
});

if (process.env.NODE_ENV === 'production') {
for (const signal of ['SIGINT', 'SIGTERM']) {
process.on(signal, (error) => {
logger.error(`uncaught exception detected ${signal}`, error);
fastify.close().then((error) => {
process.exit(error ? 1 : 0);
});
});
}
logger.info('Registering graceful shutdown handlers');
process.on('SIGTERM', async () => await shutdown(fastify, 'SIGTERM', 0));
process.on('SIGINT', async () => await shutdown(fastify, 'SIGINT', 0));
process.on('uncaughtException', async (error) => {
logger.error('Uncaught exception', error);
await shutdown(fastify, 'uncaughtException', 1);
});
process.on('unhandledRejection', async (reason, promise) => {
logger.error('Unhandled rejection', { reason, promise });
await shutdown(fastify, 'unhandledRejection', 1);
});
}
Comment thread
coderabbitai[bot] marked this conversation as resolved.

await fastify.listen({
Expand Down
108 changes: 108 additions & 0 deletions apps/api/src/utils/graceful-shutdown.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
import { ch, db } from '@openpanel/db';
import {
cronQueue,
eventsQueue,
miscQueue,
notificationQueue,
sessionsQueue,
} from '@openpanel/queue';
import {
getRedisCache,
getRedisPub,
getRedisQueue,
getRedisSub,
} from '@openpanel/redis';
import type { FastifyInstance } from 'fastify';
import { logger } from './logger';

let shuttingDown = false;

export function setShuttingDown(value: boolean) {
shuttingDown = value;
}

export function isShuttingDown() {
return shuttingDown;
}

// Graceful shutdown handler
export async function shutdown(
fastify: FastifyInstance,
signal: string,
exitCode = 0,
) {
if (isShuttingDown()) {
logger.warn('Shutdown already in progress, ignoring signal', { signal });
return;
}

logger.info('Starting graceful shutdown', { signal });

setShuttingDown(true);

// Step 2: Wait for load balancer to stop sending traffic (matches preStop sleep)
const gracePeriod = Number(process.env.SHUTDOWN_GRACE_PERIOD_MS || '5000');
await new Promise((resolve) => setTimeout(resolve, gracePeriod));

// Step 3: Close Fastify to drain in-flight requests
try {
await fastify.close();
logger.info('Fastify server closed');
} catch (error) {
logger.error('Error closing Fastify server', error);
}

// Step 4: Close database connections
try {
await db.$disconnect();
logger.info('Database connection closed');
} catch (error) {
logger.error('Error closing database connection', error);
}

// Step 5: Close ClickHouse connections
try {
await ch.close();
logger.info('ClickHouse connections closed');
} catch (error) {
logger.error('Error closing ClickHouse connections', error);
}

// Step 6: Close Bull queues (graceful shutdown of queue state)
try {
await Promise.all([
eventsQueue.close(),
sessionsQueue.close(),
cronQueue.close(),
miscQueue.close(),
notificationQueue.close(),
]);
logger.info('Queue state closed');
} catch (error) {
logger.error('Error closing queue state', error);
}

// Step 7: Close Redis connections
try {
const redisConnections = [
getRedisCache(),
getRedisPub(),
getRedisSub(),
getRedisQueue(),
];

await Promise.all(
redisConnections.map(async (redis) => {
if (redis.status === 'ready') {
await redis.quit();
}
}),
);
logger.info('Redis connections closed');
} catch (error) {
logger.error('Error closing Redis connections', error);
}

logger.info('Graceful shutdown completed');
process.exit(exitCode);
}
Binary file removed packages/GeoLite2-City.mmdb
Binary file not shown.
Loading