Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 21 additions & 6 deletions plugin/diff.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,25 +6,40 @@ import (

"github.com/apache/arrow/go/v13/arrow"
"github.com/apache/arrow/go/v13/arrow/array"
"github.com/apache/arrow/go/v13/arrow/memory"
)

func RecordDiff(l, r arrow.Record) string {
if array.RecordApproxEqual(l, r, array.WithUnorderedMapKeys(true)) {
func RecordsDiff(sc *arrow.Schema, l, r []arrow.Record) string {
return TableDiff(array.NewTableFromRecords(sc, l), array.NewTableFromRecords(sc, r))
}

func TableDiff(l, r arrow.Table) string {
if array.TableApproxEqual(l, r, array.WithUnorderedMapKeys(true)) {
return ""
}
var sb strings.Builder

if l.NumCols() != r.NumCols() {
return fmt.Sprintf("different number of columns: %d vs %d", l.NumCols(), r.NumCols())
}
if l.NumRows() != r.NumRows() {
return fmt.Sprintf("different number of rows: %d vs %d", l.NumRows(), r.NumRows())
}

var sb strings.Builder
for i := 0; i < int(l.NumCols()); i++ {
edits, err := array.Diff(l.Column(i), r.Column(i))
lCol, err := array.Concatenate(l.Column(i).Data().Chunks(), memory.DefaultAllocator)
if err != nil {
panic(fmt.Errorf("failed to concat left columns at idx %d: %w", i, err))
}
rCol, err := array.Concatenate(r.Column(i).Data().Chunks(), memory.DefaultAllocator)
if err != nil {
panic(fmt.Errorf("failed to concat right columns at idx %d: %w", i, err))
}
edits, err := array.Diff(lCol, rCol)
if err != nil {
panic(fmt.Sprintf("left: %v, right: %v, error: %v", l.Column(i).DataType(), r.Column(i).DataType(), err))
panic(fmt.Errorf("left: %v, right: %v, error: %w", lCol.DataType(), rCol.DataType(), err))
}
diff := edits.UnifiedDiff(l.Column(i), r.Column(i))
diff := edits.UnifiedDiff(lCol, rCol)
if diff != "" {
sb.WriteString(l.Schema().Field(i).Name)
sb.WriteString(": ")
Expand Down
13 changes: 7 additions & 6 deletions plugin/nulls_test.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
package plugin

import (
"testing"
"time"

"github.com/apache/arrow/go/v13/arrow"
"github.com/apache/arrow/go/v13/arrow/array"
"github.com/cloudquery/plugin-sdk/v4/schema"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"testing"
"time"
)

func TestWithTestIgnoreNullsInLists(t *testing.T) {
Expand All @@ -20,7 +21,7 @@ func TestWithTestIgnoreNullsInLists(t *testing.T) {
SyncTime: time.Now(),
MaxRows: 100,
NullRows: false,
})[0])
}))
for _, c := range resource.Columns() {
assertNoNullsInLists(t, c)
}
Expand All @@ -30,7 +31,7 @@ func TestWithTestIgnoreNullsInLists(t *testing.T) {
SyncTime: time.Now(),
MaxRows: 100,
NullRows: true,
})[0])
}))
for _, c := range resource.Columns() {
assertNoNullsInLists(t, c)
}
Expand Down Expand Up @@ -65,7 +66,7 @@ func TestWithTestSourceAllowNull(t *testing.T) {
SyncTime: time.Now(),
MaxRows: 100,
NullRows: false,
})[0])
}))
for _, c := range resource.Columns() {
assertNoNulls(t, s.allowNull, c)
}
Expand All @@ -75,7 +76,7 @@ func TestWithTestSourceAllowNull(t *testing.T) {
SyncTime: time.Now(),
MaxRows: 100,
NullRows: true,
})[0])
}))
for _, c := range resource.Columns() {
assertNoNulls(t, s.allowNull, c)
}
Expand Down
22 changes: 13 additions & 9 deletions plugin/testing_upsert.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,13 +61,14 @@ func (s *WriterTestSuite) testUpsertBasic(ctx context.Context) error {
if totalItems != 1 {
return fmt.Errorf("expected 1 item, got %d", totalItems)
}
if diff := RecordDiff(records[0], record); diff != "" {
if diff := RecordsDiff(table.ToArrowSchema(), records, []arrow.Record{record}); diff != "" {
return fmt.Errorf("record differs: %s", diff)
}
return nil
}

func (s *WriterTestSuite) testUpsertAll(ctx context.Context) error {
const rowsPerRecord = 10
tableName := s.tableNameForTest("upsert_all")
table := schema.TestTable(tableName, s.genDatOptions)
table.Columns = append(table.Columns, schema.Column{Name: "name", Type: arrow.BinaryTypes.String, PrimaryKey: true})
Expand All @@ -78,7 +79,10 @@ func (s *WriterTestSuite) testUpsertAll(ctx context.Context) error {
}

tg := schema.NewTestDataGenerator()
normalRecord := tg.Generate(table, schema.GenTestDataOptions{MaxRows: 1, TimePrecision: s.genDatOptions.TimePrecision})[0]
normalRecord := tg.Generate(table, schema.GenTestDataOptions{
MaxRows: rowsPerRecord,
TimePrecision: s.genDatOptions.TimePrecision,
})
if err := s.plugin.writeOne(ctx, &message.WriteInsert{
Record: normalRecord,
}); err != nil {
Expand All @@ -91,15 +95,15 @@ func (s *WriterTestSuite) testUpsertAll(ctx context.Context) error {
return fmt.Errorf("failed to readAll: %w", err)
}
totalItems := TotalRows(records)
if totalItems != 1 {
return fmt.Errorf("expected 1 item, got %d", totalItems)
if totalItems != rowsPerRecord {
return fmt.Errorf("expected items: %d, got %d", rowsPerRecord, totalItems)
}

if diff := RecordDiff(records[0], normalRecord); diff != "" {
if diff := RecordsDiff(table.ToArrowSchema(), records, []arrow.Record{normalRecord}); diff != "" {
return fmt.Errorf("record differs after insert: %s", diff)
}

nullRecord := tg.Generate(table, schema.GenTestDataOptions{MaxRows: 1, TimePrecision: s.genDatOptions.TimePrecision, NullRows: true})[0]
nullRecord := tg.Generate(table, schema.GenTestDataOptions{MaxRows: 10, TimePrecision: s.genDatOptions.TimePrecision, NullRows: true})
if err := s.plugin.writeOne(ctx, &message.WriteInsert{
Record: nullRecord,
}); err != nil {
Expand All @@ -113,11 +117,11 @@ func (s *WriterTestSuite) testUpsertAll(ctx context.Context) error {
}

totalItems = TotalRows(records)
if totalItems != 1 {
return fmt.Errorf("expected 1 item, got %d", totalItems)
if totalItems != rowsPerRecord {
return fmt.Errorf("expected items: %d, got %d", rowsPerRecord, totalItems)
}

if diff := RecordDiff(records[0], nullRecord); diff != "" {
if diff := RecordsDiff(table.ToArrowSchema(), records, []arrow.Record{nullRecord}); diff != "" {
return fmt.Errorf("record differs after upsert (columns should be null): %s", diff)
}

Expand Down
3 changes: 2 additions & 1 deletion plugin/testing_write_delete.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"fmt"
"time"

"github.com/apache/arrow/go/v13/arrow"
"github.com/apache/arrow/go/v13/arrow/array"
"github.com/apache/arrow/go/v13/arrow/memory"
"github.com/cloudquery/plugin-sdk/v4/message"
Expand Down Expand Up @@ -70,7 +71,7 @@ func (s *WriterTestSuite) testDeleteStale(ctx context.Context) error {
if totalItems != 1 {
return fmt.Errorf("expected 1 item, got %d", totalItems)
}
if diff := RecordDiff(records[0], record); diff != "" {
if diff := RecordsDiff(table.ToArrowSchema(), records, []arrow.Record{record}); diff != "" {
return fmt.Errorf("record differs: %s", diff)
}
return nil
Expand Down
30 changes: 16 additions & 14 deletions plugin/testing_write_insert.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,17 +73,15 @@ func (s *WriterTestSuite) testInsertBasic(ctx context.Context) error {
return fmt.Errorf("expected 2 items, got %d", totalItems)
}

if diff := RecordDiff(readRecords[0], record); diff != "" {
if diff := RecordsDiff(table.ToArrowSchema(), readRecords, []arrow.Record{record, record}); diff != "" {
return fmt.Errorf("record[0] differs: %s", diff)
}
if diff := RecordDiff(readRecords[1], record); diff != "" {
return fmt.Errorf("record[1] differs: %s", diff)
}

return nil
}

func (s *WriterTestSuite) testInsertAll(ctx context.Context) error {
const rowsPerRecord = 10
tableName := s.tableNameForTest("insert_all")
table := schema.TestTable(tableName, s.genDatOptions)
if err := s.plugin.writeOne(ctx, &message.WriteMigrateTable{
Expand All @@ -92,7 +90,10 @@ func (s *WriterTestSuite) testInsertAll(ctx context.Context) error {
return fmt.Errorf("failed to create table: %w", err)
}
tg := schema.NewTestDataGenerator()
normalRecord := tg.Generate(table, schema.GenTestDataOptions{MaxRows: 1, TimePrecision: s.genDatOptions.TimePrecision})[0]
normalRecord := tg.Generate(table, schema.GenTestDataOptions{
MaxRows: rowsPerRecord,
TimePrecision: s.genDatOptions.TimePrecision,
})
if err := s.plugin.writeOne(ctx, &message.WriteInsert{
Record: normalRecord,
}); err != nil {
Expand All @@ -106,11 +107,15 @@ func (s *WriterTestSuite) testInsertAll(ctx context.Context) error {
}

totalItems := TotalRows(readRecords)
if totalItems != 1 {
return fmt.Errorf("expected 1 item, got %d", totalItems)
if totalItems != rowsPerRecord {
return fmt.Errorf("items expected: %d, got: %d", rowsPerRecord, totalItems)
}

nullRecord := tg.Generate(table, schema.GenTestDataOptions{MaxRows: 1, TimePrecision: s.genDatOptions.TimePrecision, NullRows: true})[0]
nullRecord := tg.Generate(table, schema.GenTestDataOptions{
MaxRows: rowsPerRecord,
TimePrecision: s.genDatOptions.TimePrecision,
NullRows: true,
})
if err := s.plugin.writeOne(ctx, &message.WriteInsert{
Record: nullRecord,
}); err != nil {
Expand All @@ -125,14 +130,11 @@ func (s *WriterTestSuite) testInsertAll(ctx context.Context) error {
sortRecords(table, readRecords, "id")

totalItems = TotalRows(readRecords)
if totalItems != 2 {
return fmt.Errorf("expected 2 items, got %d", totalItems)
if totalItems != 2*rowsPerRecord {
return fmt.Errorf("items expected: %d, got: %d", 2*rowsPerRecord, totalItems)
}
if diff := RecordDiff(readRecords[0], normalRecord); diff != "" {
if diff := RecordsDiff(table.ToArrowSchema(), readRecords, []arrow.Record{normalRecord, nullRecord}); diff != "" {
return fmt.Errorf("record[0] differs: %s", diff)
}
if diff := RecordDiff(readRecords[1], nullRecord); diff != "" {
return fmt.Errorf("record[1] differs: %s", diff)
}
return nil
}
28 changes: 16 additions & 12 deletions plugin/testing_write_migrate.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ func tableUUIDSuffix() string {

// nolint:revive
func (s *WriterTestSuite) migrate(ctx context.Context, target *schema.Table, source *schema.Table, supportsSafeMigrate bool, writeOptionMigrateForce bool) error {
const rowsPerRecord = 10
if err := s.plugin.writeOne(ctx, &message.WriteMigrateTable{
Table: source,
MigrateForce: writeOptionMigrateForce,
Expand All @@ -33,11 +34,11 @@ func (s *WriterTestSuite) migrate(ctx context.Context, target *schema.Table, sou
opts := schema.GenTestDataOptions{
SourceName: sourceName,
SyncTime: syncTime,
MaxRows: 1,
MaxRows: rowsPerRecord,
TimePrecision: s.genDatOptions.TimePrecision,
}
tg := schema.NewTestDataGenerator()
resource1 := tg.Generate(source, opts)[0]
resource1 := tg.Generate(source, opts)
if err := s.plugin.writeOne(ctx, &message.WriteInsert{
Record: resource1,
}); err != nil {
Expand All @@ -50,10 +51,10 @@ func (s *WriterTestSuite) migrate(ctx context.Context, target *schema.Table, sou
return fmt.Errorf("failed to sync: %w", err)
}
totalItems := TotalRows(records)
if totalItems != 1 {
return fmt.Errorf("expected 1 item, got %d", totalItems)
if totalItems != rowsPerRecord {
return fmt.Errorf("expected items: %d, got: %d", rowsPerRecord, totalItems)
}
if diff := RecordDiff(records[0], resource1); diff != "" {
if diff := RecordsDiff(source.ToArrowSchema(), records, []arrow.Record{resource1}); diff != "" {
return fmt.Errorf("first record differs from expectation: %s", diff)
}

Expand All @@ -64,7 +65,7 @@ func (s *WriterTestSuite) migrate(ctx context.Context, target *schema.Table, sou
return fmt.Errorf("failed to create table: %w", err)
}

resource2 := tg.Generate(target, opts)[0]
resource2 := tg.Generate(target, opts)
if err := s.plugin.writeOne(ctx, &message.WriteInsert{
Record: resource2,
}); err != nil {
Expand All @@ -78,12 +79,13 @@ func (s *WriterTestSuite) migrate(ctx context.Context, target *schema.Table, sou
}
sortRecords(target, records, "id")

lastRow := resource2.NewSlice(resource2.NumRows()-1, resource2.NumRows())
// if force migration is not required, we don't expect any items to be dropped (so there should be 2 items)
if !writeOptionMigrateForce || supportsSafeMigrate {
if err := expectRows(records, 2, resource2); err != nil {
if writeOptionMigrateForce && TotalRows(records) == 1 {
if err := expectRows(target.ToArrowSchema(), records, 2*rowsPerRecord, lastRow); err != nil {
if writeOptionMigrateForce && TotalRows(records) == rowsPerRecord {
// if force migration is required, we can also expect 1 item to be dropped
return expectRows(records, 1, resource2)
return expectRows(target.ToArrowSchema(), records, rowsPerRecord, lastRow)
}

return err
Expand All @@ -92,7 +94,7 @@ func (s *WriterTestSuite) migrate(ctx context.Context, target *schema.Table, sou
return nil
}

return expectRows(records, 1, resource2)
return expectRows(target.ToArrowSchema(), records, rowsPerRecord, lastRow)
}

// nolint:revive
Expand Down Expand Up @@ -235,12 +237,14 @@ func (s *WriterTestSuite) testMigrate(
})
}

func expectRows(records []arrow.Record, expectTotal int64, expectedLast arrow.Record) error {
func expectRows(sc *arrow.Schema, records []arrow.Record, expectTotal int64, expectedLast arrow.Record) error {
totalItems := TotalRows(records)
if totalItems != expectTotal {
return fmt.Errorf("expected %d items, got %d", expectTotal, totalItems)
}
if diff := RecordDiff(records[totalItems-1], expectedLast); diff != "" {
lastRecord := records[len(records)-1]
lastRow := lastRecord.NewSlice(lastRecord.NumRows()-1, lastRecord.NumRows())
if diff := RecordsDiff(sc, []arrow.Record{lastRow}, []arrow.Record{expectedLast}); diff != "" {
return fmt.Errorf("record #%d differs from expectation: %s", totalItems, diff)
}
return nil
Expand Down
31 changes: 27 additions & 4 deletions schema/testdata.go
Original file line number Diff line number Diff line change
Expand Up @@ -212,10 +212,17 @@ func NewTestDataGenerator() *TestDataGenerator {
}
}

// GenTestData generates a slice of arrow.Records with the given schema and options.
func (tg *TestDataGenerator) Generate(table *Table, opts GenTestDataOptions) []arrow.Record {
var records []arrow.Record
// Generate will produce a single arrow.Record with the given schema and options.
func (tg *TestDataGenerator) Generate(table *Table, opts GenTestDataOptions) arrow.Record {
sc := table.ToArrowSchema()
if opts.MaxRows == 0 {
// We generate an empty record
bldr := array.NewRecordBuilder(memory.DefaultAllocator, sc)
defer bldr.Release()
return bldr.NewRecord()
}

var records []arrow.Record
for j := 0; j < opts.MaxRows; j++ {
tg.counter++
bldr := array.NewRecordBuilder(memory.DefaultAllocator, sc)
Expand Down Expand Up @@ -245,7 +252,23 @@ func (tg *TestDataGenerator) Generate(table *Table, opts GenTestDataOptions) []a
return strings.Compare(firstUUID, secondUUID) < 0
})
}
return records

// now we have sorted 1-row-records. Transform them into a single record with opts.MaxRows rows
columns := make([]arrow.Array, sc.NumFields())
for n := 0; n < sc.NumFields(); n++ {
arrs := make([]arrow.Array, len(records))
for i := range arrs {
arrs[i] = records[i].Column(n)
}

concatenated, err := array.Concatenate(arrs, memory.DefaultAllocator)
if err != nil {
panic(fmt.Sprintf("failed to concatenate arrays: %v", err))
}
columns[n] = concatenated
}

return array.NewRecord(sc, columns, -1)
}

func (tg TestDataGenerator) getExampleJSON(colName string, dataType arrow.DataType, opts GenTestDataOptions) string {
Expand Down