From 7f52fe6f6d87cd2855a70f3f3a463e04313a2311 Mon Sep 17 00:00:00 2001 From: Herman Schaaf Date: Tue, 10 Jan 2023 12:51:52 +0000 Subject: [PATCH 1/3] fix: Incremental tables should not delete stale --- plugins/destination/managed_writer.go | 7 -- plugins/destination/plugin.go | 8 +- plugins/destination/plugin_testing.go | 108 ++++++++++++++++++++++++-- schema/table.go | 54 ++++++++----- 4 files changed, 144 insertions(+), 33 deletions(-) diff --git a/plugins/destination/managed_writer.go b/plugins/destination/managed_writer.go index 0ee0d59fe8..e26ddffeef 100644 --- a/plugins/destination/managed_writer.go +++ b/plugins/destination/managed_writer.go @@ -7,7 +7,6 @@ import ( "time" "github.com/cloudquery/plugin-sdk/schema" - "github.com/cloudquery/plugin-sdk/specs" ) type worker struct { @@ -135,11 +134,5 @@ func (p *Plugin) writeManagedTableBatch(ctx context.Context, tables schema.Table } } p.workersLock.Unlock() - - if p.spec.WriteMode == specs.WriteModeOverwriteDeleteStale { - if err := p.DeleteStale(ctx, tables, sourceName, syncTime); err != nil { - return err - } - } return nil } diff --git a/plugins/destination/plugin.go b/plugins/destination/plugin.go index 85fb4eb8a7..698bf8d090 100644 --- a/plugins/destination/plugin.go +++ b/plugins/destination/plugin.go @@ -254,7 +254,13 @@ func (p *Plugin) Write(ctx context.Context, tables schema.Tables, sourceName str panic("unknown client type") } if p.spec.WriteMode == specs.WriteModeOverwriteDeleteStale { - if err := p.DeleteStale(ctx, tables, sourceName, syncTime); err != nil { + include := func(t *schema.Table) bool { return true } + exclude := func(t *schema.Table) bool { return t.IsIncremental } + nonIncrementalTables, err := tables.FilterDfsFunc(include, exclude) + if err != nil { + return err + } + if err := p.DeleteStale(ctx, nonIncrementalTables, sourceName, syncTime); err != nil { return err } } diff --git a/plugins/destination/plugin_testing.go b/plugins/destination/plugin_testing.go index d4849f0400..34a51a9f86 100644 --- a/plugins/destination/plugin_testing.go +++ b/plugins/destination/plugin_testing.go @@ -117,18 +117,95 @@ func (s *PluginTestSuite) destinationPluginTestWriteOverwrite(ctx context.Contex return fmt.Errorf("after overwrite expected second resource diff: %s", diff) } - if !s.tests.SkipDeleteStale { - if err := p.DeleteStale(ctx, tables, sourceName, secondSyncTime); err != nil { - return fmt.Errorf("failed to delete stale data second time: %w", err) - } + return nil +} + +func (s *PluginTestSuite) destinationPluginTestWriteOverwriteDeleteStale(ctx context.Context, p *Plugin, logger zerolog.Logger, spec specs.Destination) error { + spec.WriteMode = specs.WriteModeOverwriteDeleteStale + if err := p.Init(ctx, logger, spec); err != nil { + return fmt.Errorf("failed to init plugin: %w", err) + } + tableName := "cq_test_write_overwrite_delete_stale" + table := testdata.TestTable(tableName) + incTable := testdata.TestTable(tableName + "_incremental") + incTable.IsIncremental = true + syncTime := time.Now().UTC().Round(1 * time.Second) + tables := []*schema.Table{ + table, + incTable, + } + if err := p.Migrate(ctx, tables); err != nil { + return fmt.Errorf("failed to migrate tables: %w", err) } - resourcesRead, err = p.readAll(ctx, tables[0], sourceName) + sourceName := "testOverwriteSource" + uuid.NewString() + + resources := createTestResources(table, sourceName, syncTime, 2) + incResources := createTestResources(incTable, sourceName, syncTime, 2) + if err := p.writeAll(ctx, tables, sourceName, syncTime, append(resources, incResources...)); err != nil { + return fmt.Errorf("failed to write all: %w", err) + } + sortResources(table, resources) + + resourcesRead, err := p.readAll(ctx, table, sourceName) + if err != nil { + return fmt.Errorf("failed to read all: %w", err) + } + sortCQTypes(table, resourcesRead) + + if len(resourcesRead) != 2 { + return fmt.Errorf("expected 2 resources, got %d", len(resourcesRead)) + } + + if diff := resources[0].Data.Diff(resourcesRead[0]); diff != "" { + return fmt.Errorf("expected first resource diff: %s", diff) + } + + if diff := resources[1].Data.Diff(resourcesRead[1]); diff != "" { + return fmt.Errorf("expected second resource diff: %s", diff) + } + + // read from incremental table + resourcesRead, err = p.readAll(ctx, incTable, sourceName) + if err != nil { + return fmt.Errorf("failed to read all: %w", err) + } + if len(resourcesRead) != 2 { + return fmt.Errorf("expected 2 resources in incremental table, got %d", len(resourcesRead)) + } + + secondSyncTime := syncTime.Add(time.Second).UTC() + + // copy first resource but update the sync time + updatedResource := schema.DestinationResource{ + TableName: table.Name, + Data: make(schema.CQTypes, len(resources[0].Data)), + } + copy(updatedResource.Data, resources[0].Data) + _ = updatedResource.Data[1].Set(secondSyncTime) + + // write second time + if err := p.writeOne(ctx, tables, sourceName, secondSyncTime, updatedResource); err != nil { + return fmt.Errorf("failed to write one second time: %w", err) + } + + resourcesRead, err = p.readAll(ctx, table, sourceName) if err != nil { return fmt.Errorf("failed to read all second time: %w", err) } sortCQTypes(table, resourcesRead) + if len(resourcesRead) != 1 { + return fmt.Errorf("after overwrite expected 1 resource, got %d", len(resourcesRead)) + } + + if diff := resources[0].Data.Diff(resourcesRead[0]); diff != "" { + return fmt.Errorf("after overwrite expected first resource diff: %s", diff) + } + resourcesRead, err = p.readAll(ctx, tables[0], sourceName) + if err != nil { + return fmt.Errorf("failed to read all second time: %w", err) + } if len(resourcesRead) != 1 { return fmt.Errorf("expected 1 resource after delete stale, got %d", len(resourcesRead)) } @@ -138,6 +215,16 @@ func (s *PluginTestSuite) destinationPluginTestWriteOverwrite(ctx context.Contex return fmt.Errorf("after delete stale expected resource diff: %s", diff) } + // we expect the incremental table to still have 2 resources, because delete-stale should + // not apply there + resourcesRead, err = p.readAll(ctx, tables[1], sourceName) + if err != nil { + return fmt.Errorf("failed to read all from incremental table: %w", err) + } + if len(resourcesRead) != 2 { + return fmt.Errorf("expected 2 resources in incremental table after delete-stale, got %d", len(resourcesRead)) + } + return nil } @@ -326,6 +413,17 @@ func PluginTestSuiteRunner(t *testing.T, p *Plugin, spec any, tests PluginTestSu } }) + t.Run("TestWriteOverwriteDeleteStale", func(t *testing.T) { + t.Helper() + if suite.tests.SkipOverwrite || suite.tests.SkipDeleteStale { + t.Skip("skipping TestWriteOverwriteDeleteStale") + return + } + if err := suite.destinationPluginTestWriteOverwriteDeleteStale(ctx, p, logger, destSpec); err != nil { + t.Fatal(err) + } + }) + t.Run("TestWriteAppend", func(t *testing.T) { t.Helper() if suite.tests.SkipAppend { diff --git a/schema/table.go b/schema/table.go index c0aaab3db2..d18641e08c 100644 --- a/schema/table.go +++ b/schema/table.go @@ -72,6 +72,18 @@ var ( reValidColumnName = regexp.MustCompile(`^[a-z_][a-z\d_]*$`) ) +func (tt Tables) FilterDfsFunc(include, exclude func(*Table) bool) (Tables, error) { + filteredTables := make(Tables, 0, len(tt)) + for _, t := range tt { + filteredTable := t.Copy(nil) + filteredTable = filteredTable.filterDfs(false, include, exclude) + if filteredTable != nil { + filteredTables = append(filteredTables, filteredTable) + } + } + return filteredTables, nil +} + func (tt Tables) FilterDfs(tables, skipTables []string) (Tables, error) { flattenedTables := tt.FlattenTables() for _, includePattern := range tables { @@ -98,16 +110,23 @@ func (tt Tables) FilterDfs(tables, skipTables []string) (Tables, error) { return nil, fmt.Errorf("skip_tables include a pattern %s with no matches", excludePattern) } } - - filteredTables := make(Tables, 0, len(tt)) - for _, t := range tt { - filteredTable := t.Copy(nil) - filteredTable = filteredTable.filterDfs(false, tables, skipTables) - if filteredTable != nil { - filteredTables = append(filteredTables, filteredTable) + include := func(t *Table) bool { + for _, includePattern := range tables { + if glob.Glob(includePattern, t.Name) { + return true + } } + return false } - return filteredTables, nil + exclude := func(t *Table) bool { + for _, skipPattern := range skipTables { + if glob.Glob(skipPattern, t.Name) { + return true + } + } + return false + } + return tt.FilterDfsFunc(include, exclude) } func (tt Tables) FlattenTables() Tables { @@ -196,22 +215,17 @@ func (tt Tables) ValidateColumnNames() error { } // this will filter the tree in-place -func (t *Table) filterDfs(parentMatched bool, tables []string, skipTables []string) *Table { - matched := parentMatched - for _, includeTable := range tables { - if glob.Glob(includeTable, t.Name) { - matched = true - break - } +func (t *Table) filterDfs(parentMatched bool, include, exclude func(*Table) bool) *Table { + if exclude(t) { + return nil } - for _, skipTable := range skipTables { - if glob.Glob(skipTable, t.Name) { - return nil - } + matched := parentMatched + if include(t) { + matched = true } filteredRelations := make([]*Table, 0, len(t.Relations)) for _, r := range t.Relations { - filteredChild := r.filterDfs(matched, tables, skipTables) + filteredChild := r.filterDfs(matched, include, exclude) if filteredChild != nil { matched = true filteredRelations = append(filteredRelations, r) From 960a6d558ab3b9567fab8da9c8046317c395b564 Mon Sep 17 00:00:00 2001 From: Herman Schaaf Date: Tue, 10 Jan 2023 12:57:24 +0000 Subject: [PATCH 2/3] Linting --- plugins/destination/plugin_testing.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/plugins/destination/plugin_testing.go b/plugins/destination/plugin_testing.go index 34a51a9f86..ddfc508c08 100644 --- a/plugins/destination/plugin_testing.go +++ b/plugins/destination/plugin_testing.go @@ -43,7 +43,7 @@ type PluginTestSuiteTests struct { SkipMigrateAppend bool } -func (s *PluginTestSuite) destinationPluginTestWriteOverwrite(ctx context.Context, p *Plugin, logger zerolog.Logger, spec specs.Destination) error { +func (*PluginTestSuite) destinationPluginTestWriteOverwrite(ctx context.Context, p *Plugin, logger zerolog.Logger, spec specs.Destination) error { spec.WriteMode = specs.WriteModeOverwrite if err := p.Init(ctx, logger, spec); err != nil { return fmt.Errorf("failed to init plugin: %w", err) @@ -120,7 +120,7 @@ func (s *PluginTestSuite) destinationPluginTestWriteOverwrite(ctx context.Contex return nil } -func (s *PluginTestSuite) destinationPluginTestWriteOverwriteDeleteStale(ctx context.Context, p *Plugin, logger zerolog.Logger, spec specs.Destination) error { +func (*PluginTestSuite) destinationPluginTestWriteOverwriteDeleteStale(ctx context.Context, p *Plugin, logger zerolog.Logger, spec specs.Destination) error { spec.WriteMode = specs.WriteModeOverwriteDeleteStale if err := p.Init(ctx, logger, spec); err != nil { return fmt.Errorf("failed to init plugin: %w", err) From 8522add51ef434f17586f6ff7118cebb99e70111 Mon Sep 17 00:00:00 2001 From: Herman Schaaf Date: Tue, 10 Jan 2023 15:49:58 +0000 Subject: [PATCH 3/3] Add lock around memdb tables map --- internal/memdb/memdb.go | 12 +++++++++++- 1 file changed, 11 insertions(+), 1 deletion(-) diff --git a/internal/memdb/memdb.go b/internal/memdb/memdb.go index 9d650f3b75..f6db98134f 100644 --- a/internal/memdb/memdb.go +++ b/internal/memdb/memdb.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "os" + "sync" "testing" "time" @@ -18,6 +19,7 @@ type client struct { schema.DefaultTransformer spec specs.Destination memoryDB map[string][][]any + memoryDBLock sync.RWMutex errOnWrite bool blockingWrite bool } @@ -38,7 +40,8 @@ func WithBlockingWrite() Option { func GetNewClient(options ...Option) destination.NewClientFunc { c := &client{ - memoryDB: make(map[string][][]any), + memoryDB: make(map[string][][]any), + memoryDBLock: sync.RWMutex{}, } for _, opt := range options { opt(c) @@ -111,11 +114,13 @@ func (c *client) Read(_ context.Context, table *schema.Table, source string, res } sourceColIndex := table.Columns.Index(schema.CqSourceNameColumn.Name) var sortedRes [][]any + c.memoryDBLock.RLock() for _, row := range c.memoryDB[table.Name] { if row[sourceColIndex].(*schema.Text).Str == source { sortedRes = append(sortedRes, row) } } + c.memoryDBLock.RUnlock() for _, row := range sortedRes { res <- row @@ -134,12 +139,15 @@ func (c *client) Write(ctx context.Context, tables schema.Tables, resources <-ch } return nil } + for resource := range resources { + c.memoryDBLock.Lock() if c.spec.WriteMode == specs.WriteModeAppend { c.memoryDB[resource.TableName] = append(c.memoryDB[resource.TableName], resource.Data) } else { c.overwrite(tables.Get(resource.TableName), resource.Data) } + c.memoryDBLock.Unlock() } return nil } @@ -156,11 +164,13 @@ func (c *client) WriteTableBatch(ctx context.Context, table *schema.Table, resou return nil } for _, resource := range resources { + c.memoryDBLock.Lock() if c.spec.WriteMode == specs.WriteModeAppend { c.memoryDB[table.Name] = append(c.memoryDB[table.Name], resource) } else { c.overwrite(table, resource) } + c.memoryDBLock.Unlock() } return nil }