From cffb4ddb568583aedf2307e2d58f4148d8f58ffb Mon Sep 17 00:00:00 2001 From: candiduslynx Date: Wed, 17 May 2023 12:05:22 +0300 Subject: [PATCH 1/4] don't duplicate tables to be removed --- plugins/destination/plugin.go | 1 + 1 file changed, 1 insertion(+) diff --git a/plugins/destination/plugin.go b/plugins/destination/plugin.go index 31ab725094..1d40f6af80 100644 --- a/plugins/destination/plugin.go +++ b/plugins/destination/plugin.go @@ -269,6 +269,7 @@ func (p *Plugin) Write(ctx context.Context, sourceSpec specs.Source, tables sche if p.spec.WriteMode == specs.WriteModeOverwriteDeleteStale { tablesToDelete := tables if sourceSpec.Backend != specs.BackendNone { + tablesToDelete = make(schema.Tables, 0, len(tables)) for _, t := range tables { if !t.IsIncremental { tablesToDelete = append(tablesToDelete, t) From 0be5d622c0d23fab77a98f49d3aae4cae3da2726 Mon Sep 17 00:00:00 2001 From: candiduslynx Date: Wed, 17 May 2023 12:32:41 +0300 Subject: [PATCH 2/4] fix md + inc filtering + test --- internal/memdb/memdb.go | 15 +++++++++++++++ internal/memdb/memdb_test.go | 15 +++++++++++++++ schema/table.go | 28 +++++++++++++++++----------- 3 files changed, 47 insertions(+), 11 deletions(-) diff --git a/internal/memdb/memdb.go b/internal/memdb/memdb.go index 9c6bbb74d1..0ceacd301c 100644 --- a/internal/memdb/memdb.go +++ b/internal/memdb/memdb.go @@ -19,6 +19,7 @@ import ( // client is mostly used for testing the destination plugin. type client struct { spec specs.Destination + sourceSpec specs.Source // used to verify that we don't remove incremental tables when src backend != none memoryDB map[string][]arrow.Record tables map[string]*schema.Table memoryDBLock sync.RWMutex @@ -73,6 +74,16 @@ func NewClientErrOnNew(context.Context, zerolog.Logger, specs.Destination) (dest return nil, fmt.Errorf("newTestDestinationMemDBClientErrOnNew") } +func NewClientWithSrcSpec(srcSpec specs.Source) destination.NewClientFunc { + return func(ctx context.Context, logger zerolog.Logger, spec specs.Destination) (destination.Client, error) { + c, err := NewClient(ctx, logger, spec) + if c != nil { + c.(*client).sourceSpec = srcSpec + } + return c, err + } +} + func (c *client) overwrite(table *schema.Table, data arrow.Record) { pksIndex := table.PrimaryKeysIndexes() tableName := table.Name @@ -205,6 +216,10 @@ func (c *client) Close(context.Context) error { func (c *client) DeleteStale(ctx context.Context, tables schema.Tables, source string, syncTime time.Time) error { for _, table := range tables { + if c.sourceSpec.Backend != specs.BackendNone && table.IsIncremental { + return fmt.Errorf("tried to delete stale from incremental table %q when source backend is %q", + table.Name, c.sourceSpec.Backend.String()) + } c.deleteStaleTable(ctx, table, source, syncTime) } return nil diff --git a/internal/memdb/memdb_test.go b/internal/memdb/memdb_test.go index 8db5b7a620..63bd570c3c 100644 --- a/internal/memdb/memdb_test.go +++ b/internal/memdb/memdb_test.go @@ -44,6 +44,21 @@ func TestPluginUnmanagedClient(t *testing.T) { ) } +func TestPluginIncrementalFilteredOut(t *testing.T) { + destination.PluginTestSuiteRunner( + t, + func() *destination.Plugin { + return destination.NewPlugin("test", "development", + NewClientWithSrcSpec(specs.Source{Backend: specs.BackendLocal})) + }, + specs.Destination{}, + destination.PluginTestSuiteTests{ + MigrateStrategyOverwrite: migrateStrategyOverwrite, + MigrateStrategyAppend: migrateStrategyAppend, + }, + ) +} + func TestPluginManagedClient(t *testing.T) { destination.PluginTestSuiteRunner(t, func() *destination.Plugin { diff --git a/schema/table.go b/schema/table.go index a0a0940f23..ed774f3b39 100644 --- a/schema/table.go +++ b/schema/table.go @@ -116,21 +116,20 @@ func NewTableFromArrowSchema(sc *arrow.Schema) (*Table, error) { return nil, fmt.Errorf("missing table name") } description, _ := tableMD.GetValue(MetadataTableDescription) + constraintName, _ := tableMD.GetValue(MetadataConstraintName) fields := sc.Fields() columns := make(ColumnList, len(fields)) for i, field := range fields { columns[i] = NewColumnFromArrowField(field) } table := &Table{ - Name: name, - Description: description, - Columns: columns, + Name: name, + Description: description, + PkConstraintName: constraintName, + Columns: columns, } - if constraintName, found := tableMD.GetValue(MetadataConstraintName); found { - table.PkConstraintName = constraintName - } - if title, found := tableMD.GetValue(MetadataIncremental); found { - table.Title = title + if isIncremental, found := tableMD.GetValue(MetadataIncremental); found { + table.IsIncremental = isIncremental == MetadataTrue } return table, nil } @@ -369,9 +368,16 @@ func (t *Table) PrimaryKeysIndexes() []int { func (t *Table) ToArrowSchema() *arrow.Schema { fields := make([]arrow.Field, len(t.Columns)) - schemaMd := arrow.MetadataFrom(map[string]string{ - MetadataTableName: t.Name, - }) + md := map[string]string{ + MetadataTableName: t.Name, + MetadataTableDescription: t.Description, + MetadataConstraintName: t.PkConstraintName, + MetadataIncremental: MetadataFalse, + } + if t.IsIncremental { + md[MetadataIncremental] = MetadataTrue + } + schemaMd := arrow.MetadataFrom(md) for i, c := range t.Columns { fields[i] = c.ToArrowField() } From 5118fd00417c915e733f1704f4ba212d092c8ea7 Mon Sep 17 00:00:00 2001 From: candiduslynx Date: Wed, 17 May 2023 12:54:39 +0300 Subject: [PATCH 3/4] move assertion to test --- internal/memdb/memdb.go | 15 ------------- internal/memdb/memdb_test.go | 15 ------------- .../plugin_testing_overwrite_delete_stale.go | 21 +++++++++++-------- 3 files changed, 12 insertions(+), 39 deletions(-) diff --git a/internal/memdb/memdb.go b/internal/memdb/memdb.go index 0ceacd301c..9c6bbb74d1 100644 --- a/internal/memdb/memdb.go +++ b/internal/memdb/memdb.go @@ -19,7 +19,6 @@ import ( // client is mostly used for testing the destination plugin. type client struct { spec specs.Destination - sourceSpec specs.Source // used to verify that we don't remove incremental tables when src backend != none memoryDB map[string][]arrow.Record tables map[string]*schema.Table memoryDBLock sync.RWMutex @@ -74,16 +73,6 @@ func NewClientErrOnNew(context.Context, zerolog.Logger, specs.Destination) (dest return nil, fmt.Errorf("newTestDestinationMemDBClientErrOnNew") } -func NewClientWithSrcSpec(srcSpec specs.Source) destination.NewClientFunc { - return func(ctx context.Context, logger zerolog.Logger, spec specs.Destination) (destination.Client, error) { - c, err := NewClient(ctx, logger, spec) - if c != nil { - c.(*client).sourceSpec = srcSpec - } - return c, err - } -} - func (c *client) overwrite(table *schema.Table, data arrow.Record) { pksIndex := table.PrimaryKeysIndexes() tableName := table.Name @@ -216,10 +205,6 @@ func (c *client) Close(context.Context) error { func (c *client) DeleteStale(ctx context.Context, tables schema.Tables, source string, syncTime time.Time) error { for _, table := range tables { - if c.sourceSpec.Backend != specs.BackendNone && table.IsIncremental { - return fmt.Errorf("tried to delete stale from incremental table %q when source backend is %q", - table.Name, c.sourceSpec.Backend.String()) - } c.deleteStaleTable(ctx, table, source, syncTime) } return nil diff --git a/internal/memdb/memdb_test.go b/internal/memdb/memdb_test.go index 63bd570c3c..8db5b7a620 100644 --- a/internal/memdb/memdb_test.go +++ b/internal/memdb/memdb_test.go @@ -44,21 +44,6 @@ func TestPluginUnmanagedClient(t *testing.T) { ) } -func TestPluginIncrementalFilteredOut(t *testing.T) { - destination.PluginTestSuiteRunner( - t, - func() *destination.Plugin { - return destination.NewPlugin("test", "development", - NewClientWithSrcSpec(specs.Source{Backend: specs.BackendLocal})) - }, - specs.Destination{}, - destination.PluginTestSuiteTests{ - MigrateStrategyOverwrite: migrateStrategyOverwrite, - MigrateStrategyAppend: migrateStrategyAppend, - }, - ) -} - func TestPluginManagedClient(t *testing.T) { destination.PluginTestSuiteRunner(t, func() *destination.Plugin { diff --git a/plugins/destination/plugin_testing_overwrite_delete_stale.go b/plugins/destination/plugin_testing_overwrite_delete_stale.go index 06d92761fa..aa4e700719 100644 --- a/plugins/destination/plugin_testing_overwrite_delete_stale.go +++ b/plugins/destination/plugin_testing_overwrite_delete_stale.go @@ -89,10 +89,13 @@ func (*PluginTestSuite) destinationPluginTestWriteOverwriteDeleteStale(ctx conte StableUUID: u, MaxRows: 1, } - updatedResources := schema.GenTestData(table, opts)[0] + updatedResources := schema.GenTestData(table, opts) + updatedIncResources := schema.GenTestData(incTable, opts) + allUpdatedResources := updatedResources + allUpdatedResources = append(allUpdatedResources, updatedIncResources...) - if err := p.writeOne(ctx, sourceSpec, secondSyncTime, updatedResources); err != nil { - return fmt.Errorf("failed to write one second time: %w", err) + if err := p.writeAll(ctx, sourceSpec, secondSyncTime, allUpdatedResources); err != nil { + return fmt.Errorf("failed to write all second time: %w", err) } resourcesRead, err = p.readAll(ctx, table, sourceName) @@ -108,7 +111,7 @@ func (*PluginTestSuite) destinationPluginTestWriteOverwriteDeleteStale(ctx conte return fmt.Errorf("after overwrite expected first resource to be different. diff: %s", diff) } - resourcesRead, err = p.readAll(ctx, tables[0], sourceName) + resourcesRead, err = p.readAll(ctx, table, sourceName) if err != nil { return fmt.Errorf("failed to read all second time: %w", err) } @@ -117,19 +120,19 @@ func (*PluginTestSuite) destinationPluginTestWriteOverwriteDeleteStale(ctx conte } // we expect the only resource returned to match the updated resource we wrote - if !array.RecordApproxEqual(updatedResources, resourcesRead[0]) { - diff := RecordDiff(updatedResources, resourcesRead[0]) + if !array.RecordApproxEqual(updatedResources[0], resourcesRead[0]) { + diff := RecordDiff(updatedResources[0], resourcesRead[0]) return fmt.Errorf("after delete stale expected resource to be equal. diff: %s", diff) } // we expect the incremental table to still have 2 resources, because delete-stale should // not apply there - resourcesRead, err = p.readAll(ctx, tables[1], sourceName) + resourcesRead, err = p.readAll(ctx, incTable, sourceName) if err != nil { return fmt.Errorf("failed to read all from incremental table: %w", err) } - if len(resourcesRead) != 2 { - return fmt.Errorf("expected 2 resources in incremental table after delete-stale, got %d", len(resourcesRead)) + if len(resourcesRead) != 3 { + return fmt.Errorf("expected 3 resources in incremental table after delete-stale, got %d", len(resourcesRead)) } return nil From 4c5b53f1eff0c0c3c013707d8cd333b0abbf0d93 Mon Sep 17 00:00:00 2001 From: Alex Shcherbakov Date: Wed, 17 May 2023 15:06:35 +0300 Subject: [PATCH 4/4] Update plugins/destination/plugin_testing_overwrite_delete_stale.go Co-authored-by: Herman Schaaf --- plugins/destination/plugin_testing_overwrite_delete_stale.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/plugins/destination/plugin_testing_overwrite_delete_stale.go b/plugins/destination/plugin_testing_overwrite_delete_stale.go index aa4e700719..41b1dad121 100644 --- a/plugins/destination/plugin_testing_overwrite_delete_stale.go +++ b/plugins/destination/plugin_testing_overwrite_delete_stale.go @@ -125,7 +125,7 @@ func (*PluginTestSuite) destinationPluginTestWriteOverwriteDeleteStale(ctx conte return fmt.Errorf("after delete stale expected resource to be equal. diff: %s", diff) } - // we expect the incremental table to still have 2 resources, because delete-stale should + // we expect the incremental table to still have 3 resources, because delete-stale should // not apply there resourcesRead, err = p.readAll(ctx, incTable, sourceName) if err != nil {