-
Notifications
You must be signed in to change notification settings - Fork 3
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
fix: Don't cast all types to string for CSV #498
Changes from all commits
3f47c1f
376b140
a2add10
7c7b6c6
e178728
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Large diffs are not rendered by default.
Large diffs are not rendered by default.
Large diffs are not rendered by default.
Large diffs are not rendered by default.
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -36,7 +36,7 @@ func (cl *Client) WriteHeader(w io.Writer, t *schema.Table) (types.Handle, error | |
|
||
func (h *Handle) WriteContent(records []arrow.Record) error { | ||
for _, record := range records { | ||
castRec := castToString(record) | ||
castRec := transformRecord(record) | ||
if err := h.w.Write(castRec); err != nil { | ||
return fmt.Errorf("failed to write record to csv: %w", err) | ||
} | ||
|
@@ -57,59 +57,103 @@ func convertSchema(sch *arrow.Schema) *arrow.Schema { | |
fields := make([]arrow.Field, len(oldFields)) | ||
copy(fields, oldFields) | ||
for i, f := range fields { | ||
if !isTypeSupported(f.Type) { | ||
fields[i].Type = arrow.BinaryTypes.String | ||
} | ||
fields[i].Metadata = stripCQExtensionMetadata(fields[i].Metadata) | ||
fields[i].Type = convertType(f.Type) | ||
fields[i].Metadata = stripCQExtensionMetadata(f.Metadata) | ||
} | ||
|
||
md := sch.Metadata() | ||
newSchema := arrow.NewSchema(fields, &md) | ||
return newSchema | ||
} | ||
|
||
func isTypeSupported(t arrow.DataType) bool { | ||
// list from arrow/csv/common.go | ||
switch t.(type) { | ||
func convertType(dt arrow.DataType) arrow.DataType { | ||
if typeSupported(dt) { | ||
return dt | ||
} | ||
switch dt := dt.(type) { | ||
case *arrow.MapType: | ||
case *arrow.FixedSizeListType: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. lists are converted as list(converted elem) |
||
// not supported -> elem not supported | ||
field := dt.ElemField() | ||
field.Type = convertType(field.Type) | ||
return arrow.FixedSizeListOfField(dt.Len(), field) | ||
case arrow.ListLikeType: | ||
// not supported -> elem not supported | ||
field := dt.ElemField() | ||
field.Type = convertType(field.Type) | ||
return arrow.ListOfField(field) | ||
} | ||
return arrow.BinaryTypes.String | ||
} | ||
|
||
// typeSupported copied from arrow/csv/common.go | ||
func typeSupported(dt arrow.DataType) bool { | ||
switch dt := dt.(type) { | ||
case *arrow.BooleanType: | ||
case *arrow.Int8Type, *arrow.Int16Type, *arrow.Int32Type, *arrow.Int64Type: | ||
case *arrow.Uint8Type, *arrow.Uint16Type, *arrow.Uint32Type, *arrow.Uint64Type: | ||
case *arrow.Float32Type, *arrow.Float64Type: | ||
case *arrow.StringType: | ||
case *arrow.Float16Type, *arrow.Float32Type, *arrow.Float64Type: | ||
case *arrow.StringType, *arrow.LargeStringType: | ||
case *arrow.TimestampType: | ||
case *arrow.Date32Type, *arrow.Date64Type: | ||
case *arrow.Decimal128Type, *arrow.Decimal256Type: | ||
case *arrow.ListType: | ||
case *arrow.BinaryType: | ||
case *arrow.MapType: | ||
return false | ||
case arrow.ListLikeType: | ||
return typeSupported(dt.Elem()) | ||
case *arrow.BinaryType, *arrow.LargeBinaryType, *arrow.FixedSizeBinaryType: | ||
case arrow.ExtensionType: | ||
return true | ||
case *arrow.NullType: | ||
default: | ||
return false | ||
} | ||
|
||
return false | ||
return true | ||
} | ||
|
||
// castToString casts extension columns or unsupported columns to string. It does not release the original record. | ||
func castToString(rec arrow.Record) arrow.Record { | ||
newSchema := convertSchema(rec.Schema()) | ||
func transformRecord(rec arrow.Record) arrow.Record { | ||
sc := convertSchema(rec.Schema()) | ||
if sc.Equal(rec.Schema()) { | ||
return rec | ||
} | ||
|
||
cols := make([]arrow.Array, rec.NumCols()) | ||
for c := 0; c < int(rec.NumCols()); c++ { | ||
col := rec.Column(c) | ||
if isTypeSupported(col.DataType()) { | ||
cols[c] = col | ||
continue | ||
} | ||
for i, col := range rec.Columns() { | ||
cols[i] = transformArray(col, sc.Field(i).Type) | ||
} | ||
return array.NewRecord(sc, cols, rec.NumRows()) | ||
} | ||
|
||
sb := array.NewStringBuilder(memory.DefaultAllocator) | ||
for i := 0; i < col.Len(); i++ { | ||
if col.IsNull(i) { | ||
sb.AppendNull() | ||
continue | ||
} | ||
sb.Append(col.ValueStr(i)) | ||
func transformArray(arr arrow.Array, dt arrow.DataType) arrow.Array { | ||
if arrow.TypeEqual(arr.DataType(), dt) { | ||
return arr | ||
} | ||
|
||
if listDT, ok := dt.(arrow.ListLikeType); ok { | ||
listArr := arr.(array.ListLike) | ||
return array.MakeFromData(array.NewData( | ||
listDT, listArr.Len(), | ||
listArr.Data().Buffers(), | ||
[]arrow.ArrayData{transformArray(listArr.ListValues(), listDT.Elem()).Data()}, | ||
listArr.NullN(), | ||
// we use data offset for list like as the `ListValues` can be a larger array (happens when slicing) | ||
listArr.Data().Offset(), | ||
)) | ||
} | ||
|
||
return transformArrayToString(arr) | ||
} | ||
|
||
func transformArrayToString(arr arrow.Array) *array.String { | ||
builder := array.NewStringBuilder(memory.DefaultAllocator) | ||
builder.Reserve(arr.Len()) | ||
for i := 0; i < arr.Len(); i++ { | ||
if arr.IsNull(i) { | ||
builder.AppendNull() | ||
continue | ||
} | ||
cols[c] = sb.NewArray() | ||
builder.Append(arr.ValueStr(i)) | ||
} | ||
return array.NewRecord(newSchema, cols, rec.NumRows()) | ||
return builder.NewStringArray() | ||
} | ||
|
||
func stripCQExtensionMetadata(md arrow.Metadata) arrow.Metadata { | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maps aren't supported