Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@ This table depends on [`test_table`](test_table.md).
## Columns
| Name | Type |
| ------------- | ------------- |
|_cq_id (PK)|UUID|
|_cq_parent_id|UUID|
|_cq_source_name|String|
|_cq_sync_time|Timestamp|
|_cq_id (PK)|UUID|
|_cq_parent_id|UUID|
|string_col|String|
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,10 @@ The following tables depend on `test_table`:
## Columns
| Name | Type |
| ------------- | ------------- |
|_cq_id|UUID|
|_cq_parent_id|UUID|
|_cq_source_name|String|
|_cq_sync_time|Timestamp|
|_cq_id|UUID|
|_cq_parent_id|UUID|
|int_col|Int|
|id_col (PK)|Int|
|id_col2 (PK)|Int|
24 changes: 16 additions & 8 deletions plugins/destination.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,25 +73,23 @@ func (p *DestinationPlugin) Init(ctx context.Context, logger zerolog.Logger, spe

// we implement all DestinationClient functions so we can hook into pre-post behavior
func (p *DestinationPlugin) Migrate(ctx context.Context, tables schema.Tables) error {
SetDestinationManagedCqColumns(tables)

if p.client == nil {
return fmt.Errorf("destination client not initialized")
}
p.tables = tables
return p.client.Migrate(ctx, tables)
}

func (p *DestinationPlugin) Write(ctx context.Context, source string, syncTime time.Time, res <-chan *schema.Resource) *WriteSummary {
func (p *DestinationPlugin) Write(ctx context.Context, sourceName string, syncTime time.Time, res <-chan *schema.Resource) *WriteSummary {
if p.client == nil {
return nil
}
summary := WriteSummary{}
for r := range res {
if _, ok := r.Data[schema.CqSourceName.Name]; ok {
r.Data[schema.CqSourceName.Name] = source
}
if _, ok := r.Data[schema.CqSyncTime.Name]; ok {
r.Data[schema.CqSyncTime.Name] = syncTime
}
r.Data[schema.CqSourceNameColumn.Name] = sourceName
r.Data[schema.CqSyncTimeColumn.Name] = syncTime
err := p.client.Write(ctx, r.TableName, r.Data)
if err != nil {
summary.FailedWrites++
Expand All @@ -101,7 +99,7 @@ func (p *DestinationPlugin) Write(ctx context.Context, source string, syncTime t
}
}
if p.spec.WriteMode == specs.WriteModeOverwriteDeleteStale {
failedDeletes := p.DeleteStale(ctx, p.tables.TableNames(), source, syncTime)
failedDeletes := p.DeleteStale(ctx, p.tables.TableNames(), sourceName, syncTime)
summary.FailedDeletes = failedDeletes
}
return &summary
Expand All @@ -127,3 +125,13 @@ func (p *DestinationPlugin) Close(ctx context.Context) error {
}
return p.client.Close(ctx)
}

// Overwrites or adds the CQ columns that are managed by the destination plugins (_cq_sync_time, _cq_source_name).
func SetDestinationManagedCqColumns(tables []*schema.Table) {
for _, table := range tables {
table.OverwriteOrAddColumn(&schema.CqSyncTimeColumn)
table.OverwriteOrAddColumn(&schema.CqSourceNameColumn)

SetDestinationManagedCqColumns(table.Relations)
}
}
2 changes: 1 addition & 1 deletion plugins/source.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ func addInternalColumns(tables []*schema.Table) {
if len(table.PrimaryKeys()) == 0 {
cqID.CreationOptions.PrimaryKey = true
}
table.Columns = append([]schema.Column{cqID, schema.CqParentIDColumn, schema.CqSourceName, schema.CqSyncTime}, table.Columns...)
table.Columns = append([]schema.Column{cqID, schema.CqParentIDColumn}, table.Columns...)
addInternalColumns(table.Relations)
}
}
Expand Down
3 changes: 3 additions & 0 deletions plugins/source_docs.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@ func (p *SourcePlugin) GenerateSourcePluginDocs(dir string) error {
if err := os.MkdirAll(dir, os.ModePerm); err != nil {
return err
}

SetDestinationManagedCqColumns(p.Tables())

for _, table := range p.Tables() {
if err := renderAllTables(table, dir); err != nil {
return err
Expand Down
30 changes: 25 additions & 5 deletions schema/meta.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,32 @@ type Meta struct {
FetchID string `json:"fetch_id,omitempty"`
}

const FetchIDMetaKey = "cq_fetch_id"
// These columns are managed and populated by the source plugins
var CqIDColumn = Column{
Name: "_cq_id",
Type: TypeUUID,
Description: "Internal CQ ID of the row",
Resolver: cqUUIDResolver(),
}
var CqParentIDColumn = Column{
Name: "_cq_parent_id",
Type: TypeUUID,
Description: "Internal CQ ID of the parent row",
Resolver: parentCqUUIDResolver(),
IgnoreInTests: true,
}

var CqIDColumn = Column{Name: "_cq_id", Type: TypeUUID, Description: "Internal CQ ID of the row", Resolver: cqUUIDResolver()}
var CqParentIDColumn = Column{Name: "_cq_parent_id", Type: TypeUUID, Description: "Internal CQ ID of the parent row", Resolver: parentCqUUIDResolver(), IgnoreInTests: true}
var CqSyncTime = Column{Name: "_cq_sync_time", Type: TypeTimestamp, Description: "Internal CQ row of when sync was started (this will be the same for all rows in a single fetch)", IgnoreInTests: true}
var CqSourceName = Column{Name: "_cq_source_name", Type: TypeString, Description: "Internal CQ row that references the source plugin name data was retrieved", IgnoreInTests: true}
// These columns are managed and populated by the destination plugin.
var CqSyncTimeColumn = Column{
Name: "_cq_sync_time",
Type: TypeTimestamp,
Description: "Internal CQ row of when sync was started (this will be the same for all rows in a single fetch)",
}
var CqSourceNameColumn = Column{
Name: "_cq_source_name",
Type: TypeString,
Description: "Internal CQ row that references the source plugin name data was retrieved",
}

func cqUUIDResolver() ColumnResolver {
return func(_ context.Context, _ ClientMeta, r *Resource, c Column) error {
Expand Down
44 changes: 19 additions & 25 deletions schema/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,6 @@ type Table struct {

// Parent is the parent table in case this table is called via parent table (i.e. relation)
Parent *Table `json:"-"`

columnsMap map[string]int
}

func (s *SyncSummary) Merge(other SyncSummary) {
Expand Down Expand Up @@ -102,7 +100,7 @@ func (tt Tables) ValidateDuplicateTables() error {
return nil
}

func (t Table) ValidateDuplicateColumns() error {
func (t *Table) ValidateDuplicateColumns() error {
columns := make(map[string]bool, len(t.Columns))
for _, c := range t.Columns {
if _, ok := columns[c.Name]; ok {
Expand All @@ -118,7 +116,7 @@ func (t Table) ValidateDuplicateColumns() error {
return nil
}

func (t Table) Column(name string) *Column {
func (t *Table) Column(name string) *Column {
for _, c := range t.Columns {
if c.Name == name {
return &c
Expand All @@ -127,7 +125,19 @@ func (t Table) Column(name string) *Column {
return nil
}

func (t Table) PrimaryKeys() []string {
// If the column with the same name exists, overwrites it.
// Otherwise, adds the column to the beginning of the table.
func (t *Table) OverwriteOrAddColumn(column *Column) {
for i, c := range t.Columns {
if c.Name == column.Name {
t.Columns[i] = *column
return
}
}
t.Columns = append([]Column{*column}, t.Columns...)
}

func (t *Table) PrimaryKeys() []string {
var primaryKeys []string
for _, c := range t.Columns {
if c.CreationOptions.PrimaryKey {
Expand All @@ -138,23 +148,7 @@ func (t Table) PrimaryKeys() []string {
return primaryKeys
}

func (t Table) ColumnIndex(name string) int {
var once sync.Once
once.Do(func() {
if t.columnsMap == nil {
t.columnsMap = make(map[string]int)
for i, c := range t.Columns {
t.columnsMap[c.Name] = i
}
}
})
if index, ok := t.columnsMap[name]; ok {
return index
}
return -1
}

func (t Table) TableNames() []string {
func (t *Table) TableNames() []string {
ret := []string{t.Name}
for _, rel := range t.Relations {
ret = append(ret, rel.TableNames()...)
Expand All @@ -163,7 +157,7 @@ func (t Table) TableNames() []string {
}

// Call the table resolver with with all of it's relation for every reolved resource
func (t Table) Resolve(ctx context.Context, meta ClientMeta, parent *Resource, resourcesSem *semaphore.Weighted, resolvedResources chan<- *Resource) (summary SyncSummary) {
func (t *Table) Resolve(ctx context.Context, meta ClientMeta, parent *Resource, resourcesSem *semaphore.Weighted, resolvedResources chan<- *Resource) (summary SyncSummary) {
tableStartTime := time.Now()
meta.Logger().Info().Str("table", t.Name).Msg("table resolver started")

Expand Down Expand Up @@ -225,8 +219,8 @@ func (t Table) Resolve(ctx context.Context, meta ClientMeta, parent *Resource, r
return summary
}

func (t Table) resolveObject(ctx context.Context, meta ClientMeta, parent *Resource, item interface{}, resolvedResources chan<- *Resource) (summary SyncSummary) {
resource := NewResourceData(&t, parent, item)
func (t *Table) resolveObject(ctx context.Context, meta ClientMeta, parent *Resource, item interface{}, resolvedResources chan<- *Resource) (summary SyncSummary) {
resource := NewResourceData(t, parent, item)
objectStartTime := time.Now()
csr := caser.New()
meta.Logger().Info().Str("table", t.Name).Msg("object resolver started")
Expand Down