Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 29 additions & 2 deletions duckdbservice/appender_init.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,17 +6,44 @@ import (
"github.com/apache/arrow-go/v18/arrow/decimal128"
duckdb "github.com/duckdb/duckdb-go/v2"
"github.com/posthog/duckgres/duckdbservice/arrowmap"
"github.com/posthog/duckgres/server"
)

// init registers handlers for the duckdb-go driver value types so that any
// caller (including arrowmap.AppendValue and the wrapper duckdbservice.AppendValue)
// gets full type coverage when this package is linked into the binary.
// caller gets full type coverage when this package is linked into the binary:
// - arrowmap.AppendValue gets a hook for the Arrow array builders that
// receive duckdb.Interval / Decimal / UUID / OrderedMap / Map values
// - server.normalizeDriverValue gets a hook so the PG binary-format
// encoders in server/types.go convert duckdb.Interval and duckdb.Decimal
// into their arrowmap equivalents (IntervalValue, DecimalValue) before
// dispatching
//
// Binaries that don't link duckdbservice (e.g., a future control-plane-only
// binary) won't see these registrations — which is correct, because they
// also won't be the ones scanning rows from a duckdb-go driver connection.
func init() {
arrowmap.RegisterAppender(handleDuckDBValue)
server.RegisterValueNormalizer(normalizeDuckDBValue)
}

// normalizeDuckDBValue converts the duckdb-go driver's interval and decimal
// types to their duckdb-free arrowmap equivalents. Other inputs pass
// through unchanged.
func normalizeDuckDBValue(v any) any {
switch x := v.(type) {
case duckdb.Interval:
return arrowmap.IntervalValue{
Months: x.Months,
Days: x.Days,
Micros: x.Micros,
}
case duckdb.Decimal:
return arrowmap.DecimalValue{
Value: x.Value,
Scale: int(x.Scale),
}
}
return v
}

// handleDuckDBValue implements arrowmap.Appender for duckdb-go's driver
Expand Down
10 changes: 10 additions & 0 deletions duckdbservice/arrowmap/arrowmap.go
Original file line number Diff line number Diff line change
Expand Up @@ -323,6 +323,16 @@ type IntervalValue struct {
Micros int64
}

// DecimalValue is the duckdb-free representation of a fixed-precision
// decimal (the unscaled integer plus the scale). Used by the binary-format
// encoder in server/types.go so it can switch on this type without
// importing the duckdb-go driver. duckdbservice's value-normalizer hook
// converts duckdb.Decimal to arrowmap.DecimalValue at scan time.
type DecimalValue struct {
Value *big.Int
Scale int
}

// Appender is a hook that handles append for value types arrowmap doesn't
// know about (typically driver-specific types like duckdb.Interval). It
// reports whether it handled the value; arrowmap.AppendValue falls back to
Expand Down
23 changes: 8 additions & 15 deletions server/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import (
"strings"
"time"

duckdb "github.com/duckdb/duckdb-go/v2"
"github.com/posthog/duckgres/duckdbservice/arrowmap"
)

