diff --git a/plugins/destination/plugin_testing.go b/plugins/destination/plugin_testing.go index 8d49de2e19..6dbeef00db 100644 --- a/plugins/destination/plugin_testing.go +++ b/plugins/destination/plugin_testing.go @@ -2,18 +2,14 @@ package destination import ( "context" - "fmt" "os" "sort" - "strings" "testing" "time" - "github.com/cloudquery/plugin-sdk/caser" "github.com/cloudquery/plugin-sdk/schema" "github.com/cloudquery/plugin-sdk/specs" "github.com/cloudquery/plugin-sdk/testdata" - "github.com/google/uuid" "github.com/rs/zerolog" ) @@ -43,371 +39,15 @@ type PluginTestSuiteTests struct { // data is appended, then the column is removed and more data appended, checking that the migrations handle // this correctly. SkipMigrateAppend bool + // SkipMigrateAppendForce skips a test for the migrate function where a column is changed in force mode + SkipMigrateAppendForce bool // SkipMigrateOverwrite skips a test for the migrate function where a column is added, // data is appended, then the column is removed and more data overwritten, checking that the migrations handle // this correctly. SkipMigrateOverwrite bool -} - -func (*PluginTestSuite) destinationPluginTestWriteOverwrite(ctx context.Context, p *Plugin, logger zerolog.Logger, spec specs.Destination) error { - spec.WriteMode = specs.WriteModeOverwrite - if err := p.Init(ctx, logger, spec); err != nil { - return fmt.Errorf("failed to init plugin: %w", err) - } - tableName := "cq_test_write_overwrite" - table := testdata.TestTable(tableName) - syncTime := time.Now().UTC().Round(1 * time.Second) - tables := []*schema.Table{ - table, - } - if err := p.Migrate(ctx, tables); err != nil { - return fmt.Errorf("failed to migrate tables: %w", err) - } - - sourceName := "testOverwriteSource" + uuid.NewString() - sourceSpec := specs.Source{ - Name: sourceName, - } - - resources := createTestResources(table, sourceName, syncTime, 2) - if err := p.writeAll(ctx, sourceSpec, tables, syncTime, resources); err != nil { - return fmt.Errorf("failed to write all: %w", err) - } - sortResources(table, resources) - - resourcesRead, err := p.readAll(ctx, table, sourceName) - if err != nil { - return fmt.Errorf("failed to read all: %w", err) - } - sortCQTypes(table, resourcesRead) - - if len(resourcesRead) != 2 { - return fmt.Errorf("expected 2 resources, got %d", len(resourcesRead)) - } - - if diff := resources[0].Data.Diff(resourcesRead[0]); diff != "" { - return fmt.Errorf("expected first resource diff: %s", diff) - } - - if diff := resources[1].Data.Diff(resourcesRead[1]); diff != "" { - return fmt.Errorf("expected second resource diff: %s", diff) - } - - secondSyncTime := syncTime.Add(time.Second).UTC() - - // copy first resource but update the sync time - updatedResource := schema.DestinationResource{ - TableName: table.Name, - Data: make(schema.CQTypes, len(resources[0].Data)), - } - copy(updatedResource.Data, resources[0].Data) - _ = updatedResource.Data[1].Set(secondSyncTime) - - // write second time - if err := p.writeOne(ctx, sourceSpec, tables, secondSyncTime, updatedResource); err != nil { - return fmt.Errorf("failed to write one second time: %w", err) - } - - resourcesRead, err = p.readAll(ctx, table, sourceName) - if err != nil { - return fmt.Errorf("failed to read all second time: %w", err) - } - sortCQTypes(table, resourcesRead) - - if len(resourcesRead) != 2 { - return fmt.Errorf("after overwrite expected 2 resources, got %d", len(resourcesRead)) - } - - if diff := resources[1].Data.Diff(resourcesRead[0]); diff != "" { - return fmt.Errorf("after overwrite expected first resource diff: %s", diff) - } - - if diff := updatedResource.Data.Diff(resourcesRead[1]); diff != "" { - return fmt.Errorf("after overwrite expected second resource diff: %s", diff) - } - - return nil -} - -func (*PluginTestSuite) destinationPluginTestWriteOverwriteDeleteStale(ctx context.Context, p *Plugin, logger zerolog.Logger, spec specs.Destination) error { - spec.WriteMode = specs.WriteModeOverwriteDeleteStale - if err := p.Init(ctx, logger, spec); err != nil { - return fmt.Errorf("failed to init plugin: %w", err) - } - tableName := "cq_test_write_overwrite_delete_stale" - table := testdata.TestTable(tableName) - incTable := testdata.TestTable(tableName + "_incremental") - incTable.IsIncremental = true - syncTime := time.Now().UTC().Round(1 * time.Second) - tables := []*schema.Table{ - table, - incTable, - } - if err := p.Migrate(ctx, tables); err != nil { - return fmt.Errorf("failed to migrate tables: %w", err) - } - - sourceName := "testOverwriteSource" + uuid.NewString() - sourceSpec := specs.Source{ - Name: sourceName, - Backend: specs.BackendLocal, - } - - resources := createTestResources(table, sourceName, syncTime, 2) - incResources := createTestResources(incTable, sourceName, syncTime, 2) - if err := p.writeAll(ctx, sourceSpec, tables, syncTime, append(resources, incResources...)); err != nil { - return fmt.Errorf("failed to write all: %w", err) - } - sortResources(table, resources) - - resourcesRead, err := p.readAll(ctx, table, sourceName) - if err != nil { - return fmt.Errorf("failed to read all: %w", err) - } - sortCQTypes(table, resourcesRead) - - if len(resourcesRead) != 2 { - return fmt.Errorf("expected 2 resources, got %d", len(resourcesRead)) - } - - if diff := resources[0].Data.Diff(resourcesRead[0]); diff != "" { - return fmt.Errorf("expected first resource diff: %s", diff) - } - - if diff := resources[1].Data.Diff(resourcesRead[1]); diff != "" { - return fmt.Errorf("expected second resource diff: %s", diff) - } - - // read from incremental table - resourcesRead, err = p.readAll(ctx, incTable, sourceName) - if err != nil { - return fmt.Errorf("failed to read all: %w", err) - } - if len(resourcesRead) != 2 { - return fmt.Errorf("expected 2 resources in incremental table, got %d", len(resourcesRead)) - } - - secondSyncTime := syncTime.Add(time.Second).UTC() - - // copy first resource but update the sync time - updatedResource := schema.DestinationResource{ - TableName: table.Name, - Data: make(schema.CQTypes, len(resources[0].Data)), - } - copy(updatedResource.Data, resources[0].Data) - _ = updatedResource.Data[1].Set(secondSyncTime) - - // write second time - if err := p.writeOne(ctx, sourceSpec, tables, secondSyncTime, updatedResource); err != nil { - return fmt.Errorf("failed to write one second time: %w", err) - } - - resourcesRead, err = p.readAll(ctx, table, sourceName) - if err != nil { - return fmt.Errorf("failed to read all second time: %w", err) - } - sortCQTypes(table, resourcesRead) - if len(resourcesRead) != 1 { - return fmt.Errorf("after overwrite expected 1 resource, got %d", len(resourcesRead)) - } - - if diff := resources[0].Data.Diff(resourcesRead[0]); diff != "" { - return fmt.Errorf("after overwrite expected first resource diff: %s", diff) - } - - resourcesRead, err = p.readAll(ctx, tables[0], sourceName) - if err != nil { - return fmt.Errorf("failed to read all second time: %w", err) - } - if len(resourcesRead) != 1 { - return fmt.Errorf("expected 1 resource after delete stale, got %d", len(resourcesRead)) - } - - // we expect the only resource returned to match the updated resource we wrote - if diff := updatedResource.Data.Diff(resourcesRead[0]); diff != "" { - return fmt.Errorf("after delete stale expected resource diff: %s", diff) - } - - // we expect the incremental table to still have 2 resources, because delete-stale should - // not apply there - resourcesRead, err = p.readAll(ctx, tables[1], sourceName) - if err != nil { - return fmt.Errorf("failed to read all from incremental table: %w", err) - } - if len(resourcesRead) != 2 { - return fmt.Errorf("expected 2 resources in incremental table after delete-stale, got %d", len(resourcesRead)) - } - - return nil -} - -func (s *PluginTestSuite) destinationPluginTestWriteAppend(ctx context.Context, p *Plugin, logger zerolog.Logger, spec specs.Destination) error { - spec.WriteMode = specs.WriteModeAppend - if err := p.Init(ctx, logger, spec); err != nil { - return fmt.Errorf("failed to init plugin: %w", err) - } - tableName := "cq_test_write_append" - table := testdata.TestTable(tableName) - syncTime := time.Now().UTC().Round(1 * time.Second) - tables := []*schema.Table{ - table, - } - if err := p.Migrate(ctx, tables); err != nil { - return fmt.Errorf("failed to migrate tables: %w", err) - } - - resources := make([]schema.DestinationResource, 2) - sourceName := "testAppendSource" + uuid.NewString() - specSource := specs.Source{ - Name: sourceName, - } - resources[0] = createTestResources(table, sourceName, syncTime, 1)[0] - if err := p.writeOne(ctx, specSource, tables, syncTime, resources[0]); err != nil { - return fmt.Errorf("failed to write one second time: %w", err) - } - - secondSyncTime := syncTime.Add(10 * time.Second).UTC() - resources[1] = createTestResources(table, sourceName, secondSyncTime, 1)[0] - sortResources(table, resources) - - if !s.tests.SkipSecondAppend { - // write second time - if err := p.writeOne(ctx, specSource, tables, secondSyncTime, resources[1]); err != nil { - return fmt.Errorf("failed to write one second time: %w", err) - } - } - - resourcesRead, err := p.readAll(ctx, tables[0], sourceName) - if err != nil { - return fmt.Errorf("failed to read all second time: %w", err) - } - sortCQTypes(table, resourcesRead) - - expectedResource := 2 - if s.tests.SkipSecondAppend { - expectedResource = 1 - } - - if len(resourcesRead) != expectedResource { - return fmt.Errorf("expected %d resources, got %d", expectedResource, len(resourcesRead)) - } - - if diff := resources[0].Data.Diff(resourcesRead[0]); diff != "" { - return fmt.Errorf("first expected resource diff: %s", diff) - } - - if !s.tests.SkipSecondAppend { - if diff := resources[1].Data.Diff(resourcesRead[1]); diff != "" { - return fmt.Errorf("second expected resource diff: %s", diff) - } - } - - return nil -} - -func (*PluginTestSuite) destinationPluginTestMigrate( - ctx context.Context, - p *Plugin, - logger zerolog.Logger, - spec specs.Destination, - mode specs.WriteMode, -) error { - spec.WriteMode = mode - spec.BatchSize = 1 - if err := p.Init(ctx, logger, spec); err != nil { - return fmt.Errorf("failed to init plugin: %w", err) - } - suffix := strings.ToLower(strings.ReplaceAll(mode.String(), "-", "_")) - tableName := "cq_test_migrate_" + suffix - table := testdata.TestTable(tableName) - if err := p.Migrate(ctx, []*schema.Table{table}); err != nil { - return fmt.Errorf("failed to migrate tables: %w", err) - } - - sourceName := "testMigrate" + caser.New().ToPascal(suffix) + "Source" + uuid.NewString() - sourceSpec := specs.Source{ - Name: sourceName, - } - syncTime := time.Now().UTC().Round(1 * time.Second) - resource1 := createTestResources(table, sourceName, syncTime, 1)[0] - if err := p.writeOne(ctx, sourceSpec, []*schema.Table{table}, syncTime, resource1); err != nil { - return fmt.Errorf("failed to write one: %w", err) - } - - // check that migrations and writes still succeed when column ordering is changed - a := table.Columns.Index("uuid") - b := table.Columns.Index("float") - table.Columns[a], table.Columns[b] = table.Columns[b], table.Columns[a] - if err := p.Migrate(ctx, []*schema.Table{table}); err != nil { - return fmt.Errorf("failed to migrate table with changed column ordering: %w", err) - } - resource2 := createTestResources(table, sourceName, syncTime, 1)[0] - if err := p.writeOne(ctx, sourceSpec, []*schema.Table{table}, syncTime, resource2); err != nil { - return fmt.Errorf("failed to write one after column order change: %w", err) - } - - resourcesRead, err := p.readAll(ctx, table, sourceName) - if err != nil { - return fmt.Errorf("failed to read all: %w", err) - } - if len(resourcesRead) != 2 { - return fmt.Errorf("expected 2 resources after second write, got %d", len(resourcesRead)) - } - - // check that migrations succeed when a new column is added - table.Columns = append(table.Columns, schema.Column{ - Name: "new_column", - Type: schema.TypeInt, - }) - if err := p.Migrate(ctx, []*schema.Table{table}); err != nil { - return fmt.Errorf("failed to migrate table with new column: %w", err) - } - resource3 := createTestResources(table, sourceName, syncTime, 1)[0] - if err := p.writeOne(ctx, sourceSpec, []*schema.Table{table}, syncTime, resource3); err != nil { - return fmt.Errorf("failed to write one after column order change: %w", err) - } - resourcesRead, err = p.readAll(ctx, table, sourceName) - if err != nil { - return fmt.Errorf("failed to read all: %w", err) - } - if len(resourcesRead) != 3 { - return fmt.Errorf("expected 3 resources after third write, got %d", len(resourcesRead)) - } - - // check that migration still succeeds when there is an extra column in the destination table, - // which should be ignored - oldTable := testdata.TestTable(tableName) - if err := p.Migrate(ctx, []*schema.Table{oldTable}); err != nil { - return fmt.Errorf("failed to migrate table with extra column in destination: %w", err) - } - resource4 := createTestResources(oldTable, sourceName, syncTime, 1)[0] - if err := p.writeOne(ctx, sourceSpec, []*schema.Table{oldTable}, syncTime, resource4); err != nil { - return fmt.Errorf("failed to write one after column order change: %w", err) - } - resourcesRead, err = p.readAll(ctx, oldTable, sourceName) - if err != nil { - return fmt.Errorf("failed to read all: %w", err) - } - if len(resourcesRead) != 4 { - return fmt.Errorf("expected 4 resources after fourth write, got %d", len(resourcesRead)) - } - cqIDIndex := table.Columns.Index(schema.CqIDColumn.Name) - found := false - for _, r := range resourcesRead { - if !r[cqIDIndex].Equal(resource4.Data[cqIDIndex]) { - continue - } - found = true - if !r.Equal(resource4.Data) { - return fmt.Errorf("expected resource to be equal to original resource, but got diff: %s", r.Diff(resource4.Data)) - } - } - if !found { - return fmt.Errorf("expected to find resource with cq_id %s, but none matched", resource4.Data[cqIDIndex]) - } - - return nil + // SkipMigrateOverwriteForce skips a test for the migrate function where a column is changed in force mode + SkipMigrateOverwriteForce bool } func getTestLogger(t *testing.T) zerolog.Logger { @@ -455,7 +95,20 @@ func PluginTestSuiteRunner(t *testing.T, p *Plugin, spec any, tests PluginTestSu if suite.tests.SkipMigrateOverwrite { t.Skip("skipping " + t.Name()) } - if err := suite.destinationPluginTestMigrate(ctx, p, logger, destSpec, specs.WriteModeOverwrite); err != nil { + destSpec.WriteMode = specs.WriteModeOverwrite + if err := suite.destinationPluginTestMigrate(ctx, p, logger, destSpec); err != nil { + t.Fatal(err) + } + }) + + t.Run("TestWriteMigrateOverwriteForce", func(t *testing.T) { + t.Helper() + if suite.tests.SkipMigrateOverwriteForce { + t.Skip("skipping " + t.Name()) + } + destSpec.WriteMode = specs.WriteModeOverwrite + destSpec.MigrateMode = specs.MigrateModeForced + if err := suite.destinationPluginTestMigrate(ctx, p, logger, destSpec); err != nil { t.Fatal(err) } }) @@ -475,7 +128,20 @@ func PluginTestSuiteRunner(t *testing.T, p *Plugin, spec any, tests PluginTestSu if suite.tests.SkipMigrateAppend { t.Skip("skipping " + t.Name()) } - if err := suite.destinationPluginTestMigrate(ctx, p, logger, destSpec, specs.WriteModeAppend); err != nil { + destSpec.WriteMode = specs.WriteModeAppend + if err := suite.destinationPluginTestMigrate(ctx, p, logger, destSpec); err != nil { + t.Fatal(err) + } + }) + + t.Run("TestMigrateAppendForce", func(t *testing.T) { + t.Helper() + if suite.tests.SkipMigrateAppendForce { + t.Skip("skipping " + t.Name()) + } + destSpec.WriteMode = specs.WriteModeAppend + destSpec.MigrateMode = specs.MigrateModeForced + if err := suite.destinationPluginTestMigrate(ctx, p, logger, destSpec); err != nil { t.Fatal(err) } }) diff --git a/plugins/destination/plugin_testing_migrate.go b/plugins/destination/plugin_testing_migrate.go new file mode 100644 index 0000000000..76d75a0c9d --- /dev/null +++ b/plugins/destination/plugin_testing_migrate.go @@ -0,0 +1,130 @@ +package destination + +import ( + "context" + "fmt" + "strings" + "time" + + "github.com/cloudquery/plugin-sdk/caser" + "github.com/cloudquery/plugin-sdk/schema" + "github.com/cloudquery/plugin-sdk/specs" + "github.com/cloudquery/plugin-sdk/testdata" + "github.com/google/uuid" + "github.com/rs/zerolog" +) + +func (*PluginTestSuite) destinationPluginTestMigrate( + ctx context.Context, + p *Plugin, + logger zerolog.Logger, + spec specs.Destination, +) error { + spec.BatchSize = 1 + if err := p.Init(ctx, logger, spec); err != nil { + return fmt.Errorf("failed to init plugin: %w", err) + } + suffix := strings.ToLower(strings.ReplaceAll(spec.WriteMode.String(), "-", "_")) + tableName := fmt.Sprintf("cq_test_migrate_%s_%d", suffix, time.Now().Unix()) + table := testdata.TestTable(tableName) + if err := p.Migrate(ctx, []*schema.Table{table}); err != nil { + return fmt.Errorf("failed to migrate tables: %w", err) + } + + sourceName := "testMigrate" + caser.New().ToPascal(suffix) + "Source" + uuid.NewString() + sourceSpec := specs.Source{ + Name: sourceName, + } + syncTime := time.Now().UTC().Round(1 * time.Second) + resource1 := createTestResources(table, sourceName, syncTime, 1)[0] + if err := p.writeOne(ctx, sourceSpec, []*schema.Table{table}, syncTime, resource1); err != nil { + return fmt.Errorf("failed to write one: %w", err) + } + + // check that migrations and writes still succeed when column ordering is changed + a := table.Columns.Index("uuid") + b := table.Columns.Index("float") + table.Columns[a], table.Columns[b] = table.Columns[b], table.Columns[a] + if err := p.Migrate(ctx, []*schema.Table{table}); err != nil { + return fmt.Errorf("failed to migrate table with changed column ordering: %w", err) + } + resource2 := createTestResources(table, sourceName, syncTime, 1)[0] + if err := p.writeOne(ctx, sourceSpec, []*schema.Table{table}, syncTime, resource2); err != nil { + return fmt.Errorf("failed to write one after column order change: %w", err) + } + + resourcesRead, err := p.readAll(ctx, table, sourceName) + if err != nil { + return fmt.Errorf("failed to read all: %w", err) + } + if len(resourcesRead) != 2 { + return fmt.Errorf("expected 2 resources after second write, got %d", len(resourcesRead)) + } + + // check that migrations succeed when a new column is added + table.Columns = append(table.Columns, schema.Column{ + Name: "new_column", + Type: schema.TypeInt, + }) + if err := p.Migrate(ctx, []*schema.Table{table}); err != nil { + return fmt.Errorf("failed to migrate table with new column: %w", err) + } + resource3 := createTestResources(table, sourceName, syncTime, 1)[0] + if err := p.writeOne(ctx, sourceSpec, []*schema.Table{table}, syncTime, resource3); err != nil { + return fmt.Errorf("failed to write one after column order change: %w", err) + } + resourcesRead, err = p.readAll(ctx, table, sourceName) + if err != nil { + return fmt.Errorf("failed to read all: %w", err) + } + if len(resourcesRead) != 3 { + return fmt.Errorf("expected 3 resources after third write, got %d", len(resourcesRead)) + } + + // check that migration still succeeds when there is an extra column in the destination table, + // which should be ignored + oldTable := testdata.TestTable(tableName) + if err := p.Migrate(ctx, []*schema.Table{oldTable}); err != nil { + return fmt.Errorf("failed to migrate table with extra column in destination: %w", err) + } + resource4 := createTestResources(oldTable, sourceName, syncTime, 1)[0] + if err := p.writeOne(ctx, sourceSpec, []*schema.Table{oldTable}, syncTime, resource4); err != nil { + return fmt.Errorf("failed to write one after column order change: %w", err) + } + totalExpectedResources := 4 + if spec.MigrateMode == specs.MigrateModeForced { + table.Columns[len(table.Columns)-1].Type = schema.TypeString + if err := p.Migrate(ctx, []*schema.Table{table}); err != nil { + return fmt.Errorf("failed to migrate table with changed column type: %w", err) + } + resource5 := createTestResources(table, sourceName, syncTime, 1)[0] + if err := p.writeOne(ctx, sourceSpec, []*schema.Table{table}, syncTime, resource5); err != nil { + return fmt.Errorf("failed to write one after column type change: %w", err) + } + totalExpectedResources++ + } + + resourcesRead, err = p.readAll(ctx, oldTable, sourceName) + if err != nil { + return fmt.Errorf("failed to read all: %w", err) + } + if len(resourcesRead) != totalExpectedResources { + return fmt.Errorf("expected %d resources after fourth write, got %d", totalExpectedResources, len(resourcesRead)) + } + cqIDIndex := table.Columns.Index(schema.CqIDColumn.Name) + found := false + for _, r := range resourcesRead { + if !r[cqIDIndex].Equal(resource4.Data[cqIDIndex]) { + continue + } + found = true + if !r.Equal(resource4.Data) { + return fmt.Errorf("expected resource to be equal to original resource, but got diff: %s", r.Diff(resource4.Data)) + } + } + if !found { + return fmt.Errorf("expected to find resource with cq_id %s, but none matched", resource4.Data[cqIDIndex]) + } + + return nil +} diff --git a/plugins/destination/plugin_testing_overwrite.go b/plugins/destination/plugin_testing_overwrite.go new file mode 100644 index 0000000000..d5f6fa35d0 --- /dev/null +++ b/plugins/destination/plugin_testing_overwrite.go @@ -0,0 +1,93 @@ +package destination + +import ( + "context" + "fmt" + "time" + + "github.com/cloudquery/plugin-sdk/schema" + "github.com/cloudquery/plugin-sdk/specs" + "github.com/cloudquery/plugin-sdk/testdata" + "github.com/google/uuid" + "github.com/rs/zerolog" +) + +func (*PluginTestSuite) destinationPluginTestWriteOverwrite(ctx context.Context, p *Plugin, logger zerolog.Logger, spec specs.Destination) error { + spec.WriteMode = specs.WriteModeOverwrite + if err := p.Init(ctx, logger, spec); err != nil { + return fmt.Errorf("failed to init plugin: %w", err) + } + tableName := fmt.Sprintf("cq_test_write_overwrite_%d", time.Now().Unix()) + table := testdata.TestTable(tableName) + syncTime := time.Now().UTC().Round(1 * time.Second) + tables := []*schema.Table{ + table, + } + if err := p.Migrate(ctx, tables); err != nil { + return fmt.Errorf("failed to migrate tables: %w", err) + } + + sourceName := "testOverwriteSource" + uuid.NewString() + sourceSpec := specs.Source{ + Name: sourceName, + } + + resources := createTestResources(table, sourceName, syncTime, 2) + if err := p.writeAll(ctx, sourceSpec, tables, syncTime, resources); err != nil { + return fmt.Errorf("failed to write all: %w", err) + } + sortResources(table, resources) + + resourcesRead, err := p.readAll(ctx, table, sourceName) + if err != nil { + return fmt.Errorf("failed to read all: %w", err) + } + sortCQTypes(table, resourcesRead) + + if len(resourcesRead) != 2 { + return fmt.Errorf("expected 2 resources, got %d", len(resourcesRead)) + } + + if diff := resources[0].Data.Diff(resourcesRead[0]); diff != "" { + return fmt.Errorf("expected first resource diff: %s", diff) + } + + if diff := resources[1].Data.Diff(resourcesRead[1]); diff != "" { + return fmt.Errorf("expected second resource diff: %s", diff) + } + + secondSyncTime := syncTime.Add(time.Second).UTC() + + // copy first resource but update the sync time + updatedResource := schema.DestinationResource{ + TableName: table.Name, + Data: make(schema.CQTypes, len(resources[0].Data)), + } + copy(updatedResource.Data, resources[0].Data) + _ = updatedResource.Data[1].Set(secondSyncTime) + + // write second time + if err := p.writeOne(ctx, sourceSpec, tables, secondSyncTime, updatedResource); err != nil { + return fmt.Errorf("failed to write one second time: %w", err) + } + + resourcesRead, err = p.readAll(ctx, table, sourceName) + if err != nil { + return fmt.Errorf("failed to read all second time: %w", err) + } + sortCQTypes(table, resourcesRead) + + if len(resourcesRead) != 2 { + return fmt.Errorf("after overwrite expected 2 resources, got %d", len(resourcesRead)) + } + + if diff := resources[1].Data.Diff(resourcesRead[0]); diff != "" { + return fmt.Errorf("after overwrite expected first resource diff: %s", diff) + } + + if diff := updatedResource.Data.Diff(resourcesRead[1]); diff != "" { + return fmt.Errorf("after overwrite expected second resource diff: %s", diff) + } + + return nil +} diff --git a/plugins/destination/plugin_testing_overwrite_delete_stale.go b/plugins/destination/plugin_testing_overwrite_delete_stale.go new file mode 100644 index 0000000000..232e2e0383 --- /dev/null +++ b/plugins/destination/plugin_testing_overwrite_delete_stale.go @@ -0,0 +1,125 @@ +package destination + +import ( + "context" + "fmt" + "time" + + "github.com/cloudquery/plugin-sdk/schema" + "github.com/cloudquery/plugin-sdk/specs" + "github.com/cloudquery/plugin-sdk/testdata" + "github.com/google/uuid" + "github.com/rs/zerolog" +) + +func (*PluginTestSuite) destinationPluginTestWriteOverwriteDeleteStale(ctx context.Context, p *Plugin, logger zerolog.Logger, spec specs.Destination) error { + spec.WriteMode = specs.WriteModeOverwriteDeleteStale + if err := p.Init(ctx, logger, spec); err != nil { + return fmt.Errorf("failed to init plugin: %w", err) + } + tableName := fmt.Sprintf("cq_test_write_overwrite_delete_stale_%d", time.Now().Unix()) + table := testdata.TestTable(tableName) + incTable := testdata.TestTable(tableName + "_incremental") + incTable.IsIncremental = true + syncTime := time.Now().UTC().Round(1 * time.Second) + tables := []*schema.Table{ + table, + incTable, + } + if err := p.Migrate(ctx, tables); err != nil { + return fmt.Errorf("failed to migrate tables: %w", err) + } + + sourceName := "testOverwriteSource" + uuid.NewString() + sourceSpec := specs.Source{ + Name: sourceName, + Backend: specs.BackendLocal, + } + + resources := createTestResources(table, sourceName, syncTime, 2) + incResources := createTestResources(incTable, sourceName, syncTime, 2) + if err := p.writeAll(ctx, sourceSpec, tables, syncTime, append(resources, incResources...)); err != nil { + return fmt.Errorf("failed to write all: %w", err) + } + sortResources(table, resources) + + resourcesRead, err := p.readAll(ctx, table, sourceName) + if err != nil { + return fmt.Errorf("failed to read all: %w", err) + } + sortCQTypes(table, resourcesRead) + + if len(resourcesRead) != 2 { + return fmt.Errorf("expected 2 resources, got %d", len(resourcesRead)) + } + + if diff := resources[0].Data.Diff(resourcesRead[0]); diff != "" { + return fmt.Errorf("expected first resource diff: %s", diff) + } + + if diff := resources[1].Data.Diff(resourcesRead[1]); diff != "" { + return fmt.Errorf("expected second resource diff: %s", diff) + } + + // read from incremental table + resourcesRead, err = p.readAll(ctx, incTable, sourceName) + if err != nil { + return fmt.Errorf("failed to read all: %w", err) + } + if len(resourcesRead) != 2 { + return fmt.Errorf("expected 2 resources in incremental table, got %d", len(resourcesRead)) + } + + secondSyncTime := syncTime.Add(time.Second).UTC() + + // copy first resource but update the sync time + updatedResource := schema.DestinationResource{ + TableName: table.Name, + Data: make(schema.CQTypes, len(resources[0].Data)), + } + copy(updatedResource.Data, resources[0].Data) + _ = updatedResource.Data[1].Set(secondSyncTime) + + // write second time + if err := p.writeOne(ctx, sourceSpec, tables, secondSyncTime, updatedResource); err != nil { + return fmt.Errorf("failed to write one second time: %w", err) + } + + resourcesRead, err = p.readAll(ctx, table, sourceName) + if err != nil { + return fmt.Errorf("failed to read all second time: %w", err) + } + sortCQTypes(table, resourcesRead) + if len(resourcesRead) != 1 { + return fmt.Errorf("after overwrite expected 1 resource, got %d", len(resourcesRead)) + } + + if diff := resources[0].Data.Diff(resourcesRead[0]); diff != "" { + return fmt.Errorf("after overwrite expected first resource diff: %s", diff) + } + + resourcesRead, err = p.readAll(ctx, tables[0], sourceName) + if err != nil { + return fmt.Errorf("failed to read all second time: %w", err) + } + if len(resourcesRead) != 1 { + return fmt.Errorf("expected 1 resource after delete stale, got %d", len(resourcesRead)) + } + + // we expect the only resource returned to match the updated resource we wrote + if diff := updatedResource.Data.Diff(resourcesRead[0]); diff != "" { + return fmt.Errorf("after delete stale expected resource diff: %s", diff) + } + + // we expect the incremental table to still have 2 resources, because delete-stale should + // not apply there + resourcesRead, err = p.readAll(ctx, tables[1], sourceName) + if err != nil { + return fmt.Errorf("failed to read all from incremental table: %w", err) + } + if len(resourcesRead) != 2 { + return fmt.Errorf("expected 2 resources in incremental table after delete-stale, got %d", len(resourcesRead)) + } + + return nil +} diff --git a/plugins/destination/plugin_testing_write_append.go b/plugins/destination/plugin_testing_write_append.go new file mode 100644 index 0000000000..a798f62448 --- /dev/null +++ b/plugins/destination/plugin_testing_write_append.go @@ -0,0 +1,77 @@ +package destination + +import ( + "context" + "fmt" + "time" + + "github.com/cloudquery/plugin-sdk/schema" + "github.com/cloudquery/plugin-sdk/specs" + "github.com/cloudquery/plugin-sdk/testdata" + "github.com/google/uuid" + "github.com/rs/zerolog" +) + +func (s *PluginTestSuite) destinationPluginTestWriteAppend(ctx context.Context, p *Plugin, logger zerolog.Logger, spec specs.Destination) error { + spec.WriteMode = specs.WriteModeAppend + if err := p.Init(ctx, logger, spec); err != nil { + return fmt.Errorf("failed to init plugin: %w", err) + } + tableName := "cq_test_write_append" + table := testdata.TestTable(tableName) + syncTime := time.Now().UTC().Round(1 * time.Second) + tables := []*schema.Table{ + table, + } + if err := p.Migrate(ctx, tables); err != nil { + return fmt.Errorf("failed to migrate tables: %w", err) + } + + resources := make([]schema.DestinationResource, 2) + sourceName := "testAppendSource" + uuid.NewString() + specSource := specs.Source{ + Name: sourceName, + } + resources[0] = createTestResources(table, sourceName, syncTime, 1)[0] + if err := p.writeOne(ctx, specSource, tables, syncTime, resources[0]); err != nil { + return fmt.Errorf("failed to write one second time: %w", err) + } + + secondSyncTime := syncTime.Add(10 * time.Second).UTC() + resources[1] = createTestResources(table, sourceName, secondSyncTime, 1)[0] + sortResources(table, resources) + + if !s.tests.SkipSecondAppend { + // write second time + if err := p.writeOne(ctx, specSource, tables, secondSyncTime, resources[1]); err != nil { + return fmt.Errorf("failed to write one second time: %w", err) + } + } + + resourcesRead, err := p.readAll(ctx, tables[0], sourceName) + if err != nil { + return fmt.Errorf("failed to read all second time: %w", err) + } + sortCQTypes(table, resourcesRead) + + expectedResource := 2 + if s.tests.SkipSecondAppend { + expectedResource = 1 + } + + if len(resourcesRead) != expectedResource { + return fmt.Errorf("expected %d resources, got %d", expectedResource, len(resourcesRead)) + } + + if diff := resources[0].Data.Diff(resourcesRead[0]); diff != "" { + return fmt.Errorf("first expected resource diff: %s", diff) + } + + if !s.tests.SkipSecondAppend { + if diff := resources[1].Data.Diff(resourcesRead[1]); diff != "" { + return fmt.Errorf("second expected resource diff: %s", diff) + } + } + + return nil +} diff --git a/schema/column.go b/schema/column.go index 7a67d9a84b..424111708a 100644 --- a/schema/column.go +++ b/schema/column.go @@ -4,6 +4,7 @@ import ( "context" "encoding/json" "fmt" + "strings" ) type ColumnList []Column @@ -43,6 +44,20 @@ type Column struct { IgnoreInTests bool `json:"-"` } +func (c *Column) String() string { + var sb strings.Builder + sb.WriteString(c.Name) + sb.WriteString(":") + sb.WriteString(c.Type.String()) + if c.CreationOptions.PrimaryKey { + sb.WriteString(":PK") + } + if c.CreationOptions.NotNull { + sb.WriteString(":NotNull") + } + return sb.String() +} + func (c *ColumnList) UnmarshalJSON(data []byte) (err error) { var tmp []Column if err := json.Unmarshal(data, &tmp); err != nil { @@ -83,3 +98,16 @@ func (c ColumnList) Get(name string) *Column { } return nil } + +func (c ColumnList) String() string { + var sb strings.Builder + sb.WriteString("[") + for i, col := range c { + sb.WriteString(col.String()) + if i != len(c)-1 { + sb.WriteString(", ") + } + } + sb.WriteString("]") + return sb.String() +} diff --git a/schema/table.go b/schema/table.go index 63f08c5ef1..30346d9b53 100644 --- a/schema/table.go +++ b/schema/table.go @@ -260,21 +260,23 @@ func (t *Table) GetAddedColumns(other *Table) []Column { } // GetChangedColumns returns a list of columns that are in this table but have different type in the other table. -func (t *Table) GetChangedColumns(other *Table) []Column { - var changed []Column +// returns got, want +func (t *Table) GetChangedColumns(other *Table) (got ColumnList, want ColumnList) { for _, c := range t.Columns { otherCol := other.Columns.Get(c.Name) if otherCol == nil { continue } if c.Type != otherCol.Type { - changed = append(changed, c) + got = append(got, c) + want = append(want, *otherCol) } if c.CreationOptions.NotNull != otherCol.CreationOptions.NotNull { - changed = append(changed, c) + got = append(got, c) + want = append(want, *otherCol) } } - return changed + return got, want } func (t *Table) IsPrimaryKeyEqual(other *Table) bool { diff --git a/schema/table_test.go b/schema/table_test.go index dc15cb95e7..b4e7b059da 100644 --- a/schema/table_test.go +++ b/schema/table_test.go @@ -289,20 +289,26 @@ func TestGetAddedColumns(t *testing.T) { } func TestGetChangedColumns(t *testing.T) { - columns := testTable1.GetChangedColumns(testTable1) + columns, _ := testTable1.GetChangedColumns(testTable1) if columns != nil { t.Fatalf("got %v want nil", columns) } - columns = testTable3.GetChangedColumns(testTable2) + columns, got := testTable3.GetChangedColumns(testTable2) if len(columns) != 1 { t.Fatalf("got %v want 1", columns) } if columns[0].Name != "bool" { t.Fatalf("got %v want bool", columns[0].Name) } + if columns[0].Type != TypeString { + t.Fatalf("got %v want TypeString", columns[0].Type) + } + if got[0].Type != TypeBool { + t.Fatalf("got %v want TypeBool", got[0].Type) + } - columns = testTable4.GetChangedColumns(testTable2) + columns, _ = testTable4.GetChangedColumns(testTable2) if len(columns) != 1 { t.Fatalf("got %v want 1", columns) }