Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions client.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
package filetypes

import (
csvFile "github.com/cloudquery/filetypes/v3/csv"
jsonFile "github.com/cloudquery/filetypes/v3/json"
"github.com/cloudquery/filetypes/v3/parquet"
"github.com/cloudquery/filetypes/v3/types"
csvFile "github.com/cloudquery/filetypes/v4/csv"
jsonFile "github.com/cloudquery/filetypes/v4/json"
"github.com/cloudquery/filetypes/v4/parquet"
"github.com/cloudquery/filetypes/v4/types"
)

type Client struct {
Expand Down
4 changes: 2 additions & 2 deletions csv/read.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,10 @@ import (
"github.com/apache/arrow/go/v13/arrow/array"
"github.com/apache/arrow/go/v13/arrow/csv"
"github.com/apache/arrow/go/v13/arrow/memory"
"github.com/cloudquery/plugin-sdk/v3/schema"
"github.com/cloudquery/plugin-sdk/v4/schema"
)

func (cl *Client) Read(r io.Reader, table *schema.Table, _ string, res chan<- arrow.Record) error {
func (cl *Client) Read(r io.Reader, table *schema.Table, res chan<- arrow.Record) error {
arrowSchema := table.ToArrowSchema()
newSchema := convertSchema(arrowSchema)
reader := csv.NewReader(r, newSchema,
Expand Down
23 changes: 18 additions & 5 deletions csv/write.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,14 @@ import (
// "encoding/csv"
"fmt"
"io"
"strings"

"github.com/apache/arrow/go/v13/arrow"
"github.com/apache/arrow/go/v13/arrow/array"
"github.com/apache/arrow/go/v13/arrow/csv"
"github.com/apache/arrow/go/v13/arrow/memory"
"github.com/cloudquery/filetypes/v3/types"
"github.com/cloudquery/plugin-sdk/v3/schema"
"github.com/cloudquery/filetypes/v4/types"
"github.com/cloudquery/plugin-sdk/v4/schema"
)

type Handle struct {
Expand Down Expand Up @@ -39,9 +40,10 @@ func (h *Handle) WriteContent(records []arrow.Record) error {
if err := h.w.Write(castRec); err != nil {
return fmt.Errorf("failed to write record to csv: %w", err)
}
if err := h.w.Flush(); err != nil {
return fmt.Errorf("failed to flush csv writer: %w", err)
}
}

if err := h.w.Flush(); err != nil {
return fmt.Errorf("failed to flush csv writer: %w", err)
}
return nil
}
Expand All @@ -58,6 +60,7 @@ func convertSchema(sch *arrow.Schema) *arrow.Schema {
if !isTypeSupported(f.Type) {
fields[i].Type = arrow.BinaryTypes.String
}
fields[i].Metadata = stripCQExtensionMetadata(fields[i].Metadata)
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This call is required for the csv writer to actually write the records, on write it compares the schema given to the csv.NewWriter with the record schema, which (in our case) turns out to be slightly different and missing the cq:extension:* things. So we strip them here from both places (since they don't hold any actionable info for csv writing)

}

md := sch.Metadata()
Expand Down Expand Up @@ -108,3 +111,13 @@ func castToString(rec arrow.Record) arrow.Record {
}
return array.NewRecord(newSchema, cols, rec.NumRows())
}

func stripCQExtensionMetadata(md arrow.Metadata) arrow.Metadata {
m := md.ToMap()
for k := range m {
if strings.HasPrefix(k, "cq:extension:") {
delete(m, k)
}
}
return arrow.MetadataFrom(m)
}
10 changes: 5 additions & 5 deletions csv/write_read_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,9 @@ import (

"github.com/apache/arrow/go/v13/arrow"
"github.com/bradleyjkemp/cupaloy/v2"
"github.com/cloudquery/filetypes/v3/types"
"github.com/cloudquery/plugin-sdk/v3/plugins/destination"
"github.com/cloudquery/plugin-sdk/v3/schema"
"github.com/cloudquery/filetypes/v4/types"
"github.com/cloudquery/plugin-sdk/v4/plugin"
"github.com/cloudquery/plugin-sdk/v4/schema"
"github.com/google/uuid"
)

Expand Down Expand Up @@ -69,12 +69,12 @@ func TestWriteRead(t *testing.T) {
ch := make(chan arrow.Record)
var readErr error
go func() {
readErr = cl.Read(byteReader, table, "test-source", ch)
readErr = cl.Read(byteReader, table, ch)
close(ch)
}()
totalCount := 0
for got := range ch {
if diff := destination.RecordDiff(records[totalCount], got); diff != "" {
if diff := plugin.RecordDiff(records[totalCount], got); diff != "" {
t.Errorf("got diff: %s", diff)
}
totalCount++
Expand Down
7 changes: 2 additions & 5 deletions go.mod
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
module github.com/cloudquery/filetypes/v3
module github.com/cloudquery/filetypes/v4

go 1.19

require (
github.com/apache/arrow/go/v13 v13.0.0-20230622042343-ec413b7763fe
github.com/bradleyjkemp/cupaloy/v2 v2.8.0
github.com/cloudquery/plugin-sdk/v3 v3.10.6
github.com/cloudquery/plugin-sdk/v4 v4.2.0-rc1
github.com/goccy/go-json v0.10.2
github.com/google/uuid v1.3.0
github.com/stretchr/testify v1.8.4
Expand All @@ -18,9 +18,7 @@ require (
github.com/JohnCGriffin/overflow v0.0.0-20211019200055-46fa312c352c // indirect
github.com/andybalholm/brotli v1.0.5 // indirect
github.com/apache/thrift v0.18.1 // indirect
github.com/cloudquery/plugin-pb-go v1.4.0 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/ghodss/yaml v1.0.0 // indirect
github.com/golang/protobuf v1.5.3 // indirect
github.com/golang/snappy v0.0.4 // indirect
github.com/google/flatbuffers v23.1.21+incompatible // indirect
Expand All @@ -47,6 +45,5 @@ require (
google.golang.org/genproto v0.0.0-20230410155749-daa745c078e1 // indirect
google.golang.org/grpc v1.55.0 // indirect
google.golang.org/protobuf v1.30.0 // indirect
gopkg.in/yaml.v2 v2.4.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)
10 changes: 2 additions & 8 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -8,16 +8,12 @@ github.com/bradleyjkemp/cupaloy/v2 v2.8.0 h1:any4BmKE+jGIaMpnU8YgH/I2LPiLBufr6oM
github.com/bradleyjkemp/cupaloy/v2 v2.8.0/go.mod h1:bm7JXdkRd4BHJk9HpwqAI8BoAY1lps46Enkdqw6aRX0=
github.com/cloudquery/arrow/go/v13 v13.0.0-20230626001500-065602842c3a h1:O/FNq1+8YlWzHYNj2tokFQyja6GXsQBdkuvLMdpuaSw=
github.com/cloudquery/arrow/go/v13 v13.0.0-20230626001500-065602842c3a/go.mod h1:W69eByFNO0ZR30q1/7Sr9d83zcVZmF2MiP3fFYAWJOc=
github.com/cloudquery/plugin-pb-go v1.4.0 h1:sfy0oWSFac2JCJQJuKoR+8flZGKkEoUVORwZDNM3aiI=
github.com/cloudquery/plugin-pb-go v1.4.0/go.mod h1:NbWAtT2BzJQ9+XUWwh3IKBg3MOeV9ZEpHoHNAQ/YDV8=
github.com/cloudquery/plugin-sdk/v3 v3.10.6 h1:KqTsLZ6OA1h8BUMeMcU6BAD6TBW6ojgQaC4zDZMgvu0=
github.com/cloudquery/plugin-sdk/v3 v3.10.6/go.mod h1:QhBaVgiNyQ3P6uAzJWOYpYykHXL+WDZffwg1riTwv60=
github.com/cloudquery/plugin-sdk/v4 v4.2.0-rc1 h1:sRjZ/Lb/yjLw92HzvgPiyVynbocbtaa13fEgS9MN/DQ=
github.com/cloudquery/plugin-sdk/v4 v4.2.0-rc1/go.mod h1:gn2ANihFC5SUMPCcYnVD+Gt3Cgn8OeXJW2/0lRUoB68=
github.com/coreos/go-systemd/v22 v22.5.0/go.mod h1:Y58oyj3AT4RCenI/lSvhwexgC+NSVTIJ3seZv2GcEnc=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/ghodss/yaml v1.0.0 h1:wQHKEahhL6wmXdzwWG11gIVCkOv05bNOh+Rxn0yngAk=
github.com/ghodss/yaml v1.0.0/go.mod h1:4dBDuWmgqj2HViK6kFavaiC9ZROes6MMH2rRYeMEF04=
github.com/goccy/go-json v0.10.2 h1:CrxCmQqYDkv1z7lO7Wbh2HN93uovUHgrECaO5ZrCXAU=
github.com/goccy/go-json v0.10.2/go.mod h1:6MelG93GURQebXPDq3khkgXZkazVtN9CRI+MGFi0w8I=
github.com/godbus/dbus/v5 v5.0.4/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA=
Expand Down Expand Up @@ -103,8 +99,6 @@ google.golang.org/protobuf v1.30.0/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqw
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.4.0 h1:D8xgwECY7CYvx+Y2n4sBz93Jn9JRvxdiyyo8CTfuKaY=
gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
4 changes: 2 additions & 2 deletions json/read.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,12 @@ import (
"github.com/apache/arrow/go/v13/arrow"
"github.com/apache/arrow/go/v13/arrow/array"
"github.com/apache/arrow/go/v13/arrow/memory"
"github.com/cloudquery/plugin-sdk/v3/schema"
"github.com/cloudquery/plugin-sdk/v4/schema"
)

const maxJSONSize = 1024 * 1024 * 20

func (*Client) Read(r io.Reader, table *schema.Table, _ string, res chan<- arrow.Record) error {
func (*Client) Read(r io.Reader, table *schema.Table, res chan<- arrow.Record) error {
scanner := bufio.NewScanner(r)
scanner.Buffer(make([]byte, maxJSONSize), maxJSONSize)
rb := array.NewRecordBuilder(memory.DefaultAllocator, table.ToArrowSchema())
Expand Down
4 changes: 2 additions & 2 deletions json/write.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@ import (

"github.com/apache/arrow/go/v13/arrow"
"github.com/apache/arrow/go/v13/arrow/array"
"github.com/cloudquery/filetypes/v3/types"
"github.com/cloudquery/plugin-sdk/v3/schema"
"github.com/cloudquery/filetypes/v4/types"
"github.com/cloudquery/plugin-sdk/v4/schema"
"github.com/goccy/go-json"
)

Expand Down
31 changes: 5 additions & 26 deletions json/write_read_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,33 +9,12 @@ import (

"github.com/apache/arrow/go/v13/arrow"
"github.com/bradleyjkemp/cupaloy/v2"
"github.com/cloudquery/filetypes/v3/types"
"github.com/cloudquery/plugin-sdk/v3/plugins/destination"
"github.com/cloudquery/plugin-sdk/v3/schema"
"github.com/cloudquery/filetypes/v4/types"
"github.com/cloudquery/plugin-sdk/v4/plugin"
"github.com/cloudquery/plugin-sdk/v4/schema"
"github.com/google/uuid"
)

func TestWrite(t *testing.T) {
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test case was removed because TestWriteRead has the write functionality (and t.Log can be added for debugging if necessary)

var b bytes.Buffer
table := schema.TestTable("test", schema.TestSourceOptions{})
sourceName := "test-source"
syncTime := time.Now().UTC().Round(time.Second)
opts := schema.GenTestDataOptions{
SourceName: sourceName,
SyncTime: syncTime,
MaxRows: 1,
}
records := schema.GenTestData(table, opts)
cl, err := NewClient()
if err != nil {
t.Fatal(err)
}
if err := types.WriteAll(cl, &b, table, records); err != nil {
t.Fatal(err)
}
t.Log(b.String())
}

func TestWriteRead(t *testing.T) {
table := schema.TestTable("test", schema.TestSourceOptions{})
sourceName := "test-source"
Expand Down Expand Up @@ -77,12 +56,12 @@ func TestWriteRead(t *testing.T) {
ch := make(chan arrow.Record)
var readErr error
go func() {
readErr = cl.Read(byteReader, table, "test-source", ch)
readErr = cl.Read(byteReader, table, ch)
close(ch)
}()
totalCount := 0
for got := range ch {
if diff := destination.RecordDiff(records[totalCount], got); diff != "" {
if diff := plugin.RecordDiff(records[totalCount], got); diff != "" {
t.Fatalf("got diff: %s", diff)
}
totalCount++
Expand Down
4 changes: 2 additions & 2 deletions parquet/read.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,10 @@ import (
"github.com/apache/arrow/go/v13/parquet"
"github.com/apache/arrow/go/v13/parquet/file"
"github.com/apache/arrow/go/v13/parquet/pqarrow"
"github.com/cloudquery/plugin-sdk/v3/schema"
"github.com/cloudquery/plugin-sdk/v4/schema"
)

func (*Client) Read(f parquet.ReaderAtSeeker, table *schema.Table, _ string, res chan<- arrow.Record) error {
func (*Client) Read(f parquet.ReaderAtSeeker, table *schema.Table, res chan<- arrow.Record) error {
ctx := context.Background()
rdr, err := file.NewParquetReader(f)
if err != nil {
Expand Down
6 changes: 3 additions & 3 deletions parquet/write.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,9 @@ import (
"github.com/apache/arrow/go/v13/parquet"
"github.com/apache/arrow/go/v13/parquet/compress"
"github.com/apache/arrow/go/v13/parquet/pqarrow"
ftypes "github.com/cloudquery/filetypes/v3/types"
"github.com/cloudquery/plugin-sdk/v3/schema"
"github.com/cloudquery/plugin-sdk/v3/types"
ftypes "github.com/cloudquery/filetypes/v4/types"
"github.com/cloudquery/plugin-sdk/v4/schema"
"github.com/cloudquery/plugin-sdk/v4/types"
)

type Handle struct {
Expand Down
10 changes: 5 additions & 5 deletions parquet/write_read_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,9 @@ import (

"github.com/apache/arrow/go/v13/arrow"
"github.com/apache/arrow/go/v13/arrow/array"
"github.com/cloudquery/filetypes/v3/types"
"github.com/cloudquery/plugin-sdk/v3/plugins/destination"
"github.com/cloudquery/plugin-sdk/v3/schema"
"github.com/cloudquery/filetypes/v4/types"
"github.com/cloudquery/plugin-sdk/v4/plugin"
"github.com/cloudquery/plugin-sdk/v4/schema"
)

func TestWriteRead(t *testing.T) {
Expand Down Expand Up @@ -48,14 +48,14 @@ func TestWriteRead(t *testing.T) {
ch := make(chan arrow.Record)
var readErr error
go func() {
readErr = cl.Read(byteReader, table, "test-source", ch)
readErr = cl.Read(byteReader, table, ch)
close(ch)
}()
totalCount := 0
for got := range ch {
curr := records[totalCount]
if !array.RecordApproxEqual(curr, got) {
t.Fatalf("got diff (record %d): %s\n", totalCount, destination.RecordDiff(records[totalCount], got))
t.Fatalf("got diff (record %d): %s\n", totalCount, plugin.RecordDiff(records[totalCount], got))
}
totalCount++
}
Expand Down
10 changes: 5 additions & 5 deletions read.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import (
"io"

"github.com/apache/arrow/go/v13/arrow"
"github.com/cloudquery/plugin-sdk/v3/schema"
"github.com/cloudquery/plugin-sdk/v4/schema"
)

type ReaderAtSeeker interface {
Expand All @@ -13,18 +13,18 @@ type ReaderAtSeeker interface {
io.Seeker
}

func (cl *Client) Read(f ReaderAtSeeker, table *schema.Table, sourceName string, res chan<- arrow.Record) error {
func (cl *Client) Read(f ReaderAtSeeker, table *schema.Table, res chan<- arrow.Record) error {
switch cl.spec.Format {
case FormatTypeCSV:
if err := cl.csv.Read(f, table, sourceName, res); err != nil {
if err := cl.csv.Read(f, table, res); err != nil {
return err
}
case FormatTypeJSON:
if err := cl.json.Read(f, table, sourceName, res); err != nil {
if err := cl.json.Read(f, table, res); err != nil {
return err
}
case FormatTypeParquet:
if err := cl.parquet.Read(f, table, sourceName, res); err != nil {
if err := cl.parquet.Read(f, table, res); err != nil {
return err
}
default:
Expand Down
6 changes: 3 additions & 3 deletions spec.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,9 @@ import (
"encoding/json"
"fmt"

"github.com/cloudquery/filetypes/v3/csv"
jsonFile "github.com/cloudquery/filetypes/v3/json"
"github.com/cloudquery/filetypes/v3/parquet"
"github.com/cloudquery/filetypes/v4/csv"
jsonFile "github.com/cloudquery/filetypes/v4/json"
"github.com/cloudquery/filetypes/v4/parquet"
)

type FormatType string
Expand Down
4 changes: 2 additions & 2 deletions spec_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@ package filetypes
import (
"testing"

"github.com/cloudquery/filetypes/v3/csv"
"github.com/cloudquery/filetypes/v3/json"
"github.com/cloudquery/filetypes/v4/csv"
"github.com/cloudquery/filetypes/v4/json"
"github.com/stretchr/testify/assert"
)

Expand Down
Loading