Expand Down Expand Up @@ -611,21 +610,15 @@ func decodeTime(data []byte) (string, error) {
// encodeInterval encodes an INTERVAL value in PostgreSQL binary format:
// int64 microseconds + int32 days + int32 months = 16 bytes.
func encodeInterval(v interface{}) []byte {
v = normalizeDriverValue(v)
buf := make([]byte, 16)
switch val := v.(type) {
case duckdb.Interval:
binary.BigEndian.PutUint64(buf[0:8], uint64(val.Micros))
binary.BigEndian.PutUint32(buf[8:12], uint32(val.Days))
binary.BigEndian.PutUint32(buf[12:16], uint32(val.Months))
case arrowmap.IntervalValue:
// Arrow Flight returns arrowmap.IntervalValue instead of duckdb.Interval
binary.BigEndian.PutUint64(buf[0:8], uint64(val.Micros))
binary.BigEndian.PutUint32(buf[8:12], uint32(val.Days))
binary.BigEndian.PutUint32(buf[12:16], uint32(val.Months))
default:
_ = val
val, ok := v.(arrowmap.IntervalValue)
if !ok {
return nil
}
binary.BigEndian.PutUint64(buf[0:8], uint64(val.Micros))
binary.BigEndian.PutUint32(buf[8:12], uint32(val.Days))
binary.BigEndian.PutUint32(buf[12:16], uint32(val.Months))
return buf
}

Expand Down Expand Up @@ -690,8 +683,8 @@ func encodeNumeric(v interface{}) []byte {
var val *big.Int
var dscale int16

switch x := v.(type) {
case duckdb.Decimal:
switch x := normalizeDriverValue(v).(type) {
case arrowmap.DecimalValue:
val = new(big.Int).Set(x.Value)
dscale = int16(x.Scale)
case *big.Int:
Expand Down
19 changes: 19 additions & 0 deletions server/types_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,27 @@ import (
"time"

duckdb "github.com/duckdb/duckdb-go/v2"

"github.com/posthog/duckgres/duckdbservice/arrowmap"
)

// In the all-in-one binary, duckdbservice's init() registers a normalizer
// that converts duckdb.Interval / duckdb.Decimal to arrowmap.IntervalValue /
// arrowmap.DecimalValue before encode*Funcs dispatch. We can't import
// duckdbservice from a server test (import cycle), so register the same
// hook here so TestEncodeNumeric / TestEncodeInterval keep working.
func init() {
RegisterValueNormalizer(func(v any) any {
switch x := v.(type) {
case duckdb.Interval:
return arrowmap.IntervalValue{Months: x.Months, Days: x.Days, Micros: x.Micros}
case duckdb.Decimal:
return arrowmap.DecimalValue{Value: x.Value, Scale: int(x.Scale)}
}
return v
})
}

func TestMapDuckDBType(t *testing.T) {
tests := []struct {
name string
Expand Down
46 changes: 46 additions & 0 deletions server/value_normalize.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
package server

import "sync/atomic"

// ValueNormalizer is a hook that converts driver-specific value types
// (e.g., duckdb.Interval, duckdb.Decimal) into the duckdb-free equivalents
// in arrowmap (IntervalValue, DecimalValue) so the binary-format encoders
// in types.go can handle them without importing the duckdb-go driver.
//
// A normalizer must return the input unchanged when it doesn't recognize
// the type; the encode helpers fall back to AppendNull/return nil when
// the final value still isn't a recognized type.
type ValueNormalizer func(any) any

// driverNormalizers is loaded once into an atomic.Value as []ValueNormalizer.
// Reads on the hot path (every encoded row value) are lock-free; registration
// happens at init time so contention is rare.
var driverNormalizers atomic.Value // []ValueNormalizer

// RegisterValueNormalizer adds a hook consulted by normalizeDriverValue
// before the binary encoders dispatch on the value's type. Intended for use
// from init() in importers that own driver-specific value types — duckdbservice
// registers a normalizer that converts duckdb.Interval and duckdb.Decimal to
// their arrowmap equivalents.
func RegisterValueNormalizer(n ValueNormalizer) {
if n == nil {
return
}
cur, _ := driverNormalizers.Load().([]ValueNormalizer)
next := make([]ValueNormalizer, 0, len(cur)+1)
next = append(next, cur...)
next = append(next, n)
driverNormalizers.Store(next)
}

// normalizeDriverValue runs every registered ValueNormalizer over the value.
// Each normalizer returns the input unchanged when it doesn't claim it; the
// final value is whatever the chain produced (or the original input if no
// normalizers ran or none matched).
func normalizeDriverValue(v any) any {
hooks, _ := driverNormalizers.Load().([]ValueNormalizer)
for _, h := range hooks {
v = h(v)
}
return v
}
Loading