Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 57 additions & 0 deletions packages/opencode/src/altimate/native/finops/bq-utils.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/**
* Shared BigQuery helpers for the finops module.
*
* INFORMATION_SCHEMA queries on BigQuery must be region-qualified —
* `` `region-<location>.INFORMATION_SCHEMA.<view>` ``. The finops tools read
* the connection's configured `location` (e.g. "us", "eu", "us-central1") and
* interpolate it into each SQL template via a `{region}` placeholder.
*
* Kept in its own file so the sanitize + interpolate logic doesn't drift across
* query-history, credit-analyzer, warehouse-advisor, role-access, and
* unused-resources.
*/

import * as Registry from "../connections/registry"

/**
* Sanitize a BigQuery region/location string for safe interpolation into a
* region-qualified INFORMATION_SCHEMA reference. The result is always safe to
* inject into `` `region-<result>.INFORMATION_SCHEMA.X` `` — the allow-list
* `[a-z0-9-]` cannot close the backtick context or introduce SQL delimiters.
*
* Transformations:
* - lowercase + trim
* - strip anything outside [a-z0-9-]
* - trim leading/trailing hyphens (BQ region names never start or end with -)
* - cap length at 64 chars (BQ region names are short; this guards against
* pathological inputs)
* - fall back to "us" on empty input (historical default)
*/
export function sanitizeBqRegion(location: unknown): string {
const raw = typeof location === "string" ? location : ""
const cleaned = raw
.toLowerCase()
.trim()
.replace(/[^a-z0-9-]/g, "")
.replace(/^-+|-+$/g, "")
.slice(0, 64)
return cleaned || "us"
}

/**
* Substitute the `{region}` placeholder in a BQ SQL template with the sanitized
* region for a given warehouse config. Uses replaceAll so future templates that
* reference multiple region-qualified views (e.g. JOINs) are handled safely.
*/
export function interpolateBqRegion(sqlTemplate: string, bqRegion?: unknown): string {
return sqlTemplate.replaceAll("{region}", sanitizeBqRegion(bqRegion))
}

/**
* Resolve the BigQuery `location` for a registered warehouse. Returns undefined
* when the warehouse is not BigQuery or has no location set — callers pass the
* result through `sanitizeBqRegion`, which defaults to "us".
*/
export function bqRegionFor(warehouse: string): unknown {
return Registry.getConfig(warehouse)?.location
}
36 changes: 24 additions & 12 deletions packages/opencode/src/altimate/native/finops/credit-analyzer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
*/

