From 93fe71ba99779792475e6bf7b2cb8c3252eac2f9 Mon Sep 17 00:00:00 2001 From: Herman Schaaf Date: Fri, 19 May 2023 15:35:30 +0100 Subject: [PATCH 1/4] Tighter Arrow test cases --- schema/testdata.go | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/schema/testdata.go b/schema/testdata.go index 57e98c2b9b..e10ca2b9d7 100644 --- a/schema/testdata.go +++ b/schema/testdata.go @@ -160,6 +160,7 @@ func TestSourceColumns(testOpts ...func(o *TestSourceOptions)) []Column { // add JSON later, we don't want to include it as a list or map right now (it causes complications with JSON unmarshalling) basicColumns = append(basicColumns, Column{Name: "json", Type: types.NewJSONType()}) + basicColumns = append(basicColumns, Column{Name: "json_array", Type: types.NewJSONType()}) // GenTestData knows to populate this with a JSON array if !opts.SkipStructs { // struct with all the types @@ -363,7 +364,7 @@ func getExampleJSON(colName string, dataType arrow.DataType, opts GenTestDataOpt return fmt.Sprintf(`[{"key": %s,"value": %s}]`, k, v) } inner := dataType.(*arrow.ListType).Elem() - return `[` + getExampleJSON(colName, inner, opts) + `]` + return `[` + getExampleJSON(colName, inner, opts) + `,null,` + getExampleJSON(colName, inner, opts) + `]` } // handle extension types if arrow.TypeEqual(dataType, types.ExtensionTypes.UUID) { @@ -374,7 +375,10 @@ func getExampleJSON(colName string, dataType arrow.DataType, opts GenTestDataOpt return `"` + u.String() + `"` } if arrow.TypeEqual(dataType, types.ExtensionTypes.JSON) { - return `"{\"test\":\"test\"}"` + if strings.HasSuffix(colName, "_array") { + return `"[{\"test\":\"test\"},123,{\"test_number\":456}]"` + } + return `"{\"test\":[\"a\",\"b\",3]}"` } if arrow.TypeEqual(dataType, types.ExtensionTypes.Inet) { return `"192.0.2.0/24"` From aba30b56f7c8ecb4c7aee648c1995b8628d9cf55 Mon Sep 17 00:00:00 2001 From: Herman Schaaf Date: Fri, 19 May 2023 17:44:07 +0100 Subject: [PATCH 2/4] Refactor test case options and allow stripping of null values from lists --- internal/memdb/memdb_test.go | 4 +- plugins/destination/plugin_testing.go | 87 +++++++++++++++++-- plugins/destination/plugin_testing_migrate.go | 4 +- .../destination/plugin_testing_overwrite.go | 4 +- .../plugin_testing_overwrite_delete_stale.go | 6 +- .../plugin_testing_write_append.go | 7 +- plugins/destination/strip_nulls.go | 40 +++++++++ schema/testdata.go | 81 +++-------------- schema/testdata_test.go | 21 +++-- serve/destination_v1_test.go | 2 +- 10 files changed, 157 insertions(+), 99 deletions(-) create mode 100644 plugins/destination/strip_nulls.go diff --git a/internal/memdb/memdb_test.go b/internal/memdb/memdb_test.go index 8db5b7a620..7f9e8a5759 100644 --- a/internal/memdb/memdb_test.go +++ b/internal/memdb/memdb_test.go @@ -112,7 +112,7 @@ func TestOnWriteError(t *testing.T) { if err := p.Init(ctx, getTestLogger(t), specs.Destination{}); err != nil { t.Fatal(err) } - table := schema.TestTable("test") + table := schema.TestTable("test", schema.TestSourceOptions{}) tables := schema.Tables{ table, } @@ -147,7 +147,7 @@ func TestOnWriteCtxCancelled(t *testing.T) { if err := p.Init(ctx, getTestLogger(t), specs.Destination{}); err != nil { t.Fatal(err) } - table := schema.TestTable("test") + table := schema.TestTable("test", schema.TestSourceOptions{}) tables := schema.Tables{ table, } diff --git a/plugins/destination/plugin_testing.go b/plugins/destination/plugin_testing.go index f36cbe53e8..486314b472 100644 --- a/plugins/destination/plugin_testing.go +++ b/plugins/destination/plugin_testing.go @@ -100,13 +100,84 @@ func getTestLogger(t *testing.T) zerolog.Logger { type NewPluginFunc func() *Plugin -func PluginTestSuiteRunner(t *testing.T, newPlugin NewPluginFunc, destSpec specs.Destination, tests PluginTestSuiteTests, testSourceOptions ...func(o *schema.TestSourceOptions)) { +type PluginTestSuiteRunnerOptions struct { + IgnoreNullsInLists bool // strip nulls from lists before checking equality. Destination setups that don't support nulls in lists should set this to true. + schema.TestSourceOptions +} + +func WithTestIgnoreNullsInLists() func(o *PluginTestSuiteRunnerOptions) { + return func(o *PluginTestSuiteRunnerOptions) { + o.IgnoreNullsInLists = true + } +} + +func WithTestSourceSkipLists() func(o *PluginTestSuiteRunnerOptions) { + return func(o *PluginTestSuiteRunnerOptions) { + o.SkipLists = true + } +} + +func WithTestSourceSkipTimestamps() func(o *PluginTestSuiteRunnerOptions) { + return func(o *PluginTestSuiteRunnerOptions) { + o.SkipTimestamps = true + } +} + +func WithTestSourceSkipDates() func(o *PluginTestSuiteRunnerOptions) { + return func(o *PluginTestSuiteRunnerOptions) { + o.SkipDates = true + } +} + +func WithTestSourceSkipMaps() func(o *PluginTestSuiteRunnerOptions) { + return func(o *PluginTestSuiteRunnerOptions) { + o.SkipMaps = true + } +} + +func WithTestSourceSkipStructs() func(o *PluginTestSuiteRunnerOptions) { + return func(o *PluginTestSuiteRunnerOptions) { + o.SkipStructs = true + } +} + +func WithTestSourceSkipIntervals() func(o *PluginTestSuiteRunnerOptions) { + return func(o *PluginTestSuiteRunnerOptions) { + o.SkipIntervals = true + } +} + +func WithTestSourceSkipDurations() func(o *PluginTestSuiteRunnerOptions) { + return func(o *PluginTestSuiteRunnerOptions) { + o.SkipDurations = true + } +} + +func WithTestSourceSkipTimes() func(o *PluginTestSuiteRunnerOptions) { + return func(o *PluginTestSuiteRunnerOptions) { + o.SkipTimes = true + } +} + +func WithTestSourceSkipLargeTypes() func(o *PluginTestSuiteRunnerOptions) { + return func(o *PluginTestSuiteRunnerOptions) { + o.SkipLargeTypes = true + } +} + +func PluginTestSuiteRunner(t *testing.T, newPlugin NewPluginFunc, destSpec specs.Destination, tests PluginTestSuiteTests, testOptions ...func(o *PluginTestSuiteRunnerOptions)) { t.Helper() destSpec.Name = "testsuite" suite := &PluginTestSuite{ tests: tests, } + + opts := PluginTestSuiteRunnerOptions{} + for _, o := range testOptions { + o(&opts) + } + ctx := context.Background() logger := getTestLogger(t) @@ -117,7 +188,7 @@ func PluginTestSuiteRunner(t *testing.T, newPlugin NewPluginFunc, destSpec specs } destSpec.Name = "test_write_overwrite" p := newPlugin() - if err := suite.destinationPluginTestWriteOverwrite(ctx, p, logger, destSpec, testSourceOptions...); err != nil { + if err := suite.destinationPluginTestWriteOverwrite(ctx, p, logger, destSpec, opts); err != nil { t.Fatal(err) } if err := p.Close(ctx); err != nil { @@ -132,7 +203,7 @@ func PluginTestSuiteRunner(t *testing.T, newPlugin NewPluginFunc, destSpec specs } destSpec.Name = "test_write_overwrite_delete_stale" p := newPlugin() - if err := suite.destinationPluginTestWriteOverwriteDeleteStale(ctx, p, logger, destSpec, testSourceOptions...); err != nil { + if err := suite.destinationPluginTestWriteOverwriteDeleteStale(ctx, p, logger, destSpec, opts); err != nil { t.Fatal(err) } if err := p.Close(ctx); err != nil { @@ -148,7 +219,7 @@ func PluginTestSuiteRunner(t *testing.T, newPlugin NewPluginFunc, destSpec specs destSpec.WriteMode = specs.WriteModeOverwrite destSpec.MigrateMode = specs.MigrateModeSafe destSpec.Name = "test_migrate_overwrite" - suite.destinationPluginTestMigrate(ctx, t, newPlugin, logger, destSpec, tests.MigrateStrategyOverwrite, testSourceOptions...) + suite.destinationPluginTestMigrate(ctx, t, newPlugin, logger, destSpec, tests.MigrateStrategyOverwrite, opts) }) t.Run("TestMigrateOverwriteForce", func(t *testing.T) { @@ -159,7 +230,7 @@ func PluginTestSuiteRunner(t *testing.T, newPlugin NewPluginFunc, destSpec specs destSpec.WriteMode = specs.WriteModeOverwrite destSpec.MigrateMode = specs.MigrateModeForced destSpec.Name = "test_migrate_overwrite_force" - suite.destinationPluginTestMigrate(ctx, t, newPlugin, logger, destSpec, tests.MigrateStrategyOverwrite, testSourceOptions...) + suite.destinationPluginTestMigrate(ctx, t, newPlugin, logger, destSpec, tests.MigrateStrategyOverwrite, opts) }) t.Run("TestWriteAppend", func(t *testing.T) { @@ -169,7 +240,7 @@ func PluginTestSuiteRunner(t *testing.T, newPlugin NewPluginFunc, destSpec specs } destSpec.Name = "test_write_append" p := newPlugin() - if err := suite.destinationPluginTestWriteAppend(ctx, p, logger, destSpec, testSourceOptions...); err != nil { + if err := suite.destinationPluginTestWriteAppend(ctx, p, logger, destSpec, opts); err != nil { t.Fatal(err) } if err := p.Close(ctx); err != nil { @@ -185,7 +256,7 @@ func PluginTestSuiteRunner(t *testing.T, newPlugin NewPluginFunc, destSpec specs destSpec.WriteMode = specs.WriteModeAppend destSpec.MigrateMode = specs.MigrateModeSafe destSpec.Name = "test_migrate_append" - suite.destinationPluginTestMigrate(ctx, t, newPlugin, logger, destSpec, tests.MigrateStrategyAppend, testSourceOptions...) + suite.destinationPluginTestMigrate(ctx, t, newPlugin, logger, destSpec, tests.MigrateStrategyAppend, opts) }) t.Run("TestMigrateAppendForce", func(t *testing.T) { @@ -196,7 +267,7 @@ func PluginTestSuiteRunner(t *testing.T, newPlugin NewPluginFunc, destSpec specs destSpec.WriteMode = specs.WriteModeAppend destSpec.MigrateMode = specs.MigrateModeForced destSpec.Name = "test_migrate_append_force" - suite.destinationPluginTestMigrate(ctx, t, newPlugin, logger, destSpec, tests.MigrateStrategyAppend, testSourceOptions...) + suite.destinationPluginTestMigrate(ctx, t, newPlugin, logger, destSpec, tests.MigrateStrategyAppend, opts) }) } diff --git a/plugins/destination/plugin_testing_migrate.go b/plugins/destination/plugin_testing_migrate.go index 70ac7a0cc5..67e89cf5a1 100644 --- a/plugins/destination/plugin_testing_migrate.go +++ b/plugins/destination/plugin_testing_migrate.go @@ -87,7 +87,7 @@ func (*PluginTestSuite) destinationPluginTestMigrate( logger zerolog.Logger, spec specs.Destination, strategy MigrateStrategy, - testSourceOptions ...func(o *schema.TestSourceOptions), + testOpts PluginTestSuiteRunnerOptions, ) { spec.BatchSize = 1 @@ -264,7 +264,7 @@ func (*PluginTestSuite) destinationPluginTestMigrate( t.Run("double_migration", func(t *testing.T) { tableName := "double_migration_" + tableUUIDSuffix() - table := schema.TestTable(tableName, testSourceOptions...) + table := schema.TestTable(tableName, testOpts.TestSourceOptions) p := newPlugin() require.NoError(t, p.Init(ctx, logger, spec)) diff --git a/plugins/destination/plugin_testing_overwrite.go b/plugins/destination/plugin_testing_overwrite.go index 29c1bdd7b5..9432b619f4 100644 --- a/plugins/destination/plugin_testing_overwrite.go +++ b/plugins/destination/plugin_testing_overwrite.go @@ -13,13 +13,13 @@ import ( "github.com/rs/zerolog" ) -func (*PluginTestSuite) destinationPluginTestWriteOverwrite(ctx context.Context, p *Plugin, logger zerolog.Logger, spec specs.Destination, testSourceOptions ...func(o *schema.TestSourceOptions)) error { +func (*PluginTestSuite) destinationPluginTestWriteOverwrite(ctx context.Context, p *Plugin, logger zerolog.Logger, spec specs.Destination, testOpts PluginTestSuiteRunnerOptions) error { spec.WriteMode = specs.WriteModeOverwrite if err := p.Init(ctx, logger, spec); err != nil { return fmt.Errorf("failed to init plugin: %w", err) } tableName := fmt.Sprintf("cq_%s_%d", spec.Name, time.Now().Unix()) - table := schema.TestTable(tableName, testSourceOptions...) + table := schema.TestTable(tableName, testOpts.TestSourceOptions) syncTime := time.Now().UTC().Round(1 * time.Second) tables := schema.Tables{ table, diff --git a/plugins/destination/plugin_testing_overwrite_delete_stale.go b/plugins/destination/plugin_testing_overwrite_delete_stale.go index 41b1dad121..96ffeb9002 100644 --- a/plugins/destination/plugin_testing_overwrite_delete_stale.go +++ b/plugins/destination/plugin_testing_overwrite_delete_stale.go @@ -13,14 +13,14 @@ import ( "github.com/rs/zerolog" ) -func (*PluginTestSuite) destinationPluginTestWriteOverwriteDeleteStale(ctx context.Context, p *Plugin, logger zerolog.Logger, spec specs.Destination, testSourceOptions ...func(o *schema.TestSourceOptions)) error { +func (*PluginTestSuite) destinationPluginTestWriteOverwriteDeleteStale(ctx context.Context, p *Plugin, logger zerolog.Logger, spec specs.Destination, testOpts PluginTestSuiteRunnerOptions) error { spec.WriteMode = specs.WriteModeOverwriteDeleteStale if err := p.Init(ctx, logger, spec); err != nil { return fmt.Errorf("failed to init plugin: %w", err) } tableName := fmt.Sprintf("cq_%s_%d", spec.Name, time.Now().Unix()) - table := schema.TestTable(tableName, testSourceOptions...) - incTable := schema.TestTable(tableName+"_incremental", testSourceOptions...) + table := schema.TestTable(tableName, testOpts.TestSourceOptions) + incTable := schema.TestTable(tableName+"_incremental", testOpts.TestSourceOptions) incTable.IsIncremental = true syncTime := time.Now().UTC().Round(1 * time.Second) tables := schema.Tables{ diff --git a/plugins/destination/plugin_testing_write_append.go b/plugins/destination/plugin_testing_write_append.go index 578d20c35f..87174bc31b 100644 --- a/plugins/destination/plugin_testing_write_append.go +++ b/plugins/destination/plugin_testing_write_append.go @@ -12,13 +12,13 @@ import ( "github.com/rs/zerolog" ) -func (s *PluginTestSuite) destinationPluginTestWriteAppend(ctx context.Context, p *Plugin, logger zerolog.Logger, spec specs.Destination, testSourceOptions ...func(o *schema.TestSourceOptions)) error { +func (s *PluginTestSuite) destinationPluginTestWriteAppend(ctx context.Context, p *Plugin, logger zerolog.Logger, spec specs.Destination, testOpts PluginTestSuiteRunnerOptions) error { spec.WriteMode = specs.WriteModeAppend if err := p.Init(ctx, logger, spec); err != nil { return fmt.Errorf("failed to init plugin: %w", err) } tableName := fmt.Sprintf("cq_%s_%d", spec.Name, time.Now().Unix()) - table := schema.TestTable(tableName, testSourceOptions...) + table := schema.TestTable(tableName, testOpts.TestSourceOptions) syncTime := time.Now().UTC().Round(1 * time.Second) tables := schema.Tables{ table, @@ -68,6 +68,9 @@ func (s *PluginTestSuite) destinationPluginTestWriteAppend(ctx context.Context, return fmt.Errorf("expected %d resources, got %d", expectedResource, len(resourcesRead)) } + if testOpts.IgnoreNullsInLists { + stripNullsFromLists(record1) + } if !array.RecordApproxEqual(record1[0], resourcesRead[0]) { diff := RecordDiff(record1[0], resourcesRead[0]) return fmt.Errorf("first expected resource diff at row 0: %s", diff) diff --git a/plugins/destination/strip_nulls.go b/plugins/destination/strip_nulls.go new file mode 100644 index 0000000000..1767b1a7bf --- /dev/null +++ b/plugins/destination/strip_nulls.go @@ -0,0 +1,40 @@ +package destination + +import ( + "github.com/apache/arrow/go/v13/arrow" + "github.com/apache/arrow/go/v13/arrow/array" + "github.com/apache/arrow/go/v13/arrow/memory" +) + +func stripNullsFromLists(records []arrow.Record) { + for i := range records { + cols := make([]arrow.Array, records[i].NumCols()) + for c := range records[i].Columns() { + if records[i].Column(c).DataType().ID() == arrow.LIST { + list := records[i].Column(c).(*array.List) + bldr := array.NewListBuilder(memory.DefaultAllocator, list.DataType().(*arrow.ListType).Elem()) + for j := 0; j < list.Len(); j++ { + if list.IsNull(j) { + bldr.AppendNull() + continue + } + bldr.Append(true) + vBldr := bldr.ValueBuilder() + from, to := list.ValueOffsets(j) + slc := array.NewSlice(list.ListValues(), from, to) + for k := 0; k < int(to-from); k++ { + if slc.IsNull(k) { + continue + } + vBldr.AppendValueFromString(slc.ValueStr(k)) + } + + } + cols[c] = bldr.NewArray() + continue + } + cols[c] = records[i].Column(c) + } + records[i] = array.NewRecord(records[i].Schema(), cols, records[i].NumRows()) + } +} diff --git a/schema/testdata.go b/schema/testdata.go index e10ca2b9d7..81b48db9e9 100644 --- a/schema/testdata.go +++ b/schema/testdata.go @@ -29,68 +29,9 @@ type TestSourceOptions struct { SkipLargeTypes bool // e.g. large binary, large string } -func WithTestSourceSkipLists() func(o *TestSourceOptions) { - return func(o *TestSourceOptions) { - o.SkipLists = true - } -} - -func WithTestSourceSkipTimestamps() func(o *TestSourceOptions) { - return func(o *TestSourceOptions) { - o.SkipTimestamps = true - } -} - -func WithTestSourceSkipDates() func(o *TestSourceOptions) { - return func(o *TestSourceOptions) { - o.SkipDates = true - } -} - -func WithTestSourceSkipMaps() func(o *TestSourceOptions) { - return func(o *TestSourceOptions) { - o.SkipMaps = true - } -} - -func WithTestSourceSkipStructs() func(o *TestSourceOptions) { - return func(o *TestSourceOptions) { - o.SkipStructs = true - } -} - -func WithTestSourceSkipIntervals() func(o *TestSourceOptions) { - return func(o *TestSourceOptions) { - o.SkipIntervals = true - } -} - -func WithTestSourceSkipDurations() func(o *TestSourceOptions) { - return func(o *TestSourceOptions) { - o.SkipDurations = true - } -} - -func WithTestSourceSkipTimes() func(o *TestSourceOptions) { - return func(o *TestSourceOptions) { - o.SkipTimes = true - } -} - -func WithTestSourceSkipLargeTypes() func(o *TestSourceOptions) { - return func(o *TestSourceOptions) { - o.SkipLargeTypes = true - } -} - // TestSourceColumns returns columns for all Arrow types and composites thereof. TestSourceOptions controls // which types are included. -func TestSourceColumns(testOpts ...func(o *TestSourceOptions)) []Column { - var opts TestSourceOptions - for _, opt := range testOpts { - opt(&opts) - } - +func TestSourceColumns(testOpts TestSourceOptions) []Column { // cq columns var cqColumns []Column cqColumns = append(cqColumns, Column{Name: CqIDColumn.Name, Type: types.NewUUIDType(), NotNull: true, Unique: true, PrimaryKey: true}) @@ -115,25 +56,25 @@ func TestSourceColumns(testOpts ...func(o *TestSourceOptions)) []Column { // we don't support float16 right now basicColumns = removeColumnsByType(basicColumns, arrow.FLOAT16) - if opts.SkipTimestamps { + if testOpts.SkipTimestamps { // for backwards-compatibility, microsecond timestamps are not removed here basicColumns = removeColumnsByDataType(basicColumns, &arrow.TimestampType{Unit: arrow.Second, TimeZone: "UTC"}) basicColumns = removeColumnsByDataType(basicColumns, &arrow.TimestampType{Unit: arrow.Millisecond, TimeZone: "UTC"}) basicColumns = removeColumnsByDataType(basicColumns, &arrow.TimestampType{Unit: arrow.Nanosecond, TimeZone: "UTC"}) } - if opts.SkipDates { + if testOpts.SkipDates { basicColumns = removeColumnsByType(basicColumns, arrow.DATE32, arrow.DATE64) } - if opts.SkipTimes { + if testOpts.SkipTimes { basicColumns = removeColumnsByType(basicColumns, arrow.TIME32, arrow.TIME64) } - if opts.SkipIntervals { + if testOpts.SkipIntervals { basicColumns = removeColumnsByType(basicColumns, arrow.INTERVAL_DAY_TIME, arrow.INTERVAL_MONTHS, arrow.INTERVAL_MONTH_DAY_NANO) } - if opts.SkipDurations { + if testOpts.SkipDurations { basicColumns = removeColumnsByType(basicColumns, arrow.DURATION) } - if opts.SkipLargeTypes { + if testOpts.SkipLargeTypes { basicColumns = removeColumnsByType(basicColumns, arrow.LARGE_BINARY, arrow.LARGE_STRING) } @@ -141,7 +82,7 @@ func TestSourceColumns(testOpts ...func(o *TestSourceOptions)) []Column { // we don't need to include lists of binary or large binary right now; probably no destinations or sources need to support that basicColumnsWithExclusions := removeColumnsByType(basicColumns, arrow.BINARY, arrow.LARGE_BINARY) - if opts.SkipLists { + if testOpts.SkipLists { // only include lists that were originally supported by CQTypes cqListColumns := []Column{ {Name: "string", Type: arrow.BinaryTypes.String}, @@ -162,7 +103,7 @@ func TestSourceColumns(testOpts ...func(o *TestSourceOptions)) []Column { basicColumns = append(basicColumns, Column{Name: "json", Type: types.NewJSONType()}) basicColumns = append(basicColumns, Column{Name: "json_array", Type: types.NewJSONType()}) // GenTestData knows to populate this with a JSON array - if !opts.SkipStructs { + if !testOpts.SkipStructs { // struct with all the types compositeColumns = append(compositeColumns, Column{Name: "struct", Type: arrow.StructOf(columnsToFields(basicColumns...)...)}) @@ -293,13 +234,13 @@ func columnsToFields(columns ...Column) []arrow.Field { // var PKColumnNames = []string{"uuid_pk"} // TestTable returns a table with columns of all types. Useful for destination testing purposes -func TestTable(name string, opts ...func(o *TestSourceOptions)) *Table { +func TestTable(name string, testOpts TestSourceOptions) *Table { var columns []Column // columns = append(columns, Column{Name: "uuid", Type: types.NewUUIDType()}) // columns = append(columns, Column{Name: "string_pk", Type: arrow.BinaryTypes.String}) columns = append(columns, Column{Name: CqSourceNameColumn.Name, Type: arrow.BinaryTypes.String}) columns = append(columns, Column{Name: CqSyncTimeColumn.Name, Type: arrow.FixedWidthTypes.Timestamp_us}) - columns = append(columns, TestSourceColumns(opts...)...) + columns = append(columns, TestSourceColumns(testOpts)...) return &Table{Name: name, Columns: columns} } diff --git a/schema/testdata_test.go b/schema/testdata_test.go index 53903a3cf9..1dea1aae04 100644 --- a/schema/testdata_test.go +++ b/schema/testdata_test.go @@ -4,7 +4,7 @@ import "testing" func TestTestSourceColumns_Default(t *testing.T) { // basic sanity check for tested columns - defaults := TestSourceColumns() + defaults := TestSourceColumns(TestSourceOptions{}) if len(defaults) < 73 { t.Fatalf("expected at least 73 columns by default got: %d ", len(defaults)) } @@ -14,14 +14,17 @@ func TestTestSourceColumns_Default(t *testing.T) { func TestTestSourceColumns_SkipAll(t *testing.T) { skipAll := ColumnList(TestSourceColumns( - WithTestSourceSkipStructs(), - WithTestSourceSkipMaps(), - WithTestSourceSkipDates(), - WithTestSourceSkipTimes(), - WithTestSourceSkipTimestamps(), - WithTestSourceSkipDurations(), - WithTestSourceSkipIntervals(), - WithTestSourceSkipLargeTypes(), + TestSourceOptions{ + SkipLists: true, + SkipTimestamps: true, + SkipDates: true, + SkipMaps: true, + SkipStructs: true, + SkipIntervals: true, + SkipDurations: true, + SkipTimes: true, + SkipLargeTypes: true, + }, )) // test some specific columns checkColumnsExist(t, skipAll, []string{"int64", "timestamp_us", "string", "string_list"}) diff --git a/serve/destination_v1_test.go b/serve/destination_v1_test.go index 24d403785f..e5172106ad 100644 --- a/serve/destination_v1_test.go +++ b/serve/destination_v1_test.go @@ -90,7 +90,7 @@ func TestDestinationV1(t *testing.T) { tableName := "test_destination_serve" sourceName := "test_destination_serve_source" syncTime := time.Now() - table := schema.TestTable(tableName) + table := schema.TestTable(tableName, schema.TestSourceOptions{}) tables := schema.Tables{table} sourceSpec := specs.Source{ Name: sourceName, From 5be6c695c403590741101d504fac0daca6ec89d7 Mon Sep 17 00:00:00 2001 From: Herman Schaaf Date: Fri, 19 May 2023 17:50:33 +0100 Subject: [PATCH 3/4] Fix linting --- plugins/destination/strip_nulls.go | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/plugins/destination/strip_nulls.go b/plugins/destination/strip_nulls.go index 1767b1a7bf..434bc1d7b7 100644 --- a/plugins/destination/strip_nulls.go +++ b/plugins/destination/strip_nulls.go @@ -26,9 +26,11 @@ func stripNullsFromLists(records []arrow.Record) { if slc.IsNull(k) { continue } - vBldr.AppendValueFromString(slc.ValueStr(k)) + err := vBldr.AppendValueFromString(slc.ValueStr(k)) + if err != nil { + panic(err) + } } - } cols[c] = bldr.NewArray() continue From 1286b5a61a41a0b53710016d16446ca433504094 Mon Sep 17 00:00:00 2001 From: Herman Schaaf Date: Fri, 19 May 2023 17:58:41 +0100 Subject: [PATCH 4/4] Use options everywhere --- plugins/destination/plugin_testing_migrate.go | 27 ++++++++++--------- .../destination/plugin_testing_overwrite.go | 15 +++++++---- .../plugin_testing_overwrite_delete_stale.go | 9 +++++++ .../plugin_testing_write_append.go | 10 ++++--- 4 files changed, 40 insertions(+), 21 deletions(-) diff --git a/plugins/destination/plugin_testing_migrate.go b/plugins/destination/plugin_testing_migrate.go index 67e89cf5a1..69f669916a 100644 --- a/plugins/destination/plugin_testing_migrate.go +++ b/plugins/destination/plugin_testing_migrate.go @@ -21,7 +21,7 @@ func tableUUIDSuffix() string { return strings.ReplaceAll(uuid.NewString(), "-", "_") } -func testMigration(ctx context.Context, _ *testing.T, p *Plugin, logger zerolog.Logger, spec specs.Destination, target *schema.Table, source *schema.Table, mode specs.MigrateMode) error { +func testMigration(ctx context.Context, _ *testing.T, p *Plugin, logger zerolog.Logger, spec specs.Destination, target *schema.Table, source *schema.Table, mode specs.MigrateMode, testOpts PluginTestSuiteRunnerOptions) error { if err := p.Init(ctx, logger, spec); err != nil { return fmt.Errorf("failed to init plugin: %w", err) } @@ -49,10 +49,13 @@ func testMigration(ctx context.Context, _ *testing.T, p *Plugin, logger zerolog. return fmt.Errorf("failed to migrate existing table: %w", err) } opts.SyncTime = syncTime.Add(time.Second).UTC() - resource2 := schema.GenTestData(target, opts)[0] - if err := p.writeOne(ctx, sourceSpec, syncTime, resource2); err != nil { + resource2 := schema.GenTestData(target, opts) + if err := p.writeAll(ctx, sourceSpec, syncTime, resource2); err != nil { return fmt.Errorf("failed to write one after migration: %w", err) } + if testOpts.IgnoreNullsInLists { + stripNullsFromLists(resource2) + } resourcesRead, err := p.readAll(ctx, target, sourceName) if err != nil { @@ -63,16 +66,16 @@ func testMigration(ctx context.Context, _ *testing.T, p *Plugin, logger zerolog. if len(resourcesRead) != 2 { return fmt.Errorf("expected 2 resources after write, got %d", len(resourcesRead)) } - if !array.RecordApproxEqual(resourcesRead[1], resource2) { - diff := RecordDiff(resourcesRead[1], resource2) + if !array.RecordApproxEqual(resourcesRead[1], resource2[0]) { + diff := RecordDiff(resourcesRead[1], resource2[0]) return fmt.Errorf("resource1 and resource2 are not equal. diff: %s", diff) } } else { if len(resourcesRead) != 1 { return fmt.Errorf("expected 1 resource after write, got %d", len(resourcesRead)) } - if !array.RecordApproxEqual(resourcesRead[0], resource2) { - diff := RecordDiff(resourcesRead[0], resource2) + if !array.RecordApproxEqual(resourcesRead[0], resource2[0]) { + diff := RecordDiff(resourcesRead[0], resource2[0]) return fmt.Errorf("resource1 and resource2 are not equal. diff: %s", diff) } } @@ -119,7 +122,7 @@ func (*PluginTestSuite) destinationPluginTestMigrate( } p := newPlugin() - if err := testMigration(ctx, t, p, logger, spec, target, source, strategy.AddColumn); err != nil { + if err := testMigration(ctx, t, p, logger, spec, target, source, strategy.AddColumn, testOpts); err != nil { t.Fatalf("failed to migrate %s: %v", tableName, err) } if err := p.Close(ctx); err != nil { @@ -153,7 +156,7 @@ func (*PluginTestSuite) destinationPluginTestMigrate( {Name: "bool", Type: arrow.FixedWidthTypes.Boolean, NotNull: true}, }} p := newPlugin() - if err := testMigration(ctx, t, p, logger, spec, target, source, strategy.AddColumnNotNull); err != nil { + if err := testMigration(ctx, t, p, logger, spec, target, source, strategy.AddColumnNotNull, testOpts); err != nil { t.Fatalf("failed to migrate add_column_not_null: %v", err) } if err := p.Close(ctx); err != nil { @@ -186,7 +189,7 @@ func (*PluginTestSuite) destinationPluginTestMigrate( }} p := newPlugin() - if err := testMigration(ctx, t, p, logger, spec, target, source, strategy.RemoveColumn); err != nil { + if err := testMigration(ctx, t, p, logger, spec, target, source, strategy.RemoveColumn, testOpts); err != nil { t.Fatalf("failed to migrate remove_column: %v", err) } if err := p.Close(ctx); err != nil { @@ -220,7 +223,7 @@ func (*PluginTestSuite) destinationPluginTestMigrate( }} p := newPlugin() - if err := testMigration(ctx, t, p, logger, spec, target, source, strategy.RemoveColumnNotNull); err != nil { + if err := testMigration(ctx, t, p, logger, spec, target, source, strategy.RemoveColumnNotNull, testOpts); err != nil { t.Fatalf("failed to migrate remove_column_not_null: %v", err) } if err := p.Close(ctx); err != nil { @@ -254,7 +257,7 @@ func (*PluginTestSuite) destinationPluginTestMigrate( }} p := newPlugin() - if err := testMigration(ctx, t, p, logger, spec, target, source, strategy.ChangeColumn); err != nil { + if err := testMigration(ctx, t, p, logger, spec, target, source, strategy.ChangeColumn, testOpts); err != nil { t.Fatalf("failed to migrate change_column: %v", err) } if err := p.Close(ctx); err != nil { diff --git a/plugins/destination/plugin_testing_overwrite.go b/plugins/destination/plugin_testing_overwrite.go index 9432b619f4..778aaa5f6c 100644 --- a/plugins/destination/plugin_testing_overwrite.go +++ b/plugins/destination/plugin_testing_overwrite.go @@ -43,7 +43,9 @@ func (*PluginTestSuite) destinationPluginTestWriteOverwrite(ctx context.Context, return fmt.Errorf("failed to write all: %w", err) } sortRecordsBySyncTime(table, resources) - + if testOpts.IgnoreNullsInLists { + stripNullsFromLists(resources) + } resourcesRead, err := p.readAll(ctx, table, sourceName) if err != nil { return fmt.Errorf("failed to read all: %w", err) @@ -75,12 +77,15 @@ func (*PluginTestSuite) destinationPluginTestWriteOverwrite(ctx context.Context, MaxRows: 1, StableUUID: u, } - updatedResource := schema.GenTestData(table, opts)[0] + updatedResource := schema.GenTestData(table, opts) // write second time - if err := p.writeOne(ctx, sourceSpec, secondSyncTime, updatedResource); err != nil { + if err := p.writeAll(ctx, sourceSpec, secondSyncTime, updatedResource); err != nil { return fmt.Errorf("failed to write one second time: %w", err) } + if testOpts.IgnoreNullsInLists { + stripNullsFromLists(updatedResource) + } resourcesRead, err = p.readAll(ctx, table, sourceName) if err != nil { return fmt.Errorf("failed to read all second time: %w", err) @@ -94,8 +99,8 @@ func (*PluginTestSuite) destinationPluginTestWriteOverwrite(ctx context.Context, diff := RecordDiff(resources[1], resourcesRead[0]) return fmt.Errorf("after overwrite expected first resource to be equal. diff=%s", diff) } - if !array.RecordApproxEqual(updatedResource, resourcesRead[1]) { - diff := RecordDiff(updatedResource, resourcesRead[1]) + if !array.RecordApproxEqual(updatedResource[0], resourcesRead[1]) { + diff := RecordDiff(updatedResource[0], resourcesRead[1]) return fmt.Errorf("after overwrite expected second resource to be equal. diff=%s", diff) } diff --git a/plugins/destination/plugin_testing_overwrite_delete_stale.go b/plugins/destination/plugin_testing_overwrite_delete_stale.go index 96ffeb9002..bcb07fadc4 100644 --- a/plugins/destination/plugin_testing_overwrite_delete_stale.go +++ b/plugins/destination/plugin_testing_overwrite_delete_stale.go @@ -60,6 +60,9 @@ func (*PluginTestSuite) destinationPluginTestWriteOverwriteDeleteStale(ctx conte if len(resourcesRead) != 2 { return fmt.Errorf("expected 2 resources, got %d", len(resourcesRead)) } + if testOpts.IgnoreNullsInLists { + stripNullsFromLists(resources) + } if !array.RecordApproxEqual(resources[0], resourcesRead[0]) { diff := RecordDiff(resources[0], resourcesRead[0]) return fmt.Errorf("expected first resource to be equal. diff: %s", diff) @@ -106,6 +109,9 @@ func (*PluginTestSuite) destinationPluginTestWriteOverwriteDeleteStale(ctx conte if len(resourcesRead) != 1 { return fmt.Errorf("after overwrite expected 1 resource, got %d", len(resourcesRead)) } + if testOpts.IgnoreNullsInLists { + stripNullsFromLists(resources) + } if array.RecordApproxEqual(resources[0], resourcesRead[0]) { diff := RecordDiff(resources[0], resourcesRead[0]) return fmt.Errorf("after overwrite expected first resource to be different. diff: %s", diff) @@ -120,6 +126,9 @@ func (*PluginTestSuite) destinationPluginTestWriteOverwriteDeleteStale(ctx conte } // we expect the only resource returned to match the updated resource we wrote + if testOpts.IgnoreNullsInLists { + stripNullsFromLists(updatedResources) + } if !array.RecordApproxEqual(updatedResources[0], resourcesRead[0]) { diff := RecordDiff(updatedResources[0], resourcesRead[0]) return fmt.Errorf("after delete stale expected resource to be equal. diff: %s", diff) diff --git a/plugins/destination/plugin_testing_write_append.go b/plugins/destination/plugin_testing_write_append.go index 87174bc31b..b83e8aa63e 100644 --- a/plugins/destination/plugin_testing_write_append.go +++ b/plugins/destination/plugin_testing_write_append.go @@ -44,11 +44,12 @@ func (s *PluginTestSuite) destinationPluginTestWriteAppend(ctx context.Context, secondSyncTime := syncTime.Add(10 * time.Second).UTC() opts.SyncTime = secondSyncTime - record2 := schema.GenTestData(table, opts)[0] + opts.MaxRows = 1 + record2 := schema.GenTestData(table, opts) if !s.tests.SkipSecondAppend { // write second time - if err := p.writeOne(ctx, specSource, secondSyncTime, record2); err != nil { + if err := p.writeAll(ctx, specSource, secondSyncTime, record2); err != nil { return fmt.Errorf("failed to write one second time: %w", err) } } @@ -70,6 +71,7 @@ func (s *PluginTestSuite) destinationPluginTestWriteAppend(ctx context.Context, if testOpts.IgnoreNullsInLists { stripNullsFromLists(record1) + stripNullsFromLists(record2) } if !array.RecordApproxEqual(record1[0], resourcesRead[0]) { diff := RecordDiff(record1[0], resourcesRead[0]) @@ -81,8 +83,8 @@ func (s *PluginTestSuite) destinationPluginTestWriteAppend(ctx context.Context, } if !s.tests.SkipSecondAppend { - if !array.RecordApproxEqual(record2, resourcesRead[2]) { - diff := RecordDiff(record2, resourcesRead[2]) + if !array.RecordApproxEqual(record2[0], resourcesRead[2]) { + diff := RecordDiff(record2[0], resourcesRead[2]) return fmt.Errorf("second expected resource diff: %s", diff) } }