Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
398 changes: 32 additions & 366 deletions plugins/destination/plugin_testing.go

Large diffs are not rendered by default.

130 changes: 130 additions & 0 deletions plugins/destination/plugin_testing_migrate.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
package destination

import (
"context"
"fmt"
"strings"
"time"

"github.com/cloudquery/plugin-sdk/caser"
"github.com/cloudquery/plugin-sdk/schema"
"github.com/cloudquery/plugin-sdk/specs"
"github.com/cloudquery/plugin-sdk/testdata"
"github.com/google/uuid"
"github.com/rs/zerolog"
)

func (*PluginTestSuite) destinationPluginTestMigrate(
ctx context.Context,
p *Plugin,
logger zerolog.Logger,
spec specs.Destination,
) error {
spec.BatchSize = 1
if err := p.Init(ctx, logger, spec); err != nil {
return fmt.Errorf("failed to init plugin: %w", err)
}
suffix := strings.ToLower(strings.ReplaceAll(spec.WriteMode.String(), "-", "_"))
tableName := fmt.Sprintf("cq_test_migrate_%s_%d", suffix, time.Now().Unix())
table := testdata.TestTable(tableName)
if err := p.Migrate(ctx, []*schema.Table{table}); err != nil {
return fmt.Errorf("failed to migrate tables: %w", err)
}

sourceName := "testMigrate" + caser.New().ToPascal(suffix) + "Source" + uuid.NewString()
sourceSpec := specs.Source{
Name: sourceName,
}
syncTime := time.Now().UTC().Round(1 * time.Second)
resource1 := createTestResources(table, sourceName, syncTime, 1)[0]
if err := p.writeOne(ctx, sourceSpec, []*schema.Table{table}, syncTime, resource1); err != nil {
return fmt.Errorf("failed to write one: %w", err)
}

// check that migrations and writes still succeed when column ordering is changed
a := table.Columns.Index("uuid")
b := table.Columns.Index("float")
table.Columns[a], table.Columns[b] = table.Columns[b], table.Columns[a]
if err := p.Migrate(ctx, []*schema.Table{table}); err != nil {
return fmt.Errorf("failed to migrate table with changed column ordering: %w", err)
}
resource2 := createTestResources(table, sourceName, syncTime, 1)[0]
if err := p.writeOne(ctx, sourceSpec, []*schema.Table{table}, syncTime, resource2); err != nil {
return fmt.Errorf("failed to write one after column order change: %w", err)
}

resourcesRead, err := p.readAll(ctx, table, sourceName)
if err != nil {
return fmt.Errorf("failed to read all: %w", err)
}
if len(resourcesRead) != 2 {
return fmt.Errorf("expected 2 resources after second write, got %d", len(resourcesRead))
}

// check that migrations succeed when a new column is added
table.Columns = append(table.Columns, schema.Column{
Name: "new_column",
Type: schema.TypeInt,
})
if err := p.Migrate(ctx, []*schema.Table{table}); err != nil {
return fmt.Errorf("failed to migrate table with new column: %w", err)
}
resource3 := createTestResources(table, sourceName, syncTime, 1)[0]
if err := p.writeOne(ctx, sourceSpec, []*schema.Table{table}, syncTime, resource3); err != nil {
return fmt.Errorf("failed to write one after column order change: %w", err)
}
resourcesRead, err = p.readAll(ctx, table, sourceName)
if err != nil {
return fmt.Errorf("failed to read all: %w", err)
}
if len(resourcesRead) != 3 {
return fmt.Errorf("expected 3 resources after third write, got %d", len(resourcesRead))
}

// check that migration still succeeds when there is an extra column in the destination table,
// which should be ignored
oldTable := testdata.TestTable(tableName)
if err := p.Migrate(ctx, []*schema.Table{oldTable}); err != nil {
return fmt.Errorf("failed to migrate table with extra column in destination: %w", err)
}
resource4 := createTestResources(oldTable, sourceName, syncTime, 1)[0]
if err := p.writeOne(ctx, sourceSpec, []*schema.Table{oldTable}, syncTime, resource4); err != nil {
return fmt.Errorf("failed to write one after column order change: %w", err)
}
totalExpectedResources := 4
if spec.MigrateMode == specs.MigrateModeForced {
table.Columns[len(table.Columns)-1].Type = schema.TypeString
if err := p.Migrate(ctx, []*schema.Table{table}); err != nil {
return fmt.Errorf("failed to migrate table with changed column type: %w", err)
}
resource5 := createTestResources(table, sourceName, syncTime, 1)[0]
if err := p.writeOne(ctx, sourceSpec, []*schema.Table{table}, syncTime, resource5); err != nil {
return fmt.Errorf("failed to write one after column type change: %w", err)
}
totalExpectedResources++
}

resourcesRead, err = p.readAll(ctx, oldTable, sourceName)
if err != nil {
return fmt.Errorf("failed to read all: %w", err)
}
if len(resourcesRead) != totalExpectedResources {
return fmt.Errorf("expected %d resources after fourth write, got %d", totalExpectedResources, len(resourcesRead))
}
cqIDIndex := table.Columns.Index(schema.CqIDColumn.Name)
found := false
for _, r := range resourcesRead {
if !r[cqIDIndex].Equal(resource4.Data[cqIDIndex]) {
continue
}
found = true
if !r.Equal(resource4.Data) {
return fmt.Errorf("expected resource to be equal to original resource, but got diff: %s", r.Diff(resource4.Data))
}
}
if !found {
return fmt.Errorf("expected to find resource with cq_id %s, but none matched", resource4.Data[cqIDIndex])
}

return nil
}
93 changes: 93 additions & 0 deletions plugins/destination/plugin_testing_overwrite.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
package destination

