diff --git a/packages/appkit/src/connectors/index.ts b/packages/appkit/src/connectors/index.ts index cff0d152d..5fad31d99 100644 --- a/packages/appkit/src/connectors/index.ts +++ b/packages/appkit/src/connectors/index.ts @@ -2,7 +2,6 @@ export * from "./files"; export * from "./genie"; export * from "./jobs"; export * from "./lakebase"; -export * from "./lakebase-v1"; export * from "./mcp"; export * from "./sql-warehouse"; export * from "./vector-search"; diff --git a/packages/appkit/src/connectors/lakebase-v1/client.ts b/packages/appkit/src/connectors/lakebase-v1/client.ts deleted file mode 100644 index 34d193c63..000000000 --- a/packages/appkit/src/connectors/lakebase-v1/client.ts +++ /dev/null @@ -1,585 +0,0 @@ -import { randomUUID } from "node:crypto"; -import type { WorkspaceClient } from "@databricks/sdk-experimental"; -import { ApiClient, Config } from "@databricks/sdk-experimental"; -import pg from "pg"; -import { - AppKitError, - AuthenticationError, - ConfigurationError, - ConnectionError, - ValidationError, -} from "../../errors"; -import { createLogger } from "../../logging/logger"; -import { - type Counter, - type Histogram, - SpanStatusCode, - TelemetryManager, - type TelemetryProvider, -} from "../../telemetry"; -import { deepMerge } from "../../utils"; -import { lakebaseV1Defaults } from "./defaults"; -import type { - LakebaseV1Config, - LakebaseV1ConnectionConfig, - LakebaseV1Credentials, -} from "./types"; - -const logger = createLogger("connectors:lakebase-v1"); - -/** - * Enterprise-grade connector for Databricks Lakebase Provisioned - * - * @deprecated This connector is for Lakebase Provisioned only. - * For new projects, use Lakebase Autoscaling instead: https://docs.databricks.com/aws/en/oltp/projects/ - * - * This connector is compatible with Lakebase Provisioned: https://docs.databricks.com/aws/en/oltp/instances/ - * - * Lakebase Autoscaling offers: - * - Automatic compute scaling - * - Scale-to-zero for cost optimization - * - Database branching for development - * - Instant restore capabilities - * - * Use the new LakebaseConnector (coming in a future release) for Lakebase Autoscaling support. - * - * @example Simplest - everything from env/context - * ```typescript - * const connector = new LakebaseV1Connector(); - * await connector.query('SELECT * FROM users'); - * ``` - * - * @example With explicit connection string - * ```typescript - * const connector = new LakebaseV1Connector({ - * connectionString: 'postgresql://...' - * }); - * ``` - */ -export class LakebaseV1Connector { - private readonly name: string = "lakebase-v1"; - private readonly CACHE_BUFFER_MS = 2 * 60 * 1000; - private readonly config: LakebaseV1Config; - private readonly connectionConfig: LakebaseV1ConnectionConfig; - private pool: pg.Pool | null = null; - private credentials: LakebaseV1Credentials | null = null; - - // telemetry - private readonly telemetry: TelemetryProvider; - private readonly telemetryMetrics: { - queryCount: Counter; - queryDuration: Histogram; - }; - - constructor(userConfig?: Partial) { - this.config = deepMerge(lakebaseV1Defaults, userConfig); - this.connectionConfig = this.parseConnectionConfig(); - - this.telemetry = TelemetryManager.getProvider( - this.name, - this.config.telemetry, - ); - this.telemetryMetrics = { - queryCount: this.telemetry - .getMeter() - .createCounter("lakebase.v1.query.count", { - description: "Total number of queries executed", - unit: "1", - }), - queryDuration: this.telemetry - .getMeter() - .createHistogram("lakebase.v1.query.duration", { - description: "Duration of queries executed", - unit: "ms", - }), - }; - - // validate configuration - if (this.config.maxPoolSize < 1) { - throw ValidationError.invalidValue( - "maxPoolSize", - this.config.maxPoolSize, - "at least 1", - ); - } - } - - /** - * Execute a SQL query - * - * @example - * ```typescript - * const users = await connector.query('SELECT * FROM users'); - * const user = await connector.query('SELECT * FROM users WHERE id = $1', [123]); - * ``` - */ - async query( - sql: string, - params?: any[], - retryCount: number = 0, - ): Promise> { - const startTime = Date.now(); - - return this.telemetry.startActiveSpan( - "lakebase.v1.query", - { - attributes: { - "db.system": "lakebase-v1", - "db.statement": sql.substring(0, 500), - "db.retry_count": retryCount, - }, - }, - async (span) => { - try { - const pool = await this.getPool(); - const result = await pool.query(sql, params); - span.setAttribute("db.rows_affected", result.rowCount ?? 0); - span.setStatus({ code: SpanStatusCode.OK }); - return result; - } catch (error) { - // retry on auth failure - if (this.isAuthError(error)) { - span.addEvent("auth_error_retry"); - await this.rotateCredentials(); - const newPool = await this.getPool(); - const result = await newPool.query(sql, params); - span.setAttribute("db.rows_affected", result.rowCount ?? 0); - span.setStatus({ code: SpanStatusCode.OK }); - return result; - } - - // retry on transient errors, but only once - if (this.isTransientError(error) && retryCount < 1) { - span.addEvent("transient_error_retry"); - await new Promise((resolve) => setTimeout(resolve, 100)); - return await this.query(sql, params, retryCount + 1); - } - - span.recordException(error as Error); - span.setStatus({ code: SpanStatusCode.ERROR }); - - logger.error( - "Query execution failed: %s (code=%s)", - error instanceof Error ? error.message : String(error), - (error as any)?.code, - ); - - if (error instanceof AppKitError) { - throw error; - } - throw ConnectionError.queryFailed(error as Error); - } finally { - const duration = Date.now() - startTime; - this.telemetryMetrics.queryCount.add(1); - this.telemetryMetrics.queryDuration.record(duration); - span.end(); - } - }, - ); - } - - /** - * Execute a transaction - * - * COMMIT and ROLLBACK are automatically managed by the transaction function. - * - * @param callback - Callback function to execute within the transaction context - * @example - * ```typescript - * await connector.transaction(async (client) => { - * await client.query('INSERT INTO accounts (name) VALUES ($1)', ['Alice']); - * await client.query('INSERT INTO logs (action) VALUES ($1)', ['Created Alice']); - * }); - * ``` - */ - async transaction( - callback: (client: pg.PoolClient) => Promise, - retryCount: number = 0, - ): Promise { - const startTime = Date.now(); - return this.telemetry.startActiveSpan( - "lakebase.v1.transaction", - { - attributes: { - "db.system": "lakebase-v1", - "db.retry_count": retryCount, - }, - }, - async (span) => { - const pool = await this.getPool(); - const client = await pool.connect(); - try { - await client.query("BEGIN"); - const result = await callback(client); - await client.query("COMMIT"); - span.setStatus({ code: SpanStatusCode.OK }); - return result; - } catch (error) { - try { - await client.query("ROLLBACK"); - } catch {} - // retry on auth failure - if (this.isAuthError(error)) { - span.addEvent("auth_error_retry"); - client.release(); - await this.rotateCredentials(); - const newPool = await this.getPool(); - const retryClient = await newPool.connect(); - try { - await client.query("BEGIN"); - const result = await callback(retryClient); - await client.query("COMMIT"); - span.setStatus({ code: SpanStatusCode.OK }); - return result; - } catch (retryError) { - try { - await retryClient.query("ROLLBACK"); - } catch {} - throw retryError; - } finally { - retryClient.release(); - } - } - - // retry on transient errors, but only once - if (this.isTransientError(error) && retryCount < 1) { - span.addEvent("transaction_error_retry"); - client.release(); - await new Promise((resolve) => setTimeout(resolve, 100)); - return await this.transaction(callback, retryCount + 1); - } - span.recordException(error as Error); - span.setStatus({ code: SpanStatusCode.ERROR }); - - logger.error( - "Transaction execution failed: %s (code=%s)", - error instanceof Error ? error.message : String(error), - (error as any)?.code, - ); - - if (error instanceof AppKitError) { - throw error; - } - throw ConnectionError.transactionFailed(error as Error); - } finally { - client.release(); - const duration = Date.now() - startTime; - this.telemetryMetrics.queryCount.add(1); - this.telemetryMetrics.queryDuration.record(duration); - span.end(); - } - }, - ); - } - - /** Check if database connection is healthy */ - async healthCheck(): Promise { - return this.telemetry.startActiveSpan( - "lakebase.v1.healthCheck", - {}, - async (span) => { - try { - const result = await this.query<{ result: number }>( - "SELECT 1 as result", - ); - const healthy = result.rows[0]?.result === 1; - span.setAttribute("db.healthy", healthy); - span.setStatus({ code: SpanStatusCode.OK }); - return healthy; - } catch { - span.setAttribute("db.healthy", false); - span.setStatus({ code: SpanStatusCode.ERROR }); - return false; - } finally { - span.end(); - } - }, - ); - } - - /** Close connection pool (call on shutdown) */ - async close(): Promise { - if (this.pool) { - await this.pool.end().catch((error: unknown) => { - logger.error("Error closing connection pool: %O", error); - }); - this.pool = null; - } - this.credentials = null; - } - - /** Setup graceful shutdown to close connection pools */ - shutdown(): void { - process.on("SIGTERM", () => this.close()); - process.on("SIGINT", () => this.close()); - this.close(); - } - - /** Get Databricks workspace client - from config or execution context */ - private getWorkspaceClient(): WorkspaceClient { - if (this.config.workspaceClient) { - return this.config.workspaceClient; - } - - try { - const { getWorkspaceClient: getClient } = require("../../context"); - const client = getClient(); - - // cache it for subsequent calls - this.config.workspaceClient = client; - return client; - } catch (_error) { - throw ConnectionError.clientUnavailable( - "Databricks workspace client", - "Either pass it in config or ensure ServiceContext is initialized", - ); - } - } - - /** Get or create connection pool */ - private async getPool(): Promise { - if (!this.connectionConfig) { - throw ConfigurationError.invalidConnection( - "Lakebase", - "Set PGHOST, PGDATABASE, PGAPPNAME env vars, provide a connectionString, or pass explicit config", - ); - } - - if (!this.pool) { - const creds = await this.getCredentials(); - this.pool = this.createPool(creds); - } - return this.pool; - } - - /** Create PostgreSQL pool */ - private createPool(credentials: { - username: string; - password: string; - }): pg.Pool { - const { host, database, port, sslMode } = this.connectionConfig; - - const pool = new pg.Pool({ - host, - port, - database, - user: credentials.username, - password: credentials.password, - max: this.config.maxPoolSize, - idleTimeoutMillis: this.config.idleTimeoutMs, - connectionTimeoutMillis: this.config.connectionTimeoutMs, - ssl: sslMode === "require" ? { rejectUnauthorized: true } : false, - }); - - pool.on("error", (error: Error & { code?: string }) => { - logger.error( - "Connection pool error: %s (code: %s)", - error.message, - error.code, - ); - }); - - return pool; - } - - /** Get or fetch credentials with caching */ - private async getCredentials(): Promise<{ - username: string; - password: string; - }> { - const now = Date.now(); - - // return cached if still valid - if ( - this.credentials && - now < this.credentials.expiresAt - this.CACHE_BUFFER_MS - ) { - return this.credentials; - } - - // fetch new credentials - const username = await this.fetchUsername(); - const { token, expiresAt } = await this.fetchPassword(); - - this.credentials = { - username, - password: token, - expiresAt, - }; - - return { username, password: token }; - } - - /** Rotate credentials and recreate pool */ - private async rotateCredentials(): Promise { - // clear cached credentials - this.credentials = null; - - if (this.pool) { - const oldPool = this.pool; - this.pool = null; - oldPool.end().catch((error: unknown) => { - logger.error( - "Error closing old connection pool during rotation: %O", - error, - ); - }); - } - } - - /** Fetch username from Databricks */ - private async fetchUsername(): Promise { - const workspaceClient = this.getWorkspaceClient(); - const user = await workspaceClient.currentUser.me(); - if (!user.userName) { - throw AuthenticationError.userLookupFailed(); - } - return user.userName; - } - - /** Fetch password (OAuth token) from Databricks */ - private async fetchPassword(): Promise<{ token: string; expiresAt: number }> { - const workspaceClient = this.getWorkspaceClient(); - const config = new Config({ host: workspaceClient.config.host }); - const apiClient = new ApiClient(config); - - if (!this.connectionConfig.appName) { - throw ConfigurationError.resourceNotFound("Database app name"); - } - - const credentials = await apiClient.request({ - path: `/api/2.0/database/credentials`, - method: "POST", - headers: new Headers(), - raw: false, - payload: { - instance_names: [this.connectionConfig.appName], - request_id: randomUUID(), - }, - }); - - if (!this.validateCredentials(credentials)) { - throw AuthenticationError.credentialsFailed( - this.connectionConfig.appName, - ); - } - - const expiresAt = new Date(credentials.expiration_time).getTime(); - - return { token: credentials.token, expiresAt }; - } - - /** Check if error is auth failure */ - private isAuthError(error: unknown): boolean { - return ( - typeof error === "object" && - error !== null && - "code" in error && - (error as any).code === "28P01" - ); - } - - /** Check if error is transient */ - private isTransientError(error: unknown): boolean { - if (typeof error !== "object" || error === null || !("code" in error)) { - return false; - } - - const code = (error as any).code; - return ( - code === "ECONNRESET" || - code === "ECONNREFUSED" || - code === "ETIMEDOUT" || - code === "57P01" || // admin_shutdown - code === "57P03" || // cannot_connect_now - code === "08006" || // connection_failure - code === "08003" || // connection_does_not_exist - code === "08000" // connection_exception - ); - } - - /** Type guard for credentials */ - private validateCredentials( - value: unknown, - ): value is { token: string; expiration_time: string } { - if (typeof value !== "object" || value === null) { - return false; - } - - const credentials = value as { token: string; expiration_time: string }; - return ( - "token" in credentials && - typeof credentials.token === "string" && - "expiration_time" in credentials && - typeof credentials.expiration_time === "string" && - new Date(credentials.expiration_time).getTime() > Date.now() - ); - } - - /** Parse connection configuration from config or environment */ - private parseConnectionConfig(): LakebaseV1ConnectionConfig { - if (this.config.connectionString) { - return this.parseConnectionString(this.config.connectionString); - } - - // get connection from config - if (this.config.host && this.config.database && this.config.appName) { - return { - host: this.config.host, - database: this.config.database, - port: this.config.port ?? 5432, - sslMode: this.config.sslMode ?? "require", - appName: this.config.appName, - }; - } - - // get connection from environment variables - const pgHost = process.env.PGHOST; - const pgDatabase = process.env.PGDATABASE; - const pgAppName = process.env.PGAPPNAME; - if (!pgHost || !pgDatabase || !pgAppName) { - throw ConfigurationError.invalidConnection( - "Lakebase", - "Required env vars: PGHOST, PGDATABASE, PGAPPNAME. Optional: PGPORT (default: 5432), PGSSLMODE (default: require)", - ); - } - const pgPort = process.env.PGPORT; - const port = pgPort ? parseInt(pgPort, 10) : 5432; - - if (Number.isNaN(port)) { - throw ValidationError.invalidValue("port", pgPort, "a number"); - } - - const pgSSLMode = process.env.PGSSLMODE; - const sslMode = - (pgSSLMode as "require" | "disable" | "prefer") || "require"; - - return { - host: pgHost, - database: pgDatabase, - port, - sslMode, - appName: pgAppName, - }; - } - - private parseConnectionString( - connectionString: string, - ): LakebaseV1ConnectionConfig { - const url = new URL(connectionString); - const appName = url.searchParams.get("appName"); - if (!appName) { - throw ConfigurationError.missingConnectionParam("appName"); - } - - return { - host: url.hostname, - database: url.pathname.slice(1), // remove leading slash - port: url.port ? parseInt(url.port, 10) : 5432, - sslMode: - (url.searchParams.get("sslmode") as "require" | "disable" | "prefer") ?? - "require", - appName: appName, - }; - } -} diff --git a/packages/appkit/src/connectors/lakebase-v1/defaults.ts b/packages/appkit/src/connectors/lakebase-v1/defaults.ts deleted file mode 100644 index 1a8abdf57..000000000 --- a/packages/appkit/src/connectors/lakebase-v1/defaults.ts +++ /dev/null @@ -1,15 +0,0 @@ -import type { LakebaseV1Config } from "./types"; - -/** - * Default configuration for Lakebase V1 connector - * - * @deprecated This connector is for Lakebase Provisioned only. - * For new projects, use Lakebase Autoscaling: https://docs.databricks.com/aws/en/oltp/projects/ - */ -export const lakebaseV1Defaults: LakebaseV1Config = { - port: 5432, - sslMode: "require", - maxPoolSize: 10, - idleTimeoutMs: 30_000, - connectionTimeoutMs: 10_000, -}; diff --git a/packages/appkit/src/connectors/lakebase-v1/index.ts b/packages/appkit/src/connectors/lakebase-v1/index.ts deleted file mode 100644 index 4aa771bd4..000000000 --- a/packages/appkit/src/connectors/lakebase-v1/index.ts +++ /dev/null @@ -1 +0,0 @@ -export { LakebaseV1Connector } from "./client"; diff --git a/packages/appkit/src/connectors/lakebase-v1/types.ts b/packages/appkit/src/connectors/lakebase-v1/types.ts deleted file mode 100644 index fcaddaada..000000000 --- a/packages/appkit/src/connectors/lakebase-v1/types.ts +++ /dev/null @@ -1,90 +0,0 @@ -import type { WorkspaceClient } from "@databricks/sdk-experimental"; -import type { TelemetryOptions } from "shared"; - -/** - * Configuration for LakebaseV1Connector - * - * @deprecated This connector is for Lakebase Provisioned only. - * For new projects, use Lakebase Autoscaling instead: https://docs.databricks.com/aws/en/oltp/projects/ - * - * This connector is compatible with Lakebase Provisioned: https://docs.databricks.com/aws/en/oltp/instances/ - * - * Lakebase Autoscaling offers: - * - Automatic compute scaling - * - Scale-to-zero for cost optimization - * - Database branching for development - * - Instant restore capabilities - * - * Use the new LakebaseConnector (coming in a future release) for Lakebase Autoscaling support. - */ -export interface LakebaseV1Config { - /** Databricks workspace client */ - workspaceClient?: WorkspaceClient; - - /** Connection string */ - connectionString?: string; - - /** Database host (e.g., instance-uuid.database.region.databricks.com) */ - host?: string; - - /** Database name */ - database?: string; - - /** Database port */ - port: number; - - /** App name */ - appName?: string; - - /** SSL mode */ - sslMode: "require" | "disable" | "prefer"; - - /** Maximum number of connections in the pool */ - maxPoolSize: number; - - /** Close idle connections after this time (milliseconds) */ - idleTimeoutMs: number; - - /** Connection timeout (milliseconds) */ - connectionTimeoutMs: number; - - /** Telemetry configuration */ - telemetry?: TelemetryOptions; - - /** Additional configuration options */ - [key: string]: unknown; -} - -/** - * Lakebase V1 credentials for authentication - * - * @deprecated This type is for Lakebase Provisioned only. - * For new projects, use Lakebase Autoscaling: https://docs.databricks.com/aws/en/oltp/projects/ - */ -export interface LakebaseV1Credentials { - /** Username */ - username: string; - /** Password */ - password: string; - /** Expires at */ - expiresAt: number; -} - -/** - * Internal connection configuration for Lakebase V1 - * - * @deprecated This type is for Lakebase Provisioned only. - * For new projects, use Lakebase Autoscaling: https://docs.databricks.com/aws/en/oltp/projects/ - */ -export interface LakebaseV1ConnectionConfig { - /** Database host */ - readonly host: string; - /** Database name */ - readonly database: string; - /** Database port */ - readonly port: number; - /** SSL mode */ - readonly sslMode: "require" | "disable" | "prefer"; - /** App name */ - readonly appName?: string; -} diff --git a/packages/appkit/src/connectors/tests/lakebase-v1.test.ts b/packages/appkit/src/connectors/tests/lakebase-v1.test.ts deleted file mode 100644 index db7779edf..000000000 --- a/packages/appkit/src/connectors/tests/lakebase-v1.test.ts +++ /dev/null @@ -1,609 +0,0 @@ -import { beforeEach, describe, expect, test, vi } from "vitest"; -import { LakebaseV1Connector } from "../lakebase-v1"; - -// Mock pg module -vi.mock("pg", () => { - const mockQuery = vi.fn(); - const mockConnect = vi.fn(); - const mockEnd = vi.fn().mockResolvedValue(undefined); - const mockRelease = vi.fn(); - const mockOn = vi.fn(); - - const MockPool = vi.fn(() => ({ - query: mockQuery, - connect: mockConnect, - end: mockEnd, - on: mockOn, - })); - - return { - default: { Pool: MockPool }, - Pool: MockPool, - __mockQuery: mockQuery, - __mockConnect: mockConnect, - __mockEnd: mockEnd, - __mockRelease: mockRelease, - __mockOn: mockOn, - __MockPool: MockPool, - }; -}); - -// Mock Databricks SDK -vi.mock("@databricks/sdk-experimental", () => { - const mockMe = vi.fn(); - const mockRequest = vi.fn(); - - const MockWorkspaceClient = vi.fn(() => ({ - currentUser: { me: mockMe }, - config: { host: "https://test.databricks.com" }, - })); - - const MockApiClient = vi.fn(() => ({ - request: mockRequest, - })); - - const MockConfig = vi.fn(() => ({})); - - return { - WorkspaceClient: MockWorkspaceClient, - ApiClient: MockApiClient, - Config: MockConfig, - __mockMe: mockMe, - __mockRequest: mockRequest, - __MockWorkspaceClient: MockWorkspaceClient, - __MockApiClient: MockApiClient, - }; -}); - -describe("LakebaseV1Connector", () => { - beforeEach(() => { - vi.clearAllMocks(); - // Set required env vars - process.env.PGHOST = "test-host.databricks.com"; - process.env.PGDATABASE = "test-db"; - process.env.PGAPPNAME = "test-app"; - }); - - describe("configuration", () => { - test("should throw error when maxPoolSize is less than 1", () => { - expect( - () => - new LakebaseV1Connector({ - maxPoolSize: 0, - workspaceClient: {} as any, - }), - ).toThrow("Invalid value for maxPoolSize"); - }); - - test("should create connector with valid config", () => { - const connector = new LakebaseV1Connector({ - workspaceClient: {} as any, - }); - - expect(connector).toBeInstanceOf(LakebaseV1Connector); - }); - - test("should throw when env vars are missing", () => { - delete process.env.PGHOST; - delete process.env.PGDATABASE; - delete process.env.PGAPPNAME; - - expect(() => new LakebaseV1Connector()).toThrow( - "Lakebase connection not configured", - ); - }); - - test("should throw when PGPORT is invalid", () => { - process.env.PGPORT = "invalid"; - - expect(() => new LakebaseV1Connector()).toThrow("Invalid value for port"); - }); - - test("should parse env vars correctly", () => { - process.env.PGPORT = "5433"; - process.env.PGSSLMODE = "disable"; - - const connector = new LakebaseV1Connector(); - - expect(connector).toBeInstanceOf(LakebaseV1Connector); - }); - - test("should use explicit config over env vars", () => { - const connector = new LakebaseV1Connector({ - host: "explicit-host.databricks.com", - database: "explicit-db", - appName: "explicit-app", - port: 5434, - sslMode: "prefer", - workspaceClient: {} as any, - }); - - expect(connector).toBeInstanceOf(LakebaseV1Connector); - }); - }); - - describe("query", () => { - let connector: LakebaseV1Connector; - let mockQuery: ReturnType; - let mockMe: ReturnType; - let mockRequest: ReturnType; - - beforeEach(async () => { - const pg = await import("pg"); - const sdk = await import("@databricks/sdk-experimental"); - - mockQuery = (pg as any).__mockQuery; - mockMe = (sdk as any).__mockMe; - mockRequest = (sdk as any).__mockRequest; - - // Setup default mocks - mockMe.mockResolvedValue({ userName: "test-user@example.com" }); - mockRequest.mockResolvedValue({ - token: "test-oauth-token", - expiration_time: new Date(Date.now() + 3600000).toISOString(), // 1 hour from now - }); - mockQuery.mockResolvedValue({ rows: [{ result: 1 }] }); - - connector = new LakebaseV1Connector({ - workspaceClient: { - currentUser: { me: mockMe }, - config: { host: "https://test.databricks.com" }, - } as any, - }); - }); - - test("should execute query successfully", async () => { - const result = await connector.query("SELECT 1 as result"); - - expect(result.rows).toEqual([{ result: 1 }]); - expect(mockQuery).toHaveBeenCalledWith("SELECT 1 as result", undefined); - }); - - test("should execute query with parameters", async () => { - mockQuery.mockResolvedValue({ rows: [{ id: 1, name: "test" }] }); - - const result = await connector.query( - "SELECT * FROM users WHERE id = $1", - [1], - ); - - expect(result.rows).toEqual([{ id: 1, name: "test" }]); - expect(mockQuery).toHaveBeenCalledWith( - "SELECT * FROM users WHERE id = $1", - [1], - ); - }); - - test("should retry on auth error (28P01)", async () => { - const authError = new Error("auth failed") as any; - authError.code = "28P01"; - - mockQuery - .mockRejectedValueOnce(authError) - .mockResolvedValue({ rows: [{ result: 1 }] }); - - const result = await connector.query("SELECT 1"); - - expect(result.rows).toEqual([{ result: 1 }]); - expect(mockQuery).toHaveBeenCalledTimes(2); - }); - - test("should retry once on transient error", async () => { - const transientError = new Error("connection reset") as any; - transientError.code = "ECONNRESET"; - - mockQuery - .mockRejectedValueOnce(transientError) - .mockResolvedValue({ rows: [{ result: 1 }] }); - - const result = await connector.query("SELECT 1"); - - expect(result.rows).toEqual([{ result: 1 }]); - expect(mockQuery).toHaveBeenCalledTimes(2); - }); - - test("should not retry transient error more than once", async () => { - const transientError = new Error("connection reset") as any; - transientError.code = "ECONNRESET"; - - mockQuery.mockRejectedValue(transientError); - - await expect(connector.query("SELECT 1")).rejects.toThrow("Query failed"); - expect(mockQuery).toHaveBeenCalledTimes(2); - }); - - test("should throw non-retriable errors immediately", async () => { - const syntaxError = new Error("syntax error") as any; - syntaxError.code = "42601"; - - mockQuery.mockRejectedValue(syntaxError); - - await expect(connector.query("SELEC 1")).rejects.toThrow("Query failed"); - expect(mockQuery).toHaveBeenCalledTimes(1); - }); - - test("should not log the SQL query string on error", async () => { - const errorSpy = vi.spyOn(console, "error").mockImplementation(() => {}); - - const pgError = new Error('relation "users" does not exist') as any; - pgError.code = "42P01"; - pgError.query = "SELECT secret_token FROM users WHERE id = $1"; - pgError.parameters = ["sensitive-user-id-123"]; - - mockQuery.mockRejectedValue(pgError); - - await expect( - connector.query("SELECT secret_token FROM users WHERE id = $1", [ - "sensitive-user-id-123", - ]), - ).rejects.toThrow("Query failed"); - - const loggedOutput = errorSpy.mock.calls - .map((call) => call.join(" ")) - .join(" "); - - // Should log the error message and code (useful for debugging) - expect(loggedOutput).toContain('relation "users" does not exist'); - expect(loggedOutput).toContain("42P01"); - - // Should NOT log the raw query or parameter values - expect(loggedOutput).not.toContain("secret_token"); - expect(loggedOutput).not.toContain("sensitive-user-id-123"); - - errorSpy.mockRestore(); - }); - - // Locks in the contract for pg-shaped errors so a future change to e.g. - // `String(error)` cannot silently re-introduce `error.query` / - // `error.parameters` into the log output. The error code and a stable - // message prefix are kept (debugging signal); the bound parameter values - // and full query string are not. - test("redacts pg-shaped error fields when message also leaks the literal", async () => { - const errorSpy = vi.spyOn(console, "error").mockImplementation(() => {}); - - const sensitiveLiteral = "9d4f3a72-aaaa-bbbb-cccc-deadbeefcafe"; - const pgError = new Error( - `invalid input syntax for type uuid: "${sensitiveLiteral}"`, - ) as any; - pgError.code = "22P02"; - pgError.query = - "SELECT api_key FROM secrets WHERE owner_id = $1 AND tenant = $2"; - pgError.parameters = [sensitiveLiteral, "tenant-acme"]; - pgError.routine = "string_to_uuid"; - pgError.severity = "ERROR"; - - mockQuery.mockRejectedValue(pgError); - - await expect( - connector.query( - "SELECT api_key FROM secrets WHERE owner_id = $1 AND tenant = $2", - [sensitiveLiteral, "tenant-acme"], - ), - ).rejects.toThrow("Query failed"); - - const loggedOutput = errorSpy.mock.calls - .map((call) => call.join(" ")) - .join(" "); - - // Code is logged (stable class label, safe for monitoring) - expect(loggedOutput).toContain("22P02"); - // Message prefix is logged (no sensitive literal in this stable portion) - expect(loggedOutput).toContain("invalid input syntax for type uuid"); - // The raw query string and other sensitive parameter values stay out - expect(loggedOutput).not.toContain("api_key"); - expect(loggedOutput).not.toContain("FROM secrets"); - expect(loggedOutput).not.toContain("tenant-acme"); - - errorSpy.mockRestore(); - }); - }); - - describe("transaction", () => { - let connector: LakebaseV1Connector; - let mockConnect: ReturnType; - let mockMe: ReturnType; - let mockRequest: ReturnType; - - beforeEach(async () => { - const pg = await import("pg"); - const sdk = await import("@databricks/sdk-experimental"); - - mockConnect = (pg as any).__mockConnect; - mockMe = (sdk as any).__mockMe; - mockRequest = (sdk as any).__mockRequest; - - mockMe.mockResolvedValue({ userName: "test-user@example.com" }); - mockRequest.mockResolvedValue({ - token: "test-oauth-token", - expiration_time: new Date(Date.now() + 3600000).toISOString(), - }); - - const mockClient = { - query: vi.fn().mockResolvedValue({ rows: [] }), - release: vi.fn(), - }; - mockConnect.mockResolvedValue(mockClient); - - connector = new LakebaseV1Connector({ - workspaceClient: { - currentUser: { me: mockMe }, - config: { host: "https://test.databricks.com" }, - } as any, - }); - }); - - test("should execute transaction successfully", async () => { - const result = await connector.transaction(async (client) => { - await client.query("BEGIN"); - await client.query("INSERT INTO test VALUES (1)"); - await client.query("COMMIT"); - return "success"; - }); - - expect(result).toBe("success"); - }); - - test("should release client after transaction", async () => { - const mockClient = { - query: vi.fn().mockResolvedValue({ rows: [] }), - release: vi.fn(), - }; - mockConnect.mockResolvedValue(mockClient); - - await connector.transaction(async (client) => { - await client.query("SELECT 1"); - return "done"; - }); - - expect(mockClient.release).toHaveBeenCalled(); - }); - - test("should not log the SQL query string on transaction error", async () => { - const errorSpy = vi.spyOn(console, "error").mockImplementation(() => {}); - - const pgError = new Error("duplicate key value") as any; - pgError.code = "23505"; - pgError.query = "INSERT INTO users (email, secret) VALUES ($1, $2)"; - pgError.parameters = ["user@test.com", "super-secret-value"]; - - const failingClient = { - query: vi.fn().mockImplementation((sql: string) => { - if (sql === "BEGIN" || sql === "ROLLBACK") { - return Promise.resolve({ rows: [] }); - } - return Promise.reject(pgError); - }), - release: vi.fn(), - }; - mockConnect.mockResolvedValue(failingClient); - - await expect( - connector.transaction(async (client) => { - await client.query( - "INSERT INTO users (email, secret) VALUES ($1, $2)", - ); - }), - ).rejects.toThrow(); - - const loggedOutput = errorSpy.mock.calls - .map((call) => call.join(" ")) - .join(" "); - - // Should log the error message and code - expect(loggedOutput).toContain("duplicate key value"); - expect(loggedOutput).toContain("23505"); - - // Should NOT log the raw query or parameter values - expect(loggedOutput).not.toContain("super-secret-value"); - expect(loggedOutput).not.toContain("INSERT INTO users"); - - errorSpy.mockRestore(); - }); - }); - - describe("healthCheck", () => { - let connector: LakebaseV1Connector; - let mockQuery: ReturnType; - let mockMe: ReturnType; - let mockRequest: ReturnType; - - beforeEach(async () => { - const pg = await import("pg"); - const sdk = await import("@databricks/sdk-experimental"); - - mockQuery = (pg as any).__mockQuery; - mockMe = (sdk as any).__mockMe; - mockRequest = (sdk as any).__mockRequest; - - mockMe.mockResolvedValue({ userName: "test-user@example.com" }); - mockRequest.mockResolvedValue({ - token: "test-oauth-token", - expiration_time: new Date(Date.now() + 3600000).toISOString(), - }); - - connector = new LakebaseV1Connector({ - workspaceClient: { - currentUser: { me: mockMe }, - config: { host: "https://test.databricks.com" }, - } as any, - }); - }); - - test("should return true when database is healthy", async () => { - mockQuery.mockResolvedValue({ rows: [{ result: 1 }] }); - - const isHealthy = await connector.healthCheck(); - - expect(isHealthy).toBe(true); - }); - - test("should return false when database is unhealthy", async () => { - mockQuery.mockRejectedValue(new Error("connection failed")); - - const isHealthy = await connector.healthCheck(); - - expect(isHealthy).toBe(false); - }); - - test("should return false when result is unexpected", async () => { - mockQuery.mockResolvedValue({ rows: [{ result: 0 }] }); - - const isHealthy = await connector.healthCheck(); - - expect(isHealthy).toBe(false); - }); - }); - - describe("close", () => { - let connector: LakebaseV1Connector; - let mockEnd: ReturnType; - let mockQuery: ReturnType; - let mockMe: ReturnType; - let mockRequest: ReturnType; - - beforeEach(async () => { - const pg = await import("pg"); - const sdk = await import("@databricks/sdk-experimental"); - - mockEnd = (pg as any).__mockEnd; - mockQuery = (pg as any).__mockQuery; - mockMe = (sdk as any).__mockMe; - mockRequest = (sdk as any).__mockRequest; - - mockMe.mockResolvedValue({ userName: "test-user@example.com" }); - mockRequest.mockResolvedValue({ - token: "test-oauth-token", - expiration_time: new Date(Date.now() + 3600000).toISOString(), - }); - mockQuery.mockResolvedValue({ rows: [{ result: 1 }] }); - mockEnd.mockResolvedValue(undefined); - - connector = new LakebaseV1Connector({ - workspaceClient: { - currentUser: { me: mockMe }, - config: { host: "https://test.databricks.com" }, - } as any, - }); - }); - - test("should close connection pool", async () => { - // Initialize pool by making a query - await connector.query("SELECT 1"); - - await connector.close(); - - expect(mockEnd).toHaveBeenCalled(); - }); - - test("should handle close when pool not initialized", async () => { - // Don't make any queries, pool is not initialized - await expect(connector.close()).resolves.not.toThrow(); - }); - }); - - describe("credentials", () => { - let mockMe: ReturnType; - let mockRequest: ReturnType; - let mockQuery: ReturnType; - - beforeEach(async () => { - const pg = await import("pg"); - const sdk = await import("@databricks/sdk-experimental"); - - mockQuery = (pg as any).__mockQuery; - mockMe = (sdk as any).__mockMe; - mockRequest = (sdk as any).__mockRequest; - - mockQuery.mockResolvedValue({ rows: [{ result: 1 }] }); - }); - - test("should throw when username cannot be fetched", async () => { - mockMe.mockResolvedValue({ userName: null }); - mockRequest.mockResolvedValue({ token: "test-token" }); - - const connector = new LakebaseV1Connector({ - workspaceClient: { - currentUser: { me: mockMe }, - config: { host: "https://test.databricks.com" }, - } as any, - }); - - await expect(connector.query("SELECT 1")).rejects.toThrow( - "Failed to get current user", - ); - }); - - test("should throw when token cannot be fetched", async () => { - mockMe.mockResolvedValue({ userName: "test-user@example.com" }); - mockRequest.mockResolvedValue({ error: "unauthorized" }); // missing token and expiration_time - - const connector = new LakebaseV1Connector({ - workspaceClient: { - currentUser: { me: mockMe }, - config: { host: "https://test.databricks.com" }, - } as any, - }); - - await expect(connector.query("SELECT 1")).rejects.toThrow( - "Failed to generate credentials", - ); - }); - }); - - describe("transient error codes", () => { - let connector: LakebaseV1Connector; - let mockQuery: ReturnType; - let mockMe: ReturnType; - let mockRequest: ReturnType; - - beforeEach(async () => { - const pg = await import("pg"); - const sdk = await import("@databricks/sdk-experimental"); - - mockQuery = (pg as any).__mockQuery; - mockMe = (sdk as any).__mockMe; - mockRequest = (sdk as any).__mockRequest; - - mockMe.mockResolvedValue({ userName: "test-user@example.com" }); - mockRequest.mockResolvedValue({ - token: "test-oauth-token", - expiration_time: new Date(Date.now() + 3600000).toISOString(), - }); - - connector = new LakebaseV1Connector({ - workspaceClient: { - currentUser: { me: mockMe }, - config: { host: "https://test.databricks.com" }, - } as any, - }); - }); - - const transientCodes = [ - "ECONNRESET", - "ECONNREFUSED", - "ETIMEDOUT", - "57P01", // admin_shutdown - "57P03", // cannot_connect_now - "08006", // connection_failure - "08003", // connection_does_not_exist - "08000", // connection_exception - ]; - - test.each(transientCodes)( - "should retry on transient error code: %s", - async (code) => { - const error = new Error(`transient error ${code}`) as any; - error.code = code; - - mockQuery - .mockRejectedValueOnce(error) - .mockResolvedValue({ rows: [{ result: 1 }] }); - - const result = await connector.query("SELECT 1"); - - expect(result.rows).toEqual([{ result: 1 }]); - expect(mockQuery).toHaveBeenCalledTimes(2); - }, - ); - }); -});