Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions internal/memdb/memdb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ func TestOnWriteError(t *testing.T) {
if err := p.Init(ctx, getTestLogger(t), specs.Destination{}); err != nil {
t.Fatal(err)
}
table := schema.TestTable("test")
table := schema.TestTable("test", schema.TestSourceOptions{})
tables := schema.Tables{
table,
}
Expand Down Expand Up @@ -147,7 +147,7 @@ func TestOnWriteCtxCancelled(t *testing.T) {
if err := p.Init(ctx, getTestLogger(t), specs.Destination{}); err != nil {
t.Fatal(err)
}
table := schema.TestTable("test")
table := schema.TestTable("test", schema.TestSourceOptions{})
tables := schema.Tables{
table,
}
Expand Down
87 changes: 79 additions & 8 deletions plugins/destination/plugin_testing.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,13 +100,84 @@ func getTestLogger(t *testing.T) zerolog.Logger {

type NewPluginFunc func() *Plugin

func PluginTestSuiteRunner(t *testing.T, newPlugin NewPluginFunc, destSpec specs.Destination, tests PluginTestSuiteTests, testSourceOptions ...func(o *schema.TestSourceOptions)) {
type PluginTestSuiteRunnerOptions struct {
IgnoreNullsInLists bool // strip nulls from lists before checking equality. Destination setups that don't support nulls in lists should set this to true.
schema.TestSourceOptions
}

func WithTestIgnoreNullsInLists() func(o *PluginTestSuiteRunnerOptions) {
return func(o *PluginTestSuiteRunnerOptions) {
o.IgnoreNullsInLists = true
}
}

func WithTestSourceSkipLists() func(o *PluginTestSuiteRunnerOptions) {
return func(o *PluginTestSuiteRunnerOptions) {
o.SkipLists = true
}
}

func WithTestSourceSkipTimestamps() func(o *PluginTestSuiteRunnerOptions) {
return func(o *PluginTestSuiteRunnerOptions) {
o.SkipTimestamps = true
}
}

func WithTestSourceSkipDates() func(o *PluginTestSuiteRunnerOptions) {
return func(o *PluginTestSuiteRunnerOptions) {
o.SkipDates = true
}
}

func WithTestSourceSkipMaps() func(o *PluginTestSuiteRunnerOptions) {
return func(o *PluginTestSuiteRunnerOptions) {
o.SkipMaps = true
}
}

func WithTestSourceSkipStructs() func(o *PluginTestSuiteRunnerOptions) {
return func(o *PluginTestSuiteRunnerOptions) {
o.SkipStructs = true
}
}

func WithTestSourceSkipIntervals() func(o *PluginTestSuiteRunnerOptions) {
return func(o *PluginTestSuiteRunnerOptions) {
o.SkipIntervals = true
}
}

func WithTestSourceSkipDurations() func(o *PluginTestSuiteRunnerOptions) {
return func(o *PluginTestSuiteRunnerOptions) {
o.SkipDurations = true
}
}

func WithTestSourceSkipTimes() func(o *PluginTestSuiteRunnerOptions) {
return func(o *PluginTestSuiteRunnerOptions) {
o.SkipTimes = true
}
}

func WithTestSourceSkipLargeTypes() func(o *PluginTestSuiteRunnerOptions) {
return func(o *PluginTestSuiteRunnerOptions) {
o.SkipLargeTypes = true
}
}

func PluginTestSuiteRunner(t *testing.T, newPlugin NewPluginFunc, destSpec specs.Destination, tests PluginTestSuiteTests, testOptions ...func(o *PluginTestSuiteRunnerOptions)) {
t.Helper()
destSpec.Name = "testsuite"

suite := &PluginTestSuite{
tests: tests,
}

opts := PluginTestSuiteRunnerOptions{}
for _, o := range testOptions {
o(&opts)
}

ctx := context.Background()
logger := getTestLogger(t)

Expand All @@ -117,7 +188,7 @@ func PluginTestSuiteRunner(t *testing.T, newPlugin NewPluginFunc, destSpec specs
}
destSpec.Name = "test_write_overwrite"
p := newPlugin()
if err := suite.destinationPluginTestWriteOverwrite(ctx, p, logger, destSpec, testSourceOptions...); err != nil {
if err := suite.destinationPluginTestWriteOverwrite(ctx, p, logger, destSpec, opts); err != nil {
t.Fatal(err)
}
if err := p.Close(ctx); err != nil {
Expand All @@ -132,7 +203,7 @@ func PluginTestSuiteRunner(t *testing.T, newPlugin NewPluginFunc, destSpec specs
}
destSpec.Name = "test_write_overwrite_delete_stale"
p := newPlugin()
if err := suite.destinationPluginTestWriteOverwriteDeleteStale(ctx, p, logger, destSpec, testSourceOptions...); err != nil {
if err := suite.destinationPluginTestWriteOverwriteDeleteStale(ctx, p, logger, destSpec, opts); err != nil {
t.Fatal(err)
}
if err := p.Close(ctx); err != nil {
Expand All @@ -148,7 +219,7 @@ func PluginTestSuiteRunner(t *testing.T, newPlugin NewPluginFunc, destSpec specs
destSpec.WriteMode = specs.WriteModeOverwrite
destSpec.MigrateMode = specs.MigrateModeSafe
destSpec.Name = "test_migrate_overwrite"
suite.destinationPluginTestMigrate(ctx, t, newPlugin, logger, destSpec, tests.MigrateStrategyOverwrite, testSourceOptions...)
suite.destinationPluginTestMigrate(ctx, t, newPlugin, logger, destSpec, tests.MigrateStrategyOverwrite, opts)
})

t.Run("TestMigrateOverwriteForce", func(t *testing.T) {
Expand All @@ -159,7 +230,7 @@ func PluginTestSuiteRunner(t *testing.T, newPlugin NewPluginFunc, destSpec specs
destSpec.WriteMode = specs.WriteModeOverwrite
destSpec.MigrateMode = specs.MigrateModeForced
destSpec.Name = "test_migrate_overwrite_force"
suite.destinationPluginTestMigrate(ctx, t, newPlugin, logger, destSpec, tests.MigrateStrategyOverwrite, testSourceOptions...)
suite.destinationPluginTestMigrate(ctx, t, newPlugin, logger, destSpec, tests.MigrateStrategyOverwrite, opts)
})

t.Run("TestWriteAppend", func(t *testing.T) {
Expand All @@ -169,7 +240,7 @@ func PluginTestSuiteRunner(t *testing.T, newPlugin NewPluginFunc, destSpec specs
}
destSpec.Name = "test_write_append"
p := newPlugin()
if err := suite.destinationPluginTestWriteAppend(ctx, p, logger, destSpec, testSourceOptions...); err != nil {
if err := suite.destinationPluginTestWriteAppend(ctx, p, logger, destSpec, opts); err != nil {
t.Fatal(err)
}
if err := p.Close(ctx); err != nil {
Expand All @@ -185,7 +256,7 @@ func PluginTestSuiteRunner(t *testing.T, newPlugin NewPluginFunc, destSpec specs
destSpec.WriteMode = specs.WriteModeAppend
destSpec.MigrateMode = specs.MigrateModeSafe
destSpec.Name = "test_migrate_append"
suite.destinationPluginTestMigrate(ctx, t, newPlugin, logger, destSpec, tests.MigrateStrategyAppend, testSourceOptions...)
suite.destinationPluginTestMigrate(ctx, t, newPlugin, logger, destSpec, tests.MigrateStrategyAppend, opts)
})

t.Run("TestMigrateAppendForce", func(t *testing.T) {
Expand All @@ -196,7 +267,7 @@ func PluginTestSuiteRunner(t *testing.T, newPlugin NewPluginFunc, destSpec specs
destSpec.WriteMode = specs.WriteModeAppend
destSpec.MigrateMode = specs.MigrateModeForced
destSpec.Name = "test_migrate_append_force"
suite.destinationPluginTestMigrate(ctx, t, newPlugin, logger, destSpec, tests.MigrateStrategyAppend, testSourceOptions...)
suite.destinationPluginTestMigrate(ctx, t, newPlugin, logger, destSpec, tests.MigrateStrategyAppend, opts)
})
}

Expand Down
31 changes: 17 additions & 14 deletions plugins/destination/plugin_testing_migrate.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ func tableUUIDSuffix() string {
return strings.ReplaceAll(uuid.NewString(), "-", "_")
}

func testMigration(ctx context.Context, _ *testing.T, p *Plugin, logger zerolog.Logger, spec specs.Destination, target *schema.Table, source *schema.Table, mode specs.MigrateMode) error {
func testMigration(ctx context.Context, _ *testing.T, p *Plugin, logger zerolog.Logger, spec specs.Destination, target *schema.Table, source *schema.Table, mode specs.MigrateMode, testOpts PluginTestSuiteRunnerOptions) error {
if err := p.Init(ctx, logger, spec); err != nil {
return fmt.Errorf("failed to init plugin: %w", err)
}
Expand Down Expand Up @@ -49,10 +49,13 @@ func testMigration(ctx context.Context, _ *testing.T, p *Plugin, logger zerolog.
return fmt.Errorf("failed to migrate existing table: %w", err)
}
opts.SyncTime = syncTime.Add(time.Second).UTC()
resource2 := schema.GenTestData(target, opts)[0]
if err := p.writeOne(ctx, sourceSpec, syncTime, resource2); err != nil {
resource2 := schema.GenTestData(target, opts)
if err := p.writeAll(ctx, sourceSpec, syncTime, resource2); err != nil {
return fmt.Errorf("failed to write one after migration: %w", err)
}
if testOpts.IgnoreNullsInLists {
stripNullsFromLists(resource2)
}

resourcesRead, err := p.readAll(ctx, target, sourceName)
if err != nil {
Expand All @@ -63,16 +66,16 @@ func testMigration(ctx context.Context, _ *testing.T, p *Plugin, logger zerolog.
if len(resourcesRead) != 2 {
return fmt.Errorf("expected 2 resources after write, got %d", len(resourcesRead))
}
if !array.RecordApproxEqual(resourcesRead[1], resource2) {
diff := RecordDiff(resourcesRead[1], resource2)
if !array.RecordApproxEqual(resourcesRead[1], resource2[0]) {
diff := RecordDiff(resourcesRead[1], resource2[0])
return fmt.Errorf("resource1 and resource2 are not equal. diff: %s", diff)
}
} else {
if len(resourcesRead) != 1 {
return fmt.Errorf("expected 1 resource after write, got %d", len(resourcesRead))
}
if !array.RecordApproxEqual(resourcesRead[0], resource2) {
diff := RecordDiff(resourcesRead[0], resource2)
if !array.RecordApproxEqual(resourcesRead[0], resource2[0]) {
diff := RecordDiff(resourcesRead[0], resource2[0])
return fmt.Errorf("resource1 and resource2 are not equal. diff: %s", diff)
}
}
Expand All @@ -87,7 +90,7 @@ func (*PluginTestSuite) destinationPluginTestMigrate(
logger zerolog.Logger,
spec specs.Destination,
strategy MigrateStrategy,
testSourceOptions ...func(o *schema.TestSourceOptions),
testOpts PluginTestSuiteRunnerOptions,
) {
spec.BatchSize = 1

Expand Down Expand Up @@ -119,7 +122,7 @@ func (*PluginTestSuite) destinationPluginTestMigrate(
}

p := newPlugin()
if err := testMigration(ctx, t, p, logger, spec, target, source, strategy.AddColumn); err != nil {
if err := testMigration(ctx, t, p, logger, spec, target, source, strategy.AddColumn, testOpts); err != nil {
t.Fatalf("failed to migrate %s: %v", tableName, err)
}
if err := p.Close(ctx); err != nil {
Expand Down Expand Up @@ -153,7 +156,7 @@ func (*PluginTestSuite) destinationPluginTestMigrate(
{Name: "bool", Type: arrow.FixedWidthTypes.Boolean, NotNull: true},
}}
p := newPlugin()
if err := testMigration(ctx, t, p, logger, spec, target, source, strategy.AddColumnNotNull); err != nil {
if err := testMigration(ctx, t, p, logger, spec, target, source, strategy.AddColumnNotNull, testOpts); err != nil {
t.Fatalf("failed to migrate add_column_not_null: %v", err)
}
if err := p.Close(ctx); err != nil {
Expand Down Expand Up @@ -186,7 +189,7 @@ func (*PluginTestSuite) destinationPluginTestMigrate(
}}

p := newPlugin()
if err := testMigration(ctx, t, p, logger, spec, target, source, strategy.RemoveColumn); err != nil {
if err := testMigration(ctx, t, p, logger, spec, target, source, strategy.RemoveColumn, testOpts); err != nil {
t.Fatalf("failed to migrate remove_column: %v", err)
}
if err := p.Close(ctx); err != nil {
Expand Down Expand Up @@ -220,7 +223,7 @@ func (*PluginTestSuite) destinationPluginTestMigrate(
}}

p := newPlugin()
if err := testMigration(ctx, t, p, logger, spec, target, source, strategy.RemoveColumnNotNull); err != nil {
if err := testMigration(ctx, t, p, logger, spec, target, source, strategy.RemoveColumnNotNull, testOpts); err != nil {
t.Fatalf("failed to migrate remove_column_not_null: %v", err)
}
if err := p.Close(ctx); err != nil {
Expand Down Expand Up @@ -254,7 +257,7 @@ func (*PluginTestSuite) destinationPluginTestMigrate(
}}

p := newPlugin()
if err := testMigration(ctx, t, p, logger, spec, target, source, strategy.ChangeColumn); err != nil {
if err := testMigration(ctx, t, p, logger, spec, target, source, strategy.ChangeColumn, testOpts); err != nil {
t.Fatalf("failed to migrate change_column: %v", err)
}
if err := p.Close(ctx); err != nil {
Expand All @@ -264,7 +267,7 @@ func (*PluginTestSuite) destinationPluginTestMigrate(

t.Run("double_migration", func(t *testing.T) {
tableName := "double_migration_" + tableUUIDSuffix()
table := schema.TestTable(tableName, testSourceOptions...)
table := schema.TestTable(tableName, testOpts.TestSourceOptions)

p := newPlugin()
require.NoError(t, p.Init(ctx, logger, spec))
Expand Down
19 changes: 12 additions & 7 deletions plugins/destination/plugin_testing_overwrite.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,13 @@ import (
"github.com/rs/zerolog"
)

func (*PluginTestSuite) destinationPluginTestWriteOverwrite(ctx context.Context, p *Plugin, logger zerolog.Logger, spec specs.Destination, testSourceOptions ...func(o *schema.TestSourceOptions)) error {
func (*PluginTestSuite) destinationPluginTestWriteOverwrite(ctx context.Context, p *Plugin, logger zerolog.Logger, spec specs.Destination, testOpts PluginTestSuiteRunnerOptions) error {
spec.WriteMode = specs.WriteModeOverwrite
if err := p.Init(ctx, logger, spec); err != nil {
return fmt.Errorf("failed to init plugin: %w", err)
}
tableName := fmt.Sprintf("cq_%s_%d", spec.Name, time.Now().Unix())
table := schema.TestTable(tableName, testSourceOptions...)
table := schema.TestTable(tableName, testOpts.TestSourceOptions)
syncTime := time.Now().UTC().Round(1 * time.Second)
tables := schema.Tables{
table,
Expand All @@ -43,7 +43,9 @@ func (*PluginTestSuite) destinationPluginTestWriteOverwrite(ctx context.Context,
return fmt.Errorf("failed to write all: %w", err)
}
sortRecordsBySyncTime(table, resources)

if testOpts.IgnoreNullsInLists {
stripNullsFromLists(resources)
}
resourcesRead, err := p.readAll(ctx, table, sourceName)
if err != nil {
return fmt.Errorf("failed to read all: %w", err)
Expand Down Expand Up @@ -75,12 +77,15 @@ func (*PluginTestSuite) destinationPluginTestWriteOverwrite(ctx context.Context,
MaxRows: 1,
StableUUID: u,
}
updatedResource := schema.GenTestData(table, opts)[0]
updatedResource := schema.GenTestData(table, opts)
// write second time
if err := p.writeOne(ctx, sourceSpec, secondSyncTime, updatedResource); err != nil {
if err := p.writeAll(ctx, sourceSpec, secondSyncTime, updatedResource); err != nil {
return fmt.Errorf("failed to write one second time: %w", err)
}

if testOpts.IgnoreNullsInLists {
stripNullsFromLists(updatedResource)
}
resourcesRead, err = p.readAll(ctx, table, sourceName)
if err != nil {
return fmt.Errorf("failed to read all second time: %w", err)
Expand All @@ -94,8 +99,8 @@ func (*PluginTestSuite) destinationPluginTestWriteOverwrite(ctx context.Context,
diff := RecordDiff(resources[1], resourcesRead[0])
return fmt.Errorf("after overwrite expected first resource to be equal. diff=%s", diff)
}
if !array.RecordApproxEqual(updatedResource, resourcesRead[1]) {
diff := RecordDiff(updatedResource, resourcesRead[1])
if !array.RecordApproxEqual(updatedResource[0], resourcesRead[1]) {
diff := RecordDiff(updatedResource[0], resourcesRead[1])
return fmt.Errorf("after overwrite expected second resource to be equal. diff=%s", diff)
}

Expand Down
15 changes: 12 additions & 3 deletions plugins/destination/plugin_testing_overwrite_delete_stale.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,14 @@ import (
"github.com/rs/zerolog"
)

func (*PluginTestSuite) destinationPluginTestWriteOverwriteDeleteStale(ctx context.Context, p *Plugin, logger zerolog.Logger, spec specs.Destination, testSourceOptions ...func(o *schema.TestSourceOptions)) error {
func (*PluginTestSuite) destinationPluginTestWriteOverwriteDeleteStale(ctx context.Context, p *Plugin, logger zerolog.Logger, spec specs.Destination, testOpts PluginTestSuiteRunnerOptions) error {
spec.WriteMode = specs.WriteModeOverwriteDeleteStale
if err := p.Init(ctx, logger, spec); err != nil {
return fmt.Errorf("failed to init plugin: %w", err)
}
tableName := fmt.Sprintf("cq_%s_%d", spec.Name, time.Now().Unix())
table := schema.TestTable(tableName, testSourceOptions...)
incTable := schema.TestTable(tableName+"_incremental", testSourceOptions...)
table := schema.TestTable(tableName, testOpts.TestSourceOptions)
incTable := schema.TestTable(tableName+"_incremental", testOpts.TestSourceOptions)
incTable.IsIncremental = true
syncTime := time.Now().UTC().Round(1 * time.Second)
tables := schema.Tables{
Expand Down Expand Up @@ -60,6 +60,9 @@ func (*PluginTestSuite) destinationPluginTestWriteOverwriteDeleteStale(ctx conte
if len(resourcesRead) != 2 {
return fmt.Errorf("expected 2 resources, got %d", len(resourcesRead))
}
if testOpts.IgnoreNullsInLists {
stripNullsFromLists(resources)
}
if !array.RecordApproxEqual(resources[0], resourcesRead[0]) {
diff := RecordDiff(resources[0], resourcesRead[0])
return fmt.Errorf("expected first resource to be equal. diff: %s", diff)
Expand Down Expand Up @@ -106,6 +109,9 @@ func (*PluginTestSuite) destinationPluginTestWriteOverwriteDeleteStale(ctx conte
if len(resourcesRead) != 1 {
return fmt.Errorf("after overwrite expected 1 resource, got %d", len(resourcesRead))
}
if testOpts.IgnoreNullsInLists {
stripNullsFromLists(resources)
}
if array.RecordApproxEqual(resources[0], resourcesRead[0]) {
diff := RecordDiff(resources[0], resourcesRead[0])
return fmt.Errorf("after overwrite expected first resource to be different. diff: %s", diff)
Expand All @@ -120,6 +126,9 @@ func (*PluginTestSuite) destinationPluginTestWriteOverwriteDeleteStale(ctx conte
}

// we expect the only resource returned to match the updated resource we wrote
if testOpts.IgnoreNullsInLists {
stripNullsFromLists(updatedResources)
}
if !array.RecordApproxEqual(updatedResources[0], resourcesRead[0]) {
diff := RecordDiff(updatedResources[0], resourcesRead[0])
return fmt.Errorf("after delete stale expected resource to be equal. diff: %s", diff)
Expand Down
Loading