Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
118 changes: 114 additions & 4 deletions server/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -404,7 +404,13 @@ func classifyErrorCode(err error) string {
case strings.HasPrefix(msg, "Dependency Error:"):
return "2BP01" // dependent_objects_still_exist
}
return "42000"
// Unknown error class — no DuckDB prefix matched. These are
// typically infra issues (gRPC failures, IO errors, internal panics)
// rather than user input issues. Classify as XX000 (internal_error)
// so isUserQueryError correctly routes them to the system-error log
// path. If a future DuckDB error needs to land in a user class, add
// a prefix branch above instead of moving the fallback.
return "XX000"
}

// catalogErrorCode narrows a "Catalog Error: …" message to a specific SQLSTATE
Expand Down Expand Up @@ -488,8 +494,105 @@ func constraintErrorCode(msg string) string {
return "23000" // integrity_constraint_violation
}

// logQueryError logs a query execution failure with additional context for
// DuckLake-specific errors (transaction conflicts and metadata connection loss).
// userErrorSQLSTATEClasses is the closed set of PostgreSQL SQLSTATE class
// codes (the first two characters) that represent user-input or
// user-state errors — "you wrote a query that doesn't make sense for
// this database state." Anything outside this set is treated as a
// system / infra error (08 connection, 53 resources, 57 operator
// intervention, 58 system, XX internal, …).
//
// The discriminator is the SQLSTATE we already compute for the pgwire
// error response — no new string matching here. Add a class only after
// confirming every code in it is genuinely user-attributable; adding
// erroneously will hide real infra failures from the alert path.
var userErrorSQLSTATEClasses = map[string]struct{}{
"0A": {}, // feature_not_supported — user used a SQL feature we don't have
"22": {}, // data_exception — bad input (cast, overflow, encoding)
"23": {}, // integrity_constraint_violation — unique/fk/check/not_null
"25": {}, // invalid_transaction_state — nested BEGIN, etc.
"28": {}, // invalid_authorization_specification — not hit on this path today
"2B": {}, // dependent_objects_still_exist — DROP without CASCADE
"3D": {}, // invalid_catalog_name — DB doesn't exist
"3F": {}, // invalid_schema_name — schema doesn't exist
"42": {}, // syntax_error_or_access_rule_violation — table/column not found, syntax
"44": {}, // with_check_option_violation
}

// isUserQueryError tells the log/observability path whether a query
// failure is user-attributable (Info-level "Query execution failed.")
// or a real system error worth alerting on (Error-level "Query
// execution errored."). The discriminator is the SQLSTATE class —
// already computed via classifyErrorCode for the pgwire response.
//
// 57014 (query_canceled) is technically class 57 (operator
// intervention, otherwise treated as infra) but in our usage it means
// the client pressed Ctrl-C, which is a user-initiated event — short-
// circuit it back into the user bucket.
func isUserQueryError(err error) bool {
if err == nil {
return false
}
if isQueryCancelled(err) {
return true
}
code := classifyErrorCode(err)
if len(code) < 2 {
return false
}
_, ok := userErrorSQLSTATEClasses[code[:2]]
return ok
}

// logQueryStarted records a query handing off to a worker. Pairs with
// logQueryFinished at every termination point so logs and traces can
// be cross-referenced — the trace_id attribute matches the OTEL span
// ID exported by the same query, so a search like trace_id=abc123 in
// Loki/Grafana lines up directly with the trace view.
//
// Includes worker_id and worker_pod so an operator chasing a specific
// worker incident (e.g. the one in the worker-40761 postmortem) can
// filter to just that worker's queries without joining across logs.
func (c *clientConn) logQueryStarted(query string) {
slog.Info("Query started.",
"user", c.username,
"query", query,
"worker", c.workerID,
"worker_pod", c.workerPod,
"trace_id", traceIDFromContext(c.ctx))
}

