From ace3a5a5147d1f2a9f0022aeab539f947c10ac92 Mon Sep 17 00:00:00 2001 From: James Greenhill Date: Thu, 30 Apr 2026 14:55:45 -0700 Subject: [PATCH 1/2] refactor(arrowmap): split AppendValue into duckdb-free core + driver hooks MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Step 2 of the binary-split plan. Moves AppendValue out of duckdbservice into arrowmap so callers that don't need duckdb-go's driver value types (duckdb.Interval, Decimal, UUID, OrderedMap, Map) can use it without pulling libduckdb into their import graph. The split uses a registration hook rather than a build tag: - arrowmap.AppendValue handles all arrow-native and Go-native value types directly. It first consults any registered Appender hooks before falling back to the built-in switch. - duckdbservice/appender_init.go's init() registers a single hook that handles duckdb.Interval / Decimal / UUID / OrderedMap / Map. - When duckdbservice is linked into a binary (worker, standalone), the duckdb types get full coverage automatically. When it isn't (a future controlplane-only binary), the duckdb cases are dead code paths that wouldn't fire anyway because duckdb-go is the only thing producing those typed values. OrderedMapValue moved from server/flight_executor.go into arrowmap so the MAP branch in arrowmap.AppendValue can switch on it without arrowmap depending on the server package. server.OrderedMapValue is preserved as a type alias for backward compatibility with existing call sites. server/flightsqlingress no longer imports duckdbservice — it uses arrowmap.AppendValue directly. The package still transitively pulls duckdb-go via its `server` import; getting `server` itself duckdb-free is the next chunk of work. Verified: - go build ./... clean - go build -tags kubernetes ./... clean - go test -short ./duckdbservice/... ./server/flightsqlingress/... all green - go list -deps ./duckdbservice/arrowmap | grep duckdb-go is empty (arrowmap remains a pure leaf even with AppendValue + OrderedMapValue living in it) Co-Authored-By: Claude Opus 4.7 (1M context) --- duckdbservice/appender_init.go | 82 ++++++++ duckdbservice/arrow_helpers.go | 286 +------------------------- duckdbservice/arrowmap/arrowmap.go | 311 ++++++++++++++++++++++++++++- server/flight_executor.go | 14 +- server/flightsqlingress/ingress.go | 3 +- 5 files changed, 408 insertions(+), 288 deletions(-) create mode 100644 duckdbservice/appender_init.go diff --git a/duckdbservice/appender_init.go b/duckdbservice/appender_init.go new file mode 100644 index 00000000..cc50ed8c --- /dev/null +++ b/duckdbservice/appender_init.go @@ -0,0 +1,82 @@ +package duckdbservice + +import ( + "github.com/apache/arrow-go/v18/arrow" + "github.com/apache/arrow-go/v18/arrow/array" + "github.com/apache/arrow-go/v18/arrow/decimal128" + duckdb "github.com/duckdb/duckdb-go/v2" + "github.com/posthog/duckgres/duckdbservice/arrowmap" +) + +// init registers handlers for the duckdb-go driver value types so that any +// caller (including arrowmap.AppendValue and the wrapper duckdbservice.AppendValue) +// gets full type coverage when this package is linked into the binary. +// +// Binaries that don't link duckdbservice (e.g., a future control-plane-only +// binary) won't see these registrations — which is correct, because they +// also won't be the ones scanning rows from a duckdb-go driver connection. +func init() { + arrowmap.RegisterAppender(handleDuckDBValue) +} + +// handleDuckDBValue implements arrowmap.Appender for duckdb-go's driver +// value types. Returns true when it claimed the value, false to fall +// through to arrowmap's built-in handling. +func handleDuckDBValue(builder array.Builder, val any) bool { + switch b := builder.(type) { + case *array.MonthDayNanoIntervalBuilder: + v, ok := val.(duckdb.Interval) + if !ok { + return false + } + b.Append(arrow.MonthDayNanoInterval{ + Months: v.Months, + Days: v.Days, + Nanoseconds: v.Micros * 1000, + }) + return true + case *array.Decimal128Builder: + v, ok := val.(duckdb.Decimal) + if !ok { + return false + } + b.Append(decimal128.FromBigInt(v.Value)) + return true + case *array.FixedSizeBinaryBuilder: + v, ok := val.(duckdb.UUID) + if !ok { + return false + } + b.Append(v[:]) + return true + case *array.MapBuilder: + switch v := val.(type) { + case duckdb.OrderedMap: + b.Append(true) + kb, ib := b.KeyBuilder(), b.ItemBuilder() + keys, values := v.Keys(), v.Values() + for i, k := range keys { + arrowmap.AppendValue(kb, k) + arrowmap.AppendValue(ib, values[i]) + } + return true + case duckdb.Map: + b.Append(true) + kb, ib := b.KeyBuilder(), b.ItemBuilder() + for k, item := range v { + arrowmap.AppendValue(kb, k) + arrowmap.AppendValue(ib, item) + } + return true + } + return false + case *array.StringBuilder: + v, ok := val.(duckdb.UUID) + if !ok { + return false + } + b.Append(v.String()) + return true + } + return false +} diff --git a/duckdbservice/arrow_helpers.go b/duckdbservice/arrow_helpers.go index 703720e1..a267e542 100644 --- a/duckdbservice/arrow_helpers.go +++ b/duckdbservice/arrow_helpers.go @@ -3,21 +3,13 @@ package duckdbservice import ( "context" "database/sql" - "encoding/hex" - "fmt" - "math/big" "reflect" "strings" - "time" - "unicode/utf8" "github.com/apache/arrow-go/v18/arrow" "github.com/apache/arrow-go/v18/arrow/array" - "github.com/apache/arrow-go/v18/arrow/decimal128" "github.com/apache/arrow-go/v18/arrow/memory" - duckdb "github.com/duckdb/duckdb-go/v2" "github.com/posthog/duckgres/duckdbservice/arrowmap" - "github.com/posthog/duckgres/server" ) // DuckDBTypeToArrow re-exports arrowmap.DuckDBTypeToArrow for backward @@ -25,12 +17,18 @@ import ( var DuckDBTypeToArrow = arrowmap.DuckDBTypeToArrow // QualifyTableName re-exports arrowmap.QualifyTableName for backward -// compatibility with existing callers in this package. +// compatibility with existing callers. var QualifyTableName = arrowmap.QualifyTableName -// AppendValue is the only helper in this package that still needs to live with -// the duckdb-go import (because it switches on duckdb.Interval / Decimal / -// UUID / OrderedMap / Map types). +// QuoteIdent re-exports arrowmap.QuoteIdent for backward compatibility. +var QuoteIdent = arrowmap.QuoteIdent + +// AppendValue re-exports arrowmap.AppendValue for backward compatibility. +// The duckdb-go-specific value types (duckdb.Interval, Decimal, UUID, +// OrderedMap, Map) are handled via an arrowmap.Appender registered from +// duckdbservice/appender_init.go, so callers that import duckdbservice get +// full type coverage automatically. +var AppendValue = arrowmap.AppendValue // RowsToRecord converts sql.Rows into an Arrow RecordBatch of up to batchSize rows. // Returns nil when there are no more rows. @@ -66,267 +64,6 @@ func RowsToRecord(alloc memory.Allocator, rows *sql.Rows, schema *arrow.Schema, return builder.NewRecordBatch(), nil } -// AppendValue appends a value to an Arrow array builder with type coercion. -func AppendValue(builder array.Builder, val interface{}) { - if val == nil { - builder.AppendNull() - return - } - - switch b := builder.(type) { - case *array.Int64Builder: - switch v := val.(type) { - case int64: - b.Append(v) - case int32: - b.Append(int64(v)) - case int: - b.Append(int64(v)) - default: - b.AppendNull() - } - case *array.Int32Builder: - switch v := val.(type) { - case int32: - b.Append(v) - case int64: - b.Append(int32(v)) - case int: - b.Append(int32(v)) - default: - b.AppendNull() - } - case *array.Int16Builder: - switch v := val.(type) { - case int16: - b.Append(v) - case int32: - b.Append(int16(v)) - default: - b.AppendNull() - } - case *array.Int8Builder: - switch v := val.(type) { - case int8: - b.Append(v) - case int32: - b.Append(int8(v)) - default: - b.AppendNull() - } - case *array.Uint8Builder: - switch v := val.(type) { - case uint8: - b.Append(v) - case uint16: - b.Append(uint8(v)) - default: - b.AppendNull() - } - case *array.Uint16Builder: - switch v := val.(type) { - case uint16: - b.Append(v) - case uint32: - b.Append(uint16(v)) - default: - b.AppendNull() - } - case *array.Uint32Builder: - switch v := val.(type) { - case uint32: - b.Append(v) - case uint64: - b.Append(uint32(v)) - default: - b.AppendNull() - } - case *array.Uint64Builder: - switch v := val.(type) { - case uint64: - b.Append(v) - default: - b.AppendNull() - } - case *array.Float64Builder: - switch v := val.(type) { - case float64: - b.Append(v) - case float32: - b.Append(float64(v)) - default: - b.AppendNull() - } - case *array.Float32Builder: - switch v := val.(type) { - case float32: - b.Append(v) - case float64: - b.Append(float32(v)) - default: - b.AppendNull() - } - case *array.BooleanBuilder: - if v, ok := val.(bool); ok { - b.Append(v) - } else { - b.AppendNull() - } - case *array.Date32Builder: - switch v := val.(type) { - case time.Time: - // Floor division to handle pre-epoch dates correctly. - // Go's integer division truncates toward zero, but Date32 - // needs days since epoch rounded toward negative infinity. - unix := v.Unix() - days := unix / 86400 - if unix%86400 < 0 { - days-- - } - b.Append(arrow.Date32(days)) - default: - b.AppendNull() - } - case *array.TimestampBuilder: - switch v := val.(type) { - case time.Time: - b.AppendTime(v) - default: - b.AppendNull() - } - case *array.Time64Builder: - switch v := val.(type) { - case time.Time: - micros := int64(v.Hour())*3600000000 + int64(v.Minute())*60000000 + - int64(v.Second())*1000000 + int64(v.Nanosecond())/1000 - b.Append(arrow.Time64(micros)) - default: - b.AppendNull() - } - case *array.MonthDayNanoIntervalBuilder: - switch v := val.(type) { - case duckdb.Interval: - b.Append(arrow.MonthDayNanoInterval{ - Months: v.Months, - Days: v.Days, - Nanoseconds: v.Micros * 1000, - }) - default: - b.AppendNull() - } - case *array.Decimal128Builder: - switch v := val.(type) { - case duckdb.Decimal: - b.Append(decimal128.FromBigInt(v.Value)) - case *big.Int: - b.Append(decimal128.FromBigInt(v)) - default: - b.AppendNull() - } - case *array.FixedSizeBinaryBuilder: - switch v := val.(type) { - case duckdb.UUID: - b.Append(v[:]) - case []byte: - b.Append(v) - default: - b.AppendNull() - } - case *array.ListBuilder: - switch v := val.(type) { - case []any: - b.Append(true) - vb := b.ValueBuilder() - for _, elem := range v { - AppendValue(vb, elem) - } - default: - b.AppendNull() - } - case *array.StructBuilder: - switch v := val.(type) { - case map[string]any: - b.Append(true) - st := b.Type().(*arrow.StructType) - for i := 0; i < st.NumFields(); i++ { - fieldVal, ok := v[st.Field(i).Name] - if !ok { - b.FieldBuilder(i).AppendNull() - } else { - AppendValue(b.FieldBuilder(i), fieldVal) - } - } - default: - b.AppendNull() - } - case *array.MapBuilder: - switch v := val.(type) { - case duckdb.OrderedMap: - b.Append(true) - kb, ib := b.KeyBuilder(), b.ItemBuilder() - keys, values := v.Keys(), v.Values() - for i, k := range keys { - AppendValue(kb, k) - AppendValue(ib, values[i]) - } - case duckdb.Map: - b.Append(true) - kb, ib := b.KeyBuilder(), b.ItemBuilder() - for k, item := range v { - AppendValue(kb, k) - AppendValue(ib, item) - } - case map[any]any: - b.Append(true) - kb, ib := b.KeyBuilder(), b.ItemBuilder() - for k, item := range v { - AppendValue(kb, k) - AppendValue(ib, item) - } - case server.OrderedMapValue: - b.Append(true) - kb, ib := b.KeyBuilder(), b.ItemBuilder() - for i, k := range v.Keys { - AppendValue(kb, k) - AppendValue(ib, v.Values[i]) - } - default: - b.AppendNull() - } - case *array.StringBuilder: - switch v := val.(type) { - case string: - b.Append(v) - case duckdb.UUID: - b.Append(v.String()) - case []byte: - // TODO: This heuristic (16 bytes + invalid UTF-8 → UUID) is coupled to - // DuckDBTypeToArrow("UUID") returning String. If UUID mapping changes, - // update this branch accordingly. The Go driver returns []byte (not - // duckdb.UUID) when scanning UUID columns into interface{}. - if len(v) == 16 && !utf8.Valid(v) { - s := hex.EncodeToString(v) - b.Append(s[0:8] + "-" + s[8:12] + "-" + s[12:16] + "-" + s[16:20] + "-" + s[20:32]) - } else { - b.Append(string(v)) - } - default: - b.Append(fmt.Sprintf("%v", v)) - } - case *array.BinaryBuilder: - switch v := val.(type) { - case []byte: - b.Append(v) - case string: - b.Append([]byte(v)) - default: - b.AppendNull() - } - default: - builder.AppendNull() - } -} - type contextQueryer interface { QueryContext(ctx context.Context, query string, args ...any) (*sql.Rows, error) } @@ -374,6 +111,3 @@ func GetQuerySchema(ctx context.Context, db contextQueryer, query string, tx con } return arrow.NewSchema(fields, nil), nil } - -// QuoteIdent re-exports arrowmap.QuoteIdent for backward compatibility. -var QuoteIdent = arrowmap.QuoteIdent diff --git a/duckdbservice/arrowmap/arrowmap.go b/duckdbservice/arrowmap/arrowmap.go index 900e49ec..845410f5 100644 --- a/duckdbservice/arrowmap/arrowmap.go +++ b/duckdbservice/arrowmap/arrowmap.go @@ -1,17 +1,28 @@ // Package arrowmap provides DuckDB-free helpers for translating DuckDB type -// strings into Arrow types and for quoting/qualifying SQL identifiers. +// strings into Arrow types, quoting/qualifying SQL identifiers, and appending +// scanned values into Arrow array builders. // // These helpers are kept in their own package (with no dependency on // github.com/duckdb/duckdb-go) so that the control plane can use them -// without linking libduckdb. +// without linking libduckdb. The DuckDB driver-specific value types +// (duckdb.Interval, duckdb.Decimal, duckdb.UUID, duckdb.OrderedMap, +// duckdb.Map) are handled via the RegisterAppender hook so duckdbservice +// can register them at init time without arrowmap depending on duckdb-go. package arrowmap import ( "database/sql" + "encoding/hex" "fmt" + "math/big" "strings" + "sync/atomic" + "time" + "unicode/utf8" "github.com/apache/arrow-go/v18/arrow" + "github.com/apache/arrow-go/v18/arrow/array" + "github.com/apache/arrow-go/v18/arrow/decimal128" ) // DuckDBTypeToArrow maps a DuckDB type name to an Arrow DataType. @@ -287,3 +298,299 @@ func QuoteIdent(ident string) string { escaped := strings.ReplaceAll(ident, `"`, `""`) return `"` + escaped + `"` } + +// OrderedMapValue represents an Arrow MAP as parallel key/value slices, +// preserving insertion order. Using parallel slices instead of a Go map +// avoids panics on non-comparable key types (e.g., []byte from BLOB keys) +// and preserves the source MAP ordering. +// +// Lives in arrowmap so AppendValue can switch on it without depending on +// the server package (which transitively links libduckdb). The flight +// executor in the server package re-exports it as server.OrderedMapValue +// via a type alias for backward compatibility. +type OrderedMapValue struct { + Keys []any + Values []any +} + +// Appender is a hook that handles append for value types arrowmap doesn't +// know about (typically driver-specific types like duckdb.Interval). It +// reports whether it handled the value; arrowmap.AppendValue falls back to +// its built-in handling when no registered Appender claims the value. +// +// Hooks must be safe to call concurrently and must not panic. They run in +// registration order; the first one to return true wins. +type Appender func(builder array.Builder, val any) (handled bool) + +// appenders is loaded once into an atomic.Value as []Appender. Reads on the +// hot path (AppendValue) are lock-free; registrations rebuild the slice. +// Registrations are expected to happen at init time so contention is rare. +var appenders atomic.Value // []Appender + +// RegisterAppender adds a hook that AppendValue will consult before falling +// back to its built-in value-type handling. Intended for use from package +// init() functions in importers that own driver-specific value types +// (e.g., duckdbservice registers handlers for duckdb.Interval, Decimal, +// UUID, OrderedMap, and Map). +func RegisterAppender(a Appender) { + if a == nil { + return + } + cur, _ := appenders.Load().([]Appender) + next := make([]Appender, 0, len(cur)+1) + next = append(next, cur...) + next = append(next, a) + appenders.Store(next) +} + +// AppendValue appends a value to an Arrow array builder with type coercion. +// It first asks any registered Appender hooks (see RegisterAppender), then +// falls back to handling the standard Arrow / Go value types itself. +func AppendValue(builder array.Builder, val any) { + if val == nil { + builder.AppendNull() + return + } + if hooks, _ := appenders.Load().([]Appender); len(hooks) > 0 { + for _, h := range hooks { + if h(builder, val) { + return + } + } + } + appendBuiltin(builder, val) +} + +// appendBuiltin handles the value types arrowmap knows about natively +// (everything that doesn't depend on a database driver package). +func appendBuiltin(builder array.Builder, val any) { + switch b := builder.(type) { + case *array.Int64Builder: + switch v := val.(type) { + case int64: + b.Append(v) + case int32: + b.Append(int64(v)) + case int: + b.Append(int64(v)) + default: + b.AppendNull() + } + case *array.Int32Builder: + switch v := val.(type) { + case int32: + b.Append(v) + case int64: + b.Append(int32(v)) + case int: + b.Append(int32(v)) + default: + b.AppendNull() + } + case *array.Int16Builder: + switch v := val.(type) { + case int16: + b.Append(v) + case int32: + b.Append(int16(v)) + default: + b.AppendNull() + } + case *array.Int8Builder: + switch v := val.(type) { + case int8: + b.Append(v) + case int32: + b.Append(int8(v)) + default: + b.AppendNull() + } + case *array.Uint8Builder: + switch v := val.(type) { + case uint8: + b.Append(v) + case uint16: + b.Append(uint8(v)) + default: + b.AppendNull() + } + case *array.Uint16Builder: + switch v := val.(type) { + case uint16: + b.Append(v) + case uint32: + b.Append(uint16(v)) + default: + b.AppendNull() + } + case *array.Uint32Builder: + switch v := val.(type) { + case uint32: + b.Append(v) + case uint64: + b.Append(uint32(v)) + default: + b.AppendNull() + } + case *array.Uint64Builder: + switch v := val.(type) { + case uint64: + b.Append(v) + default: + b.AppendNull() + } + case *array.Float64Builder: + switch v := val.(type) { + case float64: + b.Append(v) + case float32: + b.Append(float64(v)) + default: + b.AppendNull() + } + case *array.Float32Builder: + switch v := val.(type) { + case float32: + b.Append(v) + case float64: + b.Append(float32(v)) + default: + b.AppendNull() + } + case *array.BooleanBuilder: + if v, ok := val.(bool); ok { + b.Append(v) + } else { + b.AppendNull() + } + case *array.Date32Builder: + switch v := val.(type) { + case time.Time: + // Floor division to handle pre-epoch dates correctly. + // Go's integer division truncates toward zero, but Date32 + // needs days since epoch rounded toward negative infinity. + unix := v.Unix() + days := unix / 86400 + if unix%86400 < 0 { + days-- + } + b.Append(arrow.Date32(days)) + default: + b.AppendNull() + } + case *array.TimestampBuilder: + switch v := val.(type) { + case time.Time: + b.AppendTime(v) + default: + b.AppendNull() + } + case *array.Time64Builder: + switch v := val.(type) { + case time.Time: + micros := int64(v.Hour())*3600000000 + int64(v.Minute())*60000000 + + int64(v.Second())*1000000 + int64(v.Nanosecond())/1000 + b.Append(arrow.Time64(micros)) + default: + b.AppendNull() + } + case *array.MonthDayNanoIntervalBuilder: + // arrowmap natively handles arrow.MonthDayNanoInterval; driver-specific + // interval types (e.g., duckdb.Interval) come in via Appender hooks. + switch v := val.(type) { + case arrow.MonthDayNanoInterval: + b.Append(v) + default: + b.AppendNull() + } + case *array.Decimal128Builder: + switch v := val.(type) { + case *big.Int: + b.Append(decimal128.FromBigInt(v)) + default: + b.AppendNull() + } + case *array.FixedSizeBinaryBuilder: + switch v := val.(type) { + case []byte: + b.Append(v) + default: + b.AppendNull() + } + case *array.ListBuilder: + switch v := val.(type) { + case []any: + b.Append(true) + vb := b.ValueBuilder() + for _, elem := range v { + AppendValue(vb, elem) + } + default: + b.AppendNull() + } + case *array.StructBuilder: + switch v := val.(type) { + case map[string]any: + b.Append(true) + st := b.Type().(*arrow.StructType) + for i := 0; i < st.NumFields(); i++ { + fieldVal, ok := v[st.Field(i).Name] + if !ok { + b.FieldBuilder(i).AppendNull() + } else { + AppendValue(b.FieldBuilder(i), fieldVal) + } + } + default: + b.AppendNull() + } + case *array.MapBuilder: + switch v := val.(type) { + case OrderedMapValue: + b.Append(true) + kb, ib := b.KeyBuilder(), b.ItemBuilder() + for i, k := range v.Keys { + AppendValue(kb, k) + AppendValue(ib, v.Values[i]) + } + case map[any]any: + b.Append(true) + kb, ib := b.KeyBuilder(), b.ItemBuilder() + for k, item := range v { + AppendValue(kb, k) + AppendValue(ib, item) + } + default: + b.AppendNull() + } + case *array.StringBuilder: + switch v := val.(type) { + case string: + b.Append(v) + case []byte: + // 16-byte non-UTF-8 input is heuristically formatted as a UUID + // string. This pairs with DuckDBTypeToArrow("UUID") returning + // String — duckdb's Go driver returns []byte (not duckdb.UUID) + // when scanning UUID columns into interface{}. + if len(v) == 16 && !utf8.Valid(v) { + s := hex.EncodeToString(v) + b.Append(s[0:8] + "-" + s[8:12] + "-" + s[12:16] + "-" + s[16:20] + "-" + s[20:32]) + } else { + b.Append(string(v)) + } + default: + b.Append(fmt.Sprintf("%v", v)) + } + case *array.BinaryBuilder: + switch v := val.(type) { + case []byte: + b.Append(v) + case string: + b.Append([]byte(v)) + default: + b.AppendNull() + } + default: + builder.AppendNull() + } +} diff --git a/server/flight_executor.go b/server/flight_executor.go index 1c9503cb..f1846df6 100644 --- a/server/flight_executor.go +++ b/server/flight_executor.go @@ -19,6 +19,7 @@ import ( "github.com/apache/arrow-go/v18/arrow/flight" "github.com/apache/arrow-go/v18/arrow/flight/flightsql" "github.com/apache/arrow-go/v18/arrow/memory" + "github.com/posthog/duckgres/duckdbservice/arrowmap" "google.golang.org/grpc" "google.golang.org/grpc/credentials/insecure" "google.golang.org/grpc/metadata" @@ -31,14 +32,11 @@ const MaxGRPCMessageSize = 1 << 30 // 1GB // ErrWorkerDead is returned when the backing worker process has crashed. var ErrWorkerDead = errors.New("flight worker is dead") -// OrderedMapValue represents a DuckDB MAP as parallel key/value slices, -// preserving insertion order from Arrow MAP arrays. Using parallel slices -// instead of a Go map avoids panics on non-comparable key types (e.g., -// []byte from BLOB keys) and preserves DuckDB's MAP ordering. -type OrderedMapValue struct { - Keys []any - Values []any -} +// OrderedMapValue is an alias for arrowmap.OrderedMapValue. The type was +// moved into arrowmap so AppendValue's MAP branch can switch on it without +// arrowmap depending on the server package. The alias preserves the +// existing server.OrderedMapValue spelling for current call sites. +type OrderedMapValue = arrowmap.OrderedMapValue // FlightExecutor implements QueryExecutor backed by an Arrow Flight SQL client. // It routes queries to a duckdb-service worker process over a Unix socket. diff --git a/server/flightsqlingress/ingress.go b/server/flightsqlingress/ingress.go index 911afbd1..6d0d84ce 100644 --- a/server/flightsqlingress/ingress.go +++ b/server/flightsqlingress/ingress.go @@ -22,7 +22,6 @@ import ( "github.com/apache/arrow-go/v18/arrow/flight/flightsql" "github.com/apache/arrow-go/v18/arrow/flight/flightsql/schema_ref" "github.com/apache/arrow-go/v18/arrow/memory" - "github.com/posthog/duckgres/duckdbservice" "github.com/posthog/duckgres/duckdbservice/arrowmap" "github.com/posthog/duckgres/server" "google.golang.org/grpc" @@ -2151,7 +2150,7 @@ func rowSetToRecord(alloc memory.Allocator, rows server.RowSet, schema *arrow.Sc } for i, val := range values { - duckdbservice.AppendValue(builder.Field(i), val) + arrowmap.AppendValue(builder.Field(i), val) } count++ } From 1bb5fcb7ea2982a2f9943154eb8b3081d2e15302 Mon Sep 17 00:00:00 2001 From: James Greenhill Date: Fri, 1 May 2026 09:04:58 -0700 Subject: [PATCH 2/2] refactor(server): extract auth and sysinfo helpers into pure subpackages (#483) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * fix(controlplane): stamp warm idle workers with creating CP id to stop orphan churn (#469) workerRecordFor used to clear OwnerCPInstanceID whenever state==Idle, so every freshly-spawned warm worker landed in the runtime store with an empty owner. That row matched ListOrphanedWorkers case (2) (NULLIF(owner_cp_instance_id, '') IS NULL AND last_heartbeat_at <= before) the moment it crossed the 30s orphan grace, because nothing refreshes last_heartbeat_at for an idle row in the warm pool. The janitor retired the worker, reconcileWarmCapacity replaced it, and the loop ran continuously. Persisting warm workers with the creating CP's instance id moves them to case (1), which is bounded by the active CP's 5s heartbeat instead. If the CP genuinely dies, the existing CP-expiry path still flips its rows to expired and the orphan grace then applies as designed. OrgID is still cleared on the idle transition. Co-authored-by: Claude Opus 4.7 (1M context) * fix(cache-proxy): forward origin status, body, and headers verbatim (#470) * test(cache-proxy): cover origin status passthrough (RED) Five new tests plus an update to the existing TestHandleProxyOriginError, all asserting that the cache proxy must forward upstream status codes, bodies, and response headers verbatim instead of collapsing every non-2xx into a 502. The user case that motivated this: S3 returns 400 with ExpiredToken in an XML envelope. The proxy was rewriting that to 502 with a Go-formatted error string, which (a) made DuckDB's httpfs treat a terminal auth failure as transient and retry it indefinitely, and (b) hid the real S3 error class from operators. New cases: - 5xx forwarded verbatim (replaces the old 502-asserting test) - 400 forwarded verbatim, body + Content-Type + X-Amz-Request-Id preserved - 404 forwarded verbatim - 416 forwarded verbatim with Content-Range preserved - error responses are NOT cached (the next request hits origin) - pure network errors (no HTTP response) still get 502 (the only case where 502 is correct, since there's no upstream status to forward) Tests fail with the current implementation; the next commit makes them pass. * fix(cache-proxy): forward origin status, body, and headers verbatim (GREEN) The proxy was previously translating every non-2xx upstream response into a 502 Bad Gateway with a Go-formatted error string. That: 1. Hid the real S3 error class — DuckDB's httpfs treats 5xx as transient and retries, so a terminal 4xx (e.g. an ExpiredToken auth failure) was being retried indefinitely instead of failing fast and surfacing the real cause. 2. Stripped the XML error envelope DuckLake parses, replacing it with a Go error string DuckLake doesn't understand. 3. Dropped headers (Content-Type, X-Amz-Request-Id, Content-Range) that DuckDB and operators rely on. This change introduces `originStatusError`, a typed error returned by fetchOrigin whenever the upstream responds with status >= 400. The caller in HandleProxy detects it via errors.As and forwards the captured status code, body (up to 1 MiB), and headers (minus hop-by-hop) back to the client unchanged. Pure transport errors (DNS, connection refused, TLS, timeout) keep returning 502 — there's no upstream status to forward in that case, and 5xx is what httpfs's transient-retry policy is designed for. Tests in the previous commit covered: - 5xx forwarded verbatim (replaces the old 502-asserting test) - 400 forwarded verbatim with XML body + Content-Type + amz headers - 404 forwarded verbatim - 416 forwarded verbatim with Content-Range preserved - error responses are NOT cached (the next request hits origin) - true network errors still get 502 * feat(controlplane): persist STS expiry and refresh credentials before they go stale (#471) Closes the worker credential expiration bug that surfaced in mw-prod-us (ExpiredToken errors after the 1h STS session-duration boundary). Schema: - worker_records gets s3_credentials_expires_at (nullable, indexed). It's set by the activator after a successful STS AssumeRole + ActivateTenant and consulted by the credential refresh scheduler to pick workers near expiry. NULL is treated as 'due now' so legacy / unstamped rows get refreshed eagerly. - UpsertWorkerRecord's DoUpdates list includes the new column. Configstore methods: - ListWorkersDueForCredentialRefresh(ownerCPInstanceID, cutoff) returns workers we own in active org-bound states whose creds expire by the cutoff or have NULL expiry. - MarkCredentialsRefreshed(workerID, cpInstanceID, expectedEpoch, newExpiresAt) is a conditional UPDATE that only stamps when ownership is still ours. - BumpWorkerEpoch(workerID, cpInstanceID, expectedEpoch) atomically increments owner_epoch on a worker we already own (used before re-sending ActivateTenant for refresh — the worker's reuseExistingActivation guard requires payload.OwnerEpoch > current). - Postgres-backed regression coverage for all three (owner-scoping, NULL-as-due, healthy/neutral/terminal exclusions, conditional update failure modes). Worker side: - Skip the in-process StartCredentialRefresh ticker in shared-warm (multi-tenant) mode. That ticker (a) ran on the session's pinned *sql.Conn and serialized behind user queries — a 1h+ query starved the refresh until creds had already expired — and (b) hit an else branch that swapped the org's STS-brokered config secret for a credential_chain one DuckDB can't satisfy on EKS Pod Identity. The control plane drives refreshes via re-activation now; the standalone single-tenant path keeps the ticker. Activator (shared_worker_activator.go): - TenantActivationPayload.S3CredentialsExpiresAt carries the STS Expiration through from buildDuckLakeConfigFromDuckling. Static-cred warehouses (config-store path) leave it nil. - ActivateReservedWorker stamps the expiry on the worker_records row via runtimeStore.MarkCredentialsRefreshed after a successful activation. Best-effort: failure here doesn't fail activation. - New RefreshCredentials method bumps owner_epoch atomically, sends ActivateTenant with the bumped epoch and freshly-brokered creds, and stamps the new expiry. Skips static-cred orgs. Janitor scheduler: - New refreshExpiringCredentials lambda (leader-only) lists workers whose creds expire within stsSessionDuration/2 (= 30 min today), resolves the in-memory ManagedWorker, and calls activator.RefreshCredentials. - Persists state in worker_records, so a CP failover doesn't lose scheduling — the next leader queries the same column. - A worker not in the leader's local pool (mid-takeover, mid-retire) is skipped; it'll be picked up on the next tick if it's still ours. * feat(controlplane): protect active sessions across CP shutdowns (#472) * test(controlplane): protect active sessions from CP shutdown (RED) Failing tests for the two pieces of state-store + worker-pool behavior that need to change so a CP rollout doesn't kill in-flight customer queries — extends the worker 40761 incident analysis to a regression suite. - TestShutdownAll_SparesWorkersWithActiveSessions: a worker with activeSessions > 0 must be skipped by ShutdownAll. Today the chain marks-draining and pod-deletes every owned worker, killing in-flight queries when the CP receives SIGTERM. - TestListOrphanedWorkersExcludesWorkersWithActiveFlightSessions: a worker with a Flight session record in active state must be spared by ListOrphanedWorkers even if its owning CP has expired. Without this, peer CPs' janitors retire workers the customer can still reclaim by reconnect. - TestListOrphanedWorkersIncludesWorkersWithReconnectingFlightSessions: same protection extends to records in reconnecting state — that's exactly when the customer is in the middle of picking the session back up. - TestListOrphanedWorkersIncludesWorkersWithExpiredFlightSessions: once the Flight session record is terminal (expired/closed) the customer can't reclaim, so the worker should be retired normally. Bounds the protection so a stuck row can't pin a worker forever. Tests fail with the current implementation; the next commit makes them pass. * feat(controlplane): protect active sessions across CP shutdowns (GREEN) Three interlocking changes that keep an in-flight customer query from being killed by a CP rollout. Together they remove the timing race behind the worker-40761 incident: a query running at the moment the old CP gives up (whether at 15min, 8h, or any other timeout) survives. Layer 1: CP doesn't exit while sessions are active. - HandoverDrainTimeout default flips from 15m to 0 in remote mode (= unbounded; k8s terminationGracePeriodSeconds is the only wall). The previous self-imposed wall was the proximate cause of pod-deletes during long-tail drains. Process mode keeps 24h since there's no k8s safety net. - waitForDrain treats timeout==0 as unbounded via context.Background(). - Janitor's ExpireDrainingControlPlaneInstances is now disabled in remote mode (it gates on j.maxDrainTimeout > 0). A draining CP that is still heartbeating stays "alive" to its peers; only stale heartbeat — handled by the existing ExpireControlPlaneInstances — marks a CP dead. This prevents peer CPs from forcibly expiring a draining CP and orphan-retiring its workers. Layer 2: ShutdownAll skips workers with active sessions. - Defense in depth for the case where the CP exits anyway (k8s SIGKILL after gracePeriod, or future code paths). A worker with activeSessions > 0 is left running in 'hot' state owned by the dying CP. The pod survives, the Flight client can reconnect by session token, and a peer CP can claim via TakeOverWorker. - Workers preserved this way stay in the in-memory pool until process exit so any residual session bookkeeping during the shutdown window still finds them. Layer 3: orphan janitor spares workers with reclaimable Flight sessions. - ListOrphanedWorkers gains a NOT EXISTS filter against flight_session_records: a row with at least one session in active or reconnecting state is left alone. Bounds the protection — once ExpireFlightSessionRecords moves the session to expired/closed, the worker is retired normally on the next sweep. Behavior with all three layers: - Customer mid-query during CP roll → old CP's drainAndShutdown waits for sessions → if customer finishes, clean exit; if k8s SIGKILL fires, worker survives via Layer 2 + Layer 3. - Customer Flight client reconnects within session TTL → peer CP claims via TakeOverWorker, query resumes. - Customer never reconnects → flight_session_records expires (TTL default 1h) → next orphan sweep retires the worker normally. - Pgwire customer connected to dying CP → connection dies when CP exits regardless. Worker preservation is moot for them but harmless; Layer 1 still extends the CP's life so most short queries finish. * feat(server): split query-error logs by SQLSTATE class (user vs infra) (#473) * feat(server): split query-error logs by SQLSTATE class (user vs infra) The single "Query execution failed." Error log line was alerting- hostile: a customer typo'ing a column name produced the same Error that a worker crash did, drowning real infra failures in user-error noise. Split into two distinguishable lines using the SQLSTATE we already compute for the pgwire error response — no new string matching: Info "Query execution failed." for SQLSTATE classes {0A, 22, 23, 25, 28, 2B, 3D, 3F, 42, 44} plus 57014 (client cancellation) Error "Query execution errored." for everything else (08, 53, 54, 57 except 57014, 58, XX, …) Mechanically: - New isUserQueryError(err) inspects the existing classifyErrorCode output and matches the class against a closed allow-list. Closed- list semantics so future SQLSTATEs we haven't catalogued err toward Error (the safe direction for alerting). - 57014 query_canceled short-circuits to user (its parent class 57 is otherwise infra; client-pressed-Ctrl-C is user-attributable). - DuckLake transaction conflict and metadata-connection-lost paths are unchanged — they keep their own Warn lines because retry tooling cares about those signals specifically. classifyErrorCode default fallback flips from "42000" to "XX000": unknown errors (no DuckDB prefix matched) are typically infra (gRPC, network, internal panics), not syntax errors. The previous default made every unknown error look like a user error in the alert path. Two existing test cases that asserted the old fallback are updated accordingly with comments explaining why. New tests in TestIsUserQueryError pin every user-class branch plus representative infra cases and edge cases (nil error, 57014 short- circuit, 40001 sitting in infra-side after the early-return). * feat(server): log query start/finish on the worker boundary with trace_id Two new Info-level slog lines around the worker dispatch in executeSelectQuery so an operator can correlate logs with traces and filter to a specific worker: Info "Query started." user=… query=… worker=N worker_pod=… trace_id= Info "Query finished." user=… duration_ms=… rows=N worker=N worker_pod=… trace_id= [error=… on failures] The worker / worker_pod attributes match what "Query execution failed." already emits — same shape so a search like worker=40761 surfaces the full lifecycle on a single worker. trace_id matches the OTEL trace ID exported for the same query (see server/tracing.go's existing traceIDFromContext helper), so a Loki query with trace_id=abc123 lines up directly with the trace view. logQueryFinished stays Info even on error so the start/finish pair is always balanced in the log stream — severity routing for failures is done by logQueryError separately (Info for user errors, Error for infra). Operators following one trace see started + finished + an optional separate severity line. Initial use sites are limited to executeSelectQuery (the SELECT path that's the bulk of customer queries and the one that fired in the worker-40761 incident). The non-returning path in executeSimpleQuery and the prepared-statement path still emit only the existing logQuery (structured query log) — they can be wired up incrementally if the lifecycle lines prove useful in production. * fix(controlplane): refresh STS credentials per CP, not only on the janitor leader (#474) The credential-refresh scheduler added in #471 was wired into the janitor's leader-only loop. Workers owned by non-leader CPs therefore never had their S3 session tokens re-brokered, and any long-running query on those workers hit ExpiredToken once the original 1h STS session lapsed. Move the scheduler out of the janitor and into a per-CP background goroutine spawned from SetupMultiTenant. Each CP refreshes only the workers it owns (filtered by cpInstanceID in the SQL), regardless of leader status. The new credentialRefreshScheduler type wraps the existing tick logic so it's independently testable. * Bypass transpiler for file COPY statements (#475) * docs: clarify k8s worker reuse policy (#476) * fix(server): skip DuckLake index ensure when all indexes already exist (#478) Replaces the 9 sequential CREATE INDEX IF NOT EXISTS round-trips on every fresh worker pod's first catalog attach with a single pg_indexes existence check. CREATE INDEX IF NOT EXISTS is a no-op at the storage layer once the index exists, but still costs a server round-trip; under pgbouncer transaction pooling that round-trip can take 1-2s during burst load (server-conn handover + TLS handshake). Empirically 16-24s in the prod posthog metastore. Collapsing to one round-trip cuts the post-attach window to ~150-300ms in the steady state, which matters because the control plane's session-init context is bounded at 5s — the slow path was overrunning the budget and surfacing as "failed to detect ducklake catalog attachment" FATALs on every warm-pool replenishment. The slow path is unchanged and still self-heals if any index goes missing. Indexes are now declared as a name+stmt slice so the fast-path lookup names can't drift from the slow-path statements. Co-authored-by: Claude Opus 4.7 (1M context) * fix timestamp aliases in Arrow schema mapping (#479) * fix(duckdbservice): isolate session-cleanup contexts and discard poisoned conns (#481) Three related changes to cleanupSessionState that together fix a worker poisoning chain after cancelled queries. When a query is cancelled mid-execution, DuckDB can leave the underlying connection in an aborted/INTERRUPT'd state where every subsequent statement returns "INTERRUPT Error: Interrupted!" until ROLLBACK runs. The cleanup path called by DestroySession then runs SELECT + DROP statements against this poisoned connection inside a single 5-second context. The SELECT hits the aborted-state error or burns the whole budget; every following DROP then fails instantly with context deadline exceeded. The connection is returned to the *sql.DB pool dirty, and the next session that picks it up fails its first metadata operation (e.g. "USE memory") under the control plane's 5s session-init context. From the client this surfaces as "failed to initialize session database metadata" or "failed to detect ducklake catalog attachment" — same symptoms as the warm-pool churn case fixed in #478, but a different root cause that the previous PR doesn't address. Three changes: - ROLLBACK first to clear any aborted-transaction state before running the cleanup queries. Same pattern that initSearchPath already uses. - Per-step contexts: 3s for the enumeration SELECT, 1s for each DROP, separately. A slow SELECT no longer eats the budget for all DROPs; one slow DROP no longer poisons the rest. - cleanupSessionState now returns a clean/dirty signal. If cleanup didn't complete cleanly, DestroySession marks the conn bad via Conn.Raw returning driver.ErrBadConn, so database/sql discards it from the pool instead of handing it to the next session. Reproduced the poisoning chain on the dev cluster: a clickbench scan cancelled mid-run caused a downstream session to fail with "failed to initialize session database metadata: switch to memory catalog: flight execute update: deadline exceeded", followed by the worker being marked unresponsive and reaped. Co-authored-by: Claude Opus 4.7 (1M context) * fix(duckdbservice): discard pooled conn after every session in cluster mode (#485) * fix(duckdbservice): discard pooled conn after every session in cluster mode In cluster mode every worker is bound to a single org via activateTenant, so the security boundary aligns with the worker lifecycle and there's no need to scrub per-conn state on session teardown to protect against cross-org leakage. The existing cleanup loop is two things: - Mostly wasted work in cluster mode. A typical billing-style session (open / SELECT 1 / close) creates zero user temp objects, but the cleanup still enumerates duckdb_views() and issues ~46 DROP IF EXISTS no-ops against system views that live in main / information_schema / pg_catalog (the DROP statements target the temp schema, so they're lookups against an empty namespace). - Incomplete. It handles temp tables and temp views but not temp macros, temp types, or temp sequences. Those still leak across pooled-conn reuse — verified on dev: a TEMP MACRO created in session A was visible in session B when the pool happened to hand back the same driver conn. Cluster mode now skips the cleanup loop entirely and always marks the conn bad via Conn.Raw → driver.ErrBadConn so database/sql evicts it from the pool. The next session opens a fresh DuckDB connection — no temp leakage of any kind. Standalone mode keeps the existing cleanupSessionState path because pooled conns can be reused across orgs and scrubbing is required. The post-#481 conn-discard fallback is preserved when cleanup fails. Empirically motivated: prod was reaping ~29 workers/hour because bursts of session-end events (e.g. 16 billing queries arriving at once) ran hundreds of cleanup round-trips concurrently on a single worker, blocking the gRPC health-check long enough to fail 3× and trip the unresponsive- worker reaper. The cleanup itself was correct (post-#481) but the aggregate throughput was the bottleneck. Skipping the loop in cluster mode collapses ~48 round-trips per session-end to 0. Co-Authored-By: Claude Opus 4.7 (1M context) * refactor(duckdbservice): wrap conn-pool eviction behind a named helper driver.ErrBadConn is the standard idiom for evicting a *sql.Conn from the *sql.DB pool, but the name is misleading at call sites — the conn isn't actually broken, we just don't want it reused. Go acknowledges the API gap (golang/go#40722) but hasn't fixed it. Wrap the Conn.Raw + ErrBadConn dance behind a named helper (evictConnFromPool) with a comment explaining why this is the standard idiom despite the awkward name. Update both call sites in DestroySession plus the log key (discarded -> evicted) to match. Co-Authored-By: Claude Opus 4.7 (1M context) * test(k8s): include hot_idle in findActiveOrgWorkerPodSince The helper polls cp_runtime.worker_records for workers bound to a given org. The state filter previously excluded hot_idle, on the assumption that the test would always catch a worker mid-session. With faster session teardown (cluster-mode conn eviction), workers can transition hot -> hot_idle before the polling loop runs, leaving the test unable to find the worker that just served the org. hot_idle workers are still bound to the org — org_id stays set on the runtime record while the worker is parked for fast re-claim by the same org — so they're the correct answer to 'which worker handled this org just now?' Adding hot_idle to the state list makes the helper robust to session-teardown speed. Co-Authored-By: Claude Opus 4.7 (1M context) --------- Co-authored-by: Claude Opus 4.7 (1M context) * refactor(duckdbservice): extract DuckDB-free arrow helpers into arrowmap subpackage Step 1 of a 4-step plan to split duckgres into separate control-plane and worker binaries so the control plane can stop linking libduckdb (and the worker image can ship per-DuckDB-version variants). This change moves the DuckDB-free helpers (DuckDBTypeToArrow, QualifyTableName, QuoteIdent, SupportsLimit, and the struct/map/decimal type-string parsers) out of duckdbservice/arrow_helpers.go into a new duckdbservice/arrowmap package. arrowmap has zero dependency on github.com/duckdb/duckdb-go; it's plain string-to-arrow.DataType mapping. The duckdbservice package keeps the helpers callable as before via thin re-export shims so existing call sites are unaffected. server/flightsqlingress is updated to import arrowmap directly for the three call sites that don't need DuckDB types (the AppendValue call site stays on duckdbservice for now; splitting AppendValue's duckdb-bound branches lands in a follow-up). This PR is purely a refactor — no behavior changes, no new binary, no CI changes. It establishes the package boundary and pattern that subsequent PRs build on. Co-Authored-By: Claude Opus 4.7 (1M context) * refactor(arrowmap): split AppendValue into duckdb-free core + driver hooks Step 2 of the binary-split plan. Moves AppendValue out of duckdbservice into arrowmap so callers that don't need duckdb-go's driver value types (duckdb.Interval, Decimal, UUID, OrderedMap, Map) can use it without pulling libduckdb into their import graph. The split uses a registration hook rather than a build tag: - arrowmap.AppendValue handles all arrow-native and Go-native value types directly. It first consults any registered Appender hooks before falling back to the built-in switch. - duckdbservice/appender_init.go's init() registers a single hook that handles duckdb.Interval / Decimal / UUID / OrderedMap / Map. - When duckdbservice is linked into a binary (worker, standalone), the duckdb types get full coverage automatically. When it isn't (a future controlplane-only binary), the duckdb cases are dead code paths that wouldn't fire anyway because duckdb-go is the only thing producing those typed values. OrderedMapValue moved from server/flight_executor.go into arrowmap so the MAP branch in arrowmap.AppendValue can switch on it without arrowmap depending on the server package. server.OrderedMapValue is preserved as a type alias for backward compatibility with existing call sites. server/flightsqlingress no longer imports duckdbservice — it uses arrowmap.AppendValue directly. The package still transitively pulls duckdb-go via its `server` import; getting `server` itself duckdb-free is the next chunk of work. Verified: - go build ./... clean - go build -tags kubernetes ./... clean - go test -short ./duckdbservice/... ./server/flightsqlingress/... all green - go list -deps ./duckdbservice/arrowmap | grep duckdb-go is empty (arrowmap remains a pure leaf even with AppendValue + OrderedMapValue living in it) Co-Authored-By: Claude Opus 4.7 (1M context) * refactor(server): extract auth and sysinfo helpers into pure subpackages Step 3 of the binary-split plan. Carves the duckdb-free pieces of the server package into focused subpackages so the eventual server-package split (PR #4+) has less tangled state to deal with. server/auth/ — RateLimiter, RateLimitConfig, ValidateUserPassword, BeginRateLimitedAuthAttempt, RecordFailed/SuccessfulAuth, plus the auth-related Prometheus metrics (auth_failures, rate_limit_rejects, rate_limited_ips). server/sysinfo/ — SystemMemoryBytes, AutoMemoryLimit, ParseMemoryBytes, ValidateMemoryLimit. The /proc/meminfo reader and the DuckDB-style "4GB"/"512MB" string parser/validator. Both new packages have zero dependency on github.com/duckdb/duckdb-go (verified with `go list -deps`), so the eventual control-plane-only binary will be able to use them without linking libduckdb. Backward compatibility is preserved via type aliases and re-export `var`s in server/auth_aliases.go and server/sysinfo_aliases.go. Existing references to server.RateLimiter, server.NewRateLimiter, server.ValidateUserPassword, server.ParseMemoryBytes, etc. continue to compile without touching their call sites. New code should import the subpackages directly. Three Prometheus metrics moved with the rate-limit code: - duckgres_auth_failures_total - duckgres_rate_limit_rejects_total - duckgres_rate_limited_ips The two used outside the auth package itself (RateLimitRejectsCounter, AuthFailuresCounter) are exported; rateLimitedIPsGauge stays private. Verified: - go build ./... clean - go build -tags kubernetes ./... clean - go test -short ./server/auth/... ./server/sysinfo/... ./server/... ./controlplane/... — all green (pre-existing testcontainer Postgres failures in controlplane/admin/ unrelated, requires Docker) - go list -deps ./server/auth | grep duckdb-go is empty - go list -deps ./server/sysinfo | grep duckdb-go is empty Co-Authored-By: Claude Opus 4.7 (1M context) --------- Co-authored-by: Eric Duong Co-authored-by: Claude Opus 4.7 (1M context) Co-authored-by: Benjamin Knofe-Vider Co-authored-by: Bill Guowei Yang --- server/auth/metrics.go | 30 ++++++++ server/{auth_policy.go => auth/policy.go} | 8 +- .../policy_test.go} | 2 +- server/{ => auth}/ratelimit.go | 76 +++++++++---------- server/{ => auth}/ratelimit_test.go | 2 +- server/auth_aliases.go | 22 ++++++ server/conn.go | 3 +- server/server.go | 23 ++---- server/{ => sysinfo}/sysinfo.go | 11 ++- server/{ => sysinfo}/sysinfo_test.go | 16 ++-- server/sysinfo_aliases.go | 14 ++++ server/worker.go | 6 +- 12 files changed, 132 insertions(+), 81 deletions(-) create mode 100644 server/auth/metrics.go rename server/{auth_policy.go => auth/policy.go} (95%) rename server/{auth_policy_test.go => auth/policy_test.go} (99%) rename server/{ => auth}/ratelimit.go (81%) rename server/{ => auth}/ratelimit_test.go (99%) create mode 100644 server/auth_aliases.go rename server/{ => sysinfo}/sysinfo.go (88%) rename server/{ => sysinfo}/sysinfo_test.go (89%) create mode 100644 server/sysinfo_aliases.go diff --git a/server/auth/metrics.go b/server/auth/metrics.go new file mode 100644 index 00000000..30794e1f --- /dev/null +++ b/server/auth/metrics.go @@ -0,0 +1,30 @@ +package auth + +import ( + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" +) + +// Auth-related Prometheus metrics. Defined here (rather than in the larger +// server package's metrics block) so this package is self-contained — it can +// be imported and built without pulling in the rest of server/. + +// AuthFailuresCounter is exported so the wire-protocol code in server can +// also bump it when sending FATAL/Class-28 error responses to the client. +var AuthFailuresCounter = promauto.NewCounter(prometheus.CounterOpts{ + Name: "duckgres_auth_failures_total", + Help: "Total number of authentication failures", +}) + +// RateLimitRejectsCounter is exported so the connection-handling code in +// server can also bump it when rejecting a connection at the rate-limit +// gate (before any auth attempt happens). +var RateLimitRejectsCounter = promauto.NewCounter(prometheus.CounterOpts{ + Name: "duckgres_rate_limit_rejects_total", + Help: "Total number of connections rejected due to rate limiting", +}) + +var rateLimitedIPsGauge = promauto.NewGauge(prometheus.GaugeOpts{ + Name: "duckgres_rate_limited_ips", + Help: "Number of currently rate-limited IP addresses", +}) diff --git a/server/auth_policy.go b/server/auth/policy.go similarity index 95% rename from server/auth_policy.go rename to server/auth/policy.go index 9c56f160..a7ed7e39 100644 --- a/server/auth_policy.go +++ b/server/auth/policy.go @@ -1,4 +1,4 @@ -package server +package auth import ( "crypto/subtle" @@ -16,11 +16,11 @@ func BeginRateLimitedAuthAttempt(rateLimiter *RateLimiter, remoteAddr net.Addr) } if msg := rateLimiter.CheckConnection(remoteAddr); msg != "" { - rateLimitRejectsCounter.Inc() + RateLimitRejectsCounter.Inc() return release, msg } if !rateLimiter.RegisterConnection(remoteAddr) { - rateLimitRejectsCounter.Inc() + RateLimitRejectsCounter.Inc() if msg := rateLimiter.CheckConnection(remoteAddr); msg != "" { return release, msg } @@ -35,7 +35,7 @@ func BeginRateLimitedAuthAttempt(rateLimiter *RateLimiter, remoteAddr net.Addr) // RecordFailedAuthAttempt records auth telemetry and updates rate-limit state. // Returns true when this failure causes the source IP to be banned. func RecordFailedAuthAttempt(rateLimiter *RateLimiter, remoteAddr net.Addr) bool { - authFailuresCounter.Inc() + AuthFailuresCounter.Inc() if rateLimiter == nil { return false } diff --git a/server/auth_policy_test.go b/server/auth/policy_test.go similarity index 99% rename from server/auth_policy_test.go rename to server/auth/policy_test.go index 5739a581..c5430ae2 100644 --- a/server/auth_policy_test.go +++ b/server/auth/policy_test.go @@ -1,4 +1,4 @@ -package server +package auth import ( "net" diff --git a/server/ratelimit.go b/server/auth/ratelimit.go similarity index 81% rename from server/ratelimit.go rename to server/auth/ratelimit.go index 92865a0c..1be83552 100644 --- a/server/ratelimit.go +++ b/server/auth/ratelimit.go @@ -1,4 +1,7 @@ -package server +// Package auth holds duckgres' connection rate-limiting and password +// validation policy. It has no dependency on github.com/duckdb/duckdb-go, +// so the control plane can use it without linking libduckdb. +package auth import ( "net" @@ -6,21 +9,21 @@ import ( "time" ) -// RateLimitConfig configures rate limiting behavior +// RateLimitConfig configures rate limiting behavior. type RateLimitConfig struct { - // MaxFailedAttempts is the maximum number of failed auth attempts before banning + // MaxFailedAttempts is the maximum number of failed auth attempts before banning. MaxFailedAttempts int - // FailedAttemptWindow is the time window for counting failed attempts + // FailedAttemptWindow is the time window for counting failed attempts. FailedAttemptWindow time.Duration - // BanDuration is how long to ban an IP after exceeding max failed attempts + // BanDuration is how long to ban an IP after exceeding max failed attempts. BanDuration time.Duration - // MaxConnectionsPerIP is the max concurrent connections from a single IP (0 = unlimited) + // MaxConnectionsPerIP is the max concurrent connections from a single IP (0 = unlimited). MaxConnectionsPerIP int - // MaxConnections is the total max concurrent connections (0 = unlimited) + // MaxConnections is the total max concurrent connections (0 = unlimited). MaxConnections int } -// DefaultRateLimitConfig returns sensible defaults for rate limiting +// DefaultRateLimitConfig returns sensible defaults for rate limiting. func DefaultRateLimitConfig() RateLimitConfig { return RateLimitConfig{ MaxFailedAttempts: 5, @@ -31,14 +34,14 @@ func DefaultRateLimitConfig() RateLimitConfig { } } -// ipRecord tracks connection and authentication attempts from an IP +// ipRecord tracks connection and authentication attempts from an IP. type ipRecord struct { failedAttempts []time.Time // timestamps of failed auth attempts bannedUntil time.Time // when the ban expires (zero if not banned) activeConns int // current active connections from this IP } -// RateLimiter tracks and limits connections per IP +// RateLimiter tracks and limits connections per IP. type RateLimiter struct { mu sync.Mutex config RateLimitConfig @@ -46,18 +49,17 @@ type RateLimiter struct { totalActiveConns int } -// NewRateLimiter creates a new rate limiter with the given config +// NewRateLimiter creates a new rate limiter with the given config. func NewRateLimiter(cfg RateLimitConfig) *RateLimiter { rl := &RateLimiter{ config: cfg, records: make(map[string]*ipRecord), } - // Start cleanup goroutine go rl.cleanupLoop() return rl } -// extractIP gets the IP address from a net.Addr (strips port) +// extractIP gets the IP address from a net.Addr (strips port). func extractIP(addr net.Addr) string { if addr == nil { return "" @@ -69,8 +71,8 @@ func extractIP(addr net.Addr) string { return host } -// CheckConnection checks if a connection from the given address should be allowed -// Returns an error message if the connection should be rejected, or empty string if allowed +// CheckConnection checks if a connection from the given address should be allowed. +// Returns an error message if the connection should be rejected, or empty string if allowed. func (rl *RateLimiter) CheckConnection(addr net.Addr) string { ip := extractIP(addr) if ip == "" { @@ -80,20 +82,17 @@ func (rl *RateLimiter) CheckConnection(addr net.Addr) string { rl.mu.Lock() defer rl.mu.Unlock() - // Check global connection limit if rl.config.MaxConnections > 0 && rl.totalActiveConns >= rl.config.MaxConnections { return "too many concurrent connections" } record := rl.getOrCreateRecord(ip) - // Check if IP is banned if !record.bannedUntil.IsZero() && time.Now().Before(record.bannedUntil) { remaining := time.Until(record.bannedUntil).Round(time.Second) return "too many failed authentication attempts, try again in " + remaining.String() } - // Check concurrent connection limit if rl.config.MaxConnectionsPerIP > 0 && record.activeConns >= rl.config.MaxConnectionsPerIP { return "too many connections from your IP address" } @@ -101,8 +100,8 @@ func (rl *RateLimiter) CheckConnection(addr net.Addr) string { return "" } -// RegisterConnection records a new connection from the given address -// Returns true if the connection is allowed, false otherwise +// RegisterConnection records a new connection from the given address. +// Returns true if the connection is allowed, false otherwise. func (rl *RateLimiter) RegisterConnection(addr net.Addr) bool { ip := extractIP(addr) if ip == "" { @@ -112,19 +111,16 @@ func (rl *RateLimiter) RegisterConnection(addr net.Addr) bool { rl.mu.Lock() defer rl.mu.Unlock() - // Check global connection limit if rl.config.MaxConnections > 0 && rl.totalActiveConns >= rl.config.MaxConnections { return false } record := rl.getOrCreateRecord(ip) - // Check if banned if !record.bannedUntil.IsZero() && time.Now().Before(record.bannedUntil) { return false } - // Check concurrent connection limit if rl.config.MaxConnectionsPerIP > 0 && record.activeConns >= rl.config.MaxConnectionsPerIP { return false } @@ -134,7 +130,7 @@ func (rl *RateLimiter) RegisterConnection(addr net.Addr) bool { return true } -// UnregisterConnection decrements the active connection count for an IP +// UnregisterConnection decrements the active connection count for an IP. func (rl *RateLimiter) UnregisterConnection(addr net.Addr) { ip := extractIP(addr) if ip == "" { @@ -157,8 +153,8 @@ func (rl *RateLimiter) UnregisterConnection(addr net.Addr) { } } -// RecordFailedAuth records a failed authentication attempt -// Returns true if the IP is now banned +// RecordFailedAuth records a failed authentication attempt. +// Returns true if the IP is now banned. func (rl *RateLimiter) RecordFailedAuth(addr net.Addr) bool { ip := extractIP(addr) if ip == "" { @@ -171,10 +167,8 @@ func (rl *RateLimiter) RecordFailedAuth(addr net.Addr) bool { record := rl.getOrCreateRecord(ip) now := time.Now() - // Add this failed attempt record.failedAttempts = append(record.failedAttempts, now) - // Count recent failed attempts within the window windowStart := now.Add(-rl.config.FailedAttemptWindow) recentAttempts := 0 for _, t := range record.failedAttempts { @@ -183,21 +177,22 @@ func (rl *RateLimiter) RecordFailedAuth(addr net.Addr) bool { } } - // Ban if exceeded threshold if recentAttempts >= rl.config.MaxFailedAttempts { - // Decrement if replacing an expired ban that cleanup hasn't cleared yet - if !record.bannedUntil.IsZero() && now.After(record.bannedUntil) { - rateLimitedIPsGauge.Dec() - } + // Three cases: never banned (gauge++), already banned (no change — + // already counted), or expired ban that cleanup hasn't cleared yet + // (no change — still counted, will be decremented when cleanup runs). + alreadyCounted := !record.bannedUntil.IsZero() record.bannedUntil = now.Add(rl.config.BanDuration) - rateLimitedIPsGauge.Inc() + if !alreadyCounted { + rateLimitedIPsGauge.Inc() + } return true } return false } -// RecordSuccessfulAuth clears failed attempts for an IP after successful auth +// RecordSuccessfulAuth clears failed attempts for an IP after successful auth. func (rl *RateLimiter) RecordSuccessfulAuth(addr net.Addr) { ip := extractIP(addr) if ip == "" { @@ -216,7 +211,7 @@ func (rl *RateLimiter) RecordSuccessfulAuth(addr net.Addr) { } } -// IsBanned checks if an IP is currently banned +// IsBanned checks if an IP is currently banned. func (rl *RateLimiter) IsBanned(addr net.Addr) bool { ip := extractIP(addr) if ip == "" { @@ -234,7 +229,7 @@ func (rl *RateLimiter) IsBanned(addr net.Addr) bool { return !record.bannedUntil.IsZero() && time.Now().Before(record.bannedUntil) } -// getOrCreateRecord gets or creates a record for an IP (must hold lock) +// getOrCreateRecord gets or creates a record for an IP (must hold lock). func (rl *RateLimiter) getOrCreateRecord(ip string) *ipRecord { record, ok := rl.records[ip] if !ok { @@ -244,7 +239,7 @@ func (rl *RateLimiter) getOrCreateRecord(ip string) *ipRecord { return record } -// cleanupLoop periodically cleans up expired records +// cleanupLoop periodically cleans up expired records. func (rl *RateLimiter) cleanupLoop() { ticker := time.NewTicker(5 * time.Minute) defer ticker.Stop() @@ -254,7 +249,7 @@ func (rl *RateLimiter) cleanupLoop() { } } -// cleanup removes expired records to prevent memory growth +// cleanup removes expired records to prevent memory growth. func (rl *RateLimiter) cleanup() { rl.mu.Lock() defer rl.mu.Unlock() @@ -263,7 +258,6 @@ func (rl *RateLimiter) cleanup() { windowStart := now.Add(-rl.config.FailedAttemptWindow) for ip, record := range rl.records { - // Remove expired failed attempts var validAttempts []time.Time for _, t := range record.failedAttempts { if t.After(windowStart) { @@ -272,13 +266,11 @@ func (rl *RateLimiter) cleanup() { } record.failedAttempts = validAttempts - // Clear expired bans if !record.bannedUntil.IsZero() && now.After(record.bannedUntil) { record.bannedUntil = time.Time{} rateLimitedIPsGauge.Dec() } - // Remove record if it's empty and has no active connections if len(record.failedAttempts) == 0 && record.bannedUntil.IsZero() && record.activeConns == 0 { diff --git a/server/ratelimit_test.go b/server/auth/ratelimit_test.go similarity index 99% rename from server/ratelimit_test.go rename to server/auth/ratelimit_test.go index e3f95e61..3a5afab6 100644 --- a/server/ratelimit_test.go +++ b/server/auth/ratelimit_test.go @@ -1,4 +1,4 @@ -package server +package auth import ( "net" diff --git a/server/auth_aliases.go b/server/auth_aliases.go new file mode 100644 index 00000000..ed4c3fec --- /dev/null +++ b/server/auth_aliases.go @@ -0,0 +1,22 @@ +package server + +import "github.com/posthog/duckgres/server/auth" + +// Type aliases and re-exports kept here so existing references to +// server.RateLimiter / server.RateLimitConfig / server.NewRateLimiter etc. +// continue to compile after the rate-limit and auth-policy code moved into +// server/auth. New code should import server/auth and use auth.X directly. + +type ( + RateLimiter = auth.RateLimiter + RateLimitConfig = auth.RateLimitConfig +) + +var ( + NewRateLimiter = auth.NewRateLimiter + DefaultRateLimitConfig = auth.DefaultRateLimitConfig + BeginRateLimitedAuthAttempt = auth.BeginRateLimitedAuthAttempt + RecordFailedAuthAttempt = auth.RecordFailedAuthAttempt + RecordSuccessfulAuthAttempt = auth.RecordSuccessfulAuthAttempt + ValidateUserPassword = auth.ValidateUserPassword +) diff --git a/server/conn.go b/server/conn.go index 00745f59..540e8bc4 100644 --- a/server/conn.go +++ b/server/conn.go @@ -26,6 +26,7 @@ import ( duckdb "github.com/duckdb/duckdb-go/v2" pg_query "github.com/pganalyze/pg_query_go/v6" + "github.com/posthog/duckgres/server/auth" "github.com/posthog/duckgres/transpiler" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/trace" @@ -4668,7 +4669,7 @@ func (c *clientConn) sendError(severity, code, message string) { // NOTE: If one adds a FATAL error with a non-28 code, be sure to add // a metric for it here. if strings.HasPrefix(code, "28") { - authFailuresCounter.Inc() + auth.AuthFailuresCounter.Inc() } else if severity == "ERROR" { queryErrorsCounter.WithLabelValues(c.orgID).Inc() } diff --git a/server/server.go b/server/server.go index 1c3faeab..cd93cba2 100644 --- a/server/server.go +++ b/server/server.go @@ -23,7 +23,9 @@ import ( awsconfig "github.com/aws/aws-sdk-go-v2/config" _ "github.com/duckdb/duckdb-go/v2" _ "github.com/jackc/pgx/v5/stdlib" // registers "pgx" driver for direct PostgreSQL connections + "github.com/posthog/duckgres/server/auth" "github.com/posthog/duckgres/server/ducklake" + "github.com/posthog/duckgres/server/sysinfo" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" ) @@ -130,21 +132,6 @@ var queryErrorsCounter = promauto.NewCounterVec(prometheus.CounterOpts{ Help: "Total number of failed queries", }, []string{"org"}) -var authFailuresCounter = promauto.NewCounter(prometheus.CounterOpts{ - Name: "duckgres_auth_failures_total", - Help: "Total number of authentication failures", -}) - -var rateLimitRejectsCounter = promauto.NewCounter(prometheus.CounterOpts{ - Name: "duckgres_rate_limit_rejects_total", - Help: "Total number of connections rejected due to rate limiting", -}) - -var rateLimitedIPsGauge = promauto.NewGauge(prometheus.GaugeOpts{ - Name: "duckgres_rate_limited_ips", - Help: "Number of currently rate-limited IP addresses", -}) - var queryCancellationsCounter = promauto.NewCounter(prometheus.CounterOpts{ Name: "duckgres_query_cancellations_total", Help: "Total number of queries cancelled via cancel request", @@ -850,7 +837,7 @@ func openBaseDB(cfg Config, username string) (*sql.DB, error) { // Set DuckDB memory limit memLimit := cfg.MemoryLimit if memLimit == "" { - memLimit = autoMemoryLimit() + memLimit = sysinfo.AutoMemoryLimit() } if _, err := db.Exec(fmt.Sprintf("SET memory_limit = '%s'", memLimit)); err != nil { slog.Warn("Failed to set DuckDB memory_limit.", "memory_limit", memLimit, "error", err) @@ -2085,7 +2072,7 @@ func (s *Server) handleConnection(conn net.Conn) { if msg := s.rateLimiter.CheckConnection(remoteAddr); msg != "" { // Send PostgreSQL error and close slog.Warn("Connection rejected.", "remote_addr", remoteAddr, "reason", msg) - rateLimitRejectsCounter.Inc() + auth.RateLimitRejectsCounter.Inc() _ = conn.Close() return } @@ -2093,7 +2080,7 @@ func (s *Server) handleConnection(conn net.Conn) { // Register this connection if !s.rateLimiter.RegisterConnection(remoteAddr) { slog.Warn("Connection rejected: rate limit exceeded.", "remote_addr", remoteAddr) - rateLimitRejectsCounter.Inc() + auth.RateLimitRejectsCounter.Inc() _ = conn.Close() return } diff --git a/server/sysinfo.go b/server/sysinfo/sysinfo.go similarity index 88% rename from server/sysinfo.go rename to server/sysinfo/sysinfo.go index d1ad47f6..22df4c8b 100644 --- a/server/sysinfo.go +++ b/server/sysinfo/sysinfo.go @@ -1,4 +1,7 @@ -package server +// Package sysinfo holds duckgres' system-memory detection helpers and the +// memory-limit string parser shared between the server, the control plane, +// and config resolution. No dependency on github.com/duckdb/duckdb-go. +package sysinfo import ( "bufio" @@ -52,13 +55,13 @@ var ( autoMemoryLimitValue string ) -// autoMemoryLimit computes a DuckDB memory_limit based on system memory. +// AutoMemoryLimit computes a DuckDB memory_limit based on system memory. // Formula: totalMem * 0.75, with a floor of 256MB. // Every session gets the full budget — DuckDB will spill to disk/swap if // aggregate usage exceeds physical RAM. // Returns "4GB" as a safe default if system memory cannot be detected. // The result is computed once and cached since system memory doesn't change. -func autoMemoryLimit() string { +func AutoMemoryLimit() string { autoMemoryLimitOnce.Do(func() { totalBytes := SystemMemoryBytes() if totalBytes == 0 { @@ -74,7 +77,7 @@ func autoMemoryLimit() string { limitBytes = 256 * mb } - // Format as human-readable: use GB if >= 1GB, else MB + // Format as human-readable: use GB if >= 1GB, else MB. if limitBytes >= gb { limitGB := limitBytes / gb autoMemoryLimitValue = fmt.Sprintf("%dGB", limitGB) diff --git a/server/sysinfo_test.go b/server/sysinfo/sysinfo_test.go similarity index 89% rename from server/sysinfo_test.go rename to server/sysinfo/sysinfo_test.go index 38b8dbd3..00bde27f 100644 --- a/server/sysinfo_test.go +++ b/server/sysinfo/sysinfo_test.go @@ -1,4 +1,4 @@ -package server +package sysinfo import ( "runtime" @@ -11,9 +11,9 @@ func TestAutoMemoryLimit(t *testing.T) { autoMemoryLimitOnce = sync.Once{} autoMemoryLimitValue = "" - result := autoMemoryLimit() + result := AutoMemoryLimit() if result == "" { - t.Fatal("autoMemoryLimit returned empty string") + t.Fatal("AutoMemoryLimit returned empty string") } // On Linux (CI and production), we should detect system memory @@ -31,7 +31,7 @@ func TestAutoMemoryLimitFormat(t *testing.T) { autoMemoryLimitOnce = sync.Once{} autoMemoryLimitValue = "" - result := autoMemoryLimit() + result := AutoMemoryLimit() // Should end with GB or MB validSuffix := false for _, suffix := range []string{"GB", "MB"} { @@ -41,7 +41,7 @@ func TestAutoMemoryLimitFormat(t *testing.T) { } } if !validSuffix { - t.Fatalf("autoMemoryLimit returned %q, expected suffix GB or MB", result) + t.Fatalf("AutoMemoryLimit returned %q, expected suffix GB or MB", result) } } @@ -50,10 +50,10 @@ func TestAutoMemoryLimitCached(t *testing.T) { autoMemoryLimitOnce = sync.Once{} autoMemoryLimitValue = "" - first := autoMemoryLimit() - second := autoMemoryLimit() + first := AutoMemoryLimit() + second := AutoMemoryLimit() if first != second { - t.Fatalf("autoMemoryLimit not stable: %q vs %q", first, second) + t.Fatalf("AutoMemoryLimit not stable: %q vs %q", first, second) } } diff --git a/server/sysinfo_aliases.go b/server/sysinfo_aliases.go new file mode 100644 index 00000000..094fd00e --- /dev/null +++ b/server/sysinfo_aliases.go @@ -0,0 +1,14 @@ +package server + +import "github.com/posthog/duckgres/server/sysinfo" + +// Re-exports kept here so existing references to server.SystemMemoryBytes, +// server.ValidateMemoryLimit, and server.ParseMemoryBytes continue to compile +// after the helpers moved into server/sysinfo. New code should import +// server/sysinfo and use sysinfo.X directly. + +var ( + SystemMemoryBytes = sysinfo.SystemMemoryBytes + ValidateMemoryLimit = sysinfo.ValidateMemoryLimit + ParseMemoryBytes = sysinfo.ParseMemoryBytes +) diff --git a/server/worker.go b/server/worker.go index 69c6a6d9..b63990a3 100644 --- a/server/worker.go +++ b/server/worker.go @@ -17,6 +17,8 @@ import ( "sync" "syscall" "time" + + "github.com/posthog/duckgres/server/auth" ) // Exit codes for child processes @@ -220,7 +222,7 @@ func runChildWorker(tcpConn *net.TCPConn, cfg *ChildConfig) int { expectedPassword, ok := cfg.Users[username] if !ok { slog.Warn("Unknown user", "user", username, "remote_addr", cfg.RemoteAddr) - authFailuresCounter.Inc() + auth.AuthFailuresCounter.Inc() _ = writeErrorResponse(writer, "FATAL", "28P01", "password authentication failed") _ = writer.Flush() return ExitAuthFailure @@ -254,7 +256,7 @@ func runChildWorker(tcpConn *net.TCPConn, cfg *ChildConfig) int { password := string(bytes.TrimRight(body, "\x00")) if password != expectedPassword { slog.Warn("Authentication failed", "user", username, "remote_addr", cfg.RemoteAddr) - authFailuresCounter.Inc() + auth.AuthFailuresCounter.Inc() _ = writeErrorResponse(writer, "FATAL", "28P01", "password authentication failed") _ = writer.Flush() return ExitAuthFailure