Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 13 additions & 1 deletion internal/memdb/memdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ type client struct {
schema.DefaultTransformer
spec specs.Destination
memoryDB map[string][][]any
tables map[string]*schema.Table
memoryDBLock sync.RWMutex
errOnWrite bool
blockingWrite bool
Expand Down Expand Up @@ -62,6 +63,7 @@ func getTestLogger(t *testing.T) zerolog.Logger {
func NewClient(_ context.Context, _ zerolog.Logger, spec specs.Destination) (destination.Client, error) {
return &client{
memoryDB: make(map[string][][]any),
tables: make(map[string]*schema.Table),
spec: spec,
}, nil
}
Expand Down Expand Up @@ -101,9 +103,19 @@ func (c *client) overwrite(table *schema.Table, data []any) {

func (c *client) Migrate(_ context.Context, tables schema.Tables) error {
for _, table := range tables {
if c.memoryDB[table.Name] == nil {
memTable := c.memoryDB[table.Name]
if memTable == nil {
c.memoryDB[table.Name] = make([][]any, 0)
c.tables[table.Name] = table
continue
}
changes := table.GetChanges(c.tables[table.Name])
// memdb doesn't support any auto-migrate
if changes == nil {
continue
}
c.memoryDB[table.Name] = make([][]any, 0)
c.tables[table.Name] = table
}
return nil
}
Expand Down
37 changes: 33 additions & 4 deletions internal/memdb/memdb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,34 @@ import (
"github.com/stretchr/testify/require"
)

var migrateStrategyOverwrite = destination.MigrateStrategy{
AddColumn: specs.MigrateModeForced,
AddColumnNotNull: specs.MigrateModeForced,
RemoveColumn: specs.MigrateModeForced,
RemoveColumnNotNull: specs.MigrateModeForced,
ChangeColumn: specs.MigrateModeForced,
}

var migrateStrategyAppend = destination.MigrateStrategy{
AddColumn: specs.MigrateModeForced,
AddColumnNotNull: specs.MigrateModeForced,
RemoveColumn: specs.MigrateModeForced,
RemoveColumnNotNull: specs.MigrateModeForced,
ChangeColumn: specs.MigrateModeForced,
}

func TestPluginUnmanagedClient(t *testing.T) {
destination.PluginTestSuiteRunner(
t,
func() *destination.Plugin {
return destination.NewPlugin("test", "development", NewClient)
},
nil,
destination.PluginTestSuiteTests{})
destination.PluginTestSuiteTests{
MigrateStrategyOverwrite: migrateStrategyOverwrite,
MigrateStrategyAppend: migrateStrategyAppend,
},
)
}

func TestPluginManagedClient(t *testing.T) {
Expand All @@ -29,7 +49,10 @@ func TestPluginManagedClient(t *testing.T) {
return destination.NewPlugin("test", "development", NewClient, destination.WithManagedWriter())
},
nil,
destination.PluginTestSuiteTests{})
destination.PluginTestSuiteTests{
MigrateStrategyOverwrite: migrateStrategyOverwrite,
MigrateStrategyAppend: migrateStrategyAppend,
})
}

func TestPluginManagedClientWithSmallBatchSize(t *testing.T) {
Expand All @@ -39,7 +62,10 @@ func TestPluginManagedClientWithSmallBatchSize(t *testing.T) {
destination.WithDefaultBatchSize(1),
destination.WithDefaultBatchSizeBytes(1))
}, nil,
destination.PluginTestSuiteTests{})
destination.PluginTestSuiteTests{
MigrateStrategyOverwrite: migrateStrategyOverwrite,
MigrateStrategyAppend: migrateStrategyAppend,
})
}

func TestPluginManagedClientWithLargeBatchSize(t *testing.T) {
Expand All @@ -50,7 +76,10 @@ func TestPluginManagedClientWithLargeBatchSize(t *testing.T) {
destination.WithDefaultBatchSizeBytes(100000000))
},
nil,
destination.PluginTestSuiteTests{})
destination.PluginTestSuiteTests{
MigrateStrategyOverwrite: migrateStrategyOverwrite,
MigrateStrategyAppend: migrateStrategyAppend,
})
}

