diff --git a/plugins/destination/plugin_testing.go b/plugins/destination/plugin_testing.go index 21ab53281f..d4849f0400 100644 --- a/plugins/destination/plugin_testing.go +++ b/plugins/destination/plugin_testing.go @@ -36,6 +36,11 @@ type PluginTestSuiteTests struct { // This is useful in cases like cloud storage where you can't append to an // existing object after the file has been closed. SkipSecondAppend bool + + // destinationPluginTestMigrateAppend skips a test for the migrate function where a column is added, + // data is appended, then the column is removed and more data appended, checking that the migrations handle + // this correctly. + SkipMigrateAppend bool } func (s *PluginTestSuite) destinationPluginTestWriteOverwrite(ctx context.Context, p *Plugin, logger zerolog.Logger, spec specs.Destination) error { @@ -197,6 +202,99 @@ func (s *PluginTestSuite) destinationPluginTestWriteAppend(ctx context.Context, return nil } +func (*PluginTestSuite) destinationPluginTestMigrateAppend(ctx context.Context, p *Plugin, logger zerolog.Logger, spec specs.Destination) error { + spec.WriteMode = specs.WriteModeAppend + spec.BatchSize = 1 + if err := p.Init(ctx, logger, spec); err != nil { + return fmt.Errorf("failed to init plugin: %w", err) + } + tableName := "cq_test_migrate_append" + table := testdata.TestTable(tableName) + if err := p.Migrate(ctx, []*schema.Table{table}); err != nil { + return fmt.Errorf("failed to migrate tables: %w", err) + } + sourceName := "testMigrateAppendSource" + uuid.NewString() + syncTime := time.Now().UTC().Round(1 * time.Second) + resource1 := createTestResources(table, sourceName, syncTime, 1)[0] + if err := p.writeOne(ctx, []*schema.Table{table}, "test_source", syncTime, resource1); err != nil { + return fmt.Errorf("failed to write one: %w", err) + } + + // check that migrations and writes still succeed when column ordering is changed + a := table.Columns.Index("uuid") + b := table.Columns.Index("float") + table.Columns[a], table.Columns[b] = table.Columns[b], table.Columns[a] + if err := p.Migrate(ctx, []*schema.Table{table}); err != nil { + return fmt.Errorf("failed to migrate table with changed column ordering: %w", err) + } + resource2 := createTestResources(table, sourceName, syncTime, 1)[0] + if err := p.writeOne(ctx, []*schema.Table{table}, "test_source", syncTime, resource2); err != nil { + return fmt.Errorf("failed to write one after column order change: %w", err) + } + + resourcesRead, err := p.readAll(ctx, table, sourceName) + if err != nil { + return fmt.Errorf("failed to read all: %w", err) + } + if len(resourcesRead) != 2 { + return fmt.Errorf("expected 2 resources after second write, got %d", len(resourcesRead)) + } + + // check that migrations succeed when a new column is added + table.Columns = append(table.Columns, schema.Column{ + Name: "new_column", + Type: schema.TypeInt, + }) + if err := p.Migrate(ctx, []*schema.Table{table}); err != nil { + return fmt.Errorf("failed to migrate table with new column: %w", err) + } + resource3 := createTestResources(table, sourceName, syncTime, 1)[0] + if err := p.writeOne(ctx, []*schema.Table{table}, "test_source", syncTime, resource3); err != nil { + return fmt.Errorf("failed to write one after column order change: %w", err) + } + resourcesRead, err = p.readAll(ctx, table, sourceName) + if err != nil { + return fmt.Errorf("failed to read all: %w", err) + } + if len(resourcesRead) != 3 { + return fmt.Errorf("expected 3 resources after third write, got %d", len(resourcesRead)) + } + + // check that migration still succeeds when there is an extra column in the destination table, + // which should be ignored + oldTable := testdata.TestTable(tableName) + if err := p.Migrate(ctx, []*schema.Table{oldTable}); err != nil { + return fmt.Errorf("failed to migrate table with extra column in destination: %w", err) + } + resource4 := createTestResources(oldTable, sourceName, syncTime, 1)[0] + if err := p.writeOne(ctx, []*schema.Table{oldTable}, "test_source", syncTime, resource4); err != nil { + return fmt.Errorf("failed to write one after column order change: %w", err) + } + resourcesRead, err = p.readAll(ctx, oldTable, sourceName) + if err != nil { + return fmt.Errorf("failed to read all: %w", err) + } + if len(resourcesRead) != 4 { + return fmt.Errorf("expected 4 resources after fourth write, got %d", len(resourcesRead)) + } + cqIDIndex := table.Columns.Index(schema.CqIDColumn.Name) + found := false + for _, r := range resourcesRead { + if !r[cqIDIndex].Equal(resource4.Data[cqIDIndex]) { + continue + } + found = true + if !r.Equal(resource4.Data) { + return fmt.Errorf("expected resource to be equal to original resource, but got diff: %s", r.Diff(resource4.Data)) + } + } + if !found { + return fmt.Errorf("expected to find resource with cq_id %s, but none matched", resource4.Data[cqIDIndex]) + } + + return nil +} + func getTestLogger(t *testing.T) zerolog.Logger { t.Helper() zerolog.TimeFieldFormat = zerolog.TimeFormatUnixMs @@ -238,6 +336,17 @@ func PluginTestSuiteRunner(t *testing.T, p *Plugin, spec any, tests PluginTestSu t.Fatal(err) } }) + + t.Run("TestMigrateAppend", func(t *testing.T) { + t.Helper() + if suite.tests.SkipMigrateAppend { + t.Skip("skipping TestMigrateAppend") + return + } + if err := suite.destinationPluginTestMigrateAppend(ctx, p, logger, destSpec); err != nil { + t.Fatal(err) + } + }) } func createTestResources(table *schema.Table, sourceName string, syncTime time.Time, count int) []schema.DestinationResource {