import (
"context"
"fmt"
"time"

"github.com/cloudquery/plugin-sdk/schema"
"github.com/cloudquery/plugin-sdk/specs"
"github.com/cloudquery/plugin-sdk/testdata"
"github.com/google/uuid"
"github.com/rs/zerolog"
)

func (*PluginTestSuite) destinationPluginTestWriteOverwrite(ctx context.Context, p *Plugin, logger zerolog.Logger, spec specs.Destination) error {
spec.WriteMode = specs.WriteModeOverwrite
if err := p.Init(ctx, logger, spec); err != nil {
return fmt.Errorf("failed to init plugin: %w", err)
}
tableName := fmt.Sprintf("cq_test_write_overwrite_%d", time.Now().Unix())
table := testdata.TestTable(tableName)
syncTime := time.Now().UTC().Round(1 * time.Second)
tables := []*schema.Table{
table,
}
if err := p.Migrate(ctx, tables); err != nil {
return fmt.Errorf("failed to migrate tables: %w", err)
}

sourceName := "testOverwriteSource" + uuid.NewString()
sourceSpec := specs.Source{
Name: sourceName,
}

resources := createTestResources(table, sourceName, syncTime, 2)
if err := p.writeAll(ctx, sourceSpec, tables, syncTime, resources); err != nil {
return fmt.Errorf("failed to write all: %w", err)
}
sortResources(table, resources)

resourcesRead, err := p.readAll(ctx, table, sourceName)
if err != nil {
return fmt.Errorf("failed to read all: %w", err)
}
sortCQTypes(table, resourcesRead)

if len(resourcesRead) != 2 {
return fmt.Errorf("expected 2 resources, got %d", len(resourcesRead))
}

if diff := resources[0].Data.Diff(resourcesRead[0]); diff != "" {
return fmt.Errorf("expected first resource diff: %s", diff)
}

if diff := resources[1].Data.Diff(resourcesRead[1]); diff != "" {
return fmt.Errorf("expected second resource diff: %s", diff)
}

secondSyncTime := syncTime.Add(time.Second).UTC()

// copy first resource but update the sync time
updatedResource := schema.DestinationResource{
TableName: table.Name,
Data: make(schema.CQTypes, len(resources[0].Data)),
}
copy(updatedResource.Data, resources[0].Data)
_ = updatedResource.Data[1].Set(secondSyncTime)

// write second time
if err := p.writeOne(ctx, sourceSpec, tables, secondSyncTime, updatedResource); err != nil {
return fmt.Errorf("failed to write one second time: %w", err)
}

resourcesRead, err = p.readAll(ctx, table, sourceName)
if err != nil {
return fmt.Errorf("failed to read all second time: %w", err)
}
sortCQTypes(table, resourcesRead)

if len(resourcesRead) != 2 {
return fmt.Errorf("after overwrite expected 2 resources, got %d", len(resourcesRead))
}

if diff := resources[1].Data.Diff(resourcesRead[0]); diff != "" {
return fmt.Errorf("after overwrite expected first resource diff: %s", diff)
}

if diff := updatedResource.Data.Diff(resourcesRead[1]); diff != "" {
return fmt.Errorf("after overwrite expected second resource diff: %s", diff)
}

return nil
}
125 changes: 125 additions & 0 deletions plugins/destination/plugin_testing_overwrite_delete_stale.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
package destination

