diff --git a/duckdbservice/appender_init.go b/duckdbservice/appender_init.go index cc50ed8..f5e8815 100644 --- a/duckdbservice/appender_init.go +++ b/duckdbservice/appender_init.go @@ -6,17 +6,44 @@ import ( "github.com/apache/arrow-go/v18/arrow/decimal128" duckdb "github.com/duckdb/duckdb-go/v2" "github.com/posthog/duckgres/duckdbservice/arrowmap" + "github.com/posthog/duckgres/server" ) // init registers handlers for the duckdb-go driver value types so that any -// caller (including arrowmap.AppendValue and the wrapper duckdbservice.AppendValue) -// gets full type coverage when this package is linked into the binary. +// caller gets full type coverage when this package is linked into the binary: +// - arrowmap.AppendValue gets a hook for the Arrow array builders that +// receive duckdb.Interval / Decimal / UUID / OrderedMap / Map values +// - server.normalizeDriverValue gets a hook so the PG binary-format +// encoders in server/types.go convert duckdb.Interval and duckdb.Decimal +// into their arrowmap equivalents (IntervalValue, DecimalValue) before +// dispatching // // Binaries that don't link duckdbservice (e.g., a future control-plane-only // binary) won't see these registrations — which is correct, because they // also won't be the ones scanning rows from a duckdb-go driver connection. func init() { arrowmap.RegisterAppender(handleDuckDBValue) + server.RegisterValueNormalizer(normalizeDuckDBValue) +} + +// normalizeDuckDBValue converts the duckdb-go driver's interval and decimal +// types to their duckdb-free arrowmap equivalents. Other inputs pass +// through unchanged. +func normalizeDuckDBValue(v any) any { + switch x := v.(type) { + case duckdb.Interval: + return arrowmap.IntervalValue{ + Months: x.Months, + Days: x.Days, + Micros: x.Micros, + } + case duckdb.Decimal: + return arrowmap.DecimalValue{ + Value: x.Value, + Scale: int(x.Scale), + } + } + return v } // handleDuckDBValue implements arrowmap.Appender for duckdb-go's driver diff --git a/duckdbservice/arrowmap/arrowmap.go b/duckdbservice/arrowmap/arrowmap.go index 1580e80..e8bd6c5 100644 --- a/duckdbservice/arrowmap/arrowmap.go +++ b/duckdbservice/arrowmap/arrowmap.go @@ -323,6 +323,16 @@ type IntervalValue struct { Micros int64 } +// DecimalValue is the duckdb-free representation of a fixed-precision +// decimal (the unscaled integer plus the scale). Used by the binary-format +// encoder in server/types.go so it can switch on this type without +// importing the duckdb-go driver. duckdbservice's value-normalizer hook +// converts duckdb.Decimal to arrowmap.DecimalValue at scan time. +type DecimalValue struct { + Value *big.Int + Scale int +} + // Appender is a hook that handles append for value types arrowmap doesn't // know about (typically driver-specific types like duckdb.Interval). It // reports whether it handled the value; arrowmap.AppendValue falls back to diff --git a/server/types.go b/server/types.go index f59dd23..b54cdfc 100644 --- a/server/types.go +++ b/server/types.go @@ -11,7 +11,6 @@ import ( "strings" "time" - duckdb "github.com/duckdb/duckdb-go/v2" "github.com/posthog/duckgres/duckdbservice/arrowmap" ) @@ -611,21 +610,15 @@ func decodeTime(data []byte) (string, error) { // encodeInterval encodes an INTERVAL value in PostgreSQL binary format: // int64 microseconds + int32 days + int32 months = 16 bytes. func encodeInterval(v interface{}) []byte { + v = normalizeDriverValue(v) buf := make([]byte, 16) - switch val := v.(type) { - case duckdb.Interval: - binary.BigEndian.PutUint64(buf[0:8], uint64(val.Micros)) - binary.BigEndian.PutUint32(buf[8:12], uint32(val.Days)) - binary.BigEndian.PutUint32(buf[12:16], uint32(val.Months)) - case arrowmap.IntervalValue: - // Arrow Flight returns arrowmap.IntervalValue instead of duckdb.Interval - binary.BigEndian.PutUint64(buf[0:8], uint64(val.Micros)) - binary.BigEndian.PutUint32(buf[8:12], uint32(val.Days)) - binary.BigEndian.PutUint32(buf[12:16], uint32(val.Months)) - default: - _ = val + val, ok := v.(arrowmap.IntervalValue) + if !ok { return nil } + binary.BigEndian.PutUint64(buf[0:8], uint64(val.Micros)) + binary.BigEndian.PutUint32(buf[8:12], uint32(val.Days)) + binary.BigEndian.PutUint32(buf[12:16], uint32(val.Months)) return buf } @@ -690,8 +683,8 @@ func encodeNumeric(v interface{}) []byte { var val *big.Int var dscale int16 - switch x := v.(type) { - case duckdb.Decimal: + switch x := normalizeDriverValue(v).(type) { + case arrowmap.DecimalValue: val = new(big.Int).Set(x.Value) dscale = int16(x.Scale) case *big.Int: diff --git a/server/types_test.go b/server/types_test.go index 61cd9f8..693821d 100644 --- a/server/types_test.go +++ b/server/types_test.go @@ -9,8 +9,27 @@ import ( "time" duckdb "github.com/duckdb/duckdb-go/v2" + + "github.com/posthog/duckgres/duckdbservice/arrowmap" ) +// In the all-in-one binary, duckdbservice's init() registers a normalizer +// that converts duckdb.Interval / duckdb.Decimal to arrowmap.IntervalValue / +// arrowmap.DecimalValue before encode*Funcs dispatch. We can't import +// duckdbservice from a server test (import cycle), so register the same +// hook here so TestEncodeNumeric / TestEncodeInterval keep working. +func init() { + RegisterValueNormalizer(func(v any) any { + switch x := v.(type) { + case duckdb.Interval: + return arrowmap.IntervalValue{Months: x.Months, Days: x.Days, Micros: x.Micros} + case duckdb.Decimal: + return arrowmap.DecimalValue{Value: x.Value, Scale: int(x.Scale)} + } + return v + }) +} + func TestMapDuckDBType(t *testing.T) { tests := []struct { name string diff --git a/server/value_normalize.go b/server/value_normalize.go new file mode 100644 index 0000000..4d44a7d --- /dev/null +++ b/server/value_normalize.go @@ -0,0 +1,46 @@ +package server + +import "sync/atomic" + +// ValueNormalizer is a hook that converts driver-specific value types +// (e.g., duckdb.Interval, duckdb.Decimal) into the duckdb-free equivalents +// in arrowmap (IntervalValue, DecimalValue) so the binary-format encoders +// in types.go can handle them without importing the duckdb-go driver. +// +// A normalizer must return the input unchanged when it doesn't recognize +// the type; the encode helpers fall back to AppendNull/return nil when +// the final value still isn't a recognized type. +type ValueNormalizer func(any) any + +// driverNormalizers is loaded once into an atomic.Value as []ValueNormalizer. +// Reads on the hot path (every encoded row value) are lock-free; registration +// happens at init time so contention is rare. +var driverNormalizers atomic.Value // []ValueNormalizer + +// RegisterValueNormalizer adds a hook consulted by normalizeDriverValue +// before the binary encoders dispatch on the value's type. Intended for use +// from init() in importers that own driver-specific value types — duckdbservice +// registers a normalizer that converts duckdb.Interval and duckdb.Decimal to +// their arrowmap equivalents. +func RegisterValueNormalizer(n ValueNormalizer) { + if n == nil { + return + } + cur, _ := driverNormalizers.Load().([]ValueNormalizer) + next := make([]ValueNormalizer, 0, len(cur)+1) + next = append(next, cur...) + next = append(next, n) + driverNormalizers.Store(next) +} + +// normalizeDriverValue runs every registered ValueNormalizer over the value. +// Each normalizer returns the input unchanged when it doesn't claim it; the +// final value is whatever the chain produced (or the original input if no +// normalizers ran or none matched). +func normalizeDriverValue(v any) any { + hooks, _ := driverNormalizers.Load().([]ValueNormalizer) + for _, h := range hooks { + v = h(v) + } + return v +}