import * as Registry from "../connections/registry"
import { bqRegionFor, interpolateBqRegion } from "./bq-utils"
import type {
CreditAnalysisParams,
CreditAnalysisResult,
Expand Down Expand Up @@ -80,7 +81,7 @@ SELECT
0 as credits_cloud,
COUNT(*) as query_count,
AVG(total_bytes_billed) / 1099511627776.0 * 5.0 as avg_credits_per_query
FROM \`region-US.INFORMATION_SCHEMA.JOBS\`
FROM \`region-{region}.INFORMATION_SCHEMA.JOBS\`
WHERE creation_time >= TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL ? DAY)
AND job_type = 'QUERY'
AND state = 'DONE'
Expand All @@ -97,7 +98,7 @@ SELECT
0 as total_cloud_credits,
COUNT(DISTINCT DATE(creation_time)) as active_days,
AVG(total_bytes_billed) / 1099511627776.0 * 5.0 as avg_daily_credits
FROM \`region-US.INFORMATION_SCHEMA.JOBS\`
FROM \`region-{region}.INFORMATION_SCHEMA.JOBS\`
WHERE creation_time >= TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL ? DAY)
AND job_type = 'QUERY'
AND state = 'DONE'
Expand All @@ -115,7 +116,7 @@ SELECT
0 as rows_produced,
total_bytes_billed / 1099511627776.0 * 5.0 as credits_used,
start_time
FROM \`region-US.INFORMATION_SCHEMA.JOBS\`
FROM \`region-{region}.INFORMATION_SCHEMA.JOBS\`
WHERE creation_time >= TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL ? DAY)
AND job_type = 'QUERY'
AND state = 'DONE'
Expand Down Expand Up @@ -191,7 +192,7 @@ function getWhType(warehouse: string): string {
}

function buildCreditUsageSql(
whType: string, days: number, limit: number, warehouseFilter?: string,
whType: string, days: number, limit: number, warehouseFilter?: string, bqRegion?: unknown,
): { sql: string; binds: any[] } | null {
if (whType === "snowflake") {
const binds: any[] = [-days]
Expand All @@ -203,33 +204,42 @@ function buildCreditUsageSql(
}
}
if (whType === "bigquery") {
return { sql: BIGQUERY_CREDIT_USAGE_SQL, binds: [days, limit] }
return {
sql: interpolateBqRegion(BIGQUERY_CREDIT_USAGE_SQL, bqRegion),
binds: [days, limit],
}
}
if (whType === "databricks") {
return { sql: DATABRICKS_CREDIT_USAGE_SQL, binds: [days, limit] }
}
return null
}

function buildCreditSummarySql(whType: string, days: number): { sql: string; binds: any[] } | null {
function buildCreditSummarySql(whType: string, days: number, bqRegion?: unknown): { sql: string; binds: any[] } | null {
if (whType === "snowflake") {
return { sql: SNOWFLAKE_CREDIT_SUMMARY_SQL, binds: [-days] }
}
if (whType === "bigquery") {
return { sql: BIGQUERY_CREDIT_SUMMARY_SQL, binds: [days] }
return {
sql: interpolateBqRegion(BIGQUERY_CREDIT_SUMMARY_SQL, bqRegion),
binds: [days],
}
}
if (whType === "databricks") {
return { sql: DATABRICKS_CREDIT_SUMMARY_SQL, binds: [days] }
}
return null
}

function buildExpensiveSql(whType: string, days: number, limit: number): { sql: string; binds: any[] } | null {
function buildExpensiveSql(whType: string, days: number, limit: number, bqRegion?: unknown): { sql: string; binds: any[] } | null {
if (whType === "snowflake") {
return { sql: SNOWFLAKE_EXPENSIVE_SQL, binds: [-days, limit] }
}
if (whType === "bigquery") {
return { sql: BIGQUERY_EXPENSIVE_SQL, binds: [days, limit] }
return {
sql: interpolateBqRegion(BIGQUERY_EXPENSIVE_SQL, bqRegion),
binds: [days, limit],
}
}
if (whType === "databricks") {
return { sql: DATABRICKS_EXPENSIVE_SQL, binds: [days, limit] }
Expand Down Expand Up @@ -295,9 +305,10 @@ export async function analyzeCredits(params: CreditAnalysisParams): Promise<Cred
const whType = getWhType(params.warehouse)
const days = params.days ?? 30
const limit = params.limit ?? 50
const bqRegion = whType === "bigquery" ? bqRegionFor(params.warehouse) : undefined

const dailyBuilt = buildCreditUsageSql(whType, days, limit, params.warehouse_filter)
const summaryBuilt = buildCreditSummarySql(whType, days)
const dailyBuilt = buildCreditUsageSql(whType, days, limit, params.warehouse_filter, bqRegion)
const summaryBuilt = buildCreditSummarySql(whType, days, bqRegion)

if (!dailyBuilt || !summaryBuilt) {
return {
Expand Down Expand Up @@ -346,8 +357,9 @@ export async function getExpensiveQueries(params: ExpensiveQueriesParams): Promi
const whType = getWhType(params.warehouse)
const days = params.days ?? 7
const limit = params.limit ?? 20
const bqRegion = whType === "bigquery" ? bqRegionFor(params.warehouse) : undefined

const built = buildExpensiveSql(whType, days, limit)
const built = buildExpensiveSql(whType, days, limit, bqRegion)
if (!built) {
return {
success: false,
Expand Down
26 changes: 19 additions & 7 deletions packages/opencode/src/altimate/native/finops/query-history.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
*/

import * as Registry from "../connections/registry"
import { bqRegionFor, interpolateBqRegion, sanitizeBqRegion } from "./bq-utils"
import type { QueryHistoryParams, QueryHistoryResult } from "../types"

// ---------------------------------------------------------------------------
Expand Down Expand Up @@ -59,6 +60,12 @@ ORDER BY total_exec_time DESC
LIMIT {limit}
`

// BigQuery's INFORMATION_SCHEMA.JOBS does not have top-level `error_message`,
// `error_code`, or `total_rows` columns. Errors live under the `error_result`
// STRUCT ({reason, location, message, debug_info}) and per-row counts aren't
// exposed at the jobs level. `state` returns 'DONE' (not 'SUCCESS'), so we
// derive execution_status from error_result to stay consistent with the other
// warehouse templates and the summary loop in getQueryHistory().
const BIGQUERY_HISTORY_SQL = `
SELECT
job_id as query_id,
Expand All @@ -67,16 +74,16 @@ SELECT
user_email as user_name,
'' as warehouse_name,
reservation_id as warehouse_size,
state as execution_status,
NULL as error_code,
error_message,
CASE WHEN error_result IS NULL THEN 'SUCCESS' ELSE 'FAILED' END as execution_status,
error_result.reason as error_code,
error_result.message as error_message,
start_time,
end_time,
TIMESTAMP_DIFF(end_time, start_time, SECOND) as execution_time_sec,
total_bytes_billed as bytes_scanned,
total_rows as rows_produced,
CAST(NULL AS INT64) as rows_produced,
0 as credits_used_cloud_services
FROM \`region-US.INFORMATION_SCHEMA.JOBS\`
FROM \`region-{region}.INFORMATION_SCHEMA.JOBS\`
WHERE creation_time >= TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL ? DAY)
ORDER BY creation_time DESC
LIMIT ?
Expand Down Expand Up @@ -146,6 +153,7 @@ function buildHistoryQuery(
limit: number,
user?: string,
warehouseFilter?: string,
bqRegion?: unknown,
): { sql: string; binds: any[] } | null {
if (whType === "snowflake") {
const binds: any[] = [-days]
Expand All @@ -161,7 +169,10 @@ function buildHistoryQuery(
return { sql: POSTGRES_HISTORY_SQL.replace("{limit}", String(Math.floor(Number(limit)))), binds: [] }
}
if (whType === "bigquery") {
return { sql: BIGQUERY_HISTORY_SQL, binds: [days, limit] }
return {
sql: interpolateBqRegion(BIGQUERY_HISTORY_SQL, bqRegion),
binds: [days, limit],
}
}
if (whType === "databricks") {
return { sql: DATABRICKS_HISTORY_SQL, binds: [days, limit] }
Expand Down Expand Up @@ -202,8 +213,9 @@ export async function getQueryHistory(params: QueryHistoryParams): Promise<Query
const whType = getWhType(params.warehouse)
const days = params.days ?? 7
const limit = params.limit ?? 100
const bqRegion = whType === "bigquery" ? bqRegionFor(params.warehouse) : undefined

const built = buildHistoryQuery(whType, days, limit, params.user, params.warehouse_filter)
const built = buildHistoryQuery(whType, days, limit, params.user, params.warehouse_filter, bqRegion)
if (!built) {
return {
success: false,
Expand Down
10 changes: 6 additions & 4 deletions packages/opencode/src/altimate/native/finops/role-access.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
*/

import * as Registry from "../connections/registry"
import { bqRegionFor, interpolateBqRegion } from "./bq-utils"
import type {
RoleGrantsParams,
RoleGrantsResult,
Expand Down Expand Up @@ -75,7 +76,7 @@ SELECT
'NO' as grant_option,
'' as granted_by,
'' as created_on
FROM \`region-US.INFORMATION_SCHEMA.OBJECT_PRIVILEGES\`
FROM \`region-{region}.INFORMATION_SCHEMA.OBJECT_PRIVILEGES\`
WHERE 1=1
{grantee_filter}
ORDER BY object_type, object_name
Expand Down Expand Up @@ -123,7 +124,7 @@ function rowsToRecords(result: { columns: string[]; rows: any[][] }): Record<str
}

function buildGrantsSql(
whType: string, role?: string, objectName?: string, limit: number = 100,
whType: string, role?: string, objectName?: string, limit: number = 100, bqRegion?: unknown,
): { sql: string; binds: any[] } | null {
if (whType === "snowflake") {
const binds: any[] = []
Expand All @@ -142,7 +143,7 @@ function buildGrantsSql(
const granteeF = role ? (binds.push(role), "AND grantee = ?") : ""
binds.push(limit)
return {
sql: BIGQUERY_GRANTS_SQL.replace("{grantee_filter}", granteeF),
sql: interpolateBqRegion(BIGQUERY_GRANTS_SQL.replace("{grantee_filter}", granteeF), bqRegion),
binds,
}
}
Expand All @@ -165,8 +166,9 @@ function buildGrantsSql(
export async function queryGrants(params: RoleGrantsParams): Promise<RoleGrantsResult> {
const whType = getWhType(params.warehouse)
const limit = params.limit ?? 100
const bqRegion = whType === "bigquery" ? bqRegionFor(params.warehouse) : undefined

const built = buildGrantsSql(whType, params.role, params.object_name, limit)
const built = buildGrantsSql(whType, params.role, params.object_name, limit, bqRegion)
if (!built) {
return {
success: false,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
*/

import * as Registry from "../connections/registry"
import { bqRegionFor, interpolateBqRegion } from "./bq-utils"
import type {
UnusedResourcesParams,
UnusedResourcesResult,
Expand Down Expand Up @@ -90,7 +91,7 @@ SELECT
size_bytes,
TIMESTAMP_MILLIS(last_modified_time) as last_altered,
creation_time as created
FROM \`region-US.INFORMATION_SCHEMA.TABLE_STORAGE\`
FROM \`region-{region}.INFORMATION_SCHEMA.TABLE_STORAGE\`
WHERE NOT deleted
AND last_modified_time < UNIX_MILLIS(TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL ? DAY))
ORDER BY size_bytes DESC
Expand Down Expand Up @@ -186,7 +187,8 @@ export async function findUnusedResources(params: UnusedResourcesParams): Promis
}
} else if (whType === "bigquery") {
try {
const result = await connector.execute(BIGQUERY_UNUSED_TABLES_SQL, limit, [days, limit])
const sql = interpolateBqRegion(BIGQUERY_UNUSED_TABLES_SQL, bqRegionFor(params.warehouse))
const result = await connector.execute(sql, limit, [days, limit])
unusedTables = rowsToRecords(result)
} catch (e) {
errors.push(`Could not query unused tables: ${e}`)
Expand Down
22 changes: 14 additions & 8 deletions packages/opencode/src/altimate/native/finops/warehouse-advisor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
*/

import * as Registry from "../connections/registry"
import { bqRegionFor, interpolateBqRegion } from "./bq-utils"
import type {
WarehouseAdvisorParams,
WarehouseAdvisorResult,
Expand Down Expand Up @@ -58,7 +59,7 @@ SELECT
0 as avg_queue_load,
MAX(period_slot_ms / 1000.0) as peak_queue_load,
COUNT(*) as sample_count
FROM \`region-US.INFORMATION_SCHEMA.JOBS_TIMELINE\`
FROM \`region-{region}.INFORMATION_SCHEMA.JOBS_TIMELINE\`
WHERE period_start >= TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL {days} DAY)
GROUP BY reservation_id
ORDER BY avg_concurrency DESC
Expand All @@ -73,7 +74,7 @@ SELECT
APPROX_QUANTILES(TIMESTAMP_DIFF(end_time, start_time, MILLISECOND), 100)[OFFSET(95)] / 1000.0 as p95_time_sec,
AVG(total_bytes_billed) as avg_bytes_scanned,
SUM(total_bytes_billed) / 1099511627776.0 * 5.0 as total_credits
FROM \`region-US.INFORMATION_SCHEMA.JOBS\`
FROM \`region-{region}.INFORMATION_SCHEMA.JOBS\`
WHERE creation_time >= TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL {days} DAY)
AND job_type = 'QUERY'
AND state = 'DONE'
Expand Down Expand Up @@ -127,16 +128,20 @@ function getWhType(warehouse: string): string {
return wh?.type || "unknown"
}

function buildLoadSql(whType: string, days: number): string | null {
function buildLoadSql(whType: string, days: number, bqRegion?: unknown): string | null {
if (whType === "snowflake") return SNOWFLAKE_LOAD_SQL.replace("{days}", String(days))
if (whType === "bigquery") return BIGQUERY_LOAD_SQL.replace("{days}", String(days))
if (whType === "bigquery") {
return interpolateBqRegion(BIGQUERY_LOAD_SQL.replace("{days}", String(days)), bqRegion)
}
if (whType === "databricks") return DATABRICKS_LOAD_SQL.replace(/{days}/g, String(days))
return null
}

function buildSizingSql(whType: string, days: number): string | null {
function buildSizingSql(whType: string, days: number, bqRegion?: unknown): string | null {
if (whType === "snowflake") return SNOWFLAKE_SIZING_SQL.replace("{days}", String(days))
if (whType === "bigquery") return BIGQUERY_SIZING_SQL.replace("{days}", String(days))
if (whType === "bigquery") {
return interpolateBqRegion(BIGQUERY_SIZING_SQL.replace("{days}", String(days)), bqRegion)
}
if (whType === "databricks") return DATABRICKS_SIZING_SQL.replace(/{days}/g, String(days))
return null
}
Expand Down Expand Up @@ -217,9 +222,10 @@ function generateSizingRecommendations(
export async function adviseWarehouse(params: WarehouseAdvisorParams): Promise<WarehouseAdvisorResult> {
const whType = getWhType(params.warehouse)
const days = params.days ?? 14
const bqRegion = whType === "bigquery" ? bqRegionFor(params.warehouse) : undefined

const loadSql = buildLoadSql(whType, days)
const sizingSql = buildSizingSql(whType, days)
const loadSql = buildLoadSql(whType, days, bqRegion)
const sizingSql = buildSizingSql(whType, days, bqRegion)

if (!loadSql || !sizingSql) {
return {
Expand Down
Loading
Loading