import (
"context"
"fmt"
"time"

"github.com/cloudquery/plugin-sdk/schema"
"github.com/cloudquery/plugin-sdk/specs"
"github.com/cloudquery/plugin-sdk/testdata"
"github.com/google/uuid"
"github.com/rs/zerolog"
)

func (*PluginTestSuite) destinationPluginTestWriteOverwriteDeleteStale(ctx context.Context, p *Plugin, logger zerolog.Logger, spec specs.Destination) error {
spec.WriteMode = specs.WriteModeOverwriteDeleteStale
if err := p.Init(ctx, logger, spec); err != nil {
return fmt.Errorf("failed to init plugin: %w", err)
}
tableName := fmt.Sprintf("cq_test_write_overwrite_delete_stale_%d", time.Now().Unix())
table := testdata.TestTable(tableName)
incTable := testdata.TestTable(tableName + "_incremental")
incTable.IsIncremental = true
syncTime := time.Now().UTC().Round(1 * time.Second)
tables := []*schema.Table{
table,
incTable,
}
if err := p.Migrate(ctx, tables); err != nil {
return fmt.Errorf("failed to migrate tables: %w", err)
}

sourceName := "testOverwriteSource" + uuid.NewString()
sourceSpec := specs.Source{
Name: sourceName,
Backend: specs.BackendLocal,
}

resources := createTestResources(table, sourceName, syncTime, 2)
incResources := createTestResources(incTable, sourceName, syncTime, 2)
if err := p.writeAll(ctx, sourceSpec, tables, syncTime, append(resources, incResources...)); err != nil {
return fmt.Errorf("failed to write all: %w", err)
}
sortResources(table, resources)

resourcesRead, err := p.readAll(ctx, table, sourceName)
if err != nil {
return fmt.Errorf("failed to read all: %w", err)
}
sortCQTypes(table, resourcesRead)

if len(resourcesRead) != 2 {
return fmt.Errorf("expected 2 resources, got %d", len(resourcesRead))
}

if diff := resources[0].Data.Diff(resourcesRead[0]); diff != "" {
return fmt.Errorf("expected first resource diff: %s", diff)
}

if diff := resources[1].Data.Diff(resourcesRead[1]); diff != "" {
return fmt.Errorf("expected second resource diff: %s", diff)
}

// read from incremental table
resourcesRead, err = p.readAll(ctx, incTable, sourceName)
if err != nil {
return fmt.Errorf("failed to read all: %w", err)
}
if len(resourcesRead) != 2 {
return fmt.Errorf("expected 2 resources in incremental table, got %d", len(resourcesRead))
}

secondSyncTime := syncTime.Add(time.Second).UTC()

// copy first resource but update the sync time
updatedResource := schema.DestinationResource{
TableName: table.Name,
Data: make(schema.CQTypes, len(resources[0].Data)),
}
copy(updatedResource.Data, resources[0].Data)
_ = updatedResource.Data[1].Set(secondSyncTime)

// write second time
if err := p.writeOne(ctx, sourceSpec, tables, secondSyncTime, updatedResource); err != nil {
return fmt.Errorf("failed to write one second time: %w", err)
}

resourcesRead, err = p.readAll(ctx, table, sourceName)
if err != nil {
return fmt.Errorf("failed to read all second time: %w", err)
}
sortCQTypes(table, resourcesRead)
if len(resourcesRead) != 1 {
return fmt.Errorf("after overwrite expected 1 resource, got %d", len(resourcesRead))
}

if diff := resources[0].Data.Diff(resourcesRead[0]); diff != "" {
return fmt.Errorf("after overwrite expected first resource diff: %s", diff)
}

resourcesRead, err = p.readAll(ctx, tables[0], sourceName)
if err != nil {
return fmt.Errorf("failed to read all second time: %w", err)
}
if len(resourcesRead) != 1 {
return fmt.Errorf("expected 1 resource after delete stale, got %d", len(resourcesRead))
}

// we expect the only resource returned to match the updated resource we wrote
if diff := updatedResource.Data.Diff(resourcesRead[0]); diff != "" {
return fmt.Errorf("after delete stale expected resource diff: %s", diff)
}

// we expect the incremental table to still have 2 resources, because delete-stale should
// not apply there
resourcesRead, err = p.readAll(ctx, tables[1], sourceName)
if err != nil {
return fmt.Errorf("failed to read all from incremental table: %w", err)
}
if len(resourcesRead) != 2 {
return fmt.Errorf("expected 2 resources in incremental table after delete-stale, got %d", len(resourcesRead))
}

return nil
}
Loading