From 3f5357677a40a710da0a06652f1afdddc5dd3445 Mon Sep 17 00:00:00 2001 From: Yevgeny Pats <16490766+yevgenypats@users.noreply.github.com> Date: Tue, 16 May 2023 16:56:54 +0300 Subject: [PATCH 1/2] fix: Flatten V2 tables --- internal/servers/destination/v0/destinations.go | 10 +++------- 1 file changed, 3 insertions(+), 7 deletions(-) diff --git a/internal/servers/destination/v0/destinations.go b/internal/servers/destination/v0/destinations.go index 7e47557ed6..c09b242e4c 100644 --- a/internal/servers/destination/v0/destinations.go +++ b/internal/servers/destination/v0/destinations.go @@ -59,7 +59,7 @@ func (s *Server) Migrate(ctx context.Context, req *pb.Migrate_Request) (*pb.Migr if err := json.Unmarshal(req.Tables, &tablesV2); err != nil { return nil, status.Errorf(codes.InvalidArgument, "failed to unmarshal tables: %v", err) } - tables := TablesV2ToV3(tablesV2) + tables := TablesV2ToV3(tablesV2).FlattenTables() SetDestinationManagedCqColumns(tables) s.setPKsForTables(tables) @@ -97,7 +97,7 @@ func (s *Server) Write2(msg pb.Destination_Write2Server) error { return status.Errorf(codes.InvalidArgument, "failed to unmarshal source spec: %v", err) } } - tables := TablesV2ToV3(tablesV2) + tables := TablesV2ToV3(tablesV2).FlattenTables() syncTime := r.Timestamp.AsTime() SetDestinationManagedCqColumns(tables) s.setPKsForTables(tables) @@ -201,12 +201,8 @@ func (s *Server) DeleteStale(ctx context.Context, req *pb.DeleteStale_Request) ( if err := json.Unmarshal(req.Tables, &tablesV2); err != nil { return nil, status.Errorf(codes.InvalidArgument, "failed to unmarshal tables: %v", err) } - tables := TablesV2ToV3(tablesV2) + tables := TablesV2ToV3(tablesV2).FlattenTables() SetDestinationManagedCqColumns(tables) - schemas := make(schemav2.Schemas, len(tables.FlattenTables())) - for i, table := range tables.FlattenTables() { - schemas[i] = table.ToArrowSchema() - } if err := s.Plugin.DeleteStale(ctx, tables, req.Source, req.Timestamp.AsTime()); err != nil { return nil, err } From ba49bc4efbfdc68e3944fa7a5a2e7ea8cdb575b7 Mon Sep 17 00:00:00 2001 From: Yevgeny Pats <16490766+yevgenypats@users.noreply.github.com> Date: Tue, 16 May 2023 17:31:53 +0300 Subject: [PATCH 2/2] idempotent --- schema/table.go | 11 ++++++++++- schema/table_test.go | 4 ++++ 2 files changed, 14 insertions(+), 1 deletion(-) diff --git a/schema/table.go b/schema/table.go index 9e4f572833..58b4bf3493 100644 --- a/schema/table.go +++ b/schema/table.go @@ -231,7 +231,16 @@ func (tt Tables) FlattenTables() Tables { tables = append(tables, t) tables = append(tables, t.Relations.FlattenTables()...) } - return tables + tableNames := make(map[string]bool) + dedupedTables := make(Tables, 0, len(tables)) + for _, t := range tables { + if _, found := tableNames[t.Name]; !found { + dedupedTables = append(dedupedTables, t) + tableNames[t.Name] = true + } + } + + return dedupedTables } func (tt Tables) TableNames() []string { diff --git a/schema/table_test.go b/schema/table_test.go index 1472fc82fa..cc3aadb1d0 100644 --- a/schema/table_test.go +++ b/schema/table_test.go @@ -23,6 +23,10 @@ func TestTablesFlatten(t *testing.T) { if len(tables) != 2 { t.Fatal("expected 2 tables") } + tables = Tables{testTable}.FlattenTables() + if len(tables) != 2 { + t.Fatal("expected 2 tables") + } } func TestTablesFilterDFS(t *testing.T) {