diff --git a/client.go b/client.go index ad828750..2b075b20 100644 --- a/client.go +++ b/client.go @@ -1,10 +1,10 @@ package filetypes import ( - csvFile "github.com/cloudquery/filetypes/v3/csv" - jsonFile "github.com/cloudquery/filetypes/v3/json" - "github.com/cloudquery/filetypes/v3/parquet" - "github.com/cloudquery/filetypes/v3/types" + csvFile "github.com/cloudquery/filetypes/v4/csv" + jsonFile "github.com/cloudquery/filetypes/v4/json" + "github.com/cloudquery/filetypes/v4/parquet" + "github.com/cloudquery/filetypes/v4/types" ) type Client struct { diff --git a/csv/read.go b/csv/read.go index 0aaec2d3..ecc44d9f 100644 --- a/csv/read.go +++ b/csv/read.go @@ -8,10 +8,10 @@ import ( "github.com/apache/arrow/go/v13/arrow/array" "github.com/apache/arrow/go/v13/arrow/csv" "github.com/apache/arrow/go/v13/arrow/memory" - "github.com/cloudquery/plugin-sdk/v3/schema" + "github.com/cloudquery/plugin-sdk/v4/schema" ) -func (cl *Client) Read(r io.Reader, table *schema.Table, _ string, res chan<- arrow.Record) error { +func (cl *Client) Read(r io.Reader, table *schema.Table, res chan<- arrow.Record) error { arrowSchema := table.ToArrowSchema() newSchema := convertSchema(arrowSchema) reader := csv.NewReader(r, newSchema, diff --git a/csv/write.go b/csv/write.go index 1d0f14c1..b4cf7911 100644 --- a/csv/write.go +++ b/csv/write.go @@ -4,13 +4,14 @@ import ( // "encoding/csv" "fmt" "io" + "strings" "github.com/apache/arrow/go/v13/arrow" "github.com/apache/arrow/go/v13/arrow/array" "github.com/apache/arrow/go/v13/arrow/csv" "github.com/apache/arrow/go/v13/arrow/memory" - "github.com/cloudquery/filetypes/v3/types" - "github.com/cloudquery/plugin-sdk/v3/schema" + "github.com/cloudquery/filetypes/v4/types" + "github.com/cloudquery/plugin-sdk/v4/schema" ) type Handle struct { @@ -39,9 +40,10 @@ func (h *Handle) WriteContent(records []arrow.Record) error { if err := h.w.Write(castRec); err != nil { return fmt.Errorf("failed to write record to csv: %w", err) } - if err := h.w.Flush(); err != nil { - return fmt.Errorf("failed to flush csv writer: %w", err) - } + } + + if err := h.w.Flush(); err != nil { + return fmt.Errorf("failed to flush csv writer: %w", err) } return nil } @@ -58,6 +60,7 @@ func convertSchema(sch *arrow.Schema) *arrow.Schema { if !isTypeSupported(f.Type) { fields[i].Type = arrow.BinaryTypes.String } + fields[i].Metadata = stripCQExtensionMetadata(fields[i].Metadata) } md := sch.Metadata() @@ -108,3 +111,13 @@ func castToString(rec arrow.Record) arrow.Record { } return array.NewRecord(newSchema, cols, rec.NumRows()) } + +func stripCQExtensionMetadata(md arrow.Metadata) arrow.Metadata { + m := md.ToMap() + for k := range m { + if strings.HasPrefix(k, "cq:extension:") { + delete(m, k) + } + } + return arrow.MetadataFrom(m) +} diff --git a/csv/write_read_test.go b/csv/write_read_test.go index d5abfc76..6ba53bb0 100644 --- a/csv/write_read_test.go +++ b/csv/write_read_test.go @@ -9,9 +9,9 @@ import ( "github.com/apache/arrow/go/v13/arrow" "github.com/bradleyjkemp/cupaloy/v2" - "github.com/cloudquery/filetypes/v3/types" - "github.com/cloudquery/plugin-sdk/v3/plugins/destination" - "github.com/cloudquery/plugin-sdk/v3/schema" + "github.com/cloudquery/filetypes/v4/types" + "github.com/cloudquery/plugin-sdk/v4/plugin" + "github.com/cloudquery/plugin-sdk/v4/schema" "github.com/google/uuid" ) @@ -69,12 +69,12 @@ func TestWriteRead(t *testing.T) { ch := make(chan arrow.Record) var readErr error go func() { - readErr = cl.Read(byteReader, table, "test-source", ch) + readErr = cl.Read(byteReader, table, ch) close(ch) }() totalCount := 0 for got := range ch { - if diff := destination.RecordDiff(records[totalCount], got); diff != "" { + if diff := plugin.RecordDiff(records[totalCount], got); diff != "" { t.Errorf("got diff: %s", diff) } totalCount++ diff --git a/go.mod b/go.mod index dd031527..e8409d78 100644 --- a/go.mod +++ b/go.mod @@ -1,11 +1,11 @@ -module github.com/cloudquery/filetypes/v3 +module github.com/cloudquery/filetypes/v4 go 1.19 require ( github.com/apache/arrow/go/v13 v13.0.0-20230622042343-ec413b7763fe github.com/bradleyjkemp/cupaloy/v2 v2.8.0 - github.com/cloudquery/plugin-sdk/v3 v3.10.6 + github.com/cloudquery/plugin-sdk/v4 v4.2.0-rc1 github.com/goccy/go-json v0.10.2 github.com/google/uuid v1.3.0 github.com/stretchr/testify v1.8.4 @@ -18,9 +18,7 @@ require ( github.com/JohnCGriffin/overflow v0.0.0-20211019200055-46fa312c352c // indirect github.com/andybalholm/brotli v1.0.5 // indirect github.com/apache/thrift v0.18.1 // indirect - github.com/cloudquery/plugin-pb-go v1.4.0 // indirect github.com/davecgh/go-spew v1.1.1 // indirect - github.com/ghodss/yaml v1.0.0 // indirect github.com/golang/protobuf v1.5.3 // indirect github.com/golang/snappy v0.0.4 // indirect github.com/google/flatbuffers v23.1.21+incompatible // indirect @@ -47,6 +45,5 @@ require ( google.golang.org/genproto v0.0.0-20230410155749-daa745c078e1 // indirect google.golang.org/grpc v1.55.0 // indirect google.golang.org/protobuf v1.30.0 // indirect - gopkg.in/yaml.v2 v2.4.0 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect ) diff --git a/go.sum b/go.sum index 10b20d9c..8a538eb8 100644 --- a/go.sum +++ b/go.sum @@ -8,16 +8,12 @@ github.com/bradleyjkemp/cupaloy/v2 v2.8.0 h1:any4BmKE+jGIaMpnU8YgH/I2LPiLBufr6oM github.com/bradleyjkemp/cupaloy/v2 v2.8.0/go.mod h1:bm7JXdkRd4BHJk9HpwqAI8BoAY1lps46Enkdqw6aRX0= github.com/cloudquery/arrow/go/v13 v13.0.0-20230626001500-065602842c3a h1:O/FNq1+8YlWzHYNj2tokFQyja6GXsQBdkuvLMdpuaSw= github.com/cloudquery/arrow/go/v13 v13.0.0-20230626001500-065602842c3a/go.mod h1:W69eByFNO0ZR30q1/7Sr9d83zcVZmF2MiP3fFYAWJOc= -github.com/cloudquery/plugin-pb-go v1.4.0 h1:sfy0oWSFac2JCJQJuKoR+8flZGKkEoUVORwZDNM3aiI= -github.com/cloudquery/plugin-pb-go v1.4.0/go.mod h1:NbWAtT2BzJQ9+XUWwh3IKBg3MOeV9ZEpHoHNAQ/YDV8= -github.com/cloudquery/plugin-sdk/v3 v3.10.6 h1:KqTsLZ6OA1h8BUMeMcU6BAD6TBW6ojgQaC4zDZMgvu0= -github.com/cloudquery/plugin-sdk/v3 v3.10.6/go.mod h1:QhBaVgiNyQ3P6uAzJWOYpYykHXL+WDZffwg1riTwv60= +github.com/cloudquery/plugin-sdk/v4 v4.2.0-rc1 h1:sRjZ/Lb/yjLw92HzvgPiyVynbocbtaa13fEgS9MN/DQ= +github.com/cloudquery/plugin-sdk/v4 v4.2.0-rc1/go.mod h1:gn2ANihFC5SUMPCcYnVD+Gt3Cgn8OeXJW2/0lRUoB68= github.com/coreos/go-systemd/v22 v22.5.0/go.mod h1:Y58oyj3AT4RCenI/lSvhwexgC+NSVTIJ3seZv2GcEnc= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= -github.com/ghodss/yaml v1.0.0 h1:wQHKEahhL6wmXdzwWG11gIVCkOv05bNOh+Rxn0yngAk= -github.com/ghodss/yaml v1.0.0/go.mod h1:4dBDuWmgqj2HViK6kFavaiC9ZROes6MMH2rRYeMEF04= github.com/goccy/go-json v0.10.2 h1:CrxCmQqYDkv1z7lO7Wbh2HN93uovUHgrECaO5ZrCXAU= github.com/goccy/go-json v0.10.2/go.mod h1:6MelG93GURQebXPDq3khkgXZkazVtN9CRI+MGFi0w8I= github.com/godbus/dbus/v5 v5.0.4/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA= @@ -103,8 +99,6 @@ google.golang.org/protobuf v1.30.0/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqw gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= -gopkg.in/yaml.v2 v2.4.0 h1:D8xgwECY7CYvx+Y2n4sBz93Jn9JRvxdiyyo8CTfuKaY= -gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ= gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/json/read.go b/json/read.go index d3b5b0a8..90d5e692 100644 --- a/json/read.go +++ b/json/read.go @@ -7,12 +7,12 @@ import ( "github.com/apache/arrow/go/v13/arrow" "github.com/apache/arrow/go/v13/arrow/array" "github.com/apache/arrow/go/v13/arrow/memory" - "github.com/cloudquery/plugin-sdk/v3/schema" + "github.com/cloudquery/plugin-sdk/v4/schema" ) const maxJSONSize = 1024 * 1024 * 20 -func (*Client) Read(r io.Reader, table *schema.Table, _ string, res chan<- arrow.Record) error { +func (*Client) Read(r io.Reader, table *schema.Table, res chan<- arrow.Record) error { scanner := bufio.NewScanner(r) scanner.Buffer(make([]byte, maxJSONSize), maxJSONSize) rb := array.NewRecordBuilder(memory.DefaultAllocator, table.ToArrowSchema()) diff --git a/json/write.go b/json/write.go index d5dfc2d8..ff6c5b2e 100644 --- a/json/write.go +++ b/json/write.go @@ -5,8 +5,8 @@ import ( "github.com/apache/arrow/go/v13/arrow" "github.com/apache/arrow/go/v13/arrow/array" - "github.com/cloudquery/filetypes/v3/types" - "github.com/cloudquery/plugin-sdk/v3/schema" + "github.com/cloudquery/filetypes/v4/types" + "github.com/cloudquery/plugin-sdk/v4/schema" "github.com/goccy/go-json" ) diff --git a/json/write_read_test.go b/json/write_read_test.go index 6d78c078..e1064fde 100644 --- a/json/write_read_test.go +++ b/json/write_read_test.go @@ -9,33 +9,12 @@ import ( "github.com/apache/arrow/go/v13/arrow" "github.com/bradleyjkemp/cupaloy/v2" - "github.com/cloudquery/filetypes/v3/types" - "github.com/cloudquery/plugin-sdk/v3/plugins/destination" - "github.com/cloudquery/plugin-sdk/v3/schema" + "github.com/cloudquery/filetypes/v4/types" + "github.com/cloudquery/plugin-sdk/v4/plugin" + "github.com/cloudquery/plugin-sdk/v4/schema" "github.com/google/uuid" ) -func TestWrite(t *testing.T) { - var b bytes.Buffer - table := schema.TestTable("test", schema.TestSourceOptions{}) - sourceName := "test-source" - syncTime := time.Now().UTC().Round(time.Second) - opts := schema.GenTestDataOptions{ - SourceName: sourceName, - SyncTime: syncTime, - MaxRows: 1, - } - records := schema.GenTestData(table, opts) - cl, err := NewClient() - if err != nil { - t.Fatal(err) - } - if err := types.WriteAll(cl, &b, table, records); err != nil { - t.Fatal(err) - } - t.Log(b.String()) -} - func TestWriteRead(t *testing.T) { table := schema.TestTable("test", schema.TestSourceOptions{}) sourceName := "test-source" @@ -77,12 +56,12 @@ func TestWriteRead(t *testing.T) { ch := make(chan arrow.Record) var readErr error go func() { - readErr = cl.Read(byteReader, table, "test-source", ch) + readErr = cl.Read(byteReader, table, ch) close(ch) }() totalCount := 0 for got := range ch { - if diff := destination.RecordDiff(records[totalCount], got); diff != "" { + if diff := plugin.RecordDiff(records[totalCount], got); diff != "" { t.Fatalf("got diff: %s", diff) } totalCount++ diff --git a/parquet/read.go b/parquet/read.go index 793d6020..8128ba1f 100644 --- a/parquet/read.go +++ b/parquet/read.go @@ -11,10 +11,10 @@ import ( "github.com/apache/arrow/go/v13/parquet" "github.com/apache/arrow/go/v13/parquet/file" "github.com/apache/arrow/go/v13/parquet/pqarrow" - "github.com/cloudquery/plugin-sdk/v3/schema" + "github.com/cloudquery/plugin-sdk/v4/schema" ) -func (*Client) Read(f parquet.ReaderAtSeeker, table *schema.Table, _ string, res chan<- arrow.Record) error { +func (*Client) Read(f parquet.ReaderAtSeeker, table *schema.Table, res chan<- arrow.Record) error { ctx := context.Background() rdr, err := file.NewParquetReader(f) if err != nil { diff --git a/parquet/write.go b/parquet/write.go index e71b7c5b..20f17b4b 100644 --- a/parquet/write.go +++ b/parquet/write.go @@ -9,9 +9,9 @@ import ( "github.com/apache/arrow/go/v13/parquet" "github.com/apache/arrow/go/v13/parquet/compress" "github.com/apache/arrow/go/v13/parquet/pqarrow" - ftypes "github.com/cloudquery/filetypes/v3/types" - "github.com/cloudquery/plugin-sdk/v3/schema" - "github.com/cloudquery/plugin-sdk/v3/types" + ftypes "github.com/cloudquery/filetypes/v4/types" + "github.com/cloudquery/plugin-sdk/v4/schema" + "github.com/cloudquery/plugin-sdk/v4/types" ) type Handle struct { diff --git a/parquet/write_read_test.go b/parquet/write_read_test.go index 0a5aa535..5f0dc5f1 100644 --- a/parquet/write_read_test.go +++ b/parquet/write_read_test.go @@ -9,9 +9,9 @@ import ( "github.com/apache/arrow/go/v13/arrow" "github.com/apache/arrow/go/v13/arrow/array" - "github.com/cloudquery/filetypes/v3/types" - "github.com/cloudquery/plugin-sdk/v3/plugins/destination" - "github.com/cloudquery/plugin-sdk/v3/schema" + "github.com/cloudquery/filetypes/v4/types" + "github.com/cloudquery/plugin-sdk/v4/plugin" + "github.com/cloudquery/plugin-sdk/v4/schema" ) func TestWriteRead(t *testing.T) { @@ -48,14 +48,14 @@ func TestWriteRead(t *testing.T) { ch := make(chan arrow.Record) var readErr error go func() { - readErr = cl.Read(byteReader, table, "test-source", ch) + readErr = cl.Read(byteReader, table, ch) close(ch) }() totalCount := 0 for got := range ch { curr := records[totalCount] if !array.RecordApproxEqual(curr, got) { - t.Fatalf("got diff (record %d): %s\n", totalCount, destination.RecordDiff(records[totalCount], got)) + t.Fatalf("got diff (record %d): %s\n", totalCount, plugin.RecordDiff(records[totalCount], got)) } totalCount++ } diff --git a/read.go b/read.go index 517f60ab..7992319b 100644 --- a/read.go +++ b/read.go @@ -4,7 +4,7 @@ import ( "io" "github.com/apache/arrow/go/v13/arrow" - "github.com/cloudquery/plugin-sdk/v3/schema" + "github.com/cloudquery/plugin-sdk/v4/schema" ) type ReaderAtSeeker interface { @@ -13,18 +13,18 @@ type ReaderAtSeeker interface { io.Seeker } -func (cl *Client) Read(f ReaderAtSeeker, table *schema.Table, sourceName string, res chan<- arrow.Record) error { +func (cl *Client) Read(f ReaderAtSeeker, table *schema.Table, res chan<- arrow.Record) error { switch cl.spec.Format { case FormatTypeCSV: - if err := cl.csv.Read(f, table, sourceName, res); err != nil { + if err := cl.csv.Read(f, table, res); err != nil { return err } case FormatTypeJSON: - if err := cl.json.Read(f, table, sourceName, res); err != nil { + if err := cl.json.Read(f, table, res); err != nil { return err } case FormatTypeParquet: - if err := cl.parquet.Read(f, table, sourceName, res); err != nil { + if err := cl.parquet.Read(f, table, res); err != nil { return err } default: diff --git a/spec.go b/spec.go index c9fe75ec..6f922dd1 100644 --- a/spec.go +++ b/spec.go @@ -5,9 +5,9 @@ import ( "encoding/json" "fmt" - "github.com/cloudquery/filetypes/v3/csv" - jsonFile "github.com/cloudquery/filetypes/v3/json" - "github.com/cloudquery/filetypes/v3/parquet" + "github.com/cloudquery/filetypes/v4/csv" + jsonFile "github.com/cloudquery/filetypes/v4/json" + "github.com/cloudquery/filetypes/v4/parquet" ) type FormatType string diff --git a/spec_test.go b/spec_test.go index 8baefc47..54e593ca 100644 --- a/spec_test.go +++ b/spec_test.go @@ -3,8 +3,8 @@ package filetypes import ( "testing" - "github.com/cloudquery/filetypes/v3/csv" - "github.com/cloudquery/filetypes/v3/json" + "github.com/cloudquery/filetypes/v4/csv" + "github.com/cloudquery/filetypes/v4/json" "github.com/stretchr/testify/assert" ) diff --git a/stream.go b/stream.go new file mode 100644 index 00000000..e6a3d230 --- /dev/null +++ b/stream.go @@ -0,0 +1,92 @@ +package filetypes + +import ( + "fmt" + "io" + + "github.com/apache/arrow/go/v13/arrow" + "github.com/cloudquery/filetypes/v4/types" + "github.com/cloudquery/plugin-sdk/v4/schema" +) + +// Stream helps with streaming uploads by handling header/footer and uploader logic. Use StartStream to start a stream and then Write to it. +type Stream struct { + h types.Handle + wc *writeCloser + done chan error +} + +type writeCloser struct { + *io.PipeWriter + closed bool +} + +func (w *writeCloser) Close() error { + w.closed = true + return w.PipeWriter.Close() +} + +// StartStream starts a streaming upload using the provided uploadFunc. +func (c *Client) StartStream(table *schema.Table, uploadFunc func(io.Reader) error) (*Stream, error) { + pr, pw := io.Pipe() + doneCh := make(chan error) + + go func() { + err := uploadFunc(pr) + _ = pr.CloseWithError(err) + doneCh <- err + close(doneCh) + }() + + wc := &writeCloser{PipeWriter: pw} + h, err := c.WriteHeader(wc, table) + if err != nil { + _ = pw.CloseWithError(err) + <-doneCh + return nil, err + } + + return &Stream{ + h: h, + wc: wc, + done: doneCh, + }, nil +} + +// Write to the stream opened with StartStream. +func (s *Stream) Write(records []arrow.Record) error { + if len(records) == 0 { + return nil + } + + return s.h.WriteContent(records) +} + +// Finish writing to the stream. +func (s *Stream) Finish() error { + return s.FinishWithError(nil) +} + +// FinishWithError aborts writing to the stream by closing the writer with the provided error and waiting for the uploader to finish. +func (s *Stream) FinishWithError(finishError error) error { + if finishError != nil { + _ = s.wc.CloseWithError(finishError) + return <-s.done + } + + if err := s.h.WriteFooter(); err != nil { + if !s.wc.closed { + _ = s.wc.CloseWithError(err) + } + return fmt.Errorf("failed to write footer: %w", <-s.done) + } + + // ParquetWriter likes to close the underlying writer, so we need to check if it's already closed + if !s.wc.closed { + if err := s.wc.Close(); err != nil { + return err + } + } + + return <-s.done +} diff --git a/stream_test.go b/stream_test.go new file mode 100644 index 00000000..896e7bb2 --- /dev/null +++ b/stream_test.go @@ -0,0 +1,129 @@ +package filetypes_test + +import ( + "bufio" + fmt "fmt" + "io" + "testing" + + "github.com/apache/arrow/go/v13/arrow" + "github.com/apache/arrow/go/v13/arrow/array" + "github.com/apache/arrow/go/v13/arrow/memory" + "github.com/cloudquery/filetypes/v4" + "github.com/cloudquery/plugin-sdk/v4/schema" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +type uploadHelper struct { + t *testing.T + failAfter int + expect []byte + expectAt int +} + +var errTest = fmt.Errorf("test error") + +func (u *uploadHelper) Upload(r io.Reader) error { + s := bufio.NewScanner(r) + i := 0 + for s.Scan() { + if u.failAfter > 0 && i == u.failAfter { + return errTest + } + if u.expect != nil && i == u.expectAt { + if !assert.Equal(u.t, u.expect, s.Bytes()) { + return fmt.Errorf("assertion failed") + } + } + i++ + } + return s.Err() +} + +func TestHappyPath(t *testing.T) { + cl, err := filetypes.NewClient(&filetypes.FileSpec{ + Format: filetypes.FormatTypeJSON, + }) + require.NoError(t, err) + + table := &schema.Table{ + Name: "test", + Columns: []schema.Column{ + {Name: "name", Type: arrow.BinaryTypes.String}, + }, + } + + u := &uploadHelper{ + t: t, + expect: []byte(`{"name":"bar"}`), + expectAt: 1, + } + s, err := cl.StartStream(table, u.Upload) + require.NoError(t, err) + + bldr := array.NewRecordBuilder(memory.DefaultAllocator, table.ToArrowSchema()) + bldr.Field(0).(*array.StringBuilder).Append("foo") + bldr.Field(0).(*array.StringBuilder).Append("bar") + record := bldr.NewRecord() + + require.NoError(t, s.Write([]arrow.Record{record})) + require.NoError(t, s.Finish()) +} + +func TestWriteError(t *testing.T) { + cl, err := filetypes.NewClient(&filetypes.FileSpec{ + Format: filetypes.FormatTypeJSON, + }) + require.NoError(t, err) + + table := &schema.Table{ + Name: "test", + Columns: []schema.Column{ + {Name: "name", Type: arrow.BinaryTypes.String}, + }, + } + + u := &uploadHelper{ + t: t, + failAfter: 1, + } + s, err := cl.StartStream(table, u.Upload) + require.NoError(t, err) + + bldr := array.NewRecordBuilder(memory.DefaultAllocator, table.ToArrowSchema()) + bldr.Field(0).(*array.StringBuilder).Append("foo") + bldr.Field(0).(*array.StringBuilder).Append("bar") + record := bldr.NewRecord() + + require.NoError(t, s.Write([]arrow.Record{record})) + require.ErrorIs(t, s.Finish(), errTest) +} + +func TestCloseError(t *testing.T) { + cl, err := filetypes.NewClient(&filetypes.FileSpec{ + Format: filetypes.FormatTypeJSON, + }) + require.NoError(t, err) + + table := &schema.Table{ + Name: "test", + Columns: []schema.Column{ + {Name: "name", Type: arrow.BinaryTypes.String}, + }, + } + + u := &uploadHelper{ + t: t, + } + s, err := cl.StartStream(table, u.Upload) + require.NoError(t, err) + + bldr := array.NewRecordBuilder(memory.DefaultAllocator, table.ToArrowSchema()) + bldr.Field(0).(*array.StringBuilder).Append("foo") + bldr.Field(0).(*array.StringBuilder).Append("bar") + record := bldr.NewRecord() + + require.NoError(t, s.Write([]arrow.Record{record})) + require.ErrorIs(t, s.FinishWithError(errTest), errTest) +} diff --git a/types/types.go b/types/types.go index 77ae7a7c..022d5ac6 100644 --- a/types/types.go +++ b/types/types.go @@ -4,7 +4,7 @@ import ( "io" "github.com/apache/arrow/go/v13/arrow" - "github.com/cloudquery/plugin-sdk/v3/schema" + "github.com/cloudquery/plugin-sdk/v4/schema" ) type FileType interface { diff --git a/write.go b/write.go index fbb2eb9f..2633591e 100644 --- a/write.go +++ b/write.go @@ -4,8 +4,8 @@ import ( "io" "github.com/apache/arrow/go/v13/arrow" - "github.com/cloudquery/filetypes/v3/types" - "github.com/cloudquery/plugin-sdk/v3/schema" + "github.com/cloudquery/filetypes/v4/types" + "github.com/cloudquery/plugin-sdk/v4/schema" ) func (cl *Client) WriteTableBatchFile(w io.Writer, table *schema.Table, records []arrow.Record) error {