func TestPluginOnNewError(t *testing.T) {
Expand Down
46 changes: 17 additions & 29 deletions plugins/destination/plugin_testing.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,18 @@ type PluginTestSuite struct {
tests PluginTestSuiteTests
}

// MigrateStrategy defines which tests we should include
type MigrateStrategy struct {
AddColumn specs.MigrateMode
AddColumnNotNull specs.MigrateMode
RemoveColumn specs.MigrateMode
RemoveColumnNotNull specs.MigrateMode
ChangeColumn specs.MigrateMode
}
Comment on lines +21 to +27
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It might be just semantics, but I'm a bit confused by the options here 🤔

The table in the PR description has two options for each entry: Auto Migrate / Drop Table. I think it would be clearer if we used that terminology here, rather than MigrateMode = forced or MigrateMode = safe. forced/safe doesn't convey the same meaning to me drop/auto, but I think that is the intention of this configuration, right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah I had the same dilemma if to introduce another type, but then it would be quite annoying as we will have OverwriteForceStrategy OverwriteAutoMigrateStrategy and the same for append so I preferred for now just to do one for each mode and then we can understand if it is being dropped or not.

But I think that yes, we potentially will need to change it, I just didn't want to make it even more complex right now as I think we will need to learn a bit more about this API and usage and then do another iteration on the testing API at least.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe a comment would help for now. Something like forced == drop table, safe == do not drop table


type PluginTestSuiteTests struct {
// SkipOverwrite skips testing for "overwrite" mode. Use if the destination
// // plugin doesn't support this feature.
// plugin doesn't support this feature.
SkipOverwrite bool

// SkipDeleteStale skips testing "delete-stale" mode. Use if the destination
Expand Down Expand Up @@ -48,6 +57,9 @@ type PluginTestSuiteTests struct {
SkipMigrateOverwrite bool
// SkipMigrateOverwriteForce skips a test for the migrate function where a column is changed in force mode
SkipMigrateOverwriteForce bool

MigrateStrategyOverwrite MigrateStrategy
MigrateStrategyAppend MigrateStrategy
}

func getTestLogger(t *testing.T) zerolog.Logger {
Expand Down Expand Up @@ -109,13 +121,7 @@ func PluginTestSuiteRunner(t *testing.T, newPlugin NewPluginFunc, spec any, test
}
destSpec.WriteMode = specs.WriteModeOverwrite
destSpec.Name = "test_migrate_overwrite"
p := newPlugin()
if err := suite.destinationPluginTestMigrate(ctx, p, logger, destSpec); err != nil {
t.Fatal(err)
}
if err := p.Close(ctx); err != nil {
t.Fatal(err)
}
suite.destinationPluginTestMigrate(ctx, t, newPlugin, logger, destSpec, tests.MigrateStrategyOverwrite)
})

t.Run("TestMigrateOverwriteForce", func(t *testing.T) {
Expand All @@ -126,13 +132,7 @@ func PluginTestSuiteRunner(t *testing.T, newPlugin NewPluginFunc, spec any, test
destSpec.WriteMode = specs.WriteModeOverwrite
destSpec.MigrateMode = specs.MigrateModeForced
destSpec.Name = "test_migrate_overwrite_force"
p := newPlugin()
if err := suite.destinationPluginTestMigrate(ctx, p, logger, destSpec); err != nil {
t.Fatal(err)
}
if err := p.Close(ctx); err != nil {
t.Fatal(err)
}
suite.destinationPluginTestMigrate(ctx, t, newPlugin, logger, destSpec, tests.MigrateStrategyOverwrite)
})

t.Run("TestWriteAppend", func(t *testing.T) {
Expand All @@ -157,13 +157,7 @@ func PluginTestSuiteRunner(t *testing.T, newPlugin NewPluginFunc, spec any, test
}
destSpec.WriteMode = specs.WriteModeAppend
destSpec.Name = "test_migrate_append"
p := newPlugin()
if err := suite.destinationPluginTestMigrate(ctx, p, logger, destSpec); err != nil {
t.Fatal(err)
}
if err := p.Close(ctx); err != nil {
t.Fatal(err)
}
suite.destinationPluginTestMigrate(ctx, t, newPlugin, logger, destSpec, tests.MigrateStrategyAppend)
})

t.Run("TestMigrateAppendForce", func(t *testing.T) {
Expand All @@ -174,13 +168,7 @@ func PluginTestSuiteRunner(t *testing.T, newPlugin NewPluginFunc, spec any, test
destSpec.WriteMode = specs.WriteModeAppend
destSpec.MigrateMode = specs.MigrateModeForced
destSpec.Name = "test_migrate_append_force"
p := newPlugin()
if err := suite.destinationPluginTestMigrate(ctx, p, logger, destSpec); err != nil {
t.Fatal(err)
}
if err := p.Close(ctx); err != nil {
t.Fatal(err)
}
suite.destinationPluginTestMigrate(ctx, t, newPlugin, logger, destSpec, tests.MigrateStrategyAppend)
})
}

Expand Down
Loading