Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
89 changes: 89 additions & 0 deletions backend/__tests__/unit/routes/health.db.test.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
// Unit test for the /api/health/db pool-status probe (#454 follow-up).
// Verifies the saturation signal (waiting > 0 AND idle === 0 → 503)
// and the OK shape (200 with pool stats payload).

const request = require('supertest');
const express = require('express');

// Hand-rolled pg pool stub so the test controls totalCount/idleCount/
// waitingCount and we can assert the response logic deterministically.
const mockPool = {
options: { max: 50, connectionTimeoutMillis: 5000, idleTimeoutMillis: 10000 },
totalCount: 0,
idleCount: 0,
waitingCount: 0,
query: jest.fn().mockResolvedValue({ rows: [{ ok: 1 }] }),
on: jest.fn(),
};

jest.mock('../../../config/db-pg', () => ({
pool: mockPool,
connectPG: jest.fn(),
}));

// process.env.PG_HOST needs to be truthy or the route returns
// not_configured. Set BEFORE the route module loads.
process.env.PG_HOST = process.env.PG_HOST || 'localhost-test';

const healthRoutes = require('../../../routes/health');

const buildApp = () => {
const app = express();
app.use(express.json());
app.use('/api/health', healthRoutes);
return app;
};

describe('GET /api/health/db', () => {
beforeEach(() => {
mockPool.totalCount = 0;
mockPool.idleCount = 0;
mockPool.waitingCount = 0;
});

it('returns 200 with pg stats when pool is idle (no waiters)', async () => {
mockPool.totalCount = 3;
mockPool.idleCount = 2;
mockPool.waitingCount = 0;

const res = await request(buildApp()).get('/api/health/db').expect(200);
expect(res.body.pg).toEqual(expect.objectContaining({
status: 'ok',
max: 50,
total: 3,
idle: 2,
waiting: 0,
connectionTimeoutMillis: 5000,
}));
});

it('returns 200 (not saturated) when waiting > 0 but idle > 0 (transient burst)', async () => {
mockPool.totalCount = 10;
mockPool.idleCount = 1;
mockPool.waitingCount = 3;

const res = await request(buildApp()).get('/api/health/db').expect(200);
expect(res.body.pg.status).toBe('ok');
expect(res.body.pg.waiting).toBe(3);
});

it('returns 503 (saturated) when waiting > 0 AND idle === 0', async () => {
mockPool.totalCount = 50;
mockPool.idleCount = 0;
mockPool.waitingCount = 5;

const res = await request(buildApp()).get('/api/health/db').expect(503);
expect(res.body.pg).toEqual(expect.objectContaining({
status: 'saturated',
idle: 0,
waiting: 5,
}));
});

it('reports mongo state', async () => {
const res = await request(buildApp()).get('/api/health/db').expect(200);
expect(res.body.mongo).toEqual(expect.objectContaining({
state: expect.any(String),
}));
});
});
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
// Unit test for SchedulerService.dispatchPodSummaryRequests chunking
// (#454 follow-up). The previous bare Promise.all fanned out all
// installations in a single tick — see project-2026-05-26-pg-pool-
// exhaustion-incident memory for the live incident this prevents.

jest.mock('../../../services/agentEventService', () => ({
enqueue: jest.fn().mockResolvedValue({ _id: 'evt' }),
}));

jest.mock('../../../models/AgentRegistry', () => ({
AgentInstallation: {
find: jest.fn(),
},
}));

const AgentEventService = require('../../../services/agentEventService');
const { AgentInstallation } = require('../../../models/AgentRegistry');
// schedulerService is exported as `new SchedulerService()` instance via
// CJS compat (`module.exports = exports["default"]`). The class lives
// on `.constructor` of that instance — same pattern routes/summaries.ts
// uses to access SchedulerService.runSummarizer.
const schedulerServiceInstance = require('../../../services/schedulerService');
const SchedulerService = schedulerServiceInstance.constructor;

