From 85e1e092342652b119cd77385407ffba6f54ac9f Mon Sep 17 00:00:00 2001 From: Sam Xu Date: Sun, 31 May 2026 13:05:00 +0800 Subject: [PATCH] fix(#454): chunk summarizer fanout + add /api/health/db pool probe MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Two follow-ups to PR #455. PR #455 raised the pool ceiling (max 10→50, connectionTimeoutMillis 0→5000ms) so a saturated pool fails fast instead of hanging forever. These follow-ups address the SHAPE of the burst that caused the saturation in the first place: (A) backend/services/schedulerService.ts — Chunk dispatchPodSummaryRequests. Previously a bare Promise.all over every installation: all N events became ready in a single tick, and the agent runtime then races to process them — each downstream summary handler queries PG for messages. Now: batches of 10 (configurable via SUMMARIZER_FANOUT_BATCH_SIZE) with a 500ms gap between batches (SUMMARIZER_FANOUT_BATCH_PAUSE_MS). For 60 pods that spreads enqueue across ~3 seconds. Caps peak consumer concurrency without extending total wall time meaningfully (next hourly tick is still an hour away). (B) backend/routes/health.ts — New GET /api/health/db endpoint. Reports pool stats (max, total, idle, waiting, connectionTimeoutMillis) without doing a SELECT round-trip — safe to scrape every 10s from Prometheus or a uptime check. Returns 503 when (waiting > 0 AND idle === 0), the only signal that surely indicates real queueing. Bare waiting > 0 with idle > 0 is a transient burst the pool will catch up on; alerting there would be noisy. 8 new unit tests: - 4 in schedulerService.dispatchPodSummary.test.js (empty list, small list ≤ batch, large list chunked + verified call ordering, options forwarded into payload). - 4 in health.db.test.js (200/ok shape, 200 on transient waiting, 503 on saturation, mongo state surfaced). Refs #454. Co-Authored-By: Claude Opus 4.7 (1M context) --- .../__tests__/unit/routes/health.db.test.js | 89 ++++++++++++++++ ...chedulerService.dispatchPodSummary.test.js | 100 ++++++++++++++++++ backend/routes/health.ts | 52 +++++++++ backend/services/schedulerService.ts | 56 +++++++--- 4 files changed, 280 insertions(+), 17 deletions(-) create mode 100644 backend/__tests__/unit/routes/health.db.test.js create mode 100644 backend/__tests__/unit/services/schedulerService.dispatchPodSummary.test.js diff --git a/backend/__tests__/unit/routes/health.db.test.js b/backend/__tests__/unit/routes/health.db.test.js new file mode 100644 index 000000000..288594c52 --- /dev/null +++ b/backend/__tests__/unit/routes/health.db.test.js @@ -0,0 +1,89 @@ +// Unit test for the /api/health/db pool-status probe (#454 follow-up). +// Verifies the saturation signal (waiting > 0 AND idle === 0 → 503) +// and the OK shape (200 with pool stats payload). + +const request = require('supertest'); +const express = require('express'); + +// Hand-rolled pg pool stub so the test controls totalCount/idleCount/ +// waitingCount and we can assert the response logic deterministically. +const mockPool = { + options: { max: 50, connectionTimeoutMillis: 5000, idleTimeoutMillis: 10000 }, + totalCount: 0, + idleCount: 0, + waitingCount: 0, + query: jest.fn().mockResolvedValue({ rows: [{ ok: 1 }] }), + on: jest.fn(), +}; + +jest.mock('../../../config/db-pg', () => ({ + pool: mockPool, + connectPG: jest.fn(), +})); + +// process.env.PG_HOST needs to be truthy or the route returns +// not_configured. Set BEFORE the route module loads. +process.env.PG_HOST = process.env.PG_HOST || 'localhost-test'; + +const healthRoutes = require('../../../routes/health'); + +const buildApp = () => { + const app = express(); + app.use(express.json()); + app.use('/api/health', healthRoutes); + return app; +}; + +describe('GET /api/health/db', () => { + beforeEach(() => { + mockPool.totalCount = 0; + mockPool.idleCount = 0; + mockPool.waitingCount = 0; + }); + + it('returns 200 with pg stats when pool is idle (no waiters)', async () => { + mockPool.totalCount = 3; + mockPool.idleCount = 2; + mockPool.waitingCount = 0; + + const res = await request(buildApp()).get('/api/health/db').expect(200); + expect(res.body.pg).toEqual(expect.objectContaining({ + status: 'ok', + max: 50, + total: 3, + idle: 2, + waiting: 0, + connectionTimeoutMillis: 5000, + })); + }); + + it('returns 200 (not saturated) when waiting > 0 but idle > 0 (transient burst)', async () => { + mockPool.totalCount = 10; + mockPool.idleCount = 1; + mockPool.waitingCount = 3; + + const res = await request(buildApp()).get('/api/health/db').expect(200); + expect(res.body.pg.status).toBe('ok'); + expect(res.body.pg.waiting).toBe(3); + }); + + it('returns 503 (saturated) when waiting > 0 AND idle === 0', async () => { + mockPool.totalCount = 50; + mockPool.idleCount = 0; + mockPool.waitingCount = 5; + + const res = await request(buildApp()).get('/api/health/db').expect(503); + expect(res.body.pg).toEqual(expect.objectContaining({ + status: 'saturated', + idle: 0, + waiting: 5, + })); + }); + + it('reports mongo state', async () => { + const res = await request(buildApp()).get('/api/health/db').expect(200); + expect(res.body.mongo).toEqual(expect.objectContaining({ + state: expect.any(String), + })); + }); +}); diff --git a/backend/__tests__/unit/services/schedulerService.dispatchPodSummary.test.js b/backend/__tests__/unit/services/schedulerService.dispatchPodSummary.test.js new file mode 100644 index 000000000..ff2434a36 --- /dev/null +++ b/backend/__tests__/unit/services/schedulerService.dispatchPodSummary.test.js @@ -0,0 +1,100 @@ +// Unit test for SchedulerService.dispatchPodSummaryRequests chunking +// (#454 follow-up). The previous bare Promise.all fanned out all +// installations in a single tick — see project-2026-05-26-pg-pool- +// exhaustion-incident memory for the live incident this prevents. + +jest.mock('../../../services/agentEventService', () => ({ + enqueue: jest.fn().mockResolvedValue({ _id: 'evt' }), +})); + +jest.mock('../../../models/AgentRegistry', () => ({ + AgentInstallation: { + find: jest.fn(), + }, +})); + +const AgentEventService = require('../../../services/agentEventService'); +const { AgentInstallation } = require('../../../models/AgentRegistry'); +// schedulerService is exported as `new SchedulerService()` instance via +// CJS compat (`module.exports = exports["default"]`). The class lives +// on `.constructor` of that instance — same pattern routes/summaries.ts +// uses to access SchedulerService.runSummarizer. +const schedulerServiceInstance = require('../../../services/schedulerService'); +const SchedulerService = schedulerServiceInstance.constructor; + +const makeFindChain = (installations) => { + const lean = jest.fn().mockResolvedValue(installations); + const select = jest.fn().mockReturnValue({ lean }); + AgentInstallation.find.mockReturnValue({ select }); +}; + +describe('SchedulerService.dispatchPodSummaryRequests chunking', () => { + beforeEach(() => { + jest.clearAllMocks(); + // Force tiny pause so the test runs fast. + process.env.SUMMARIZER_FANOUT_BATCH_SIZE = '10'; + process.env.SUMMARIZER_FANOUT_BATCH_PAUSE_MS = '0'; + }); + + afterAll(() => { + delete process.env.SUMMARIZER_FANOUT_BATCH_SIZE; + delete process.env.SUMMARIZER_FANOUT_BATCH_PAUSE_MS; + }); + + it('returns 0 enqueued when no installations exist', async () => { + makeFindChain([]); + const result = await SchedulerService.dispatchPodSummaryRequests(); + expect(result).toEqual({ enqueued: 0 }); + expect(AgentEventService.enqueue).not.toHaveBeenCalled(); + }); + + it('enqueues every installation when count <= batch size', async () => { + const installs = Array.from({ length: 5 }, (_, i) => ({ podId: `pod-${i}`, instanceId: 'default' })); + makeFindChain(installs); + const result = await SchedulerService.dispatchPodSummaryRequests(); + expect(result).toEqual({ enqueued: 5 }); + expect(AgentEventService.enqueue).toHaveBeenCalledTimes(5); + }); + + it('chunks the fanout when count exceeds batch size (no all-at-once burst)', async () => { + const installs = Array.from({ length: 60 }, (_, i) => ({ podId: `pod-${i}`, instanceId: 'default' })); + makeFindChain(installs); + + // Snapshot the order of calls so we can verify they came in + // batches, not one big Promise.all over 60 items. With + // BATCH_PAUSE_MS=0 each batch still awaits its Promise.all before + // the next batch starts — verified by call ordering. + const callOrder = []; + AgentEventService.enqueue.mockImplementation(async ({ podId }) => { + callOrder.push(String(podId)); + return { _id: 'evt' }; + }); + + const result = await SchedulerService.dispatchPodSummaryRequests(); + expect(result).toEqual({ enqueued: 60 }); + expect(callOrder).toHaveLength(60); + // First 10 calls should be the first batch (pod-0 through pod-9), + // demonstrating sequential batching. With a single Promise.all over + // all 60 the order is non-deterministic. + expect(callOrder.slice(0, 10)).toEqual( + Array.from({ length: 10 }, (_, i) => `pod-${i}`), + ); + expect(callOrder.slice(10, 20)).toEqual( + Array.from({ length: 10 }, (_, i) => `pod-${10 + i}`), + ); + }); + + it('forwards trigger + windowMinutes options into the payload', async () => { + makeFindChain([{ podId: 'pod-x', instanceId: 'default' }]); + await SchedulerService.dispatchPodSummaryRequests({ trigger: 'manual-test', windowMinutes: 15 }); + expect(AgentEventService.enqueue).toHaveBeenCalledWith(expect.objectContaining({ + agentName: 'commonly-bot', + podId: 'pod-x', + type: 'summary.request', + payload: expect.objectContaining({ + trigger: 'manual-test', + windowMinutes: 15, + }), + })); + }); +}); diff --git a/backend/routes/health.ts b/backend/routes/health.ts index e363f0d85..9e0cdc129 100644 --- a/backend/routes/health.ts +++ b/backend/routes/health.ts @@ -109,6 +109,58 @@ router.get('/live', (_req: unknown, res: Res) => { res.status(200).json({ status: 'alive', timestamp: new Date().toISOString() }); }); +// #454 follow-up: dedicated DB-pool probe. The main /api/health does a +// `SELECT 1` round-trip but doesn't surface pool stats. Alerting on +// pool.waitingCount > N for sustained windows would have caught the +// 2026-05-26 incident before user impact. Lightweight (no PG query) so +// safe to scrape frequently (e.g. every 10s from Prometheus). +// +// Returns 503 when waiting > 0 AND idle === 0 — the only signal that +// surely indicates saturation (waiting>0 alone could be a brief blip +// during normal bursts). Operators tune via env. +router.get('/db', (_req: unknown, res: Res) => { + const stats: Record = { + timestamp: new Date().toISOString(), + mongo: { + state: ['disconnected', 'connected', 'connecting', 'disconnecting'][mongoose.connection.readyState] || 'unknown', + }, + }; + + if (!process.env.PG_HOST || !pgPool) { + stats.pg = { status: 'not_configured' }; + return res.status(200).json(stats); + } + + const p = pgPool as { + options?: { max?: number; connectionTimeoutMillis?: number; idleTimeoutMillis?: number }; + totalCount?: number; + idleCount?: number; + waitingCount?: number; + }; + const idle = p.idleCount ?? 0; + const waiting = p.waitingCount ?? 0; + const total = p.totalCount ?? 0; + const max = p.options?.max ?? 0; + + // Saturation signal: waiting callers AND zero idle connections. + // waiting > 0 with idle > 0 just means clients ask in bursts faster + // than they release — pool will catch up. Both being non-zero is + // when actual queueing happens and latency stacks up. + const saturated = waiting > 0 && idle === 0; + + stats.pg = { + status: saturated ? 'saturated' : 'ok', + max, + total, + idle, + waiting, + connectionTimeoutMillis: p.options?.connectionTimeoutMillis ?? 0, + idleTimeoutMillis: p.options?.idleTimeoutMillis ?? 0, + }; + + return res.status(saturated ? 503 : 200).json(stats); +}); + router.get('/ready', async (_req: unknown, res: Res) => { try { if (mongoose.connection.readyState !== 1) { diff --git a/backend/services/schedulerService.ts b/backend/services/schedulerService.ts index c16c694b9..5da15b451 100644 --- a/backend/services/schedulerService.ts +++ b/backend/services/schedulerService.ts @@ -675,23 +675,45 @@ class SchedulerService { return { enqueued: 0 }; } - await Promise.all( - installations.map((installation) => ( - AgentEventService.enqueue({ - agentName: 'commonly-bot', - instanceId: installation.instanceId || 'default', - podId: installation.podId, - type: 'summary.request', - payload: { - source: 'pod', - trigger, - windowMinutes, - includeDigest: true, - silent: true, - }, - }) - )), - ); + // #454 follow-up: chunk the fanout. A bare Promise.all over all + // installations means 60+ events become ready at the exact same + // instant; the agent runtime then races to process them, and each + // downstream summary handler queries PG for messages — which under + // pg.Pool max=10 (pre-#455) saturated the pool and hung user-facing + // /api/pods. PR #455 raised the pool ceiling, but the burst itself + // is still wasteful: better to spread enqueue (and therefore the + // downstream processing window) over a few seconds. Batches of 10 + // with a 500ms gap caps peak concurrency on the consumer side while + // keeping total wall time well under the next hourly tick. + const BATCH_SIZE = parseInt(process.env.SUMMARIZER_FANOUT_BATCH_SIZE || '10', 10) || 10; + const BATCH_PAUSE_MS = parseInt(process.env.SUMMARIZER_FANOUT_BATCH_PAUSE_MS || '500', 10) || 500; + + for (let i = 0; i < installations.length; i += BATCH_SIZE) { + const batch = installations.slice(i, i + BATCH_SIZE); + // eslint-disable-next-line no-await-in-loop + await Promise.all( + batch.map((installation) => ( + AgentEventService.enqueue({ + agentName: 'commonly-bot', + instanceId: installation.instanceId || 'default', + podId: installation.podId, + type: 'summary.request', + payload: { + source: 'pod', + trigger, + windowMinutes, + includeDigest: true, + silent: true, + }, + }) + )), + ); + // Sleep between batches; skip the final pause. + if (i + BATCH_SIZE < installations.length && BATCH_PAUSE_MS > 0) { + // eslint-disable-next-line no-await-in-loop, no-promise-executor-return + await new Promise((resolve) => setTimeout(resolve, BATCH_PAUSE_MS)); + } + } return { enqueued: installations.length }; }