Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion packages/opencode/src/altimate/telemetry/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -644,8 +644,14 @@ export namespace Telemetry {
session_id: string
/** skipped = no cache or stale, passed = valid SQL, blocked = invalid SQL caught, error = validation itself failed */
outcome: "skipped" | "passed" | "blocked" | "error"
/** why: no_cache, stale_cache, empty_cache, valid, non_structural, structural_error, validation_exception */
/** why: no_cache, stale_cache, empty_cache, valid, non_structural, structural_error, dispatcher_failed, validation_exception */
reason: string
/** warehouse driver type (postgres, snowflake, bigquery, ...) — enables per-warehouse catch-rate analysis */
warehouse_type: string
/** read / write / unknown — enables per-query-type analysis */
query_type: string
/** SHA-256 prefix of masked SQL — join key to sql_execute_failure events for same query */
masked_sql_hash: string
schema_columns: number
/** true when schema scan hit the column-scan cap — flags samples biased by large-warehouse truncation */
schema_truncated: boolean
Expand Down
93 changes: 61 additions & 32 deletions packages/opencode/src/altimate/tools/sql-execute.ts
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ export const SqlExecuteTool = Tool.define("sql_execute", {
// but does NOT block execution. Used to measure catch rate before deciding
// whether to enable blocking in a future release. Fire-and-forget so it
// doesn't add latency to the sql_execute hot path.
preValidateSql(args.query, args.warehouse).catch(() => {})
preValidateSql(args.query, args.warehouse, queryType).catch(() => {})
// altimate_change end

try {
Expand Down Expand Up @@ -115,22 +115,39 @@ interface PreValidationResult {
error?: string
}

async function preValidateSql(sql: string, warehouse?: string): Promise<PreValidationResult> {
async function preValidateSql(sql: string, warehouse: string | undefined, queryType: string): Promise<PreValidationResult> {
const startTime = Date.now()
// Yield the event loop before heavy synchronous SQLite work so concurrent
// tasks aren't blocked. Bun's sqlite API is sync and listColumns can touch
// hundreds of thousands of rows for large warehouses.
await new Promise<void>((resolve) => setImmediate(resolve))

// Precompute correlation fields used in every telemetry event this function emits.
const maskedSqlHash = Telemetry.hashError(Telemetry.maskString(sql))

try {
// Resolve the warehouse the same way sql.execute's fallback path does:
// when caller omits `warehouse`, sql.execute uses Registry.list()[0].
// Matching that here keeps the shadow validation aligned with actual
// execution (dbt-routed queries are a known gap — they short-circuit
// before this fallback, so validation may use a different warehouse
// than the one dbt selects).
const registered = Registry.list().warehouses
let warehouseName = warehouse
if (!warehouseName) {
const registered = Registry.list().warehouses
warehouseName = registered[0]?.name
}
const warehouseInfo = registered.find((w) => w.name === warehouseName)
const warehouseType = warehouseInfo?.type ?? "unknown"

const ctx: TrackCtx = {
warehouse_type: warehouseType,
query_type: queryType,
masked_sql_hash: maskedSqlHash,
}

if (!warehouseName) {
trackPreValidation("skipped", "no_cache", 0, Date.now() - startTime, false)
trackPreValidation("skipped", "no_cache", 0, Date.now() - startTime, false, ctx)
return { blocked: false }
}

Expand All @@ -139,31 +156,39 @@ async function preValidateSql(sql: string, warehouse?: string): Promise<PreValid

const warehouseStatus = status.warehouses.find((w) => w.name === warehouseName)
if (!warehouseStatus?.last_indexed) {
trackPreValidation("skipped", "no_cache", 0, Date.now() - startTime, false)
trackPreValidation("skipped", "no_cache", 0, Date.now() - startTime, false, ctx)
return { blocked: false }
}

// Check cache freshness
const cacheAge = Date.now() - new Date(warehouseStatus.last_indexed).getTime()
if (cacheAge > CACHE_TTL_MS) {
trackPreValidation("skipped", "stale_cache", 0, Date.now() - startTime, false)
trackPreValidation("skipped", "stale_cache", 0, Date.now() - startTime, false, ctx)
return { blocked: false }
}

// Build schema context from cached columns
const columns = cache.listColumns(warehouseName, COLUMN_SCAN_LIMIT)
const schemaTruncated = columns.length >= COLUMN_SCAN_LIMIT
if (columns.length === 0) {
trackPreValidation("skipped", "empty_cache", 0, Date.now() - startTime, false)
trackPreValidation("skipped", "empty_cache", 0, Date.now() - startTime, false, ctx)
return { blocked: false }
}

const schemaContext: Record<string, any> = {}
// Build schema context keyed by fully-qualified name (database.schema.table)
// so multi-database warehouses don't collide on schema+table alone.
// Dedupe columns per table to defend against residual collisions.
const schemaContext: Record<string, { name: string; type: string; nullable: boolean }[]> = {}
const seenColumns: Record<string, Set<string>> = {}
for (const col of columns) {
const tableName = col.schema_name ? `${col.schema_name}.${col.table}` : col.table
const tableName = [col.database, col.schema_name, col.table].filter(Boolean).join(".")
if (!tableName) continue
if (!schemaContext[tableName]) {
schemaContext[tableName] = []
seenColumns[tableName] = new Set()
}
if (seenColumns[tableName].has(col.name)) continue
seenColumns[tableName].add(col.name)
schemaContext[tableName].push({
name: col.name,
type: col.data_type || "VARCHAR",
Expand All @@ -178,60 +203,61 @@ async function preValidateSql(sql: string, warehouse?: string): Promise<PreValid
schema_context: schemaContext,
})

// If the dispatcher itself failed, don't treat missing data as "valid".
if (!validationResult.success) {
const errMsg = typeof validationResult.error === "string" ? validationResult.error : undefined
trackPreValidation("error", "dispatcher_failed", 0, Date.now() - startTime, false, ctx, errMsg)
return { blocked: false }
}

const data = (validationResult.data ?? {}) as Record<string, any>
const errors = Array.isArray(data.errors) ? data.errors : []
const isValid = data.valid !== false && errors.length === 0

if (isValid) {
trackPreValidation("passed", "valid", columns.length, Date.now() - startTime, schemaTruncated)
trackPreValidation("passed", "valid", columns.length, Date.now() - startTime, schemaTruncated, ctx)
return { blocked: false }
}

// Only block on high-confidence structural errors
const structuralErrors = errors.filter((e: any) => {
const msg = (e.message ?? "").toLowerCase()
return msg.includes("column") || msg.includes("table") || msg.includes("not found") || msg.includes("does not exist")
return /\b(column|table|view|relation|identifier|not found|does not exist)\b/.test(msg)
})

if (structuralErrors.length === 0) {
// Non-structural errors (ambiguous cases) — let them through
trackPreValidation("passed", "non_structural", columns.length, Date.now() - startTime, schemaTruncated)
trackPreValidation("passed", "non_structural", columns.length, Date.now() - startTime, schemaTruncated, ctx)
return { blocked: false }
}

// Build helpful error with available columns
const errorMsgs = structuralErrors.map((e: any) => e.message).join("\n")
const referencedTables = Object.keys(schemaContext).slice(0, 10)
const availableColumns = referencedTables
.map((t) => `${t}: ${schemaContext[t].map((c: any) => c.name).join(", ")}`)
.join("\n")

const errorOutput = [
`Pre-execution validation failed (validated against cached schema):`,
``,
errorMsgs,
``,
`Available tables and columns:`,
availableColumns,
``,
`Fix the query and retry. If the schema cache is outdated, run schema_index to refresh it.`,
].join("\n")

trackPreValidation("blocked", "structural_error", columns.length, Date.now() - startTime, schemaTruncated, errorMsgs)
return { blocked: true, error: errorOutput }
trackPreValidation("blocked", "structural_error", columns.length, Date.now() - startTime, schemaTruncated, ctx, errorMsgs)
// Shadow mode: caller discards the result. When blocking is enabled in the
// future, build errorOutput here with the structural errors and
// schemaContext keys for user-facing guidance.
return { blocked: false }
} catch {
// Validation failure should never block execution
trackPreValidation("error", "validation_exception", 0, Date.now() - startTime, false)
const ctx: TrackCtx = { warehouse_type: "unknown", query_type: queryType, masked_sql_hash: maskedSqlHash }
trackPreValidation("error", "validation_exception", 0, Date.now() - startTime, false, ctx)
return { blocked: false }
}
}

interface TrackCtx {
warehouse_type: string
query_type: string
masked_sql_hash: string
}

function trackPreValidation(
outcome: "skipped" | "passed" | "blocked" | "error",
reason: string,
schema_columns: number,
duration_ms: number,
schema_truncated: boolean,
ctx: TrackCtx,
error_message?: string,
) {
// Mask schema identifiers (table / column names, paths, user IDs) from the
Expand All @@ -244,6 +270,9 @@ function trackPreValidation(
session_id: Telemetry.getContext().sessionId,
outcome,
reason,
warehouse_type: ctx.warehouse_type,
query_type: ctx.query_type,
masked_sql_hash: ctx.masked_sql_hash,
schema_columns,
schema_truncated,
duration_ms,
Expand Down
3 changes: 2 additions & 1 deletion packages/opencode/test/telemetry/telemetry.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -245,11 +245,12 @@ const ALL_EVENT_TYPES: Telemetry.Event["type"][] = [
"sql_execute_failure",
"feature_suggestion",
"core_failure",
"sql_pre_validation",
]

describe("telemetry.event-types", () => {
test("all event types are valid", () => {
expect(ALL_EVENT_TYPES.length).toBe(42)
expect(ALL_EVENT_TYPES.length).toBe(43)
})
})

Expand Down
Loading