const makeFindChain = (installations) => {
const lean = jest.fn().mockResolvedValue(installations);
const select = jest.fn().mockReturnValue({ lean });
AgentInstallation.find.mockReturnValue({ select });
};

describe('SchedulerService.dispatchPodSummaryRequests chunking', () => {
beforeEach(() => {
jest.clearAllMocks();
// Force tiny pause so the test runs fast.
process.env.SUMMARIZER_FANOUT_BATCH_SIZE = '10';
process.env.SUMMARIZER_FANOUT_BATCH_PAUSE_MS = '0';
});

afterAll(() => {
delete process.env.SUMMARIZER_FANOUT_BATCH_SIZE;
delete process.env.SUMMARIZER_FANOUT_BATCH_PAUSE_MS;
});

it('returns 0 enqueued when no installations exist', async () => {
makeFindChain([]);
const result = await SchedulerService.dispatchPodSummaryRequests();
expect(result).toEqual({ enqueued: 0 });
expect(AgentEventService.enqueue).not.toHaveBeenCalled();
});

it('enqueues every installation when count <= batch size', async () => {
const installs = Array.from({ length: 5 }, (_, i) => ({ podId: `pod-${i}`, instanceId: 'default' }));
makeFindChain(installs);
const result = await SchedulerService.dispatchPodSummaryRequests();
expect(result).toEqual({ enqueued: 5 });
expect(AgentEventService.enqueue).toHaveBeenCalledTimes(5);
});

it('chunks the fanout when count exceeds batch size (no all-at-once burst)', async () => {
const installs = Array.from({ length: 60 }, (_, i) => ({ podId: `pod-${i}`, instanceId: 'default' }));
makeFindChain(installs);

// Snapshot the order of calls so we can verify they came in
// batches, not one big Promise.all over 60 items. With
// BATCH_PAUSE_MS=0 each batch still awaits its Promise.all before
// the next batch starts — verified by call ordering.
const callOrder = [];
AgentEventService.enqueue.mockImplementation(async ({ podId }) => {
callOrder.push(String(podId));
return { _id: 'evt' };
});

const result = await SchedulerService.dispatchPodSummaryRequests();
expect(result).toEqual({ enqueued: 60 });
expect(callOrder).toHaveLength(60);
// First 10 calls should be the first batch (pod-0 through pod-9),
// demonstrating sequential batching. With a single Promise.all over
// all 60 the order is non-deterministic.
expect(callOrder.slice(0, 10)).toEqual(
Array.from({ length: 10 }, (_, i) => `pod-${i}`),
);
expect(callOrder.slice(10, 20)).toEqual(
Array.from({ length: 10 }, (_, i) => `pod-${10 + i}`),
);
});

it('forwards trigger + windowMinutes options into the payload', async () => {
makeFindChain([{ podId: 'pod-x', instanceId: 'default' }]);
await SchedulerService.dispatchPodSummaryRequests({ trigger: 'manual-test', windowMinutes: 15 });
expect(AgentEventService.enqueue).toHaveBeenCalledWith(expect.objectContaining({
agentName: 'commonly-bot',
podId: 'pod-x',
type: 'summary.request',
payload: expect.objectContaining({
trigger: 'manual-test',
windowMinutes: 15,
}),
}));
});
});
52 changes: 52 additions & 0 deletions backend/routes/health.ts
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,58 @@ router.get('/live', (_req: unknown, res: Res) => {
res.status(200).json({ status: 'alive', timestamp: new Date().toISOString() });
});

