From 5a88aad8efd9f759fb63f9c2de10473f968888ed Mon Sep 17 00:00:00 2001 From: Jean-Philippe Sirois Date: Fri, 15 May 2026 18:04:28 +0400 Subject: [PATCH] fix: dispose RPC session and ping timer so CI exits after reporting ApiClient.connect now returns { api, dispose }; runInCI calls dispose() in finally so the WebSocket stub and ping setInterval no longer keep the Node event loop alive after the report is generated. Co-Authored-By: Claude Opus 4.7 (1M context) --- src/main.ts | 198 ++++++++++++++++++++------------------- src/remote/api-client.ts | 25 ++++- 2 files changed, 121 insertions(+), 102 deletions(-) diff --git a/src/main.ts b/src/main.ts index 61c5733..5abb6f6 100644 --- a/src/main.ts +++ b/src/main.ts @@ -43,113 +43,117 @@ async function runInCI( throw new Error("CI mode cannot be run without a TOKEN variable provided") } - let api = await ApiClient.connect(siteApiEndpoint, env.TOKEN, { kind: "ci", branch, sha: "" }, remote); - - const config = repo - ? await api.getRepoConfig(repo, branch).catch( - (err) => { - log.warn(`Failed to fetch repo config via RPC: ${err}. Using defaults`, "main"); - return DEFAULT_CONFIG; - }, - ) - : DEFAULT_CONFIG; - - const runner = await Runner.build({ - targetPostgresUrl, - sourcePostgresUrl, - logPath, - maxCost, - ignoredQueryHashes: config.ignoredQueryHashes, - remote, - }); - let allResults: QueryProcessResult[]; - let reportContext; + const { api, dispose: disposeApi } = await ApiClient.connect(siteApiEndpoint, env.TOKEN, { kind: "ci", branch, sha: "" }, remote); try { - log.info("main", "Running in CI mode. Skipping server creation"); - const results = await runner.run(config); - allResults = results.allResults; - reportContext = results.reportContext; - } finally { - await runner.close(); - } + const config = repo + ? await api.getRepoConfig(repo, branch).catch( + (err) => { + log.warn(`Failed to fetch repo config via RPC: ${err}. Using defaults`, "main"); + return DEFAULT_CONFIG; + }, + ) + : DEFAULT_CONFIG; + + const runner = await Runner.build({ + targetPostgresUrl, + sourcePostgresUrl, + logPath, + maxCost, + ignoredQueryHashes: config.ignoredQueryHashes, + remote, + }); + let allResults: QueryProcessResult[]; + let reportContext; + + try { + log.info("main", "Running in CI mode. Skipping server creation"); + const results = await runner.run(config); + allResults = results.allResults; + reportContext = results.reportContext; + } finally { + await runner.close(); + } - const queries = buildQueries(allResults, config); + const queries = buildQueries(allResults, config); - // POST to Site API first so we get the run ID for the PR comment link - let runId: string | null = null; - if (siteApiEndpoint) { - runId = await postToSiteApi(siteApiEndpoint, queries, reportContext.statisticsMode, reportContext.computedStats); - } + // POST to Site API first so we get the run ID for the PR comment link + let runId: string | null = null; + if (siteApiEndpoint) { + runId = await postToSiteApi(siteApiEndpoint, queries, reportContext.statisticsMode, reportContext.computedStats); + } - // Build the run URL and query base URL for the PR comment - if (siteApiEndpoint && runId) { - // SITE_API_ENDPOINT is e.g. https://api.querydoctor.com - // The app lives at https://app.querydoctor.com — derive from the API URL - const appUrl = - process.env.SITE_APP_URL ?? - siteApiEndpoint.replace(/\/api\/?$/, "").replace("api.", "app."); - const baseUrl = appUrl.replace(/\/$/, ""); - reportContext.runUrl = `${baseUrl}/ixr/ci/${runId}`; - reportContext.queryBaseUrl = baseUrl; - } + // Build the run URL and query base URL for the PR comment + if (siteApiEndpoint && runId) { + // SITE_API_ENDPOINT is e.g. https://api.querydoctor.com + // The app lives at https://app.querydoctor.com — derive from the API URL + const appUrl = + process.env.SITE_APP_URL ?? + siteApiEndpoint.replace(/\/api\/?$/, "").replace("api.", "app."); + const baseUrl = appUrl.replace(/\/$/, ""); + reportContext.runUrl = `${baseUrl}/ixr/ci/${runId}`; + reportContext.queryBaseUrl = baseUrl; + } - // Fetch previous run for comparison - let previousRun = null; - if (siteApiEndpoint && repo) { - const comparisonBranch = - config.comparisonBranch ?? process.env.GITHUB_BASE_REF ?? branch; - const result = await fetchPreviousRun( - siteApiEndpoint, - repo, - comparisonBranch, - runId ?? undefined, - ); - reportContext.comparisonBranch = comparisonBranch; - if (result.kind === "found") { - previousRun = result.run; - } else if (result.kind === "not-found") { - log.info( - "main", - `No baseline found on branch "${comparisonBranch}". Comparison will be skipped. ` + - `To establish a baseline, run the analyzer on pushes to "${comparisonBranch}" ` + - `(add "push: branches: [${comparisonBranch}]" to your workflow trigger).`, + // Fetch previous run for comparison + let previousRun = null; + if (siteApiEndpoint && repo) { + const comparisonBranch = + config.comparisonBranch ?? process.env.GITHUB_BASE_REF ?? branch; + const result = await fetchPreviousRun( + siteApiEndpoint, + repo, + comparisonBranch, + runId ?? undefined, ); - } else { - log.warn( - "main", - `Failed to fetch baseline for branch "${comparisonBranch}" (${result.reason}). ` + - `Comparison will be skipped. This is likely a transient Site API issue — re-run the check to retry.`, + reportContext.comparisonBranch = comparisonBranch; + if (result.kind === "found") { + previousRun = result.run; + } else if (result.kind === "not-found") { + log.info( + "main", + `No baseline found on branch "${comparisonBranch}". Comparison will be skipped. ` + + `To establish a baseline, run the analyzer on pushes to "${comparisonBranch}" ` + + `(add "push: branches: [${comparisonBranch}]" to your workflow trigger).`, + ); + } else { + log.warn( + "main", + `Failed to fetch baseline for branch "${comparisonBranch}" (${result.reason}). ` + + `Comparison will be skipped. This is likely a transient Site API issue — re-run the check to retry.`, + ); + } + } + if (previousRun) { + reportContext.comparison = compareRuns( + queries, + previousRun, + config.regressionThreshold, + config.minimumCost, + config.acknowledgedQueryHashes, ); } - } - if (previousRun) { - reportContext.comparison = compareRuns( - queries, - previousRun, - config.regressionThreshold, - config.minimumCost, - config.acknowledgedQueryHashes, - ); - } - console.log("Creating report...") - // Generate PR comment with comparison data - await runner.report(reportContext); - - // Block PR if regressions exceed thresholds - if (reportContext.comparison && reportContext.comparison.regressed.length > 0) { - const messages = reportContext.comparison.regressed.map((q) => { - const preview = queryPreview(q.formattedQuery); - const cost = `cost ${formatCost(q.previousCost)} → ${formatCost(q.currentCost)} (+${q.regressionPercentage.toFixed(1)}%)`; - const link = reportContext.runUrl - ? `\n ${reportContext.runUrl}/${q.hash}` - : ""; - return ` - ${preview}: ${cost}${link}`; - }); - core.setFailed( - `${reportContext.comparison.regressed.length} untriaged regression(s) beyond threshold:\n${messages.join("\n")}`, - ); + console.log("Creating report...") + // Generate PR comment with comparison data + await runner.report(reportContext); + + // Block PR if regressions exceed thresholds + if (reportContext.comparison && reportContext.comparison.regressed.length > 0) { + const messages = reportContext.comparison.regressed.map((q) => { + const preview = queryPreview(q.formattedQuery); + const cost = `cost ${formatCost(q.previousCost)} → ${formatCost(q.currentCost)} (+${q.regressionPercentage.toFixed(1)}%)`; + const link = reportContext.runUrl + ? `\n ${reportContext.runUrl}/${q.hash}` + : ""; + return ` - ${preview}: ${cost}${link}`; + }); + core.setFailed( + `${reportContext.comparison.regressed.length} untriaged regression(s) beyond threshold:\n${messages.join("\n")}`, + ); + } + } finally { + disposeApi(); } } diff --git a/src/remote/api-client.ts b/src/remote/api-client.ts index 95e173b..3cf5508 100644 --- a/src/remote/api-client.ts +++ b/src/remote/api-client.ts @@ -70,6 +70,11 @@ export function hookUpApiReporter(api: RpcStub, remote: Remote): () = }; } +export interface ApiConnection { + api: RpcStub; + dispose: () => void; +} + export class ApiClient extends RpcTarget implements ClientApi { static #name = "ApiClient" static #PING_INTERVAL_MS = 30_000; @@ -80,19 +85,27 @@ export class ApiClient extends RpcTarget implements ClientApi { } - static async connect(endpoint: string, token: string, mode: ConnectionMode, remote: Remote): Promise> { + static async connect(endpoint: string, token: string, mode: ConnectionMode, remote: Remote): Promise { const wsEndpoint = `${endpoint}/relay`.replace(/^http/, "ws"); const unauthenticated = newWebSocketRpcSession(wsEndpoint); const api = await unauthenticated.authenticate(token, new this(remote), mode) as unknown as RpcStub; - this.schedulePingTimer(api); - return api; + const stopPing = this.schedulePingTimer(api); + let disposed = false; + const dispose = () => { + if (disposed) return; + disposed = true; + stopPing(); + try { api[Symbol.dispose](); } catch { /* already gone */ } + try { (unauthenticated as unknown as Disposable)[Symbol.dispose]?.(); } catch { /* already gone */ } + }; + return { api, dispose }; } static connectWithReconnect(endpoint: string, token: string, mode: ConnectionMode, remote: Remote): void { let cleanup: (() => void) | undefined; const attempt = async (failCount: number) => { try { - const api = await this.connect(endpoint, token, mode, remote); + const { api, dispose } = await this.connect(endpoint, token, mode, remote); log.info(`Connected to the api`, this.#name); cleanup = hookUpApiReporter(api, remote); api.onRpcBroken((err) => { @@ -100,6 +113,7 @@ export class ApiClient extends RpcTarget implements ClientApi { log.error(`Connection broken: ${err}, reconnecting in ${delay}ms`, this.#name); cleanup?.(); cleanup = undefined; + dispose(); setTimeout(() => attempt(failCount + 1), delay); }); } catch (err) { @@ -115,7 +129,7 @@ export class ApiClient extends RpcTarget implements ClientApi { attempt(0); } - static schedulePingTimer(api: RpcStub) { + static schedulePingTimer(api: RpcStub): () => void { const timer = setInterval(() => { api.ping().catch(err => { console.error(err) @@ -123,6 +137,7 @@ export class ApiClient extends RpcTarget implements ClientApi { clearInterval(timer); }); }, this.#PING_INTERVAL_MS); + return () => clearInterval(timer); } async repull(): Promise {