From 08b0f457e0f31fa89932f03965a132b4187d8961 Mon Sep 17 00:00:00 2001 From: Yevgeny Pats <16490766+yevgenypats@users.noreply.github.com> Date: Wed, 15 Feb 2023 21:16:38 +0200 Subject: [PATCH 01/15] feat: Add better migration detection APIs --- internal/memdb/memdb.go | 14 +- internal/memdb/memdb_test.go | 37 +- plugins/destination/plugin_testing.go | 46 +-- plugins/destination/plugin_testing_migrate.go | 337 +++++++++++++----- schema/column.go | 2 +- schema/table.go | 108 +++--- schema/table_test.go | 145 ++++---- 7 files changed, 467 insertions(+), 222 deletions(-) diff --git a/internal/memdb/memdb.go b/internal/memdb/memdb.go index f6db98134f..2555ce1f64 100644 --- a/internal/memdb/memdb.go +++ b/internal/memdb/memdb.go @@ -19,6 +19,7 @@ type client struct { schema.DefaultTransformer spec specs.Destination memoryDB map[string][][]any + tables map[string]*schema.Table memoryDBLock sync.RWMutex errOnWrite bool blockingWrite bool @@ -62,6 +63,7 @@ func getTestLogger(t *testing.T) zerolog.Logger { func NewClient(_ context.Context, _ zerolog.Logger, spec specs.Destination) (destination.Client, error) { return &client{ memoryDB: make(map[string][][]any), + tables: make(map[string]*schema.Table), spec: spec, }, nil } @@ -101,9 +103,19 @@ func (c *client) overwrite(table *schema.Table, data []any) { func (c *client) Migrate(_ context.Context, tables schema.Tables) error { for _, table := range tables { - if c.memoryDB[table.Name] == nil { + memTable := c.memoryDB[table.Name] + if memTable == nil { c.memoryDB[table.Name] = make([][]any, 0) + c.tables[table.Name] = table + continue } + changes := table.GetChanges(c.tables[table.Name]) + // memdb doesn't support any auto-migrate + if changes == nil { + continue + } + c.memoryDB[table.Name] = make([][]any, 0) + c.tables[table.Name] = table } return nil } diff --git a/internal/memdb/memdb_test.go b/internal/memdb/memdb_test.go index 5b501736b6..77c8001cf7 100644 --- a/internal/memdb/memdb_test.go +++ b/internal/memdb/memdb_test.go @@ -13,6 +13,22 @@ import ( "github.com/stretchr/testify/require" ) +var migrateStrategyOverwrite = destination.MigrateStrategy{ + AddColumn: specs.MigrateModeForced, + AddColumnNotNull: specs.MigrateModeForced, + RemoveColumn: specs.MigrateModeForced, + RemoveColumnNotNull: specs.MigrateModeForced, + ChangeColumn: specs.MigrateModeForced, +} + +var migrateStrategyAppend = destination.MigrateStrategy{ + AddColumn: specs.MigrateModeForced, + AddColumnNotNull: specs.MigrateModeForced, + RemoveColumn: specs.MigrateModeForced, + RemoveColumnNotNull: specs.MigrateModeForced, + ChangeColumn: specs.MigrateModeForced, +} + func TestPluginUnmanagedClient(t *testing.T) { destination.PluginTestSuiteRunner( t, @@ -20,7 +36,11 @@ func TestPluginUnmanagedClient(t *testing.T) { return destination.NewPlugin("test", "development", NewClient) }, nil, - destination.PluginTestSuiteTests{}) + destination.PluginTestSuiteTests{ + MigrateStrategyOverwrite: migrateStrategyOverwrite, + MigrateStrategyAppend: migrateStrategyOverwrite, + }, + ) } func TestPluginManagedClient(t *testing.T) { @@ -29,7 +49,10 @@ func TestPluginManagedClient(t *testing.T) { return destination.NewPlugin("test", "development", NewClient, destination.WithManagedWriter()) }, nil, - destination.PluginTestSuiteTests{}) + destination.PluginTestSuiteTests{ + MigrateStrategyOverwrite: migrateStrategyOverwrite, + MigrateStrategyAppend: migrateStrategyOverwrite, + }) } func TestPluginManagedClientWithSmallBatchSize(t *testing.T) { @@ -39,7 +62,10 @@ func TestPluginManagedClientWithSmallBatchSize(t *testing.T) { destination.WithDefaultBatchSize(1), destination.WithDefaultBatchSizeBytes(1)) }, nil, - destination.PluginTestSuiteTests{}) + destination.PluginTestSuiteTests{ + MigrateStrategyOverwrite: migrateStrategyOverwrite, + MigrateStrategyAppend: migrateStrategyOverwrite, + }) } func TestPluginManagedClientWithLargeBatchSize(t *testing.T) { @@ -50,7 +76,10 @@ func TestPluginManagedClientWithLargeBatchSize(t *testing.T) { destination.WithDefaultBatchSizeBytes(100000000)) }, nil, - destination.PluginTestSuiteTests{}) + destination.PluginTestSuiteTests{ + MigrateStrategyOverwrite: migrateStrategyOverwrite, + MigrateStrategyAppend: migrateStrategyOverwrite, + }) } func TestPluginOnNewError(t *testing.T) { diff --git a/plugins/destination/plugin_testing.go b/plugins/destination/plugin_testing.go index b3a994227c..3bd6e3db6c 100644 --- a/plugins/destination/plugin_testing.go +++ b/plugins/destination/plugin_testing.go @@ -17,9 +17,26 @@ type PluginTestSuite struct { tests PluginTestSuiteTests } +// type MigrateResult int + +// const ( +// MigrateResultSuccess MigrateResult = iota +// MigrateResultDropTable +// ) + +// MigrateStrategy defines which tests we should include +// true means test should succeed for either automigrate +type MigrateStrategy struct { + AddColumn specs.MigrateMode + AddColumnNotNull specs.MigrateMode + RemoveColumn specs.MigrateMode + RemoveColumnNotNull specs.MigrateMode + ChangeColumn specs.MigrateMode +} + type PluginTestSuiteTests struct { // SkipOverwrite skips testing for "overwrite" mode. Use if the destination - // // plugin doesn't support this feature. + // plugin doesn't support this feature. SkipOverwrite bool // SkipDeleteStale skips testing "delete-stale" mode. Use if the destination @@ -48,6 +65,9 @@ type PluginTestSuiteTests struct { SkipMigrateOverwrite bool // SkipMigrateOverwriteForce skips a test for the migrate function where a column is changed in force mode SkipMigrateOverwriteForce bool + + MigrateStrategyOverwrite MigrateStrategy + MigrateStrategyAppend MigrateStrategy } func getTestLogger(t *testing.T) zerolog.Logger { @@ -109,11 +129,7 @@ func PluginTestSuiteRunner(t *testing.T, newPlugin NewPluginFunc, spec any, test } destSpec.WriteMode = specs.WriteModeOverwrite destSpec.Name = "test_migrate_overwrite" - p := newPlugin() - if err := suite.destinationPluginTestMigrate(ctx, p, logger, destSpec); err != nil { - t.Fatal(err) - } - if err := p.Close(ctx); err != nil { + if err := suite.destinationPluginTestMigrate(t, ctx, newPlugin, logger, destSpec, tests.MigrateStrategyOverwrite); err != nil { t.Fatal(err) } }) @@ -126,11 +142,7 @@ func PluginTestSuiteRunner(t *testing.T, newPlugin NewPluginFunc, spec any, test destSpec.WriteMode = specs.WriteModeOverwrite destSpec.MigrateMode = specs.MigrateModeForced destSpec.Name = "test_migrate_overwrite_force" - p := newPlugin() - if err := suite.destinationPluginTestMigrate(ctx, p, logger, destSpec); err != nil { - t.Fatal(err) - } - if err := p.Close(ctx); err != nil { + if err := suite.destinationPluginTestMigrate(t, ctx, newPlugin, logger, destSpec, tests.MigrateStrategyOverwrite); err != nil { t.Fatal(err) } }) @@ -157,11 +169,7 @@ func PluginTestSuiteRunner(t *testing.T, newPlugin NewPluginFunc, spec any, test } destSpec.WriteMode = specs.WriteModeAppend destSpec.Name = "test_migrate_append" - p := newPlugin() - if err := suite.destinationPluginTestMigrate(ctx, p, logger, destSpec); err != nil { - t.Fatal(err) - } - if err := p.Close(ctx); err != nil { + if err := suite.destinationPluginTestMigrate(t, ctx, newPlugin, logger, destSpec, tests.MigrateStrategyAppend); err != nil { t.Fatal(err) } }) @@ -174,11 +182,7 @@ func PluginTestSuiteRunner(t *testing.T, newPlugin NewPluginFunc, spec any, test destSpec.WriteMode = specs.WriteModeAppend destSpec.MigrateMode = specs.MigrateModeForced destSpec.Name = "test_migrate_append_force" - p := newPlugin() - if err := suite.destinationPluginTestMigrate(ctx, p, logger, destSpec); err != nil { - t.Fatal(err) - } - if err := p.Close(ctx); err != nil { + if err := suite.destinationPluginTestMigrate(t, ctx, newPlugin, logger, destSpec, tests.MigrateStrategyAppend); err != nil { t.Fatal(err) } }) diff --git a/plugins/destination/plugin_testing_migrate.go b/plugins/destination/plugin_testing_migrate.go index 3584fb37c2..dec9074c58 100644 --- a/plugins/destination/plugin_testing_migrate.go +++ b/plugins/destination/plugin_testing_migrate.go @@ -3,124 +3,295 @@ package destination import ( "context" "fmt" + "testing" "time" "github.com/cloudquery/plugin-sdk/schema" "github.com/cloudquery/plugin-sdk/specs" - "github.com/cloudquery/plugin-sdk/testdata" "github.com/rs/zerolog" ) -func (*PluginTestSuite) destinationPluginTestMigrate( - ctx context.Context, - p *Plugin, - logger zerolog.Logger, - spec specs.Destination, -) error { - spec.BatchSize = 1 +var tableTest1 = &schema.Table{ + Name: "test1", + Columns: []schema.Column{ + { + Name: "id", + Type: schema.TypeUUID, + CreationOptions: schema.ColumnCreationOptions{ + PrimaryKey: true, + }, + }, + }, +} + +var tableTest2 = &schema.Table{ + Name: "test1", + Columns: []schema.Column{ + { + Name: "id", + Type: schema.TypeUUID, + CreationOptions: schema.ColumnCreationOptions{ + PrimaryKey: true, + }, + }, + { + Name: "bool", + Type: schema.TypeBool, + }, + }, +} + +func testMigration(ctx context.Context, p *Plugin, logger zerolog.Logger, spec specs.Destination, target *schema.Table, source *schema.Table, mode specs.MigrateMode) error { if err := p.Init(ctx, logger, spec); err != nil { return fmt.Errorf("failed to init plugin: %w", err) } - tableName := fmt.Sprintf("cq_%s_%d", spec.Name, time.Now().Unix()) - table := testdata.TestTable(tableName) - if err := p.Migrate(ctx, []*schema.Table{table}); err != nil { + if err := p.Migrate(ctx, []*schema.Table{source}); err != nil { return fmt.Errorf("failed to migrate tables: %w", err) } - sourceName := tableName + sourceName := target.Name sourceSpec := specs.Source{ Name: sourceName, } syncTime := time.Now().UTC().Round(1 * time.Second) - resource1 := createTestResources(table, sourceName, syncTime, 1)[0] - if err := p.writeOne(ctx, sourceSpec, []*schema.Table{table}, syncTime, resource1); err != nil { + resource1 := createTestResources(source, sourceName, syncTime, 1)[0] + if err := p.writeOne(ctx, sourceSpec, []*schema.Table{source}, syncTime, resource1); err != nil { return fmt.Errorf("failed to write one: %w", err) } - // check that migrations and writes still succeed when column ordering is changed - a := table.Columns.Index("uuid") - b := table.Columns.Index("float") - table.Columns[a], table.Columns[b] = table.Columns[b], table.Columns[a] - if err := p.Migrate(ctx, []*schema.Table{table}); err != nil { - return fmt.Errorf("failed to migrate table with changed column ordering: %w", err) + if err := p.Migrate(ctx, []*schema.Table{target}); err != nil { + return fmt.Errorf("failed to migrate existing table: %w", err) } - resource2 := createTestResources(table, sourceName, syncTime, 1)[0] - if err := p.writeOne(ctx, sourceSpec, []*schema.Table{table}, syncTime, resource2); err != nil { - return fmt.Errorf("failed to write one after column order change: %w", err) + resource2 := createTestResources(target, sourceName, syncTime, 1)[0] + if err := p.writeOne(ctx, sourceSpec, []*schema.Table{target}, syncTime, resource2); err != nil { + return fmt.Errorf("failed to write one after migration: %w", err) } - resourcesRead, err := p.readAll(ctx, table, sourceName) + resourcesRead, err := p.readAll(ctx, target, sourceName) if err != nil { return fmt.Errorf("failed to read all: %w", err) } - if len(resourcesRead) != 2 { - return fmt.Errorf("expected 2 resources after second write, got %d", len(resourcesRead)) + if mode == specs.MigrateModeSafe { + if len(resourcesRead) != 2 { + return fmt.Errorf("expected 2 resources after write, got %d", len(resourcesRead)) + } + if diff := resourcesRead[0].Diff(resource1.Data); diff != "" { + return fmt.Errorf("resource1 diff: %s", diff) + } + if diff := resourcesRead[1].Diff(resource2.Data); diff != "" { + return fmt.Errorf("resource2 diff: %s", diff) + } + } else { + if len(resourcesRead) != 1 { + return fmt.Errorf("expected 1 resources after write, got %d", len(resourcesRead)) + } + if diff := resourcesRead[0].Diff(resource2.Data); diff != "" { + return fmt.Errorf("resource1 diff: %s", diff) + } } - // check that migrations succeed when a new column is added - table.Columns = append(table.Columns, schema.Column{ - Name: "new_column", - Type: schema.TypeInt, + return nil +} + +func (*PluginTestSuite) destinationPluginTestMigrate( + t *testing.T, + ctx context.Context, + newPlugin NewPluginFunc, + logger zerolog.Logger, + spec specs.Destination, + strategy MigrateStrategy, +) error { + spec.BatchSize = 1 + + t.Run("add_column", func(t *testing.T) { + if strategy.AddColumn == specs.MigrateModeForced && spec.MigrateMode == specs.MigrateModeSafe { + t.Skip("skipping as migrate mode is safe") + return + } + source := &schema.Table{ + Name: "add_column", + Columns: []schema.Column{ + { + Name: "id", + Type: schema.TypeUUID, + }, + }, + } + target := &schema.Table{ + Name: "add_column", + Columns: []schema.Column{ + { + Name: "id", + Type: schema.TypeUUID, + }, + { + Name: "bool", + Type: schema.TypeBool, + }, + }, + } + p := newPlugin() + if err := testMigration(ctx, p, logger, spec, target, source, strategy.AddColumn); err != nil { + t.Fatalf("failed to migrate add_column: %v", err) + } + if err := p.Close(ctx); err != nil { + t.Fatal(err) + } }) - if err := p.Migrate(ctx, []*schema.Table{table}); err != nil { - return fmt.Errorf("failed to migrate table with new column: %w", err) - } - resource3 := createTestResources(table, sourceName, syncTime, 1)[0] - if err := p.writeOne(ctx, sourceSpec, []*schema.Table{table}, syncTime, resource3); err != nil { - return fmt.Errorf("failed to write one after column order change: %w", err) - } - resourcesRead, err = p.readAll(ctx, table, sourceName) - if err != nil { - return fmt.Errorf("failed to read all: %w", err) - } - if len(resourcesRead) != 3 { - return fmt.Errorf("expected 3 resources after third write, got %d", len(resourcesRead)) - } - // check that migration still succeeds when there is an extra column in the destination table, - // which should be ignored - oldTable := testdata.TestTable(tableName) - if err := p.Migrate(ctx, []*schema.Table{oldTable}); err != nil { - return fmt.Errorf("failed to migrate table with extra column in destination: %w", err) - } - resource4 := createTestResources(oldTable, sourceName, syncTime, 1)[0] - if err := p.writeOne(ctx, sourceSpec, []*schema.Table{oldTable}, syncTime, resource4); err != nil { - return fmt.Errorf("failed to write one after column order change: %w", err) - } - totalExpectedResources := 4 - if spec.MigrateMode == specs.MigrateModeForced { - table.Columns[len(table.Columns)-1].Type = schema.TypeString - if err := p.Migrate(ctx, []*schema.Table{table}); err != nil { - return fmt.Errorf("failed to migrate table with changed column type: %w", err) + t.Run("add_column_not_null", func(t *testing.T) { + if strategy.AddColumnNotNull == specs.MigrateModeForced && spec.MigrateMode == specs.MigrateModeSafe { + t.Skip("skipping as migrate mode is safe") + return } - resource5 := createTestResources(table, sourceName, syncTime, 1)[0] - if err := p.writeOne(ctx, sourceSpec, []*schema.Table{table}, syncTime, resource5); err != nil { - return fmt.Errorf("failed to write one after column type change: %w", err) + source := &schema.Table{ + Name: "add_column_not_null", + Columns: []schema.Column{ + { + Name: "id", + Type: schema.TypeUUID, + }, + }, } - totalExpectedResources++ - } + target := &schema.Table{ + Name: "add_column_not_null", + Columns: []schema.Column{ + { + Name: "id", + Type: schema.TypeUUID, + }, + { + Name: "bool", + Type: schema.TypeBool, + CreationOptions: schema.ColumnCreationOptions{ + NotNull: true, + }, + }, + }, + } + p := newPlugin() + if err := testMigration(ctx, p, logger, spec, target, source, strategy.AddColumnNotNull); err != nil { + t.Fatalf("failed to migrate add_column_not_null: %v", err) + } + if err := p.Close(ctx); err != nil { + t.Fatal(err) + } + }) - resourcesRead, err = p.readAll(ctx, oldTable, sourceName) - if err != nil { - return fmt.Errorf("failed to read all: %w", err) - } - if len(resourcesRead) != totalExpectedResources { - return fmt.Errorf("expected %d resources after fourth write, got %d", totalExpectedResources, len(resourcesRead)) - } - cqIDIndex := table.Columns.Index(schema.CqIDColumn.Name) - found := false - for _, r := range resourcesRead { - if !r[cqIDIndex].Equal(resource4.Data[cqIDIndex]) { - continue + t.Run("remove_column", func(t *testing.T) { + if strategy.RemoveColumn == specs.MigrateModeForced && spec.MigrateMode == specs.MigrateModeSafe { + t.Skip("skipping as migrate mode is safe") + return } - found = true - if !r.Equal(resource4.Data) { - return fmt.Errorf("expected resource to be equal to original resource, but got diff: %s", r.Diff(resource4.Data)) + source := &schema.Table{ + Name: "remove_column", + Columns: []schema.Column{ + { + Name: "id", + Type: schema.TypeUUID, + }, + { + Name: "bool", + Type: schema.TypeBool, + }, + }, } - } - if !found { - return fmt.Errorf("expected to find resource with cq_id %s, but none matched", resource4.Data[cqIDIndex]) - } + target := &schema.Table{ + Name: "remove_column", + Columns: []schema.Column{ + { + Name: "id", + Type: schema.TypeUUID, + }, + }, + } + p := newPlugin() + if err := testMigration(ctx, p, logger, spec, target, source, strategy.RemoveColumn); err != nil { + t.Fatalf("failed to migrate remove_column: %v", err) + } + if err := p.Close(ctx); err != nil { + t.Fatal(err) + } + }) + + t.Run("remove_column_not_null", func(t *testing.T) { + if strategy.RemoveColumnNotNull == specs.MigrateModeForced && spec.MigrateMode == specs.MigrateModeSafe { + t.Skip("skipping as migrate mode is safe") + return + } + source := &schema.Table{ + Name: "remove_column_not_null", + Columns: []schema.Column{ + { + Name: "id", + Type: schema.TypeUUID, + }, + { + Name: "bool", + Type: schema.TypeBool, + CreationOptions: schema.ColumnCreationOptions{ + NotNull: true, + }, + }, + }, + } + target := &schema.Table{ + Name: "remove_column_not_null", + Columns: []schema.Column{ + { + Name: "id", + Type: schema.TypeUUID, + }, + }, + } + p := newPlugin() + if err := testMigration(ctx, p, logger, spec, target, source, strategy.RemoveColumn); err != nil { + t.Fatalf("failed to migrate add_column: %v", err) + } + if err := p.Close(ctx); err != nil { + t.Fatal(err) + } + }) + + t.Run("change_column", func(t *testing.T) { + if strategy.ChangeColumn == specs.MigrateModeForced && spec.MigrateMode == specs.MigrateModeSafe { + t.Skip("skipping as migrate mode is safe") + return + } + source := &schema.Table{ + Name: "change_column", + Columns: []schema.Column{ + { + Name: "id", + Type: schema.TypeUUID, + }, + { + Name: "bool", + Type: schema.TypeBool, + }, + }, + } + target := &schema.Table{ + Name: "change_column", + Columns: []schema.Column{ + { + Name: "id", + Type: schema.TypeUUID, + }, + { + Name: "bool", + Type: schema.TypeString, + }, + }, + } + p := newPlugin() + if err := testMigration(ctx, p, logger, spec, target, source, strategy.RemoveColumn); err != nil { + t.Fatalf("failed to migrate add_column: %v", err) + } + if err := p.Close(ctx); err != nil { + t.Fatal(err) + } + }) return nil } diff --git a/schema/column.go b/schema/column.go index 424111708a..ef1ed1da51 100644 --- a/schema/column.go +++ b/schema/column.go @@ -44,7 +44,7 @@ type Column struct { IgnoreInTests bool `json:"-"` } -func (c *Column) String() string { +func (c Column) String() string { var sb strings.Builder sb.WriteString(c.Name) sb.WriteString(":") diff --git a/schema/table.go b/schema/table.go index 30346d9b53..407f209146 100644 --- a/schema/table.go +++ b/schema/table.go @@ -31,6 +31,22 @@ type SyncSummary struct { Panics uint64 } +type TableColumnChangeType int + +const ( + TableColumnChangeTypeUnknown TableColumnChangeType = iota + TableColumnChangeTypeAdd + TableColumnChangeTypeUpdate + TableColumnChangeTypeRemove +) + +type TableColumnChange struct { + Type TableColumnChangeType + ColumnName string + Current Column + Previous Column +} + type Table struct { // Name of table Name string `json:"name"` @@ -74,6 +90,23 @@ var ( reValidColumnName = regexp.MustCompile(`^[a-z_][a-z\d_]*$`) ) +func (t TableColumnChangeType) String() string { + switch t { + case TableColumnChangeTypeAdd: + return "add" + case TableColumnChangeTypeUpdate: + return "update" + case TableColumnChangeTypeRemove: + return "remove" + default: + return "unknown" + } +} + +func (t TableColumnChange) String() string { + return fmt.Sprintf("column: %s, type: %s, current: %s, previous: %s", t.ColumnName, t.Type, t.Current, t.Previous) +} + func (tt Tables) FilterDfsFunc(include, exclude func(*Table) bool, skipDependentTables bool) Tables { filteredTables := make(Tables, 0, len(tt)) for _, t := range tt { @@ -248,55 +281,50 @@ func (t *Table) ValidateName() error { return nil } -// GetAddedColumns returns a list of columns that are in this table but not in the other table. -func (t *Table) GetAddedColumns(other *Table) []Column { - var added []Column - for _, c := range t.Columns { - if other.Columns.Get(c.Name) == nil { - added = append(added, c) - } - } - return added -} +// added columns with no constraints - migrate // table.GetAddedColumns(other) +// added columns with not null - drop table // table.GetAddedConstraintColumns(other) +// added columns with unique - drop table -// GetChangedColumns returns a list of columns that are in this table but have different type in the other table. -// returns got, want -func (t *Table) GetChangedColumns(other *Table) (got ColumnList, want ColumnList) { +// changed columns: change type - drop table or drop/add column? // table.GetChangedColumns(other) +// changed columns: added constraint not null or unique - drop table // table.GetChangedColumnsWithAddedConstraints +// changed columns: removed constraint: not null, removed unique - drop constraint? do nothing? what if a user is adding a unique constraint? we will drop it? // table.GetChangedColumnsWithRemovedConstraints + +// removed columns: with no constraints - do nothing? +// removed columns: with constraints not null or unique - drop column? drop table ? // table.RemovedColumnsWithConstraints(other) + +// changed pks: drop table // table.IsPrimaryKeyEqual(other) + +func (t *Table) GetChanges(other *Table) []TableColumnChange { + var changes []TableColumnChange for _, c := range t.Columns { - otherCol := other.Columns.Get(c.Name) - if otherCol == nil { + otherColumn := other.Columns.Get(c.Name) + if otherColumn == nil { + changes = append(changes, TableColumnChange{ + Type: TableColumnChangeTypeAdd, + ColumnName: c.Name, + Current: c, + }) continue } - if c.Type != otherCol.Type { - got = append(got, c) - want = append(want, *otherCol) - } - if c.CreationOptions.NotNull != otherCol.CreationOptions.NotNull { - got = append(got, c) - want = append(want, *otherCol) - } - } - return got, want -} - -func (t *Table) IsPrimaryKeyEqual(other *Table) bool { - for _, c := range t.Columns { - if c.CreationOptions.PrimaryKey { - otherCol := other.Columns.Get(c.Name) - if otherCol == nil || !otherCol.CreationOptions.PrimaryKey { - return false - } + if c.Type != otherColumn.Type || c.CreationOptions.NotNull != otherColumn.CreationOptions.NotNull || c.CreationOptions.PrimaryKey != otherColumn.CreationOptions.PrimaryKey { + changes = append(changes, TableColumnChange{ + Type: TableColumnChangeTypeUpdate, + ColumnName: c.Name, + Current: c, + Previous: *otherColumn, + }) } } for _, c := range other.Columns { - if c.CreationOptions.PrimaryKey { - otherCol := t.Columns.Get(c.Name) - if otherCol == nil || !otherCol.CreationOptions.PrimaryKey { - return false - } + if t.Columns.Get(c.Name) == nil { + changes = append(changes, TableColumnChange{ + Type: TableColumnChangeTypeRemove, + ColumnName: c.Name, + Previous: c, + }) } } - return true + return changes } func (t *Table) ValidateDuplicateColumns() error { diff --git a/schema/table_test.go b/schema/table_test.go index b4e7b059da..8dabe5073d 100644 --- a/schema/table_test.go +++ b/schema/table_test.go @@ -244,84 +244,85 @@ func TestTablesFilterDFS(t *testing.T) { } } -var testTable1 = &Table{ - Name: "test", - Columns: []Column{ - {Name: "bool", Type: TypeBool}, - }, +type testTableGetChangeTestCase struct { + name string + target *Table + source *Table + expectedChanges []TableColumnChange } -var testTable2 = &Table{ - Name: "test", - Columns: []Column{ - {Name: "bool", Type: TypeBool}, - {Name: "bool1", Type: TypeBool}, +var testTableGetChangeTestCases = []testTableGetChangeTestCase{ + { + name: "no changes", + target: &Table{ + Name: "test", + Columns: []Column{ + {Name: "bool", Type: TypeBool}, + }, + }, + source: &Table{ + Name: "test", + Columns: []Column{ + {Name: "bool", Type: TypeBool}, + }, + }, + expectedChanges: nil, }, -} - -var testTable3 = &Table{ - Name: "test", - Columns: []Column{ - {Name: "bool", Type: TypeString}, + { + name: "add column", + target: &Table{ + Name: "test", + Columns: []Column{ + {Name: "bool", Type: TypeBool}, + {Name: "bool1", Type: TypeBool}, + }, + }, + source: &Table{ + Name: "test", + Columns: []Column{ + {Name: "bool", Type: TypeBool}, + }, + }, + expectedChanges: []TableColumnChange{ + { + Type: TableColumnChangeTypeAdd, + ColumnName: "bool1", + Current: Column{Name: "bool1", Type: TypeBool}, + }, + }, }, -} - -var testTable4 = &Table{ - Name: "test", - Columns: []Column{ - {Name: "bool", Type: TypeBool, CreationOptions: ColumnCreationOptions{PrimaryKey: true, NotNull: true}}, + { + name: "remove column", + target: &Table{ + Name: "test", + Columns: []Column{ + {Name: "bool", Type: TypeBool}, + }, + }, + source: &Table{ + Name: "test", + Columns: []Column{ + {Name: "bool", Type: TypeBool}, + {Name: "bool1", Type: TypeBool}, + }, + }, + expectedChanges: []TableColumnChange{ + { + Type: TableColumnChangeTypeRemove, + ColumnName: "bool1", + Previous: Column{Name: "bool1", Type: TypeBool}, + }, + }, }, } -func TestGetAddedColumns(t *testing.T) { - columns := testTable1.GetAddedColumns(testTable1) - if columns != nil { - t.Fatalf("got %v want nil", columns) - } - - columns = testTable2.GetAddedColumns(testTable1) - if len(columns) != 1 { - t.Fatalf("got %v want 1", columns) - } - if columns[0].Name != "bool1" { - t.Fatalf("got %v want bool1", columns[0].Name) - } -} - -func TestGetChangedColumns(t *testing.T) { - columns, _ := testTable1.GetChangedColumns(testTable1) - if columns != nil { - t.Fatalf("got %v want nil", columns) - } - - columns, got := testTable3.GetChangedColumns(testTable2) - if len(columns) != 1 { - t.Fatalf("got %v want 1", columns) - } - if columns[0].Name != "bool" { - t.Fatalf("got %v want bool", columns[0].Name) - } - if columns[0].Type != TypeString { - t.Fatalf("got %v want TypeString", columns[0].Type) - } - if got[0].Type != TypeBool { - t.Fatalf("got %v want TypeBool", got[0].Type) - } - - columns, _ = testTable4.GetChangedColumns(testTable2) - if len(columns) != 1 { - t.Fatalf("got %v want 1", columns) - } - if columns[0].Name != "bool" { - t.Fatalf("got %v want bool", columns[0].Name) - } -} - -func TestIsPkEqual(t *testing.T) { - if !testTable1.IsPrimaryKeyEqual(testTable1) { - t.Fatalf("got false want true") - } - if testTable4.IsPrimaryKeyEqual(testTable2) { - t.Fatalf("got true want false") +func TestTableGetChanges(t *testing.T) { + for _, tc := range testTableGetChangeTestCases { + t.Run(tc.name, func(t *testing.T) { + changes := tc.target.GetChanges(tc.source) + if diff := cmp.Diff(changes, tc.expectedChanges); diff != "" { + t.Errorf("diff (+got, -want): %v", diff) + } + }) } } From 0943a5c89a7d362c17854ea62392c77b1963d5fe Mon Sep 17 00:00:00 2001 From: Yevgeny Pats <16490766+yevgenypats@users.noreply.github.com> Date: Wed, 15 Feb 2023 21:19:30 +0200 Subject: [PATCH 02/15] fmt --- plugins/destination/plugin_testing.go | 16 +++------ plugins/destination/plugin_testing_migrate.go | 36 ++----------------- schema/table_test.go | 4 +-- 3 files changed, 8 insertions(+), 48 deletions(-) diff --git a/plugins/destination/plugin_testing.go b/plugins/destination/plugin_testing.go index 3bd6e3db6c..3ec7bb6931 100644 --- a/plugins/destination/plugin_testing.go +++ b/plugins/destination/plugin_testing.go @@ -129,9 +129,7 @@ func PluginTestSuiteRunner(t *testing.T, newPlugin NewPluginFunc, spec any, test } destSpec.WriteMode = specs.WriteModeOverwrite destSpec.Name = "test_migrate_overwrite" - if err := suite.destinationPluginTestMigrate(t, ctx, newPlugin, logger, destSpec, tests.MigrateStrategyOverwrite); err != nil { - t.Fatal(err) - } + suite.destinationPluginTestMigrate(ctx, t, newPlugin, logger, destSpec, tests.MigrateStrategyOverwrite) }) t.Run("TestMigrateOverwriteForce", func(t *testing.T) { @@ -142,9 +140,7 @@ func PluginTestSuiteRunner(t *testing.T, newPlugin NewPluginFunc, spec any, test destSpec.WriteMode = specs.WriteModeOverwrite destSpec.MigrateMode = specs.MigrateModeForced destSpec.Name = "test_migrate_overwrite_force" - if err := suite.destinationPluginTestMigrate(t, ctx, newPlugin, logger, destSpec, tests.MigrateStrategyOverwrite); err != nil { - t.Fatal(err) - } + suite.destinationPluginTestMigrate(ctx, t, newPlugin, logger, destSpec, tests.MigrateStrategyOverwrite) }) t.Run("TestWriteAppend", func(t *testing.T) { @@ -169,9 +165,7 @@ func PluginTestSuiteRunner(t *testing.T, newPlugin NewPluginFunc, spec any, test } destSpec.WriteMode = specs.WriteModeAppend destSpec.Name = "test_migrate_append" - if err := suite.destinationPluginTestMigrate(t, ctx, newPlugin, logger, destSpec, tests.MigrateStrategyAppend); err != nil { - t.Fatal(err) - } + suite.destinationPluginTestMigrate(ctx, t, newPlugin, logger, destSpec, tests.MigrateStrategyAppend) }) t.Run("TestMigrateAppendForce", func(t *testing.T) { @@ -182,9 +176,7 @@ func PluginTestSuiteRunner(t *testing.T, newPlugin NewPluginFunc, spec any, test destSpec.WriteMode = specs.WriteModeAppend destSpec.MigrateMode = specs.MigrateModeForced destSpec.Name = "test_migrate_append_force" - if err := suite.destinationPluginTestMigrate(t, ctx, newPlugin, logger, destSpec, tests.MigrateStrategyAppend); err != nil { - t.Fatal(err) - } + suite.destinationPluginTestMigrate(ctx, t, newPlugin, logger, destSpec, tests.MigrateStrategyAppend) }) } diff --git a/plugins/destination/plugin_testing_migrate.go b/plugins/destination/plugin_testing_migrate.go index dec9074c58..810dc951ec 100644 --- a/plugins/destination/plugin_testing_migrate.go +++ b/plugins/destination/plugin_testing_migrate.go @@ -11,36 +11,6 @@ import ( "github.com/rs/zerolog" ) -var tableTest1 = &schema.Table{ - Name: "test1", - Columns: []schema.Column{ - { - Name: "id", - Type: schema.TypeUUID, - CreationOptions: schema.ColumnCreationOptions{ - PrimaryKey: true, - }, - }, - }, -} - -var tableTest2 = &schema.Table{ - Name: "test1", - Columns: []schema.Column{ - { - Name: "id", - Type: schema.TypeUUID, - CreationOptions: schema.ColumnCreationOptions{ - PrimaryKey: true, - }, - }, - { - Name: "bool", - Type: schema.TypeBool, - }, - }, -} - func testMigration(ctx context.Context, p *Plugin, logger zerolog.Logger, spec specs.Destination, target *schema.Table, source *schema.Table, mode specs.MigrateMode) error { if err := p.Init(ctx, logger, spec); err != nil { return fmt.Errorf("failed to init plugin: %w", err) @@ -94,13 +64,13 @@ func testMigration(ctx context.Context, p *Plugin, logger zerolog.Logger, spec s } func (*PluginTestSuite) destinationPluginTestMigrate( - t *testing.T, ctx context.Context, + t *testing.T, newPlugin NewPluginFunc, logger zerolog.Logger, spec specs.Destination, strategy MigrateStrategy, -) error { +) { spec.BatchSize = 1 t.Run("add_column", func(t *testing.T) { @@ -292,6 +262,4 @@ func (*PluginTestSuite) destinationPluginTestMigrate( t.Fatal(err) } }) - - return nil } diff --git a/schema/table_test.go b/schema/table_test.go index 8dabe5073d..5b5df67f59 100644 --- a/schema/table_test.go +++ b/schema/table_test.go @@ -246,8 +246,8 @@ func TestTablesFilterDFS(t *testing.T) { type testTableGetChangeTestCase struct { name string - target *Table - source *Table + target *Table + source *Table expectedChanges []TableColumnChange } From 0c21e454d227526e339cc4b75b6e72dbe28da7c2 Mon Sep 17 00:00:00 2001 From: Yevgeny Pats <16490766+yevgenypats@users.noreply.github.com> Date: Wed, 15 Feb 2023 21:45:34 +0200 Subject: [PATCH 03/15] more fixes --- plugins/destination/plugin_testing_migrate.go | 37 ++++++++++++------- 1 file changed, 24 insertions(+), 13 deletions(-) diff --git a/plugins/destination/plugin_testing_migrate.go b/plugins/destination/plugin_testing_migrate.go index 810dc951ec..290e81d7ba 100644 --- a/plugins/destination/plugin_testing_migrate.go +++ b/plugins/destination/plugin_testing_migrate.go @@ -3,14 +3,20 @@ package destination import ( "context" "fmt" + "strings" "testing" "time" "github.com/cloudquery/plugin-sdk/schema" "github.com/cloudquery/plugin-sdk/specs" + "github.com/google/uuid" "github.com/rs/zerolog" ) +func tableUUIDSuffix() string { + return strings.ReplaceAll(uuid.NewString(), "-", "_") +} + func testMigration(ctx context.Context, p *Plugin, logger zerolog.Logger, spec specs.Destination, target *schema.Table, source *schema.Table, mode specs.MigrateMode) error { if err := p.Init(ctx, logger, spec); err != nil { return fmt.Errorf("failed to init plugin: %w", err) @@ -78,8 +84,9 @@ func (*PluginTestSuite) destinationPluginTestMigrate( t.Skip("skipping as migrate mode is safe") return } + tableName := "add_column_" + tableUUIDSuffix() source := &schema.Table{ - Name: "add_column", + Name: tableName, Columns: []schema.Column{ { Name: "id", @@ -88,7 +95,7 @@ func (*PluginTestSuite) destinationPluginTestMigrate( }, } target := &schema.Table{ - Name: "add_column", + Name: tableName, Columns: []schema.Column{ { Name: "id", @@ -102,7 +109,7 @@ func (*PluginTestSuite) destinationPluginTestMigrate( } p := newPlugin() if err := testMigration(ctx, p, logger, spec, target, source, strategy.AddColumn); err != nil { - t.Fatalf("failed to migrate add_column: %v", err) + t.Fatalf("failed to migrate %s: %v", tableName, err) } if err := p.Close(ctx); err != nil { t.Fatal(err) @@ -114,8 +121,9 @@ func (*PluginTestSuite) destinationPluginTestMigrate( t.Skip("skipping as migrate mode is safe") return } + tableName := "add_column_not_null_" + tableUUIDSuffix() source := &schema.Table{ - Name: "add_column_not_null", + Name: tableName, Columns: []schema.Column{ { Name: "id", @@ -124,7 +132,7 @@ func (*PluginTestSuite) destinationPluginTestMigrate( }, } target := &schema.Table{ - Name: "add_column_not_null", + Name: tableName, Columns: []schema.Column{ { Name: "id", @@ -153,8 +161,9 @@ func (*PluginTestSuite) destinationPluginTestMigrate( t.Skip("skipping as migrate mode is safe") return } + tableName := "remove_column_" + tableUUIDSuffix() source := &schema.Table{ - Name: "remove_column", + Name: tableName, Columns: []schema.Column{ { Name: "id", @@ -167,7 +176,7 @@ func (*PluginTestSuite) destinationPluginTestMigrate( }, } target := &schema.Table{ - Name: "remove_column", + Name: tableName, Columns: []schema.Column{ { Name: "id", @@ -189,8 +198,9 @@ func (*PluginTestSuite) destinationPluginTestMigrate( t.Skip("skipping as migrate mode is safe") return } + tableName := "remove_column_not_null_" + tableUUIDSuffix() source := &schema.Table{ - Name: "remove_column_not_null", + Name: tableName, Columns: []schema.Column{ { Name: "id", @@ -206,7 +216,7 @@ func (*PluginTestSuite) destinationPluginTestMigrate( }, } target := &schema.Table{ - Name: "remove_column_not_null", + Name: tableName, Columns: []schema.Column{ { Name: "id", @@ -215,7 +225,7 @@ func (*PluginTestSuite) destinationPluginTestMigrate( }, } p := newPlugin() - if err := testMigration(ctx, p, logger, spec, target, source, strategy.RemoveColumn); err != nil { + if err := testMigration(ctx, p, logger, spec, target, source, strategy.RemoveColumnNotNull); err != nil { t.Fatalf("failed to migrate add_column: %v", err) } if err := p.Close(ctx); err != nil { @@ -228,8 +238,9 @@ func (*PluginTestSuite) destinationPluginTestMigrate( t.Skip("skipping as migrate mode is safe") return } + tableName := "change_column_" + tableUUIDSuffix() source := &schema.Table{ - Name: "change_column", + Name: tableName, Columns: []schema.Column{ { Name: "id", @@ -242,7 +253,7 @@ func (*PluginTestSuite) destinationPluginTestMigrate( }, } target := &schema.Table{ - Name: "change_column", + Name: tableName, Columns: []schema.Column{ { Name: "id", @@ -255,7 +266,7 @@ func (*PluginTestSuite) destinationPluginTestMigrate( }, } p := newPlugin() - if err := testMigration(ctx, p, logger, spec, target, source, strategy.RemoveColumn); err != nil { + if err := testMigration(ctx, p, logger, spec, target, source, strategy.ChangeColumn); err != nil { t.Fatalf("failed to migrate add_column: %v", err) } if err := p.Close(ctx); err != nil { From a3326de7a0f8fecfe916f3aa5c8645a7363050a6 Mon Sep 17 00:00:00 2001 From: Yevgeny Pats <16490766+yevgenypats@users.noreply.github.com> Date: Wed, 15 Feb 2023 22:45:10 +0200 Subject: [PATCH 04/15] more fixes --- plugins/destination/plugin_testing_migrate.go | 13 ++++++++++--- 1 file changed, 10 insertions(+), 3 deletions(-) diff --git a/plugins/destination/plugin_testing_migrate.go b/plugins/destination/plugin_testing_migrate.go index 290e81d7ba..3a62ef6cec 100644 --- a/plugins/destination/plugin_testing_migrate.go +++ b/plugins/destination/plugin_testing_migrate.go @@ -21,6 +21,16 @@ func testMigration(ctx context.Context, p *Plugin, logger zerolog.Logger, spec s if err := p.Init(ctx, logger, spec); err != nil { return fmt.Errorf("failed to init plugin: %w", err) } + + source.Columns = append(schema.ColumnList{ + schema.CqSourceNameColumn, + schema.CqSyncTimeColumn, + }, source.Columns...) + target.Columns = append(schema.ColumnList{ + schema.CqSourceNameColumn, + schema.CqSyncTimeColumn, + }, target.Columns...) + if err := p.Migrate(ctx, []*schema.Table{source}); err != nil { return fmt.Errorf("failed to migrate tables: %w", err) } @@ -51,9 +61,6 @@ func testMigration(ctx context.Context, p *Plugin, logger zerolog.Logger, spec s if len(resourcesRead) != 2 { return fmt.Errorf("expected 2 resources after write, got %d", len(resourcesRead)) } - if diff := resourcesRead[0].Diff(resource1.Data); diff != "" { - return fmt.Errorf("resource1 diff: %s", diff) - } if diff := resourcesRead[1].Diff(resource2.Data); diff != "" { return fmt.Errorf("resource2 diff: %s", diff) } From 99cc560a6eb5a561cca17ef7aee94157d7efbd2b Mon Sep 17 00:00:00 2001 From: Yevgeny Pats <16490766+yevgenypats@users.noreply.github.com> Date: Wed, 15 Feb 2023 22:47:44 +0200 Subject: [PATCH 05/15] fix lint --- plugins/destination/plugin_testing_migrate.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/plugins/destination/plugin_testing_migrate.go b/plugins/destination/plugin_testing_migrate.go index 3a62ef6cec..38ad851e93 100644 --- a/plugins/destination/plugin_testing_migrate.go +++ b/plugins/destination/plugin_testing_migrate.go @@ -28,7 +28,7 @@ func testMigration(ctx context.Context, p *Plugin, logger zerolog.Logger, spec s }, source.Columns...) target.Columns = append(schema.ColumnList{ schema.CqSourceNameColumn, - schema.CqSyncTimeColumn, + schema.CqSyncTimeColumn, }, target.Columns...) if err := p.Migrate(ctx, []*schema.Table{source}); err != nil { From f5e4cec2a2a212e8e3340642f444ec805bde95c3 Mon Sep 17 00:00:00 2001 From: Yevgeny Pats Date: Thu, 16 Feb 2023 12:54:17 +0200 Subject: [PATCH 06/15] Update internal/memdb/memdb_test.go Co-authored-by: Erez Rokah --- internal/memdb/memdb_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal/memdb/memdb_test.go b/internal/memdb/memdb_test.go index 77c8001cf7..cabbda561d 100644 --- a/internal/memdb/memdb_test.go +++ b/internal/memdb/memdb_test.go @@ -38,7 +38,7 @@ func TestPluginUnmanagedClient(t *testing.T) { nil, destination.PluginTestSuiteTests{ MigrateStrategyOverwrite: migrateStrategyOverwrite, - MigrateStrategyAppend: migrateStrategyOverwrite, + MigrateStrategyAppend: migrateStrategyAppend, }, ) } From 5ee41b2937cb780560b1c8571cb73150d62ecb62 Mon Sep 17 00:00:00 2001 From: Yevgeny Pats Date: Thu, 16 Feb 2023 12:54:25 +0200 Subject: [PATCH 07/15] Update internal/memdb/memdb_test.go Co-authored-by: Erez Rokah --- internal/memdb/memdb_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal/memdb/memdb_test.go b/internal/memdb/memdb_test.go index cabbda561d..ce184f343d 100644 --- a/internal/memdb/memdb_test.go +++ b/internal/memdb/memdb_test.go @@ -51,7 +51,7 @@ func TestPluginManagedClient(t *testing.T) { nil, destination.PluginTestSuiteTests{ MigrateStrategyOverwrite: migrateStrategyOverwrite, - MigrateStrategyAppend: migrateStrategyOverwrite, + MigrateStrategyAppend: migrateStrategyAppend, }) } From f81a0be79bbed1426bdf4e77f528eae4e00db74c Mon Sep 17 00:00:00 2001 From: Yevgeny Pats Date: Thu, 16 Feb 2023 12:54:33 +0200 Subject: [PATCH 08/15] Update internal/memdb/memdb_test.go Co-authored-by: Erez Rokah --- internal/memdb/memdb_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal/memdb/memdb_test.go b/internal/memdb/memdb_test.go index ce184f343d..21591a5fbd 100644 --- a/internal/memdb/memdb_test.go +++ b/internal/memdb/memdb_test.go @@ -64,7 +64,7 @@ func TestPluginManagedClientWithSmallBatchSize(t *testing.T) { }, nil, destination.PluginTestSuiteTests{ MigrateStrategyOverwrite: migrateStrategyOverwrite, - MigrateStrategyAppend: migrateStrategyOverwrite, + MigrateStrategyAppend: migrateStrategyAppend, }) } From 8ca2cd0b2f4b72594359eaa74cb54688b6476d48 Mon Sep 17 00:00:00 2001 From: Yevgeny Pats Date: Thu, 16 Feb 2023 12:54:45 +0200 Subject: [PATCH 09/15] Update internal/memdb/memdb_test.go Co-authored-by: Erez Rokah --- internal/memdb/memdb_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal/memdb/memdb_test.go b/internal/memdb/memdb_test.go index 21591a5fbd..8a1d66d179 100644 --- a/internal/memdb/memdb_test.go +++ b/internal/memdb/memdb_test.go @@ -78,7 +78,7 @@ func TestPluginManagedClientWithLargeBatchSize(t *testing.T) { nil, destination.PluginTestSuiteTests{ MigrateStrategyOverwrite: migrateStrategyOverwrite, - MigrateStrategyAppend: migrateStrategyOverwrite, + MigrateStrategyAppend: migrateStrategyAppend, }) } From 6ffffc807170dfb07c6285094cd83a5018ecb1ef Mon Sep 17 00:00:00 2001 From: Yevgeny Pats <16490766+yevgenypats@users.noreply.github.com> Date: Thu, 16 Feb 2023 13:00:21 +0200 Subject: [PATCH 10/15] review fix --- schema/table.go | 20 ++++---------------- 1 file changed, 4 insertions(+), 16 deletions(-) diff --git a/schema/table.go b/schema/table.go index 407f209146..126c7c71fd 100644 --- a/schema/table.go +++ b/schema/table.go @@ -281,23 +281,11 @@ func (t *Table) ValidateName() error { return nil } -// added columns with no constraints - migrate // table.GetAddedColumns(other) -// added columns with not null - drop table // table.GetAddedConstraintColumns(other) -// added columns with unique - drop table - -// changed columns: change type - drop table or drop/add column? // table.GetChangedColumns(other) -// changed columns: added constraint not null or unique - drop table // table.GetChangedColumnsWithAddedConstraints -// changed columns: removed constraint: not null, removed unique - drop constraint? do nothing? what if a user is adding a unique constraint? we will drop it? // table.GetChangedColumnsWithRemovedConstraints - -// removed columns: with no constraints - do nothing? -// removed columns: with constraints not null or unique - drop column? drop table ? // table.RemovedColumnsWithConstraints(other) - -// changed pks: drop table // table.IsPrimaryKeyEqual(other) - -func (t *Table) GetChanges(other *Table) []TableColumnChange { +// Get Changes returns changes between two tables when t is the new one and old is the old one. +func (t *Table) GetChanges(old *Table) []TableColumnChange { var changes []TableColumnChange for _, c := range t.Columns { - otherColumn := other.Columns.Get(c.Name) + otherColumn := old.Columns.Get(c.Name) if otherColumn == nil { changes = append(changes, TableColumnChange{ Type: TableColumnChangeTypeAdd, @@ -315,7 +303,7 @@ func (t *Table) GetChanges(other *Table) []TableColumnChange { }) } } - for _, c := range other.Columns { + for _, c := range old.Columns { if t.Columns.Get(c.Name) == nil { changes = append(changes, TableColumnChange{ Type: TableColumnChangeTypeRemove, From d6ae733bd95583c01e699d128281c421370b13d1 Mon Sep 17 00:00:00 2001 From: Yevgeny Pats <16490766+yevgenypats@users.noreply.github.com> Date: Thu, 16 Feb 2023 13:23:50 +0200 Subject: [PATCH 11/15] fix review --- plugins/destination/plugin_testing.go | 7 ------- 1 file changed, 7 deletions(-) diff --git a/plugins/destination/plugin_testing.go b/plugins/destination/plugin_testing.go index 3ec7bb6931..16ef711ef0 100644 --- a/plugins/destination/plugin_testing.go +++ b/plugins/destination/plugin_testing.go @@ -17,13 +17,6 @@ type PluginTestSuite struct { tests PluginTestSuiteTests } -// type MigrateResult int - -// const ( -// MigrateResultSuccess MigrateResult = iota -// MigrateResultDropTable -// ) - // MigrateStrategy defines which tests we should include // true means test should succeed for either automigrate type MigrateStrategy struct { From 8f4d92b8426e55264cc1a322b1c90f70ac6aca0f Mon Sep 17 00:00:00 2001 From: Yevgeny Pats Date: Thu, 16 Feb 2023 13:25:00 +0200 Subject: [PATCH 12/15] Update plugins/destination/plugin_testing.go Co-authored-by: Herman Schaaf --- plugins/destination/plugin_testing.go | 1 - 1 file changed, 1 deletion(-) diff --git a/plugins/destination/plugin_testing.go b/plugins/destination/plugin_testing.go index 16ef711ef0..43aaa1b372 100644 --- a/plugins/destination/plugin_testing.go +++ b/plugins/destination/plugin_testing.go @@ -18,7 +18,6 @@ type PluginTestSuite struct { } // MigrateStrategy defines which tests we should include -// true means test should succeed for either automigrate type MigrateStrategy struct { AddColumn specs.MigrateMode AddColumnNotNull specs.MigrateMode From 7b779bd502d74dfce1fef0a775f1bcbed474c95a Mon Sep 17 00:00:00 2001 From: Yevgeny Pats Date: Thu, 16 Feb 2023 13:31:05 +0200 Subject: [PATCH 13/15] Update plugins/destination/plugin_testing_migrate.go Co-authored-by: Herman Schaaf --- plugins/destination/plugin_testing_migrate.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/plugins/destination/plugin_testing_migrate.go b/plugins/destination/plugin_testing_migrate.go index 38ad851e93..08e9284de4 100644 --- a/plugins/destination/plugin_testing_migrate.go +++ b/plugins/destination/plugin_testing_migrate.go @@ -66,7 +66,7 @@ func testMigration(ctx context.Context, p *Plugin, logger zerolog.Logger, spec s } } else { if len(resourcesRead) != 1 { - return fmt.Errorf("expected 1 resources after write, got %d", len(resourcesRead)) + return fmt.Errorf("expected 1 resource after write, got %d", len(resourcesRead)) } if diff := resourcesRead[0].Diff(resource2.Data); diff != "" { return fmt.Errorf("resource1 diff: %s", diff) From 5a0cc5a6e774d34bcd92c5e14918d1255c4351fe Mon Sep 17 00:00:00 2001 From: Yevgeny Pats Date: Thu, 16 Feb 2023 13:38:38 +0200 Subject: [PATCH 14/15] Update plugins/destination/plugin_testing_migrate.go Co-authored-by: Herman Schaaf --- plugins/destination/plugin_testing_migrate.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/plugins/destination/plugin_testing_migrate.go b/plugins/destination/plugin_testing_migrate.go index 08e9284de4..2e16facd86 100644 --- a/plugins/destination/plugin_testing_migrate.go +++ b/plugins/destination/plugin_testing_migrate.go @@ -274,7 +274,7 @@ func (*PluginTestSuite) destinationPluginTestMigrate( } p := newPlugin() if err := testMigration(ctx, p, logger, spec, target, source, strategy.ChangeColumn); err != nil { - t.Fatalf("failed to migrate add_column: %v", err) + t.Fatalf("failed to migrate change_column: %v", err) } if err := p.Close(ctx); err != nil { t.Fatal(err) From 57704b84fbae7325b858bc4fe8796d7bf7f86f02 Mon Sep 17 00:00:00 2001 From: Yevgeny Pats Date: Thu, 16 Feb 2023 13:38:46 +0200 Subject: [PATCH 15/15] Update plugins/destination/plugin_testing_migrate.go Co-authored-by: Herman Schaaf --- plugins/destination/plugin_testing_migrate.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/plugins/destination/plugin_testing_migrate.go b/plugins/destination/plugin_testing_migrate.go index 2e16facd86..5f691cdde8 100644 --- a/plugins/destination/plugin_testing_migrate.go +++ b/plugins/destination/plugin_testing_migrate.go @@ -233,7 +233,7 @@ func (*PluginTestSuite) destinationPluginTestMigrate( } p := newPlugin() if err := testMigration(ctx, p, logger, spec, target, source, strategy.RemoveColumnNotNull); err != nil { - t.Fatalf("failed to migrate add_column: %v", err) + t.Fatalf("failed to migrate remove_column_not_null: %v", err) } if err := p.Close(ctx); err != nil { t.Fatal(err)