// #454 follow-up: dedicated DB-pool probe. The main /api/health does a
// `SELECT 1` round-trip but doesn't surface pool stats. Alerting on
// pool.waitingCount > N for sustained windows would have caught the
// 2026-05-26 incident before user impact. Lightweight (no PG query) so
// safe to scrape frequently (e.g. every 10s from Prometheus).
//
// Returns 503 when waiting > 0 AND idle === 0 — the only signal that
// surely indicates saturation (waiting>0 alone could be a brief blip
// during normal bursts). Operators tune via env.
router.get('/db', (_req: unknown, res: Res) => {
const stats: Record<string, unknown> = {
timestamp: new Date().toISOString(),
mongo: {
state: ['disconnected', 'connected', 'connecting', 'disconnecting'][mongoose.connection.readyState] || 'unknown',
},
};

if (!process.env.PG_HOST || !pgPool) {
stats.pg = { status: 'not_configured' };
return res.status(200).json(stats);
}

const p = pgPool as {
options?: { max?: number; connectionTimeoutMillis?: number; idleTimeoutMillis?: number };
totalCount?: number;
idleCount?: number;
waitingCount?: number;
};
const idle = p.idleCount ?? 0;
const waiting = p.waitingCount ?? 0;
const total = p.totalCount ?? 0;
const max = p.options?.max ?? 0;

// Saturation signal: waiting callers AND zero idle connections.
// waiting > 0 with idle > 0 just means clients ask in bursts faster
// than they release — pool will catch up. Both being non-zero is
// when actual queueing happens and latency stacks up.
const saturated = waiting > 0 && idle === 0;

stats.pg = {
status: saturated ? 'saturated' : 'ok',
max,
total,
idle,
waiting,
connectionTimeoutMillis: p.options?.connectionTimeoutMillis ?? 0,
idleTimeoutMillis: p.options?.idleTimeoutMillis ?? 0,
};

return res.status(saturated ? 503 : 200).json(stats);
});

router.get('/ready', async (_req: unknown, res: Res) => {
try {
if (mongoose.connection.readyState !== 1) {
Expand Down
56 changes: 39 additions & 17 deletions backend/services/schedulerService.ts
Original file line number Diff line number Diff line change
Expand Up @@ -675,23 +675,45 @@ class SchedulerService {
return { enqueued: 0 };
}

await Promise.all(
installations.map((installation) => (
AgentEventService.enqueue({
agentName: 'commonly-bot',
instanceId: installation.instanceId || 'default',
podId: installation.podId,
type: 'summary.request',
payload: {
source: 'pod',
trigger,
windowMinutes,
includeDigest: true,
silent: true,
},
})
)),
);
// #454 follow-up: chunk the fanout. A bare Promise.all over all
// installations means 60+ events become ready at the exact same
// instant; the agent runtime then races to process them, and each
// downstream summary handler queries PG for messages — which under
// pg.Pool max=10 (pre-#455) saturated the pool and hung user-facing
// /api/pods. PR #455 raised the pool ceiling, but the burst itself
// is still wasteful: better to spread enqueue (and therefore the
// downstream processing window) over a few seconds. Batches of 10
// with a 500ms gap caps peak concurrency on the consumer side while
// keeping total wall time well under the next hourly tick.
const BATCH_SIZE = parseInt(process.env.SUMMARIZER_FANOUT_BATCH_SIZE || '10', 10) || 10;
const BATCH_PAUSE_MS = parseInt(process.env.SUMMARIZER_FANOUT_BATCH_PAUSE_MS || '500', 10) || 500;

for (let i = 0; i < installations.length; i += BATCH_SIZE) {
const batch = installations.slice(i, i + BATCH_SIZE);
// eslint-disable-next-line no-await-in-loop
await Promise.all(
batch.map((installation) => (
AgentEventService.enqueue({
agentName: 'commonly-bot',
instanceId: installation.instanceId || 'default',
podId: installation.podId,
type: 'summary.request',
payload: {
source: 'pod',
trigger,
windowMinutes,
includeDigest: true,
silent: true,
},
})
)),
);
// Sleep between batches; skip the final pause.
if (i + BATCH_SIZE < installations.length && BATCH_PAUSE_MS > 0) {
// eslint-disable-next-line no-await-in-loop, no-promise-executor-return
await new Promise((resolve) => setTimeout(resolve, BATCH_PAUSE_MS));
}
}

return { enqueued: installations.length };
}
Expand Down
Loading