From 8879a5d2cfe07aedeb0f6861dc99ffa3d039383c Mon Sep 17 00:00:00 2001 From: candiduslynx Date: Fri, 18 Aug 2023 15:55:13 +0300 Subject: [PATCH 1/2] change signature --- plugin/nulls_test.go | 13 +++++++------ plugin/testing_upsert.go | 4 ++-- plugin/testing_write_insert.go | 4 ++-- plugin/testing_write_migrate.go | 6 +++--- schema/testdata.go | 23 ++++++++++++++++++++--- 5 files changed, 34 insertions(+), 16 deletions(-) diff --git a/plugin/nulls_test.go b/plugin/nulls_test.go index 38c1bef341..ba77fe7281 100644 --- a/plugin/nulls_test.go +++ b/plugin/nulls_test.go @@ -1,13 +1,14 @@ package plugin import ( + "testing" + "time" + "github.com/apache/arrow/go/v13/arrow" "github.com/apache/arrow/go/v13/arrow/array" "github.com/cloudquery/plugin-sdk/v4/schema" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" - "testing" - "time" ) func TestWithTestIgnoreNullsInLists(t *testing.T) { @@ -20,7 +21,7 @@ func TestWithTestIgnoreNullsInLists(t *testing.T) { SyncTime: time.Now(), MaxRows: 100, NullRows: false, - })[0]) + })) for _, c := range resource.Columns() { assertNoNullsInLists(t, c) } @@ -30,7 +31,7 @@ func TestWithTestIgnoreNullsInLists(t *testing.T) { SyncTime: time.Now(), MaxRows: 100, NullRows: true, - })[0]) + })) for _, c := range resource.Columns() { assertNoNullsInLists(t, c) } @@ -65,7 +66,7 @@ func TestWithTestSourceAllowNull(t *testing.T) { SyncTime: time.Now(), MaxRows: 100, NullRows: false, - })[0]) + })) for _, c := range resource.Columns() { assertNoNulls(t, s.allowNull, c) } @@ -75,7 +76,7 @@ func TestWithTestSourceAllowNull(t *testing.T) { SyncTime: time.Now(), MaxRows: 100, NullRows: true, - })[0]) + })) for _, c := range resource.Columns() { assertNoNulls(t, s.allowNull, c) } diff --git a/plugin/testing_upsert.go b/plugin/testing_upsert.go index 2b795b84ae..696b0ce957 100644 --- a/plugin/testing_upsert.go +++ b/plugin/testing_upsert.go @@ -78,7 +78,7 @@ func (s *WriterTestSuite) testUpsertAll(ctx context.Context) error { } tg := schema.NewTestDataGenerator() - normalRecord := tg.Generate(table, schema.GenTestDataOptions{MaxRows: 1, TimePrecision: s.genDatOptions.TimePrecision})[0] + normalRecord := tg.Generate(table, schema.GenTestDataOptions{MaxRows: 10, TimePrecision: s.genDatOptions.TimePrecision}) if err := s.plugin.writeOne(ctx, &message.WriteInsert{ Record: normalRecord, }); err != nil { @@ -99,7 +99,7 @@ func (s *WriterTestSuite) testUpsertAll(ctx context.Context) error { return fmt.Errorf("record differs after insert: %s", diff) } - nullRecord := tg.Generate(table, schema.GenTestDataOptions{MaxRows: 1, TimePrecision: s.genDatOptions.TimePrecision, NullRows: true})[0] + nullRecord := tg.Generate(table, schema.GenTestDataOptions{MaxRows: 10, TimePrecision: s.genDatOptions.TimePrecision, NullRows: true}) if err := s.plugin.writeOne(ctx, &message.WriteInsert{ Record: nullRecord, }); err != nil { diff --git a/plugin/testing_write_insert.go b/plugin/testing_write_insert.go index 07603b2488..a1309ef93d 100644 --- a/plugin/testing_write_insert.go +++ b/plugin/testing_write_insert.go @@ -92,7 +92,7 @@ func (s *WriterTestSuite) testInsertAll(ctx context.Context) error { return fmt.Errorf("failed to create table: %w", err) } tg := schema.NewTestDataGenerator() - normalRecord := tg.Generate(table, schema.GenTestDataOptions{MaxRows: 1, TimePrecision: s.genDatOptions.TimePrecision})[0] + normalRecord := tg.Generate(table, schema.GenTestDataOptions{MaxRows: 10, TimePrecision: s.genDatOptions.TimePrecision}) if err := s.plugin.writeOne(ctx, &message.WriteInsert{ Record: normalRecord, }); err != nil { @@ -110,7 +110,7 @@ func (s *WriterTestSuite) testInsertAll(ctx context.Context) error { return fmt.Errorf("expected 1 item, got %d", totalItems) } - nullRecord := tg.Generate(table, schema.GenTestDataOptions{MaxRows: 1, TimePrecision: s.genDatOptions.TimePrecision, NullRows: true})[0] + nullRecord := tg.Generate(table, schema.GenTestDataOptions{MaxRows: 10, TimePrecision: s.genDatOptions.TimePrecision, NullRows: true}) if err := s.plugin.writeOne(ctx, &message.WriteInsert{ Record: nullRecord, }); err != nil { diff --git a/plugin/testing_write_migrate.go b/plugin/testing_write_migrate.go index 0e6b58dbd4..ad4f4d253a 100644 --- a/plugin/testing_write_migrate.go +++ b/plugin/testing_write_migrate.go @@ -33,11 +33,11 @@ func (s *WriterTestSuite) migrate(ctx context.Context, target *schema.Table, sou opts := schema.GenTestDataOptions{ SourceName: sourceName, SyncTime: syncTime, - MaxRows: 1, + MaxRows: 10, TimePrecision: s.genDatOptions.TimePrecision, } tg := schema.NewTestDataGenerator() - resource1 := tg.Generate(source, opts)[0] + resource1 := tg.Generate(source, opts) if err := s.plugin.writeOne(ctx, &message.WriteInsert{ Record: resource1, }); err != nil { @@ -64,7 +64,7 @@ func (s *WriterTestSuite) migrate(ctx context.Context, target *schema.Table, sou return fmt.Errorf("failed to create table: %w", err) } - resource2 := tg.Generate(target, opts)[0] + resource2 := tg.Generate(target, opts) if err := s.plugin.writeOne(ctx, &message.WriteInsert{ Record: resource2, }); err != nil { diff --git a/schema/testdata.go b/schema/testdata.go index 4ad147ab99..b23db6cbb4 100644 --- a/schema/testdata.go +++ b/schema/testdata.go @@ -212,8 +212,8 @@ func NewTestDataGenerator() *TestDataGenerator { } } -// GenTestData generates a slice of arrow.Records with the given schema and options. -func (tg *TestDataGenerator) Generate(table *Table, opts GenTestDataOptions) []arrow.Record { +// Generate will produce a single arrow.Record with the given schema and options. +func (tg *TestDataGenerator) Generate(table *Table, opts GenTestDataOptions) arrow.Record { var records []arrow.Record sc := table.ToArrowSchema() for j := 0; j < opts.MaxRows; j++ { @@ -245,7 +245,24 @@ func (tg *TestDataGenerator) Generate(table *Table, opts GenTestDataOptions) []a return strings.Compare(firstUUID, secondUUID) < 0 }) } - return records + + // now we have sorted 1-row-records. Transform them into a single record with opts.MaxRows rows + + columns := make([]arrow.Array, sc.NumFields()) + for n := 0; n < sc.NumFields(); n++ { + arrs := make([]arrow.Array, len(records)) + for i := range arrs { + arrs[i] = records[i].Column(n) + } + + concatenated, err := array.Concatenate(arrs, memory.DefaultAllocator) + if err != nil { + panic(fmt.Sprintf("failed to concatenate arrays: %v", err)) + } + columns[n] = concatenated + } + + return array.NewRecord(sc, columns, -1) } func (tg TestDataGenerator) getExampleJSON(colName string, dataType arrow.DataType, opts GenTestDataOptions) string { From 1ce3ff6f5b00af90ff3c8e99a1223ce65ed256df Mon Sep 17 00:00:00 2001 From: candiduslynx Date: Fri, 18 Aug 2023 16:59:39 +0300 Subject: [PATCH 2/2] fix tests --- plugin/diff.go | 27 +++++++++++++++++++++------ plugin/testing_upsert.go | 20 ++++++++++++-------- plugin/testing_write_delete.go | 3 ++- plugin/testing_write_insert.go | 30 ++++++++++++++++-------------- plugin/testing_write_migrate.go | 24 ++++++++++++++---------- schema/testdata.go | 10 ++++++++-- 6 files changed, 73 insertions(+), 41 deletions(-) diff --git a/plugin/diff.go b/plugin/diff.go index de78e97013..3b5b63aa2d 100644 --- a/plugin/diff.go +++ b/plugin/diff.go @@ -6,25 +6,40 @@ import ( "github.com/apache/arrow/go/v13/arrow" "github.com/apache/arrow/go/v13/arrow/array" + "github.com/apache/arrow/go/v13/arrow/memory" ) -func RecordDiff(l, r arrow.Record) string { - if array.RecordApproxEqual(l, r, array.WithUnorderedMapKeys(true)) { +func RecordsDiff(sc *arrow.Schema, l, r []arrow.Record) string { + return TableDiff(array.NewTableFromRecords(sc, l), array.NewTableFromRecords(sc, r)) +} + +func TableDiff(l, r arrow.Table) string { + if array.TableApproxEqual(l, r, array.WithUnorderedMapKeys(true)) { return "" } - var sb strings.Builder + if l.NumCols() != r.NumCols() { return fmt.Sprintf("different number of columns: %d vs %d", l.NumCols(), r.NumCols()) } if l.NumRows() != r.NumRows() { return fmt.Sprintf("different number of rows: %d vs %d", l.NumRows(), r.NumRows()) } + + var sb strings.Builder for i := 0; i < int(l.NumCols()); i++ { - edits, err := array.Diff(l.Column(i), r.Column(i)) + lCol, err := array.Concatenate(l.Column(i).Data().Chunks(), memory.DefaultAllocator) + if err != nil { + panic(fmt.Errorf("failed to concat left columns at idx %d: %w", i, err)) + } + rCol, err := array.Concatenate(r.Column(i).Data().Chunks(), memory.DefaultAllocator) + if err != nil { + panic(fmt.Errorf("failed to concat right columns at idx %d: %w", i, err)) + } + edits, err := array.Diff(lCol, rCol) if err != nil { - panic(fmt.Sprintf("left: %v, right: %v, error: %v", l.Column(i).DataType(), r.Column(i).DataType(), err)) + panic(fmt.Errorf("left: %v, right: %v, error: %w", lCol.DataType(), rCol.DataType(), err)) } - diff := edits.UnifiedDiff(l.Column(i), r.Column(i)) + diff := edits.UnifiedDiff(lCol, rCol) if diff != "" { sb.WriteString(l.Schema().Field(i).Name) sb.WriteString(": ") diff --git a/plugin/testing_upsert.go b/plugin/testing_upsert.go index 696b0ce957..e92240563b 100644 --- a/plugin/testing_upsert.go +++ b/plugin/testing_upsert.go @@ -61,13 +61,14 @@ func (s *WriterTestSuite) testUpsertBasic(ctx context.Context) error { if totalItems != 1 { return fmt.Errorf("expected 1 item, got %d", totalItems) } - if diff := RecordDiff(records[0], record); diff != "" { + if diff := RecordsDiff(table.ToArrowSchema(), records, []arrow.Record{record}); diff != "" { return fmt.Errorf("record differs: %s", diff) } return nil } func (s *WriterTestSuite) testUpsertAll(ctx context.Context) error { + const rowsPerRecord = 10 tableName := s.tableNameForTest("upsert_all") table := schema.TestTable(tableName, s.genDatOptions) table.Columns = append(table.Columns, schema.Column{Name: "name", Type: arrow.BinaryTypes.String, PrimaryKey: true}) @@ -78,7 +79,10 @@ func (s *WriterTestSuite) testUpsertAll(ctx context.Context) error { } tg := schema.NewTestDataGenerator() - normalRecord := tg.Generate(table, schema.GenTestDataOptions{MaxRows: 10, TimePrecision: s.genDatOptions.TimePrecision}) + normalRecord := tg.Generate(table, schema.GenTestDataOptions{ + MaxRows: rowsPerRecord, + TimePrecision: s.genDatOptions.TimePrecision, + }) if err := s.plugin.writeOne(ctx, &message.WriteInsert{ Record: normalRecord, }); err != nil { @@ -91,11 +95,11 @@ func (s *WriterTestSuite) testUpsertAll(ctx context.Context) error { return fmt.Errorf("failed to readAll: %w", err) } totalItems := TotalRows(records) - if totalItems != 1 { - return fmt.Errorf("expected 1 item, got %d", totalItems) + if totalItems != rowsPerRecord { + return fmt.Errorf("expected items: %d, got %d", rowsPerRecord, totalItems) } - if diff := RecordDiff(records[0], normalRecord); diff != "" { + if diff := RecordsDiff(table.ToArrowSchema(), records, []arrow.Record{normalRecord}); diff != "" { return fmt.Errorf("record differs after insert: %s", diff) } @@ -113,11 +117,11 @@ func (s *WriterTestSuite) testUpsertAll(ctx context.Context) error { } totalItems = TotalRows(records) - if totalItems != 1 { - return fmt.Errorf("expected 1 item, got %d", totalItems) + if totalItems != rowsPerRecord { + return fmt.Errorf("expected items: %d, got %d", rowsPerRecord, totalItems) } - if diff := RecordDiff(records[0], nullRecord); diff != "" { + if diff := RecordsDiff(table.ToArrowSchema(), records, []arrow.Record{nullRecord}); diff != "" { return fmt.Errorf("record differs after upsert (columns should be null): %s", diff) } diff --git a/plugin/testing_write_delete.go b/plugin/testing_write_delete.go index 0892ce2784..4fc6133549 100644 --- a/plugin/testing_write_delete.go +++ b/plugin/testing_write_delete.go @@ -5,6 +5,7 @@ import ( "fmt" "time" + "github.com/apache/arrow/go/v13/arrow" "github.com/apache/arrow/go/v13/arrow/array" "github.com/apache/arrow/go/v13/arrow/memory" "github.com/cloudquery/plugin-sdk/v4/message" @@ -70,7 +71,7 @@ func (s *WriterTestSuite) testDeleteStale(ctx context.Context) error { if totalItems != 1 { return fmt.Errorf("expected 1 item, got %d", totalItems) } - if diff := RecordDiff(records[0], record); diff != "" { + if diff := RecordsDiff(table.ToArrowSchema(), records, []arrow.Record{record}); diff != "" { return fmt.Errorf("record differs: %s", diff) } return nil diff --git a/plugin/testing_write_insert.go b/plugin/testing_write_insert.go index a1309ef93d..ebe5a94dfa 100644 --- a/plugin/testing_write_insert.go +++ b/plugin/testing_write_insert.go @@ -73,17 +73,15 @@ func (s *WriterTestSuite) testInsertBasic(ctx context.Context) error { return fmt.Errorf("expected 2 items, got %d", totalItems) } - if diff := RecordDiff(readRecords[0], record); diff != "" { + if diff := RecordsDiff(table.ToArrowSchema(), readRecords, []arrow.Record{record, record}); diff != "" { return fmt.Errorf("record[0] differs: %s", diff) } - if diff := RecordDiff(readRecords[1], record); diff != "" { - return fmt.Errorf("record[1] differs: %s", diff) - } return nil } func (s *WriterTestSuite) testInsertAll(ctx context.Context) error { + const rowsPerRecord = 10 tableName := s.tableNameForTest("insert_all") table := schema.TestTable(tableName, s.genDatOptions) if err := s.plugin.writeOne(ctx, &message.WriteMigrateTable{ @@ -92,7 +90,10 @@ func (s *WriterTestSuite) testInsertAll(ctx context.Context) error { return fmt.Errorf("failed to create table: %w", err) } tg := schema.NewTestDataGenerator() - normalRecord := tg.Generate(table, schema.GenTestDataOptions{MaxRows: 10, TimePrecision: s.genDatOptions.TimePrecision}) + normalRecord := tg.Generate(table, schema.GenTestDataOptions{ + MaxRows: rowsPerRecord, + TimePrecision: s.genDatOptions.TimePrecision, + }) if err := s.plugin.writeOne(ctx, &message.WriteInsert{ Record: normalRecord, }); err != nil { @@ -106,11 +107,15 @@ func (s *WriterTestSuite) testInsertAll(ctx context.Context) error { } totalItems := TotalRows(readRecords) - if totalItems != 1 { - return fmt.Errorf("expected 1 item, got %d", totalItems) + if totalItems != rowsPerRecord { + return fmt.Errorf("items expected: %d, got: %d", rowsPerRecord, totalItems) } - nullRecord := tg.Generate(table, schema.GenTestDataOptions{MaxRows: 10, TimePrecision: s.genDatOptions.TimePrecision, NullRows: true}) + nullRecord := tg.Generate(table, schema.GenTestDataOptions{ + MaxRows: rowsPerRecord, + TimePrecision: s.genDatOptions.TimePrecision, + NullRows: true, + }) if err := s.plugin.writeOne(ctx, &message.WriteInsert{ Record: nullRecord, }); err != nil { @@ -125,14 +130,11 @@ func (s *WriterTestSuite) testInsertAll(ctx context.Context) error { sortRecords(table, readRecords, "id") totalItems = TotalRows(readRecords) - if totalItems != 2 { - return fmt.Errorf("expected 2 items, got %d", totalItems) + if totalItems != 2*rowsPerRecord { + return fmt.Errorf("items expected: %d, got: %d", 2*rowsPerRecord, totalItems) } - if diff := RecordDiff(readRecords[0], normalRecord); diff != "" { + if diff := RecordsDiff(table.ToArrowSchema(), readRecords, []arrow.Record{normalRecord, nullRecord}); diff != "" { return fmt.Errorf("record[0] differs: %s", diff) } - if diff := RecordDiff(readRecords[1], nullRecord); diff != "" { - return fmt.Errorf("record[1] differs: %s", diff) - } return nil } diff --git a/plugin/testing_write_migrate.go b/plugin/testing_write_migrate.go index ad4f4d253a..6c57059563 100644 --- a/plugin/testing_write_migrate.go +++ b/plugin/testing_write_migrate.go @@ -21,6 +21,7 @@ func tableUUIDSuffix() string { // nolint:revive func (s *WriterTestSuite) migrate(ctx context.Context, target *schema.Table, source *schema.Table, supportsSafeMigrate bool, writeOptionMigrateForce bool) error { + const rowsPerRecord = 10 if err := s.plugin.writeOne(ctx, &message.WriteMigrateTable{ Table: source, MigrateForce: writeOptionMigrateForce, @@ -33,7 +34,7 @@ func (s *WriterTestSuite) migrate(ctx context.Context, target *schema.Table, sou opts := schema.GenTestDataOptions{ SourceName: sourceName, SyncTime: syncTime, - MaxRows: 10, + MaxRows: rowsPerRecord, TimePrecision: s.genDatOptions.TimePrecision, } tg := schema.NewTestDataGenerator() @@ -50,10 +51,10 @@ func (s *WriterTestSuite) migrate(ctx context.Context, target *schema.Table, sou return fmt.Errorf("failed to sync: %w", err) } totalItems := TotalRows(records) - if totalItems != 1 { - return fmt.Errorf("expected 1 item, got %d", totalItems) + if totalItems != rowsPerRecord { + return fmt.Errorf("expected items: %d, got: %d", rowsPerRecord, totalItems) } - if diff := RecordDiff(records[0], resource1); diff != "" { + if diff := RecordsDiff(source.ToArrowSchema(), records, []arrow.Record{resource1}); diff != "" { return fmt.Errorf("first record differs from expectation: %s", diff) } @@ -78,12 +79,13 @@ func (s *WriterTestSuite) migrate(ctx context.Context, target *schema.Table, sou } sortRecords(target, records, "id") + lastRow := resource2.NewSlice(resource2.NumRows()-1, resource2.NumRows()) // if force migration is not required, we don't expect any items to be dropped (so there should be 2 items) if !writeOptionMigrateForce || supportsSafeMigrate { - if err := expectRows(records, 2, resource2); err != nil { - if writeOptionMigrateForce && TotalRows(records) == 1 { + if err := expectRows(target.ToArrowSchema(), records, 2*rowsPerRecord, lastRow); err != nil { + if writeOptionMigrateForce && TotalRows(records) == rowsPerRecord { // if force migration is required, we can also expect 1 item to be dropped - return expectRows(records, 1, resource2) + return expectRows(target.ToArrowSchema(), records, rowsPerRecord, lastRow) } return err @@ -92,7 +94,7 @@ func (s *WriterTestSuite) migrate(ctx context.Context, target *schema.Table, sou return nil } - return expectRows(records, 1, resource2) + return expectRows(target.ToArrowSchema(), records, rowsPerRecord, lastRow) } // nolint:revive @@ -235,12 +237,14 @@ func (s *WriterTestSuite) testMigrate( }) } -func expectRows(records []arrow.Record, expectTotal int64, expectedLast arrow.Record) error { +func expectRows(sc *arrow.Schema, records []arrow.Record, expectTotal int64, expectedLast arrow.Record) error { totalItems := TotalRows(records) if totalItems != expectTotal { return fmt.Errorf("expected %d items, got %d", expectTotal, totalItems) } - if diff := RecordDiff(records[totalItems-1], expectedLast); diff != "" { + lastRecord := records[len(records)-1] + lastRow := lastRecord.NewSlice(lastRecord.NumRows()-1, lastRecord.NumRows()) + if diff := RecordsDiff(sc, []arrow.Record{lastRow}, []arrow.Record{expectedLast}); diff != "" { return fmt.Errorf("record #%d differs from expectation: %s", totalItems, diff) } return nil diff --git a/schema/testdata.go b/schema/testdata.go index b23db6cbb4..fa784411d6 100644 --- a/schema/testdata.go +++ b/schema/testdata.go @@ -214,8 +214,15 @@ func NewTestDataGenerator() *TestDataGenerator { // Generate will produce a single arrow.Record with the given schema and options. func (tg *TestDataGenerator) Generate(table *Table, opts GenTestDataOptions) arrow.Record { - var records []arrow.Record sc := table.ToArrowSchema() + if opts.MaxRows == 0 { + // We generate an empty record + bldr := array.NewRecordBuilder(memory.DefaultAllocator, sc) + defer bldr.Release() + return bldr.NewRecord() + } + + var records []arrow.Record for j := 0; j < opts.MaxRows; j++ { tg.counter++ bldr := array.NewRecordBuilder(memory.DefaultAllocator, sc) @@ -247,7 +254,6 @@ func (tg *TestDataGenerator) Generate(table *Table, opts GenTestDataOptions) arr } // now we have sorted 1-row-records. Transform them into a single record with opts.MaxRows rows - columns := make([]arrow.Array, sc.NumFields()) for n := 0; n < sc.NumFields(); n++ { arrs := make([]arrow.Array, len(records))