From b9d2d7e9e000e5fe8c90f9578e7d6996167d20b4 Mon Sep 17 00:00:00 2001 From: shimonp21 Date: Sat, 8 Oct 2022 12:58:21 +0300 Subject: [PATCH 1/3] fix: Use pointer receiver for Table methods --- schema/table.go | 17 ++++++++--------- 1 file changed, 8 insertions(+), 9 deletions(-) diff --git a/schema/table.go b/schema/table.go index fa45a7301d..a5d303eb9f 100644 --- a/schema/table.go +++ b/schema/table.go @@ -102,7 +102,7 @@ func (tt Tables) ValidateDuplicateTables() error { return nil } -func (t Table) ValidateDuplicateColumns() error { +func (t *Table) ValidateDuplicateColumns() error { columns := make(map[string]bool, len(t.Columns)) for _, c := range t.Columns { if _, ok := columns[c.Name]; ok { @@ -118,7 +118,7 @@ func (t Table) ValidateDuplicateColumns() error { return nil } -func (t Table) Column(name string) *Column { +func (t *Table) Column(name string) *Column { for _, c := range t.Columns { if c.Name == name { return &c @@ -126,8 +126,7 @@ func (t Table) Column(name string) *Column { } return nil } - -func (t Table) PrimaryKeys() []string { +func (t *Table) PrimaryKeys() []string { var primaryKeys []string for _, c := range t.Columns { if c.CreationOptions.PrimaryKey { @@ -138,7 +137,7 @@ func (t Table) PrimaryKeys() []string { return primaryKeys } -func (t Table) ColumnIndex(name string) int { +func (t *Table) ColumnIndex(name string) int { var once sync.Once once.Do(func() { if t.columnsMap == nil { @@ -154,7 +153,7 @@ func (t Table) ColumnIndex(name string) int { return -1 } -func (t Table) TableNames() []string { +func (t *Table) TableNames() []string { ret := []string{t.Name} for _, rel := range t.Relations { ret = append(ret, rel.TableNames()...) @@ -163,7 +162,7 @@ func (t Table) TableNames() []string { } // Call the table resolver with with all of it's relation for every reolved resource -func (t Table) Resolve(ctx context.Context, meta ClientMeta, parent *Resource, resourcesSem *semaphore.Weighted, resolvedResources chan<- *Resource) (summary SyncSummary) { +func (t *Table) Resolve(ctx context.Context, meta ClientMeta, parent *Resource, resourcesSem *semaphore.Weighted, resolvedResources chan<- *Resource) (summary SyncSummary) { tableStartTime := time.Now() meta.Logger().Info().Str("table", t.Name).Msg("table resolver started") @@ -225,8 +224,8 @@ func (t Table) Resolve(ctx context.Context, meta ClientMeta, parent *Resource, r return summary } -func (t Table) resolveObject(ctx context.Context, meta ClientMeta, parent *Resource, item interface{}, resolvedResources chan<- *Resource) (summary SyncSummary) { - resource := NewResourceData(&t, parent, item) +func (t *Table) resolveObject(ctx context.Context, meta ClientMeta, parent *Resource, item interface{}, resolvedResources chan<- *Resource) (summary SyncSummary) { + resource := NewResourceData(t, parent, item) objectStartTime := time.Now() csr := caser.New() meta.Logger().Info().Str("table", t.Name).Msg("object resolver started") From 04548b4fa856c89527efb8b8f99c6673ecac2065 Mon Sep 17 00:00:00 2001 From: shimonp21 Date: Sat, 8 Oct 2022 13:54:30 +0300 Subject: [PATCH 2/3] fix: remove buggy 'columnsMap' index 1) Before this PR the "indexing" was done on a copy of the table rather than the actual table.. 2) Because a new 'once' object is created at every invocation of 'ColumnIndex', it doesn't actually happen "once". Nor does it provide protection from race-conditions in case of concurrent calls. 3) The method is unused.. --- schema/table.go | 18 ------------------ 1 file changed, 18 deletions(-) diff --git a/schema/table.go b/schema/table.go index a5d303eb9f..0461c235d0 100644 --- a/schema/table.go +++ b/schema/table.go @@ -64,8 +64,6 @@ type Table struct { // Parent is the parent table in case this table is called via parent table (i.e. relation) Parent *Table `json:"-"` - - columnsMap map[string]int } func (s *SyncSummary) Merge(other SyncSummary) { @@ -137,22 +135,6 @@ func (t *Table) PrimaryKeys() []string { return primaryKeys } -func (t *Table) ColumnIndex(name string) int { - var once sync.Once - once.Do(func() { - if t.columnsMap == nil { - t.columnsMap = make(map[string]int) - for i, c := range t.Columns { - t.columnsMap[c.Name] = i - } - } - }) - if index, ok := t.columnsMap[name]; ok { - return index - } - return -1 -} - func (t *Table) TableNames() []string { ret := []string{t.Name} for _, rel := range t.Relations { From 59cae6bf9e0074a6b88fd7787b4bd42d297f0aa9 Mon Sep 17 00:00:00 2001 From: shimonp21 Date: Sat, 8 Oct 2022 13:55:39 +0300 Subject: [PATCH 3/3] fix: deleteStale Move responsibility for migrating the cq_sync_time and cq_source_name columns to the destination plugin. We do this because the source plugins don't really care about these columns (it's an implementation detail of 'overwriteDeleteStale'. --- ...GenerateSourcePluginDocs-relation_table.md | 4 +-- ...TestGenerateSourcePluginDocs-test_table.md | 4 +-- plugins/destination.go | 24 ++++++++++----- plugins/source.go | 2 +- plugins/source_docs.go | 3 ++ schema/meta.go | 30 +++++++++++++++---- schema/table.go | 13 ++++++++ 7 files changed, 62 insertions(+), 18 deletions(-) diff --git a/plugins/.snapshots/TestGenerateSourcePluginDocs-relation_table.md b/plugins/.snapshots/TestGenerateSourcePluginDocs-relation_table.md index bc0b3842e8..beb661b929 100644 --- a/plugins/.snapshots/TestGenerateSourcePluginDocs-relation_table.md +++ b/plugins/.snapshots/TestGenerateSourcePluginDocs-relation_table.md @@ -10,8 +10,8 @@ This table depends on [`test_table`](test_table.md). ## Columns | Name | Type | | ------------- | ------------- | -|_cq_id (PK)|UUID| -|_cq_parent_id|UUID| |_cq_source_name|String| |_cq_sync_time|Timestamp| +|_cq_id (PK)|UUID| +|_cq_parent_id|UUID| |string_col|String| diff --git a/plugins/.snapshots/TestGenerateSourcePluginDocs-test_table.md b/plugins/.snapshots/TestGenerateSourcePluginDocs-test_table.md index 3bb8001093..7bfd607057 100644 --- a/plugins/.snapshots/TestGenerateSourcePluginDocs-test_table.md +++ b/plugins/.snapshots/TestGenerateSourcePluginDocs-test_table.md @@ -11,10 +11,10 @@ The following tables depend on `test_table`: ## Columns | Name | Type | | ------------- | ------------- | -|_cq_id|UUID| -|_cq_parent_id|UUID| |_cq_source_name|String| |_cq_sync_time|Timestamp| +|_cq_id|UUID| +|_cq_parent_id|UUID| |int_col|Int| |id_col (PK)|Int| |id_col2 (PK)|Int| diff --git a/plugins/destination.go b/plugins/destination.go index bab8afabcf..c076ffefab 100644 --- a/plugins/destination.go +++ b/plugins/destination.go @@ -73,6 +73,8 @@ func (p *DestinationPlugin) Init(ctx context.Context, logger zerolog.Logger, spe // we implement all DestinationClient functions so we can hook into pre-post behavior func (p *DestinationPlugin) Migrate(ctx context.Context, tables schema.Tables) error { + SetDestinationManagedCqColumns(tables) + if p.client == nil { return fmt.Errorf("destination client not initialized") } @@ -80,18 +82,14 @@ func (p *DestinationPlugin) Migrate(ctx context.Context, tables schema.Tables) e return p.client.Migrate(ctx, tables) } -func (p *DestinationPlugin) Write(ctx context.Context, source string, syncTime time.Time, res <-chan *schema.Resource) *WriteSummary { +func (p *DestinationPlugin) Write(ctx context.Context, sourceName string, syncTime time.Time, res <-chan *schema.Resource) *WriteSummary { if p.client == nil { return nil } summary := WriteSummary{} for r := range res { - if _, ok := r.Data[schema.CqSourceName.Name]; ok { - r.Data[schema.CqSourceName.Name] = source - } - if _, ok := r.Data[schema.CqSyncTime.Name]; ok { - r.Data[schema.CqSyncTime.Name] = syncTime - } + r.Data[schema.CqSourceNameColumn.Name] = sourceName + r.Data[schema.CqSyncTimeColumn.Name] = syncTime err := p.client.Write(ctx, r.TableName, r.Data) if err != nil { summary.FailedWrites++ @@ -101,7 +99,7 @@ func (p *DestinationPlugin) Write(ctx context.Context, source string, syncTime t } } if p.spec.WriteMode == specs.WriteModeOverwriteDeleteStale { - failedDeletes := p.DeleteStale(ctx, p.tables.TableNames(), source, syncTime) + failedDeletes := p.DeleteStale(ctx, p.tables.TableNames(), sourceName, syncTime) summary.FailedDeletes = failedDeletes } return &summary @@ -127,3 +125,13 @@ func (p *DestinationPlugin) Close(ctx context.Context) error { } return p.client.Close(ctx) } + +// Overwrites or adds the CQ columns that are managed by the destination plugins (_cq_sync_time, _cq_source_name). +func SetDestinationManagedCqColumns(tables []*schema.Table) { + for _, table := range tables { + table.OverwriteOrAddColumn(&schema.CqSyncTimeColumn) + table.OverwriteOrAddColumn(&schema.CqSourceNameColumn) + + SetDestinationManagedCqColumns(table.Relations) + } +} diff --git a/plugins/source.go b/plugins/source.go index 3069b8a73b..5d50e9029f 100644 --- a/plugins/source.go +++ b/plugins/source.go @@ -38,7 +38,7 @@ func addInternalColumns(tables []*schema.Table) { if len(table.PrimaryKeys()) == 0 { cqID.CreationOptions.PrimaryKey = true } - table.Columns = append([]schema.Column{cqID, schema.CqParentIDColumn, schema.CqSourceName, schema.CqSyncTime}, table.Columns...) + table.Columns = append([]schema.Column{cqID, schema.CqParentIDColumn}, table.Columns...) addInternalColumns(table.Relations) } } diff --git a/plugins/source_docs.go b/plugins/source_docs.go index 7cc74e84ea..de000565e7 100644 --- a/plugins/source_docs.go +++ b/plugins/source_docs.go @@ -19,6 +19,9 @@ func (p *SourcePlugin) GenerateSourcePluginDocs(dir string) error { if err := os.MkdirAll(dir, os.ModePerm); err != nil { return err } + + SetDestinationManagedCqColumns(p.Tables()) + for _, table := range p.Tables() { if err := renderAllTables(table, dir); err != nil { return err diff --git a/schema/meta.go b/schema/meta.go index 4eff343cbf..f52769ce6b 100644 --- a/schema/meta.go +++ b/schema/meta.go @@ -17,12 +17,32 @@ type Meta struct { FetchID string `json:"fetch_id,omitempty"` } -const FetchIDMetaKey = "cq_fetch_id" +// These columns are managed and populated by the source plugins +var CqIDColumn = Column{ + Name: "_cq_id", + Type: TypeUUID, + Description: "Internal CQ ID of the row", + Resolver: cqUUIDResolver(), +} +var CqParentIDColumn = Column{ + Name: "_cq_parent_id", + Type: TypeUUID, + Description: "Internal CQ ID of the parent row", + Resolver: parentCqUUIDResolver(), + IgnoreInTests: true, +} -var CqIDColumn = Column{Name: "_cq_id", Type: TypeUUID, Description: "Internal CQ ID of the row", Resolver: cqUUIDResolver()} -var CqParentIDColumn = Column{Name: "_cq_parent_id", Type: TypeUUID, Description: "Internal CQ ID of the parent row", Resolver: parentCqUUIDResolver(), IgnoreInTests: true} -var CqSyncTime = Column{Name: "_cq_sync_time", Type: TypeTimestamp, Description: "Internal CQ row of when sync was started (this will be the same for all rows in a single fetch)", IgnoreInTests: true} -var CqSourceName = Column{Name: "_cq_source_name", Type: TypeString, Description: "Internal CQ row that references the source plugin name data was retrieved", IgnoreInTests: true} +// These columns are managed and populated by the destination plugin. +var CqSyncTimeColumn = Column{ + Name: "_cq_sync_time", + Type: TypeTimestamp, + Description: "Internal CQ row of when sync was started (this will be the same for all rows in a single fetch)", +} +var CqSourceNameColumn = Column{ + Name: "_cq_source_name", + Type: TypeString, + Description: "Internal CQ row that references the source plugin name data was retrieved", +} func cqUUIDResolver() ColumnResolver { return func(_ context.Context, _ ClientMeta, r *Resource, c Column) error { diff --git a/schema/table.go b/schema/table.go index 0461c235d0..853c5404b8 100644 --- a/schema/table.go +++ b/schema/table.go @@ -124,6 +124,19 @@ func (t *Table) Column(name string) *Column { } return nil } + +// If the column with the same name exists, overwrites it. +// Otherwise, adds the column to the beginning of the table. +func (t *Table) OverwriteOrAddColumn(column *Column) { + for i, c := range t.Columns { + if c.Name == column.Name { + t.Columns[i] = *column + return + } + } + t.Columns = append([]Column{*column}, t.Columns...) +} + func (t *Table) PrimaryKeys() []string { var primaryKeys []string for _, c := range t.Columns {