Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
109 changes: 109 additions & 0 deletions plugins/destination/plugin_testing.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,11 @@ type PluginTestSuiteTests struct {
// This is useful in cases like cloud storage where you can't append to an
// existing object after the file has been closed.
SkipSecondAppend bool

// destinationPluginTestMigrateAppend skips a test for the migrate function where a column is added,
// data is appended, then the column is removed and more data appended, checking that the migrations handle
// this correctly.
SkipMigrateAppend bool
}

func (s *PluginTestSuite) destinationPluginTestWriteOverwrite(ctx context.Context, p *Plugin, logger zerolog.Logger, spec specs.Destination) error {
Expand Down Expand Up @@ -197,6 +202,99 @@ func (s *PluginTestSuite) destinationPluginTestWriteAppend(ctx context.Context,
return nil
}

func (*PluginTestSuite) destinationPluginTestMigrateAppend(ctx context.Context, p *Plugin, logger zerolog.Logger, spec specs.Destination) error {
spec.WriteMode = specs.WriteModeAppend
spec.BatchSize = 1
if err := p.Init(ctx, logger, spec); err != nil {
return fmt.Errorf("failed to init plugin: %w", err)
}
tableName := "cq_test_migrate_append"
table := testdata.TestTable(tableName)
if err := p.Migrate(ctx, []*schema.Table{table}); err != nil {
return fmt.Errorf("failed to migrate tables: %w", err)
}
sourceName := "testMigrateAppendSource" + uuid.NewString()
syncTime := time.Now().UTC().Round(1 * time.Second)
resource1 := createTestResources(table, sourceName, syncTime, 1)[0]
if err := p.writeOne(ctx, []*schema.Table{table}, "test_source", syncTime, resource1); err != nil {
return fmt.Errorf("failed to write one: %w", err)
}

// check that migrations and writes still succeed when column ordering is changed
a := table.Columns.Index("uuid")
b := table.Columns.Index("float")
table.Columns[a], table.Columns[b] = table.Columns[b], table.Columns[a]
if err := p.Migrate(ctx, []*schema.Table{table}); err != nil {
return fmt.Errorf("failed to migrate table with changed column ordering: %w", err)
}
resource2 := createTestResources(table, sourceName, syncTime, 1)[0]
if err := p.writeOne(ctx, []*schema.Table{table}, "test_source", syncTime, resource2); err != nil {
return fmt.Errorf("failed to write one after column order change: %w", err)
}

resourcesRead, err := p.readAll(ctx, table, sourceName)
if err != nil {
return fmt.Errorf("failed to read all: %w", err)
}
if len(resourcesRead) != 2 {
return fmt.Errorf("expected 2 resources after second write, got %d", len(resourcesRead))
}

// check that migrations succeed when a new column is added
table.Columns = append(table.Columns, schema.Column{
Name: "new_column",
Type: schema.TypeInt,
})
if err := p.Migrate(ctx, []*schema.Table{table}); err != nil {
return fmt.Errorf("failed to migrate table with new column: %w", err)
}
resource3 := createTestResources(table, sourceName, syncTime, 1)[0]
if err := p.writeOne(ctx, []*schema.Table{table}, "test_source", syncTime, resource3); err != nil {
return fmt.Errorf("failed to write one after column order change: %w", err)
}
resourcesRead, err = p.readAll(ctx, table, sourceName)
if err != nil {
return fmt.Errorf("failed to read all: %w", err)
}
if len(resourcesRead) != 3 {
return fmt.Errorf("expected 3 resources after third write, got %d", len(resourcesRead))
}

// check that migration still succeeds when there is an extra column in the destination table,
// which should be ignored
oldTable := testdata.TestTable(tableName)
if err := p.Migrate(ctx, []*schema.Table{oldTable}); err != nil {
return fmt.Errorf("failed to migrate table with extra column in destination: %w", err)
}
resource4 := createTestResources(oldTable, sourceName, syncTime, 1)[0]
if err := p.writeOne(ctx, []*schema.Table{oldTable}, "test_source", syncTime, resource4); err != nil {
return fmt.Errorf("failed to write one after column order change: %w", err)
}
resourcesRead, err = p.readAll(ctx, oldTable, sourceName)
if err != nil {
return fmt.Errorf("failed to read all: %w", err)
}
if len(resourcesRead) != 4 {
return fmt.Errorf("expected 4 resources after fourth write, got %d", len(resourcesRead))
}
cqIDIndex := table.Columns.Index(schema.CqIDColumn.Name)
found := false
for _, r := range resourcesRead {
if !r[cqIDIndex].Equal(resource4.Data[cqIDIndex]) {
continue
}
found = true
if !r.Equal(resource4.Data) {
return fmt.Errorf("expected resource to be equal to original resource, but got diff: %s", r.Diff(resource4.Data))
}
}
if !found {
return fmt.Errorf("expected to find resource with cq_id %s, but none matched", resource4.Data[cqIDIndex])
}

return nil
}

func getTestLogger(t *testing.T) zerolog.Logger {
t.Helper()
zerolog.TimeFieldFormat = zerolog.TimeFormatUnixMs
Expand Down Expand Up @@ -238,6 +336,17 @@ func PluginTestSuiteRunner(t *testing.T, p *Plugin, spec any, tests PluginTestSu
t.Fatal(err)
}
})

t.Run("TestMigrateAppend", func(t *testing.T) {
t.Helper()
if suite.tests.SkipMigrateAppend {
t.Skip("skipping TestMigrateAppend")
return
}
if err := suite.destinationPluginTestMigrateAppend(ctx, p, logger, destSpec); err != nil {
t.Fatal(err)
}
})
}

func createTestResources(table *schema.Table, sourceName string, syncTime time.Time, count int) []schema.DestinationResource {
Expand Down