From 30e970b71231000b3fa52d318ce2256e14d4b15c Mon Sep 17 00:00:00 2001 From: Xetera Date: Wed, 20 May 2026 22:07:24 +0300 Subject: [PATCH 1/2] feat: allow pg_stat_statements as query source in ci --- src/main.ts | 25 ++++----- src/runner.ts | 105 ++++++----------------------------- src/sql/pg_log.ts | 84 ---------------------------- src/sql/pgbadger.ts | 100 +++++++++++++++++++++++++++++++++ src/sql/recent-query.test.ts | 1 + src/sql/recent-query.ts | 22 ++------ src/sync/pg-connector.ts | 3 +- 7 files changed, 137 insertions(+), 203 deletions(-) delete mode 100644 src/sql/pg_log.ts create mode 100644 src/sql/pgbadger.ts diff --git a/src/main.ts b/src/main.ts index 5abb6f62..9fc89131 100644 --- a/src/main.ts +++ b/src/main.ts @@ -12,19 +12,17 @@ import { postToSiteApi, } from "./reporters/site-api.ts"; import { formatCost, queryPreview } from "./reporters/github/github.ts"; -import { DEFAULT_CONFIG, type AnalyzerConfig } from "./config.ts"; -import { ApiClient, hookUpApiReporter } from "./remote/api-client.ts"; +import { DEFAULT_CONFIG } from "./config.ts"; +import { ApiClient } from "./remote/api-client.ts"; import { Remote } from "./remote/remote.ts"; import { ConnectionManager } from "./sync/connection-manager.ts"; -import type { RpcStub } from "capnweb"; -import type { ServerApi } from "@query-doctor/core"; - -const INVALID_TOKEN_ERROR = "Unauthorized" +import { PgbadgerSource } from "./sql/pgbadger.ts"; +import type { RecentQuerySource } from "./sql/recent-query.ts"; async function runInCI( targetPostgresUrl: Connectable, sourcePostgresUrl: Connectable, - logPath: string, + logPath: string | undefined, maxCost?: number, ) { const siteApiEndpoint = env.SITE_API_ENDPOINT; @@ -32,10 +30,11 @@ async function runInCI( const branch = process.env.GITHUB_HEAD_REF || process.env.GITHUB_REF_NAME || ""; + const remoteDbManager = ConnectionManager.forRemoteDatabase() const remote = new Remote( targetPostgresUrl, ConnectionManager.forLocalDatabase(), - ConnectionManager.forRemoteDatabase(), + remoteDbManager, { disableQueryLoader: true }, ); @@ -55,10 +54,14 @@ async function runInCI( ) : DEFAULT_CONFIG; + const source: RecentQuerySource = logPath + ? new PgbadgerSource(logPath) + : remoteDbManager.getConnectorFor(sourcePostgresUrl); + const runner = await Runner.build({ targetPostgresUrl, sourcePostgresUrl, - logPath, + source, maxCost, ignoredQueryHashes: config.ignoredQueryHashes, remote, @@ -202,10 +205,6 @@ async function main() { core.setFailed("POSTGRES_URL environment variable is not set"); process.exit(1); } - if (!env.LOG_PATH) { - core.setFailed("LOG_PATH environment variable is not set"); - process.exit(1); - } await runInCI( Connectable.fromString(env.POSTGRES_URL), Connectable.fromString(env.SOURCE_DATABASE_URL), diff --git a/src/runner.ts b/src/runner.ts index 000e68cb..e3496da8 100644 --- a/src/runner.ts +++ b/src/runner.ts @@ -1,10 +1,6 @@ import * as core from "@actions/core"; -import csv from "fast-csv"; -import { statSync } from "node:fs"; -import { spawn } from "node:child_process"; -import { fingerprint } from "@libpg-query/parser"; -import { preprocessEncodedJson } from "./sql/json.ts"; -import { ExplainedLog } from "./sql/pg_log.ts"; +import { PgbadgerSource } from "./sql/pgbadger.ts"; +import type { RecentQuerySource } from "./sql/recent-query.ts"; import { GithubReporter } from "./reporters/github/github.ts"; import { deriveIndexStatistics, @@ -17,8 +13,6 @@ import { env } from "./env.ts"; import { Connectable } from "./sync/connectable.ts"; import { Remote, StatisticsStrategy } from "./remote/remote.ts"; import { ConnectionManager } from "./sync/connection-manager.ts"; -import { RecentQuery } from "./sql/recent-query.ts"; -import { QueryHash } from "./sql/recent-query.ts"; import type { OptimizedQuery } from "./sql/recent-query.ts"; import { ExportedStats } from "@query-doctor/core"; import { readFile } from "node:fs/promises"; @@ -27,7 +21,7 @@ import { buildQueries } from "./reporters/site-api.ts"; export class Runner { constructor( private readonly remote: Remote, - private readonly logPath: string, + private readonly source: RecentQuerySource, private readonly maxCost?: number, private readonly ignoredQueryHashes: Set = new Set(), ) { } @@ -37,7 +31,7 @@ export class Runner { sourcePostgresUrl: Connectable; statisticsPath?: string; maxCost?: number; - logPath: string; + source: RecentQuerySource; ignoredQueryHashes?: string[]; remote?: Remote; }) { @@ -45,7 +39,6 @@ export class Runner { options.targetPostgresUrl, ConnectionManager.forLocalDatabase(), ConnectionManager.forRemoteDatabase(), - // queries are already sourced from logs { disableQueryLoader: true } ); await remote.syncFrom(options.sourcePostgresUrl, @@ -54,7 +47,7 @@ export class Runner { await remote.optimizer.finish; return new Runner( remote, - options.logPath, + options.source, options.maxCost, new Set(options.ignoredQueryHashes ?? []), ); @@ -92,82 +85,15 @@ export class Runner { async run(config: AnalyzerConfig = DEFAULT_CONFIG) { const startDate = new Date(); - const logSize = statSync(this.logPath).size; - console.log(`logPath=${this.logPath},fileSize=${logSize}`); - const args = [ - "--dump-raw-csv", - "--no-progressbar", - "-f", - "stderr", - this.logPath, - ]; - console.log(`pgbadger ${args.join(" ")}`); - const child = spawn("pgbadger", args, { - stdio: ["ignore", "pipe", "pipe"], - }); - child.stderr!.pipe(process.stderr); - let error: Error | undefined; - const stream = csv - .parseStream(child.stdout!, { - headers: false, - }) - .on("error", (err) => { - console.error("Got a pgbadger error", err); - error = err; - }); console.time("total"); - const recentQueries: RecentQuery[] = []; - for await (const chunk of stream) { - const [ - _timestamp, - _username, - _dbname, - _pid, - _client, - _sessionid, - loglevel, - _sqlstate, - _duration, - queryString, - _parameters, - _appname, - _backendtype, - _queryid, - ] = chunk as string[]; - if (loglevel !== "LOG" || !queryString.startsWith("plan:")) { - continue; - } - const planString: string = queryString.split("plan:")[1].trim(); - const json = preprocessEncodedJson(planString); - if (!json) { - console.log("Skipping LOG that is not JSON", queryString); - continue; - } - let parsed: ExplainedLog; - try { - parsed = ExplainedLog.fromLog(json); - } catch (e) { - console.log(e); - console.log( - "Log line that looked like valid auto_explain was not valid json?", - ); - continue; - } - - const query = parsed.query; - const hash = QueryHash.parse(await fingerprint(query)); - if (this.ignoredQueryHashes.has(hash)) { - continue; - } - if (parsed.isIntrospection) { - continue; - } - - const recentQuery = await RecentQuery.fromLogEntry(query, hash); - recentQueries.push(recentQuery) - } - console.log("Finished pgbadger stream"); + const recentQueries = await this.source.getRecentQueries(); + const error = this.source instanceof PgbadgerSource + ? this.source.streamError + : undefined; + const totalRows = this.source instanceof PgbadgerSource + ? this.source.totalRows + : recentQueries.length; await this.remote.optimizer.addQueries(recentQueries); await this.remote.optimizer.finish; @@ -184,7 +110,7 @@ export class Runner { }); console.log( - `Matched ${this.remote.optimizer.validQueriesProcessed} unique queries out of ${recentQueries.length} log entries`, + `Matched ${this.remote.optimizer.validQueriesProcessed} unique queries out of ${totalRows} entries`, ); const recommendations: ReportIndexRecommendation[] = []; @@ -266,7 +192,10 @@ export class Runner { }), statistics, error, - metadata: { logSize, timeElapsed }, + metadata: { + logSize: this.source instanceof PgbadgerSource ? this.source.logSize : -1, + timeElapsed, + }, }; console.timeEnd("total"); return { reportContext, allResults }; diff --git a/src/sql/pg_log.ts b/src/sql/pg_log.ts deleted file mode 100644 index db48cac7..00000000 --- a/src/sql/pg_log.ts +++ /dev/null @@ -1,84 +0,0 @@ -export class ExplainedLog { - private static readonly paramPattern = - /\$(\d+)\s*=\s*(?:'([^']*)'|([^,\s]+))/g; - constructor(private readonly json: object) {} - - static fromLog(stringifiedJson: string) { - const json = JSON.parse(stringifiedJson); - return new ExplainedLog(json); - } - - get query(): string { - if ( - !("Query Text" in this.json) || - typeof this.json["Query Text"] !== "string" - ) { - console.error(this.json); - throw new Error("Query Text not found"); - } - return this.json["Query Text"]; - } - - get plan(): Plan { - if (!("Plan" in this.json)) { - console.error(this.json); - throw new Error("Plan not found"); - } - return new Plan(this.json["Plan"] as object); - } - - get parameters(): string[] { - if (!("Query Parameters" in this.json)) { - return []; - } - if (typeof this.json["Query Parameters"] !== "string") { - console.error(this.json); - throw new Error("Query Parameters not found"); - } - return this.extractParams(this.json["Query Parameters"]); - } - - private extractParams(logLine: string) { - const paramsArray: string[] = []; - let match: RegExpExecArray | null; - - while ((match = ExplainedLog.paramPattern.exec(logLine)) !== null) { - const paramValue = match[2] !== undefined ? match[2] : match[3]; - // Push the value directly into the array. - // The order is determined by the $1, $2, etc. in the log line. - paramsArray[parseInt(match[1]) - 1] = paramValue; - } - - return paramsArray.filter((value) => value !== undefined); - } - - /** - * Whether this query was run by our tool. - * Want to skip this to prevent analyzing our own queries. - */ - get isIntrospection(): boolean { - return this.query.includes("@qd_introspection"); - } -} - -class Plan { - constructor(public readonly json: object) {} - - get nodeType(): string { - if ( - !("Node Type" in this.json) || - typeof this.json["Node Type"] !== "string" - ) { - console.error(this.json); - throw new Error("Node Type not found"); - } - return this.json["Node Type"]; - } - - get cost(): number { - if (!("Total Cost" in this.json)) { - return -1; - } - return Number(this.json["Total Cost"]); - } -} diff --git a/src/sql/pgbadger.ts b/src/sql/pgbadger.ts new file mode 100644 index 00000000..9a88e868 --- /dev/null +++ b/src/sql/pgbadger.ts @@ -0,0 +1,100 @@ +import { spawn } from "node:child_process"; +import { statSync } from "node:fs"; +import csv from "fast-csv"; +import type { RawRecentQuery, RecentQuery } from "./recent-query.ts"; +import { preprocessEncodedJson } from "./json.ts"; +import { QueryCache } from "../sync/seen-cache.ts"; +import type { RecentQuerySource } from "./recent-query.ts"; + +const INTROSPECTION_MARKER = "@qd_introspection"; +const PLAN_PREFIX = "plan:"; + +export function rawQueryFromPgbadgerRow( + chunk: readonly string[], +): RawRecentQuery | null { + const loglevel = chunk[6]; + const queryString = chunk[9]; + if (loglevel !== "LOG" || !queryString || !queryString.startsWith(PLAN_PREFIX)) { + return null; + } + + const planString = queryString.slice(PLAN_PREFIX.length).trim(); + const json = preprocessEncodedJson(planString); + if (!json) { + return null; + } + + let parsed: unknown; + try { + parsed = JSON.parse(json); + } catch { + return null; + } + if (!parsed || typeof parsed !== "object") { + return null; + } + + const query = (parsed as Record)["Query Text"]; + if (typeof query !== "string") { + return null; + } + if (query.includes(INTROSPECTION_MARKER)) { + return null; + } + + return { + username: "", + query, + formattedQuery: query, + meanTime: 0, + calls: "1", + rows: "0", + topLevel: true, + }; +} + +export class PgbadgerSource implements RecentQuerySource { + totalRows = 0; + streamError?: Error; + readonly logSize: number; + + constructor( + private readonly logPath: string, + private readonly cache: QueryCache = new QueryCache(), + ) { + this.logSize = statSync(this.logPath).size; + console.log(`logPath=${this.logPath},fileSize=${this.logSize}`); + } + + async getRecentQueries(): Promise { + const args = [ + "--dump-raw-csv", + "--no-progressbar", + "-f", + "stderr", + this.logPath, + ]; + console.log(`pgbadger ${args.join(" ")}`); + const child = spawn("pgbadger", args, { + stdio: ["ignore", "pipe", "pipe"], + }); + child.stderr!.pipe(process.stderr); + + const stream = csv + .parseStream(child.stdout!, { headers: false }) + .on("error", (err) => { + console.error("Got a pgbadger error", err); + this.streamError = err; + }); + + const rawQueries: RawRecentQuery[] = []; + for await (const chunk of stream) { + const raw = rawQueryFromPgbadgerRow(chunk as string[]); + if (!raw) continue; + rawQueries.push(raw); + this.totalRows++; + } + console.log("Finished pgbadger stream"); + return this.cache.sync(rawQueries); + } +} diff --git a/src/sql/recent-query.test.ts b/src/sql/recent-query.test.ts index caf5f6fc..2c87787f 100644 --- a/src/sql/recent-query.test.ts +++ b/src/sql/recent-query.test.ts @@ -300,6 +300,7 @@ test("analyze leaves displayQuery undefined for UNION", async () => { expect(rq.displayQuery).toBeUndefined(); }); + test("analyze strips sqlcommenter tags from formattedQuery", async () => { const data = makeRawQuery({ query: "select 1 /*a='1',b='2'*/" }); const rq = await RecentQuery.analyze(data, testHash, 1000); diff --git a/src/sql/recent-query.ts b/src/sql/recent-query.ts index 3e8d5bc2..48e1be6a 100644 --- a/src/sql/recent-query.ts +++ b/src/sql/recent-query.ts @@ -111,22 +111,6 @@ export class RecentQuery { */ private static readonly MAX_ANALYZABLE_QUERY_SIZE = 50_000; - static fromLogEntry(query: string, hash: QueryHash, seenAt: number = Date.now()) { - return RecentQuery.analyze( - { - query, - formattedQuery: query, - username: "", - meanTime: 0, - calls: "1", - rows: "0", - topLevel: true, - }, - hash, - seenAt, - ); - } - static async analyze( data: RawRecentQuery, hash: QueryHash, @@ -151,7 +135,7 @@ export class RecentQuery { ); const analysis = await analyzer.analyze(formattedQuery); const query = this.rewriteQuery(analysis.queryWithoutTags); - const strippedFormattedQuery = analysis.queryWithoutTags; + const strippedFormattedQuery = await RecentQuery.formatQuery(analysis.queryWithoutTags); const displayQuery = await RecentQuery.computeDisplayQuery(query); return new RecentQuery( { ...data, query, formattedQuery: strippedFormattedQuery, displayQuery }, @@ -241,3 +225,7 @@ export type OptimizedQuery = RecentQuery & { export const QueryHash = z.string().brand<"QueryHash">(); export type QueryHash = z.infer; + +export interface RecentQuerySource { + getRecentQueries(): Promise; +} diff --git a/src/sync/pg-connector.ts b/src/sync/pg-connector.ts index 3a95c998..a0e7986c 100644 --- a/src/sync/pg-connector.ts +++ b/src/sync/pg-connector.ts @@ -23,6 +23,7 @@ import { SegmentedQueryCache } from "./seen-cache.ts"; import { FullSchema, FullSchemaColumn } from "./schema_differ.ts"; import { ExtensionNotInstalledError, PostgresError } from "./errors.ts"; import { RawRecentQuery, RecentQuery } from "../sql/recent-query.ts"; +import type { RecentQuerySource } from "../sql/recent-query.ts"; const __filename = fileURLToPath(import.meta.url); const __dirname = dirname(__filename); @@ -86,7 +87,7 @@ export type ResetPgStatStatementsResult = /** * Use {@link ConnectionManager.getConnectorFor} to grab an instance of this class */ -export class PostgresConnector implements DatabaseConnector { +export class PostgresConnector implements DatabaseConnector, RecentQuerySource { private static readonly QUERY_DOCTOR_USER = "query_doctor_db_link"; private readonly tupleEstimates = new Map(); private querySource: QuerySourceExtension | null = null; From 145f6f41d0a68fda2e47eb8333a490c264526779 Mon Sep 17 00:00:00 2001 From: Xetera Date: Wed, 20 May 2026 22:45:48 +0300 Subject: [PATCH 2/2] chore: tests for CI --- src/runner.ci.test.ts | 67 +++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 67 insertions(+) create mode 100644 src/runner.ci.test.ts diff --git a/src/runner.ci.test.ts b/src/runner.ci.test.ts new file mode 100644 index 00000000..a0b27888 --- /dev/null +++ b/src/runner.ci.test.ts @@ -0,0 +1,67 @@ +import { test, expect } from "vitest"; +import { PostgreSqlContainer } from "@testcontainers/postgresql"; +import { testSpawnTarget } from "./remote/remote.test.ts"; +import { Connectable } from "./sync/connectable.ts"; +import { ConnectionManager } from "./sync/connection-manager.ts"; +import { Remote } from "./remote/remote.ts"; +import { Runner } from "./runner.ts"; +import { DEFAULT_CONFIG } from "./config.ts"; + +test("CI mode runs end-to-end against a source db with pg_stat_statements", async () => { + const [sourceDb, targetDb] = await Promise.all([ + new PostgreSqlContainer("postgres:17") + .withCopyContentToContainer([ + { + content: ` + create extension pg_stat_statements; + create table testing(a int, b text); + insert into testing (a, b) values (1, 'hello'); + create index testing_b_idx on testing(b); + select * from testing where a = 10; + select * from testing where b = 'c'; + `, + target: "/docker-entrypoint-initdb.d/init.sql", + }, + ]) + .withCommand(["-c", "shared_preload_libraries=pg_stat_statements"]) + .start(), + testSpawnTarget(), + ]); + + try { + const sourcePostgresUrl = Connectable.fromString(sourceDb.getConnectionUri()); + const targetPostgresUrl = Connectable.fromString(targetDb.getConnectionUri()); + + const remote = new Remote( + targetPostgresUrl, + ConnectionManager.forLocalDatabase(), + ConnectionManager.forRemoteDatabase(), + { disableQueryLoader: true }, + ); + + const sourceManager = ConnectionManager.forRemoteDatabase(); + const source = sourceManager.getConnectorFor(sourcePostgresUrl); + + const runner = await Runner.build({ + targetPostgresUrl, + sourcePostgresUrl, + source, + remote, + }); + + try { + const { reportContext, allResults } = await runner.run(DEFAULT_CONFIG); + + expect(reportContext.queryStats.matched).toBeGreaterThan(0); + expect(allResults.some((q) => q.query.toLowerCase().includes("testing"))) + .toBe(true); + expect(reportContext.metadata.logSize).toBe(-1); + expect(reportContext.error).toBeUndefined(); + } finally { + await runner.close(); + await sourceManager.closeAll(); + } + } finally { + await Promise.all([sourceDb.stop(), targetDb.stop()]); + } +});