// logQueryFinished records a query terminating on the worker. Counter-
// part to logQueryStarted; emit once per query regardless of outcome
// so the start/finish pair is always balanced.
//
// On error paths logQueryError still fires for severity routing
// (Info vs Error based on SQLSTATE class). logQueryFinished
// deliberately stays at Info even on error so the lifecycle pair stays
// readable as a stream — operators following one trace see both a
// "started" and a "finished" line, and can look at the separate error
// line for severity context.
func (c *clientConn) logQueryFinished(query string, start time.Time, rows int64, err error) {
attrs := []any{
"user", c.username,
"query", query,
"duration_ms", time.Since(start).Milliseconds(),
"rows", rows,
"worker", c.workerID,
"worker_pod", c.workerPod,
"trace_id", traceIDFromContext(c.ctx),
}
if err != nil {
attrs = append(attrs, "error", err.Error())
}
slog.Info("Query finished.", attrs...)
}

// logQueryError logs a query execution failure. DuckLake-specific
// retryable conditions and user-attributable errors get Warn / Info so
// the Error level stays meaningful as an alerting signal — "Query
// execution errored." should mean the system genuinely went wrong
// (worker crash, IO failure, internal panic, infra unreachable), not
// "user typo'd a column name."
func (c *clientConn) logQueryError(query string, err error) {
attrs := []any{"user", c.username, "query", query, "error", err, "worker", c.workerID, "worker_pod", c.workerPod}
if isDuckLakeTransactionConflict(err) {
Expand All @@ -500,7 +603,11 @@ func (c *clientConn) logQueryError(query string, err error) {
slog.Warn("DuckLake metadata connection lost during transaction.", attrs...)
return
}
slog.Error("Query execution failed.", attrs...)
if isUserQueryError(err) {
slog.Info("Query execution failed.", attrs...)
return
}
slog.Error("Query execution errored.", attrs...)
}

// isConnectionBroken checks if an error indicates a broken connection
Expand Down Expand Up @@ -1486,6 +1593,7 @@ func (c *clientConn) executeSelectQuery(query string, cmdType string) (int64, st

execStart := time.Now()
execCtx, execSpan := tracer.Start(ctx, "duckgres.execute")
c.logQueryStarted(query)
runQuery := func() (RowSet, error) {
return c.executor.QueryContext(ctx, query)
}
Expand Down Expand Up @@ -1518,6 +1626,7 @@ func (c *clientConn) executeSelectQuery(query string, cmdType string) (int64, st
} else {
c.logQueryError(query, err)
}
c.logQueryFinished(query, execStart, 0, err)
c.sendError("ERROR", errCode, errMsg)
c.setTxError()
_ = writeReadyForQuery(c.writer, c.txStatus)
Expand Down Expand Up @@ -1606,6 +1715,7 @@ func (c *clientConn) executeSelectQuery(query string, cmdType string) (int64, st
_ = writeCommandComplete(c.writer, tag)
_ = writeReadyForQuery(c.writer, c.txStatus)
_ = c.writer.Flush()
c.logQueryFinished(query, execStart, int64(rowCount), nil)
return int64(rowCount), "", "", nil
}

Expand Down
74 changes: 72 additions & 2 deletions server/transient_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -298,8 +298,12 @@ func TestClassifyErrorCode(t *testing.T) {
}{
{"transaction conflict", errors.New("Transaction conflict on commit"), "40001"},
{"query cancelled", errors.New("context canceled"), "57014"},
{"generic error", errors.New("syntax error"), "42000"},
{"SSL closed is not conflict", errors.New("SSL connection has been closed unexpectedly"), "42000"},
// Unknown error class (no DuckDB prefix) maps to XX000 — internal /
// infra rather than user-class — so isUserQueryError correctly
// routes it to the alert-worthy log path. Adding this prefix to a
// user class would hide real infra failures.
{"generic error with no DuckDB prefix", errors.New("syntax error"), "XX000"},
{"SSL closed is infra not user", errors.New("SSL connection has been closed unexpectedly"), "XX000"},

{"catalog missing table", errors.New("Catalog Error: Table with name users does not exist!"), "42P01"},
{"catalog missing table with suggestion", errors.New("Catalog Error: Table with name stg_customers__dbt_tmp does not exist!\nDid you mean \"stg_customers\"?"), "42P01"},
Expand Down Expand Up @@ -345,6 +349,72 @@ func TestClassifyErrorCode(t *testing.T) {
}
}

// TestIsUserQueryError pins the SQLSTATE-class-based discriminator
// that splits Query execution log lines between Info ("user wrote
// something that doesn't make sense") and Error ("the system itself
// failed"). The logger uses this to keep the Error level meaningful
// for alerting; a regression here would either drown alerts in user-
// typo noise or silently downgrade real infra failures.
func TestIsUserQueryError(t *testing.T) {
tests := []struct {
name string
err error
want bool // true == user error (Info), false == infra error (Error)
}{
// Class 42 — by far the most common user errors (table/column not
// found, syntax errors, access rule violations).
{"missing table (42P01)", errors.New("Catalog Error: Table with name users does not exist!"), true},
{"missing column (42703)", errors.New("Binder Error: Referenced column \"missing_col\" not found in FROM clause!"), true},
{"syntax error (42601)", errors.New("Parser Error: syntax error at or near \"FORM\""), true},
{"missing function (42883)", errors.New("Catalog Error: Scalar Function with name no_such_func does not exist!"), true},
{"permission denied (42501)", errors.New("Permission Error: not allowed to write here"), true},
{"duplicate table (42P07)", errors.New("Catalog Error: Table with name \"t\" already exists!"), true},

// Other user classes — bad input, integrity, transaction misuse.
{"data exception conversion (22P02)", errors.New("Conversion Error: Could not convert string 'abc' to INT32"), true},
{"data exception out of range (22003)", errors.New("Out of Range Error: Overflow in multiplication of INT32"), true},
{"unique violation (23505)", errors.New("Constraint Error: Duplicate key \"id: 1\" violates primary key constraint"), true},
{"not null violation (23502)", errors.New("Constraint Error: NOT NULL constraint failed: t.col"), true},
{"invalid transaction state (25000)", errors.New("Transaction Error: cannot begin within an existing transaction"), true},
{"missing schema (3F000)", errors.New("Catalog Error: Schema with name \"missing\" does not exist!"), true},
{"dependent objects (2BP01)", errors.New("Dependency Error: Cannot drop entry because there are other entries that depend on it"), true},

// 57014 cancellation — class 57 is otherwise infra, but client-
// initiated cancels are user events. The short-circuit in
// isUserQueryError must keep this in the user bucket.
{"client cancellation (57014)", errors.New("context canceled"), true},

// Infra classes — must NOT be treated as user errors.
{"unknown error → XX000", errors.New("something went wrong"), false},
{"SSL closed → infra", errors.New("SSL connection has been closed unexpectedly"), false},
{"nil error", nil, false},

// 40001 retryable conflicts are a special case handled before the
// SQLSTATE check fires (logQueryError emits its own Warn for them),
// so they never reach this function in production. But verify the
// classification is unambiguously infra-side here so a future
// caller doesn't accidentally bucket retries as user errors.
{"transaction conflict 40001 is not user", errors.New("Transaction conflict on commit"), false},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
if got := isUserQueryError(tt.err); got != tt.want {
t.Errorf("isUserQueryError(%v) = %v, want %v (SQLSTATE=%s)",
tt.err, got, tt.want, classifyErrorCodeOrEmpty(tt.err))
}
})
}
}

// classifyErrorCodeOrEmpty is a test helper to surface the computed
// SQLSTATE in failure messages without crashing on nil errors.
func classifyErrorCodeOrEmpty(err error) string {
if err == nil {
return "(nil)"
}
return classifyErrorCode(err)
}

// TestClassifyErrorCodeAgainstRealDuckDB drives queries that reliably
// produce each error prefix the classifier branches on, against a real
// in-memory DuckDB, and asserts the SQLSTATE we map to.
Expand Down
Loading