Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions plugins/destination/plugin_testing.go
Original file line number Diff line number Diff line change
Expand Up @@ -198,9 +198,9 @@ func PluginTestSuiteRunner(t *testing.T, newPlugin NewPluginFunc, destSpec specs
})
}

func sortRecordsBySyncTime(table *schema.Table, records []arrow.Record) {
syncTimeIndex := table.Columns.Index(schema.CqSyncTimeColumn.Name)
cqIDIndex := table.Columns.Index(schema.CqIDColumn.Name)
func sortRecordsBySyncTime(table *arrow.Schema, records []arrow.Record) {
syncTimeIndex := table.FieldIndices(schema.CqSyncTimeColumn.Name)[0]
cqIDIndex := table.FieldIndices(schema.CqIDColumn.Name)[0]
sort.Slice(records, func(i, j int) bool {
// sort by sync time, then UUID
first := records[i].Column(syncTimeIndex).(*array.Timestamp).Value(0).ToTime(arrow.Millisecond)
Expand Down
2 changes: 2 additions & 0 deletions plugins/destination/plugin_testing_migrate.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ func testMigration(ctx context.Context, _ *testing.T, p *Plugin, logger zerolog.
if err := p.Migrate(ctx, []*arrow.Schema{target}); err != nil {
return fmt.Errorf("failed to migrate existing table: %w", err)
}
opts.SyncTime = syncTime.Add(time.Second).UTC()
resource2 := testdata.GenTestData(target, opts)[0]
if err := p.writeOne(ctx, sourceSpec, syncTime, resource2); err != nil {
return fmt.Errorf("failed to write one after migration: %w", err)
Expand All @@ -57,6 +58,7 @@ func testMigration(ctx context.Context, _ *testing.T, p *Plugin, logger zerolog.
if err != nil {
return fmt.Errorf("failed to read all: %w", err)
}
sortRecordsBySyncTime(target, resourcesRead)
if mode == specs.MigrateModeSafe {
if len(resourcesRead) != 2 {
return fmt.Errorf("expected 2 resources after write, got %d", len(resourcesRead))
Expand Down
13 changes: 6 additions & 7 deletions plugins/destination/plugin_testing_overwrite.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (

"github.com/apache/arrow/go/v12/arrow"
"github.com/apache/arrow/go/v12/arrow/array"
"github.com/cloudquery/plugin-sdk/v2/schema"
"github.com/cloudquery/plugin-sdk/v2/specs"
"github.com/cloudquery/plugin-sdk/v2/testdata"
"github.com/cloudquery/plugin-sdk/v2/types"
Expand All @@ -21,10 +20,10 @@ func (*PluginTestSuite) destinationPluginTestWriteOverwrite(ctx context.Context,
return fmt.Errorf("failed to init plugin: %w", err)
}
tableName := fmt.Sprintf("cq_%s_%d", spec.Name, time.Now().Unix())
table := testdata.TestTable(tableName)
table := testdata.TestTable(tableName).ToArrowSchema()
syncTime := time.Now().UTC().Round(1 * time.Second)
tables := []*arrow.Schema{
table.ToArrowSchema(),
table,
}
if err := p.Migrate(ctx, tables); err != nil {
return fmt.Errorf("failed to migrate tables: %w", err)
Expand All @@ -40,13 +39,13 @@ func (*PluginTestSuite) destinationPluginTestWriteOverwrite(ctx context.Context,
SyncTime: syncTime,
MaxRows: 2,
}
resources := testdata.GenTestData(schema.CQSchemaToArrow(table), opts)
resources := testdata.GenTestData(table, opts)
if err := p.writeAll(ctx, sourceSpec, syncTime, resources); err != nil {
return fmt.Errorf("failed to write all: %w", err)
}
sortRecordsBySyncTime(table, resources)

resourcesRead, err := p.readAll(ctx, table.ToArrowSchema(), sourceName)
resourcesRead, err := p.readAll(ctx, table, sourceName)
if err != nil {
return fmt.Errorf("failed to read all: %w", err)
}
Expand Down Expand Up @@ -76,13 +75,13 @@ func (*PluginTestSuite) destinationPluginTestWriteOverwrite(ctx context.Context,
MaxRows: 1,
StableUUID: *u,
}
updatedResource := testdata.GenTestData(schema.CQSchemaToArrow(table), opts)[0]
updatedResource := testdata.GenTestData(table, opts)[0]
// write second time
if err := p.writeOne(ctx, sourceSpec, secondSyncTime, updatedResource); err != nil {
return fmt.Errorf("failed to write one second time: %w", err)
}

resourcesRead, err = p.readAll(ctx, table.ToArrowSchema(), sourceName)
resourcesRead, err = p.readAll(ctx, table, sourceName)
if err != nil {
return fmt.Errorf("failed to read all second time: %w", err)
}
Expand Down
21 changes: 10 additions & 11 deletions plugins/destination/plugin_testing_overwrite_delete_stale.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,12 @@ func (*PluginTestSuite) destinationPluginTestWriteOverwriteDeleteStale(ctx conte
return fmt.Errorf("failed to init plugin: %w", err)
}
tableName := fmt.Sprintf("cq_%s_%d", spec.Name, time.Now().Unix())
table := testdata.TestTable(tableName)
incTable := testdata.TestTable(tableName + "_incremental")
incTable.IsIncremental = true
table := testdata.TestTable(tableName).ToArrowSchema()
incTable := testdata.TestTableIncremental(tableName + "_incremental").ToArrowSchema()
syncTime := time.Now().UTC().Round(1 * time.Second)
tables := []*arrow.Schema{
table.ToArrowSchema(),
incTable.ToArrowSchema(),
table,
incTable,
}
if err := p.Migrate(ctx, tables); err != nil {
return fmt.Errorf("failed to migrate tables: %w", err)
Expand All @@ -43,16 +42,16 @@ func (*PluginTestSuite) destinationPluginTestWriteOverwriteDeleteStale(ctx conte
SyncTime: syncTime,
MaxRows: 2,
}
resources := testdata.GenTestData(table.ToArrowSchema(), opts)
incResources := testdata.GenTestData(incTable.ToArrowSchema(), opts)
resources := testdata.GenTestData(table, opts)
incResources := testdata.GenTestData(incTable, opts)
allResources := resources
allResources = append(allResources, incResources...)
if err := p.writeAll(ctx, sourceSpec, syncTime, allResources); err != nil {
return fmt.Errorf("failed to write all: %w", err)
}
sortRecordsBySyncTime(table, resources)

resourcesRead, err := p.readAll(ctx, table.ToArrowSchema(), sourceName)
resourcesRead, err := p.readAll(ctx, table, sourceName)
if err != nil {
return fmt.Errorf("failed to read all: %w", err)
}
Expand All @@ -72,7 +71,7 @@ func (*PluginTestSuite) destinationPluginTestWriteOverwriteDeleteStale(ctx conte
}

// read from incremental table
resourcesRead, err = p.readAll(ctx, incTable.ToArrowSchema(), sourceName)
resourcesRead, err = p.readAll(ctx, incTable, sourceName)
if err != nil {
return fmt.Errorf("failed to read all: %w", err)
}
Expand All @@ -89,13 +88,13 @@ func (*PluginTestSuite) destinationPluginTestWriteOverwriteDeleteStale(ctx conte
StableUUID: *u,
MaxRows: 1,
}
updatedResources := testdata.GenTestData(table.ToArrowSchema(), opts)[0]
updatedResources := testdata.GenTestData(table, opts)[0]

if err := p.writeOne(ctx, sourceSpec, secondSyncTime, updatedResources); err != nil {
return fmt.Errorf("failed to write one second time: %w", err)
}

resourcesRead, err = p.readAll(ctx, table.ToArrowSchema(), sourceName)
resourcesRead, err = p.readAll(ctx, table, sourceName)
if err != nil {
return fmt.Errorf("failed to read all second time: %w", err)
}
Expand Down
8 changes: 4 additions & 4 deletions plugins/destination/plugin_testing_write_append.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,10 @@ func (s *PluginTestSuite) destinationPluginTestWriteAppend(ctx context.Context,
return fmt.Errorf("failed to init plugin: %w", err)
}
tableName := spec.Name
table := testdata.TestTable(tableName)
table := testdata.TestTable(tableName).ToArrowSchema()
syncTime := time.Now().UTC().Round(1 * time.Second)
tables := []*arrow.Schema{
table.ToArrowSchema(),
table,
}
if err := p.Migrate(ctx, tables); err != nil {
return fmt.Errorf("failed to migrate tables: %w", err)
Expand All @@ -38,14 +38,14 @@ func (s *PluginTestSuite) destinationPluginTestWriteAppend(ctx context.Context,
SyncTime: syncTime,
MaxRows: 1,
}
record1 := testdata.GenTestData(table.ToArrowSchema(), opts)[0]
record1 := testdata.GenTestData(table, opts)[0]
if err := p.writeOne(ctx, specSource, syncTime, record1); err != nil {
return fmt.Errorf("failed to write one second time: %w", err)
}

secondSyncTime := syncTime.Add(10 * time.Second).UTC()
opts.SyncTime = secondSyncTime
record2 := testdata.GenTestData(table.ToArrowSchema(), opts)[0]
record2 := testdata.GenTestData(table, opts)[0]

if !s.tests.SkipSecondAppend {
// write second time
Expand Down
16 changes: 16 additions & 0 deletions testdata/testdata.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ package testdata

import (
"net"
"sort"
"strings"
"time"

"github.com/apache/arrow/go/v12/arrow"
Expand Down Expand Up @@ -109,6 +111,12 @@ func TestSourceTable(name string) *schema.Table {
}
}

func TestTableIncremental(name string) *schema.Table {
t := TestTable(name)
t.IsIncremental = true
return t
}

// TestTable returns a table with columns of all type. useful for destination testing purposes
func TestTable(name string) *schema.Table {
sourceTable := TestSourceTable(name)
Expand Down Expand Up @@ -243,6 +251,14 @@ func GenTestData(sc *arrow.Schema, opts GenTestDataOptions) []arrow.Record {
records = append(records, bldr.NewRecord())
bldr.Release()
}
if indices := sc.FieldIndices(schema.CqIDColumn.Name); len(indices) > 0 {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this needed?

cqIDIndex := indices[0]
sort.Slice(records, func(i, j int) bool {
firstUUID := records[i].Column(cqIDIndex).(*types.UUIDArray).Value(0).String()
secondUUID := records[j].Column(cqIDIndex).(*types.UUIDArray).Value(0).String()
return strings.Compare(firstUUID, secondUUID) < 0
})
}
return records
}

Expand Down