From d9d7d0ae76887c29cad47481dea31e051ee24c16 Mon Sep 17 00:00:00 2001 From: Yevgeny Pats <16490766+yevgenypats@users.noreply.github.com> Date: Thu, 20 Apr 2023 12:21:38 +0300 Subject: [PATCH] fix(testing): Add sorting for testing dest migrations --- plugins/destination/plugin_testing.go | 6 +++--- plugins/destination/plugin_testing_migrate.go | 2 ++ .../destination/plugin_testing_overwrite.go | 13 ++++++------ .../plugin_testing_overwrite_delete_stale.go | 21 +++++++++---------- .../plugin_testing_write_append.go | 8 +++---- testdata/testdata.go | 16 ++++++++++++++ 6 files changed, 41 insertions(+), 25 deletions(-) diff --git a/plugins/destination/plugin_testing.go b/plugins/destination/plugin_testing.go index a3230c5b5a..d0624060aa 100644 --- a/plugins/destination/plugin_testing.go +++ b/plugins/destination/plugin_testing.go @@ -198,9 +198,9 @@ func PluginTestSuiteRunner(t *testing.T, newPlugin NewPluginFunc, destSpec specs }) } -func sortRecordsBySyncTime(table *schema.Table, records []arrow.Record) { - syncTimeIndex := table.Columns.Index(schema.CqSyncTimeColumn.Name) - cqIDIndex := table.Columns.Index(schema.CqIDColumn.Name) +func sortRecordsBySyncTime(table *arrow.Schema, records []arrow.Record) { + syncTimeIndex := table.FieldIndices(schema.CqSyncTimeColumn.Name)[0] + cqIDIndex := table.FieldIndices(schema.CqIDColumn.Name)[0] sort.Slice(records, func(i, j int) bool { // sort by sync time, then UUID first := records[i].Column(syncTimeIndex).(*array.Timestamp).Value(0).ToTime(arrow.Millisecond) diff --git a/plugins/destination/plugin_testing_migrate.go b/plugins/destination/plugin_testing_migrate.go index ee1fdb0a62..a18c49f9ea 100644 --- a/plugins/destination/plugin_testing_migrate.go +++ b/plugins/destination/plugin_testing_migrate.go @@ -48,6 +48,7 @@ func testMigration(ctx context.Context, _ *testing.T, p *Plugin, logger zerolog. if err := p.Migrate(ctx, []*arrow.Schema{target}); err != nil { return fmt.Errorf("failed to migrate existing table: %w", err) } + opts.SyncTime = syncTime.Add(time.Second).UTC() resource2 := testdata.GenTestData(target, opts)[0] if err := p.writeOne(ctx, sourceSpec, syncTime, resource2); err != nil { return fmt.Errorf("failed to write one after migration: %w", err) @@ -57,6 +58,7 @@ func testMigration(ctx context.Context, _ *testing.T, p *Plugin, logger zerolog. if err != nil { return fmt.Errorf("failed to read all: %w", err) } + sortRecordsBySyncTime(target, resourcesRead) if mode == specs.MigrateModeSafe { if len(resourcesRead) != 2 { return fmt.Errorf("expected 2 resources after write, got %d", len(resourcesRead)) diff --git a/plugins/destination/plugin_testing_overwrite.go b/plugins/destination/plugin_testing_overwrite.go index 86fab268d4..07b85f02d9 100644 --- a/plugins/destination/plugin_testing_overwrite.go +++ b/plugins/destination/plugin_testing_overwrite.go @@ -7,7 +7,6 @@ import ( "github.com/apache/arrow/go/v12/arrow" "github.com/apache/arrow/go/v12/arrow/array" - "github.com/cloudquery/plugin-sdk/v2/schema" "github.com/cloudquery/plugin-sdk/v2/specs" "github.com/cloudquery/plugin-sdk/v2/testdata" "github.com/cloudquery/plugin-sdk/v2/types" @@ -21,10 +20,10 @@ func (*PluginTestSuite) destinationPluginTestWriteOverwrite(ctx context.Context, return fmt.Errorf("failed to init plugin: %w", err) } tableName := fmt.Sprintf("cq_%s_%d", spec.Name, time.Now().Unix()) - table := testdata.TestTable(tableName) + table := testdata.TestTable(tableName).ToArrowSchema() syncTime := time.Now().UTC().Round(1 * time.Second) tables := []*arrow.Schema{ - table.ToArrowSchema(), + table, } if err := p.Migrate(ctx, tables); err != nil { return fmt.Errorf("failed to migrate tables: %w", err) @@ -40,13 +39,13 @@ func (*PluginTestSuite) destinationPluginTestWriteOverwrite(ctx context.Context, SyncTime: syncTime, MaxRows: 2, } - resources := testdata.GenTestData(schema.CQSchemaToArrow(table), opts) + resources := testdata.GenTestData(table, opts) if err := p.writeAll(ctx, sourceSpec, syncTime, resources); err != nil { return fmt.Errorf("failed to write all: %w", err) } sortRecordsBySyncTime(table, resources) - resourcesRead, err := p.readAll(ctx, table.ToArrowSchema(), sourceName) + resourcesRead, err := p.readAll(ctx, table, sourceName) if err != nil { return fmt.Errorf("failed to read all: %w", err) } @@ -76,13 +75,13 @@ func (*PluginTestSuite) destinationPluginTestWriteOverwrite(ctx context.Context, MaxRows: 1, StableUUID: *u, } - updatedResource := testdata.GenTestData(schema.CQSchemaToArrow(table), opts)[0] + updatedResource := testdata.GenTestData(table, opts)[0] // write second time if err := p.writeOne(ctx, sourceSpec, secondSyncTime, updatedResource); err != nil { return fmt.Errorf("failed to write one second time: %w", err) } - resourcesRead, err = p.readAll(ctx, table.ToArrowSchema(), sourceName) + resourcesRead, err = p.readAll(ctx, table, sourceName) if err != nil { return fmt.Errorf("failed to read all second time: %w", err) } diff --git a/plugins/destination/plugin_testing_overwrite_delete_stale.go b/plugins/destination/plugin_testing_overwrite_delete_stale.go index b1f0918f7d..991168c2d4 100644 --- a/plugins/destination/plugin_testing_overwrite_delete_stale.go +++ b/plugins/destination/plugin_testing_overwrite_delete_stale.go @@ -20,13 +20,12 @@ func (*PluginTestSuite) destinationPluginTestWriteOverwriteDeleteStale(ctx conte return fmt.Errorf("failed to init plugin: %w", err) } tableName := fmt.Sprintf("cq_%s_%d", spec.Name, time.Now().Unix()) - table := testdata.TestTable(tableName) - incTable := testdata.TestTable(tableName + "_incremental") - incTable.IsIncremental = true + table := testdata.TestTable(tableName).ToArrowSchema() + incTable := testdata.TestTableIncremental(tableName + "_incremental").ToArrowSchema() syncTime := time.Now().UTC().Round(1 * time.Second) tables := []*arrow.Schema{ - table.ToArrowSchema(), - incTable.ToArrowSchema(), + table, + incTable, } if err := p.Migrate(ctx, tables); err != nil { return fmt.Errorf("failed to migrate tables: %w", err) @@ -43,8 +42,8 @@ func (*PluginTestSuite) destinationPluginTestWriteOverwriteDeleteStale(ctx conte SyncTime: syncTime, MaxRows: 2, } - resources := testdata.GenTestData(table.ToArrowSchema(), opts) - incResources := testdata.GenTestData(incTable.ToArrowSchema(), opts) + resources := testdata.GenTestData(table, opts) + incResources := testdata.GenTestData(incTable, opts) allResources := resources allResources = append(allResources, incResources...) if err := p.writeAll(ctx, sourceSpec, syncTime, allResources); err != nil { @@ -52,7 +51,7 @@ func (*PluginTestSuite) destinationPluginTestWriteOverwriteDeleteStale(ctx conte } sortRecordsBySyncTime(table, resources) - resourcesRead, err := p.readAll(ctx, table.ToArrowSchema(), sourceName) + resourcesRead, err := p.readAll(ctx, table, sourceName) if err != nil { return fmt.Errorf("failed to read all: %w", err) } @@ -72,7 +71,7 @@ func (*PluginTestSuite) destinationPluginTestWriteOverwriteDeleteStale(ctx conte } // read from incremental table - resourcesRead, err = p.readAll(ctx, incTable.ToArrowSchema(), sourceName) + resourcesRead, err = p.readAll(ctx, incTable, sourceName) if err != nil { return fmt.Errorf("failed to read all: %w", err) } @@ -89,13 +88,13 @@ func (*PluginTestSuite) destinationPluginTestWriteOverwriteDeleteStale(ctx conte StableUUID: *u, MaxRows: 1, } - updatedResources := testdata.GenTestData(table.ToArrowSchema(), opts)[0] + updatedResources := testdata.GenTestData(table, opts)[0] if err := p.writeOne(ctx, sourceSpec, secondSyncTime, updatedResources); err != nil { return fmt.Errorf("failed to write one second time: %w", err) } - resourcesRead, err = p.readAll(ctx, table.ToArrowSchema(), sourceName) + resourcesRead, err = p.readAll(ctx, table, sourceName) if err != nil { return fmt.Errorf("failed to read all second time: %w", err) } diff --git a/plugins/destination/plugin_testing_write_append.go b/plugins/destination/plugin_testing_write_append.go index 84eb0b5784..677a9645b8 100644 --- a/plugins/destination/plugin_testing_write_append.go +++ b/plugins/destination/plugin_testing_write_append.go @@ -19,10 +19,10 @@ func (s *PluginTestSuite) destinationPluginTestWriteAppend(ctx context.Context, return fmt.Errorf("failed to init plugin: %w", err) } tableName := spec.Name - table := testdata.TestTable(tableName) + table := testdata.TestTable(tableName).ToArrowSchema() syncTime := time.Now().UTC().Round(1 * time.Second) tables := []*arrow.Schema{ - table.ToArrowSchema(), + table, } if err := p.Migrate(ctx, tables); err != nil { return fmt.Errorf("failed to migrate tables: %w", err) @@ -38,14 +38,14 @@ func (s *PluginTestSuite) destinationPluginTestWriteAppend(ctx context.Context, SyncTime: syncTime, MaxRows: 1, } - record1 := testdata.GenTestData(table.ToArrowSchema(), opts)[0] + record1 := testdata.GenTestData(table, opts)[0] if err := p.writeOne(ctx, specSource, syncTime, record1); err != nil { return fmt.Errorf("failed to write one second time: %w", err) } secondSyncTime := syncTime.Add(10 * time.Second).UTC() opts.SyncTime = secondSyncTime - record2 := testdata.GenTestData(table.ToArrowSchema(), opts)[0] + record2 := testdata.GenTestData(table, opts)[0] if !s.tests.SkipSecondAppend { // write second time diff --git a/testdata/testdata.go b/testdata/testdata.go index 9fce828a3c..e220e670d5 100644 --- a/testdata/testdata.go +++ b/testdata/testdata.go @@ -2,6 +2,8 @@ package testdata import ( "net" + "sort" + "strings" "time" "github.com/apache/arrow/go/v12/arrow" @@ -109,6 +111,12 @@ func TestSourceTable(name string) *schema.Table { } } +func TestTableIncremental(name string) *schema.Table { + t := TestTable(name) + t.IsIncremental = true + return t +} + // TestTable returns a table with columns of all type. useful for destination testing purposes func TestTable(name string) *schema.Table { sourceTable := TestSourceTable(name) @@ -243,6 +251,14 @@ func GenTestData(sc *arrow.Schema, opts GenTestDataOptions) []arrow.Record { records = append(records, bldr.NewRecord()) bldr.Release() } + if indices := sc.FieldIndices(schema.CqIDColumn.Name); len(indices) > 0 { + cqIDIndex := indices[0] + sort.Slice(records, func(i, j int) bool { + firstUUID := records[i].Column(cqIDIndex).(*types.UUIDArray).Value(0).String() + secondUUID := records[j].Column(cqIDIndex).(*types.UUIDArray).Value(0).String() + return strings.Compare(firstUUID, secondUUID) < 0 + }) + } return records }