Skip to content

Commit

Permalink
feat(types)!: Proper support for nested types (#11196)
Browse files Browse the repository at this point in the history
Closes #11187
Code pieces are copied from [cloudquery/filetypes](https://github.com/cloudquery/filetypes) (persqueue)

BEGIN_COMMIT_OVERRIDE
feat(types)!: Proper support for nested types (#11196)
BREAKING-CHANGE: Support writing Apache Arrow nested types:
* Structs as DuckDB structs
* Maps as DuckDB maps

END_COMMIT_OVERRIDE
  • Loading branch information
candiduslynx committed Jun 5, 2023
1 parent dd6b275 commit 7c6a3e2
Show file tree
Hide file tree
Showing 7 changed files with 170 additions and 112 deletions.
32 changes: 25 additions & 7 deletions plugins/destination/duckdb/client/read.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,20 +111,38 @@ func reverseTransformArray(dt arrow.DataType, arr arrow.Array) arrow.Array {
switch dt := dt.(type) {
case *types.UUIDType:
return array.NewExtensionArrayWithStorage(dt, arr.(*array.FixedSizeBinary))
case *types.InetType, *types.MACType, *types.JSONType, *arrow.StructType:
case *types.InetType, *types.MACType, *types.JSONType:
return reverseTransformFromString(dt, arr.(*array.String))
case *arrow.Uint16Type:
return reverseTransformUint16(arr.(*array.Uint32))
case *arrow.Uint8Type:
return reverseTransformUint8(arr.(*array.Uint32))
case *arrow.TimestampType:
return transformTimestamp(dt, arr.(*array.Timestamp))
case *arrow.MapType:
child := reverseTransformArray(dt.ValueType(), arr.(*array.List).ListValues()).Data()
return array.NewMapData(array.NewData(dt, arr.Len(), arr.Data().Buffers(), []arrow.ArrayData{child}, arr.NullN(), arr.Data().Offset()))
case listLike:
child := reverseTransformArray(dt.Elem(), arr.(array.ListLike).ListValues()).Data()
return array.NewListData(array.NewData(dt, arr.Len(), arr.Data().Buffers(), []arrow.ArrayData{child}, arr.NullN(), arr.Data().Offset()))
case *arrow.StructType:
arr := arr.(*array.Struct)
children := make([]arrow.ArrayData, arr.NumField())
for i := range children {
// struct fields can be odd when read from parquet, but the data is intact
child := array.MakeFromData(arr.Data().Children()[i])
children[i] = reverseTransformArray(dt.Field(i).Type, child).Data()
}

return array.NewStructData(array.NewData(
dt, arr.Len(),
arr.Data().Buffers(),
children,
arr.NullN(), arr.Data().Offset(),
))

case arrow.ListLikeType: // also handles maps
return array.MakeFromData(array.NewData(
dt, arr.Len(),
arr.Data().Buffers(),
[]arrow.ArrayData{reverseTransformArray(dt.Elem(), arr.(array.ListLike).ListValues()).Data()},
arr.NullN(), arr.Data().Offset(),
))

default:
return reverseTransformFromString(dt, arr.(*array.String))
}
Expand Down
31 changes: 26 additions & 5 deletions plugins/destination/duckdb/client/transform.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ func transformArray(arr arrow.Array) arrow.Array {
}

switch arr := arr.(type) {
case *types.UUIDArray, *types.InetArray, *types.MACArray, *types.JSONArray, *array.Struct:
case *types.UUIDArray, *types.InetArray, *types.MACArray, *types.JSONArray:
return transformToStringArray(arr)
case *array.Uint8:
return transformUint8ToUint32Array(arr)
Expand All @@ -30,10 +30,31 @@ func transformArray(arr arrow.Array) arrow.Array {
case *array.Timestamp:
// mismatching unit or tz
return transformTimestamp(duckDBToArrow(arrowToDuckDB(arr.DataType())).(*arrow.TimestampType), arr)
case array.ListLike:
child := transformArray(arr.ListValues()).Data()
newType := arrow.ListOf(child.DataType())
return array.NewListData(array.NewData(newType, arr.Len(), arr.Data().Buffers(), []arrow.ArrayData{child}, arr.NullN(), arr.Data().Offset()))

case *array.Struct:
dt := arr.DataType().(*arrow.StructType)
children := make([]arrow.ArrayData, arr.NumField())
names := make([]string, arr.NumField())
for i := range children {
children[i] = transformArray(arr.Field(i)).Data()
names[i] = dt.Field(i).Name
}

return array.NewStructData(array.NewData(
transformTypeForWriting(dt), arr.Len(),
arr.Data().Buffers(),
children,
arr.NullN(), arr.Data().Offset(),
))

case array.ListLike: // this includes maps, too
return array.MakeFromData(array.NewData(
transformTypeForWriting(arr.DataType()), arr.Len(),
arr.Data().Buffers(),
[]arrow.ArrayData{transformArray(arr.ListValues()).Data()},
arr.NullN(), arr.Data().Offset(),
))

default:
return transformToStringArray(arr)
}
Expand Down
124 changes: 101 additions & 23 deletions plugins/destination/duckdb/client/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,44 +5,53 @@ import (

"github.com/apache/arrow/go/v13/arrow"
"github.com/cloudquery/plugin-sdk/v3/types"
"golang.org/x/exp/slices"
)

type listLike interface {
arrow.DataType
Elem() arrow.DataType
func transformSchemaForWriting(sc *arrow.Schema) *arrow.Schema {
md := arrow.MetadataFrom(sc.Metadata().ToMap())
return arrow.NewSchema(transformFieldsForWriting(sc.Fields()), &md)
}

func transformSchemaForWriting(sc *arrow.Schema) *arrow.Schema {
fields := sc.Fields()
func transformFieldsForWriting(fields []arrow.Field) []arrow.Field {
for i := range fields {
fields[i].Type = transformTypeForWriting(fields[i].Type)
}
md := sc.Metadata()
return arrow.NewSchema(fields, &md)
return fields
}

func transformTypeForWriting(dt arrow.DataType) arrow.DataType {
switch dt := dt.(type) {
case listLike:
return arrow.ListOf(transformTypeForWriting(dt.Elem()))
case *arrow.StructType:
return arrow.StructOf(transformFieldsForWriting(dt.Fields())...)
case *arrow.MapType:
return arrow.ListOf(transformTypeForWriting(dt.ValueType()))
}

switch dt := duckDBToArrow(arrowToDuckDB(dt)).(type) {
return arrow.MapOf(transformTypeForWriting(dt.KeyType()), transformTypeForWriting(dt.ItemType()))
case arrow.ListLikeType:
return arrow.ListOf(transformTypeForWriting(dt.Elem()))
case *types.UUIDType, *types.JSONType:
return arrow.BinaryTypes.String
default:
return dt
return duckDBToArrow(arrowToDuckDB(dt))
}
}

func arrowToDuckDB(dt arrow.DataType) string {
switch dt := dt.(type) {
case listLike:
return arrowToDuckDB(dt.Elem()) + "[]"
case *arrow.StructType:
builder := new(strings.Builder)
builder.WriteString("struct(")
for i, field := range dt.Fields() {
if i > 0 {
builder.WriteString(", ")
}
builder.WriteString(sanitizeID(field.Name) + " " + arrowToDuckDB(field.Type))
}
builder.WriteString(")")
return builder.String()
case *arrow.MapType:
return arrowToDuckDB(arrow.ListOf(dt.ValueType()))
return "map(" + arrowToDuckDB(dt.KeyType()) + ", " + arrowToDuckDB(dt.ItemType()) + ")"
case arrow.ListLikeType:
return arrowToDuckDB(dt.Elem()) + "[]"
case *arrow.BooleanType:
return "boolean"
case *arrow.Int8Type:
Expand Down Expand Up @@ -79,20 +88,21 @@ func arrowToDuckDB(dt arrow.DataType) string {
return "date"
case *arrow.DayTimeIntervalType:
return "interval"
case *arrow.StructType:
return "json"
default:
return "varchar"
}
}

func duckDBToArrow(t string) arrow.DataType {
if strings.HasSuffix(t, "[]") {
switch {
case strings.HasSuffix(t, "[]"):
return arrow.ListOf(duckDBToArrow(strings.TrimSuffix(t, "[]")))
case strings.HasPrefix(t, "struct"):
return duckDBStructToArrow(t)
case strings.HasPrefix(t, "map"):
return duckDBMapToArrow(t)
}
if strings.HasPrefix(t, "struct") {
return types.ExtensionTypes.JSON
}

switch t {
case "tinyint", "int1":
return arrow.PrimitiveTypes.Int8
Expand Down Expand Up @@ -133,6 +143,74 @@ func duckDBToArrow(t string) arrow.DataType {
}
}

func duckDBStructToArrow(spec string) *arrow.StructType {
params := strings.TrimPrefix(spec, "struct")
params = strings.TrimSpace(params)
params = strings.TrimSuffix(strings.TrimPrefix(params, "("), ")")

fieldsSpec := splitParams(params)
if len(fieldsSpec) == 0 {
panic("unsupported struct spec: " + spec)
}

fields := make([]arrow.Field, len(fieldsSpec))
for i, fieldSpec := range fieldsSpec {
parts := strings.SplitN(fieldSpec, " ", 2)
if len(parts) != 2 {
panic("unsupported field spec: " + fieldSpec)
}

fields[i] = arrow.Field{
Name: strings.Trim(parts[0], `"`),
Type: duckDBToArrow(strings.TrimSpace(parts[1])),
Nullable: true, // all duckdb columns are nullable
}
}

return arrow.StructOf(fields...)
}

func duckDBMapToArrow(spec string) *arrow.MapType {
params := strings.TrimPrefix(spec, "map")
params = strings.TrimSpace(params)
params = strings.TrimSuffix(strings.TrimPrefix(params, "("), ")")

kv := splitParams(params)
if len(kv) != 2 {
panic("unsupported map spec: " + spec)
}

// these should only be types
return arrow.MapOf(duckDBToArrow(kv[0]), duckDBToArrow(kv[1]))
}

func splitParams(params string) []string {
params = strings.TrimSpace(params)

var brackets int
var parts []string
elem := make([]rune, 0, len(params))

for _, r := range params {
switch r {
case '(':
brackets++
case ')':
brackets--
case ',':
if brackets == 0 {
parts = append(parts, strings.TrimSpace(string(elem)))
elem = elem[:0] // cleanup
continue
}
}
elem = append(elem, r)
}
parts = append(parts, strings.TrimSpace(string(elem)))

return slices.Clip(parts)
}

func sanitizeID(id string) string {
return `"` + id + `"`
}
19 changes: 14 additions & 5 deletions plugins/destination/duckdb/client/write.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/cloudquery/plugin-pb-go/specs"
"github.com/cloudquery/plugin-sdk/v3/schema"
"github.com/google/uuid"
"golang.org/x/exp/slices"
)

func nonPkIndices(sc *schema.Table) []int {
Expand All @@ -29,12 +30,20 @@ func nonPkIndices(sc *schema.Table) []int {
// but this is unavoidable until support is added to duckdb itself.
// See https://github.com/duckdb/duckdb/blob/c5d9afb97bbf0be12216f3b89ae3131afbbc3156/src/storage/table/list_column_data.cpp#L243-L251
func containsList(sc *schema.Table) bool {
for _, f := range sc.Columns {
if arrow.IsListLike(f.Type.ID()) {
return true
}
return slices.ContainsFunc(sc.Columns, func(c schema.Column) bool { return dtContainsList(c.Type) })
}

func dtContainsList(dt arrow.DataType) bool {
switch dt := dt.(type) {
case *arrow.StructType:
return slices.ContainsFunc(dt.Fields(), func(f arrow.Field) bool { return dtContainsList(f.Type) })
case *arrow.MapType:
return dtContainsList(dt.KeyType()) || dtContainsList(dt.ItemType())
case arrow.ListLikeType:
return true
default:
return false
}
return false
}

func (c *Client) upsert(ctx context.Context, tmpTableName string, tableName string, table *schema.Table) error {
Expand Down
4 changes: 2 additions & 2 deletions plugins/destination/duckdb/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,11 @@ require (
github.com/google/uuid v1.3.0
github.com/marcboeker/go-duckdb v1.3.0
github.com/rs/zerolog v1.29.0
golang.org/x/exp v0.0.0-20230522175609-2e198f4a06a1
)

// TODO: remove once all updates are merged
replace github.com/apache/arrow/go/v13 => github.com/cloudquery/arrow/go/v13 v13.0.0-20230602001318-a7aad4c5365c
replace github.com/apache/arrow/go/v13 => github.com/cloudquery/arrow/go/v13 v13.0.0-20230602155531-6d34568d4501

require (
github.com/JohnCGriffin/overflow v0.0.0-20211019200055-46fa312c352c // indirect
Expand Down Expand Up @@ -46,7 +47,6 @@ require (
github.com/stretchr/testify v1.8.2 // indirect
github.com/thoas/go-funk v0.9.3 // indirect
github.com/zeebo/xxh3 v1.0.2 // indirect
golang.org/x/exp v0.0.0-20230522175609-2e198f4a06a1 // indirect
golang.org/x/mod v0.9.0 // indirect
golang.org/x/net v0.9.0 // indirect
golang.org/x/sync v0.1.0 // indirect
Expand Down
4 changes: 2 additions & 2 deletions plugins/destination/duckdb/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,8 @@ github.com/chzyer/logex v1.1.10/go.mod h1:+Ywpsq7O8HXn0nuIou7OrIPyXbp3wmkHB+jjWR
github.com/chzyer/readline v0.0.0-20180603132655-2972be24d48e/go.mod h1:nSuG5e5PlCu98SY8svDHJxuZscDgtXS6KTTbou5AhLI=
github.com/chzyer/test v0.0.0-20180213035817-a1ea475d72b1/go.mod h1:Q3SI9o4m/ZMnBNeIyt5eFwwo7qiLfzFZmjNmxjkiQlU=
github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw=
github.com/cloudquery/arrow/go/v13 v13.0.0-20230602001318-a7aad4c5365c h1:z/inZxI0DTuu4DELOZQN5CjlSx0jwdVcn4XILXWDB+o=
github.com/cloudquery/arrow/go/v13 v13.0.0-20230602001318-a7aad4c5365c/go.mod h1:/XatdE3kDIBqZKhZ7OBUHwP2jaASDFZHqF4puOWM8po=
github.com/cloudquery/arrow/go/v13 v13.0.0-20230602155531-6d34568d4501 h1:vZJyF3/vy6nhP7guV0I5aB1eK3CGXZb7vg7UQPPnqqo=
github.com/cloudquery/arrow/go/v13 v13.0.0-20230602155531-6d34568d4501/go.mod h1:/XatdE3kDIBqZKhZ7OBUHwP2jaASDFZHqF4puOWM8po=
github.com/cloudquery/plugin-pb-go v1.0.8 h1:wn3GXhcNItcP+6wUUZuzUFbvdL59liKBO37/izMi+FQ=
github.com/cloudquery/plugin-pb-go v1.0.8/go.mod h1:vAGA27psem7ZZNAY4a3S9TKuA/JDQWstjKcHPJX91Mc=
github.com/cloudquery/plugin-sdk/v2 v2.7.0 h1:hRXsdEiaOxJtsn/wZMFQC9/jPfU1MeMK3KF+gPGqm7U=
Expand Down
68 changes: 0 additions & 68 deletions plugins/destination/duckdb/json/writer.go

This file was deleted.

0 comments on commit 7c6a3e2

Please sign in to comment.