diff --git a/plugins/destination.go b/plugins/destination.go index bab8afabcf..e7eda085d6 100644 --- a/plugins/destination.go +++ b/plugins/destination.go @@ -80,16 +80,16 @@ func (p *DestinationPlugin) Migrate(ctx context.Context, tables schema.Tables) e return p.client.Migrate(ctx, tables) } -func (p *DestinationPlugin) Write(ctx context.Context, source string, syncTime time.Time, res <-chan *schema.Resource) *WriteSummary { +func (p *DestinationPlugin) Write(ctx context.Context, sourceName string, syncTime time.Time, res <-chan *schema.Resource) *WriteSummary { if p.client == nil { return nil } summary := WriteSummary{} for r := range res { - if _, ok := r.Data[schema.CqSourceName.Name]; ok { - r.Data[schema.CqSourceName.Name] = source + if _, ok := r.Data[schema.CqSourceName.Name]; !ok { + r.Data[schema.CqSourceName.Name] = sourceName } - if _, ok := r.Data[schema.CqSyncTime.Name]; ok { + if _, ok := r.Data[schema.CqSyncTime.Name]; !ok { r.Data[schema.CqSyncTime.Name] = syncTime } err := p.client.Write(ctx, r.TableName, r.Data) @@ -101,7 +101,7 @@ func (p *DestinationPlugin) Write(ctx context.Context, source string, syncTime t } } if p.spec.WriteMode == specs.WriteModeOverwriteDeleteStale { - failedDeletes := p.DeleteStale(ctx, p.tables.TableNames(), source, syncTime) + failedDeletes := p.DeleteStale(ctx, p.tables.TableNames(), sourceName, syncTime) summary.FailedDeletes = failedDeletes } return &summary diff --git a/schema/meta.go b/schema/meta.go index 4eff343cbf..a981b16805 100644 --- a/schema/meta.go +++ b/schema/meta.go @@ -19,10 +19,36 @@ type Meta struct { const FetchIDMetaKey = "cq_fetch_id" -var CqIDColumn = Column{Name: "_cq_id", Type: TypeUUID, Description: "Internal CQ ID of the row", Resolver: cqUUIDResolver()} -var CqParentIDColumn = Column{Name: "_cq_parent_id", Type: TypeUUID, Description: "Internal CQ ID of the parent row", Resolver: parentCqUUIDResolver(), IgnoreInTests: true} -var CqSyncTime = Column{Name: "_cq_sync_time", Type: TypeTimestamp, Description: "Internal CQ row of when sync was started (this will be the same for all rows in a single fetch)", IgnoreInTests: true} -var CqSourceName = Column{Name: "_cq_source_name", Type: TypeString, Description: "Internal CQ row that references the source plugin name data was retrieved", IgnoreInTests: true} +var CqIDColumn = Column{ + Name: "_cq_id", + Type: TypeUUID, + Description: "Internal CQ ID of the row", + Resolver: cqUUIDResolver(), +} + +var CqParentIDColumn = Column{ + Name: "_cq_parent_id", + Type: TypeUUID, + Description: "Internal CQ ID of the parent row", + Resolver: parentCqUUIDResolver(), + IgnoreInTests: true, +} + +var CqSyncTime = Column{ + Name: "_cq_sync_time", + Type: TypeTimestamp, + Description: "Internal CQ row of when sync was started (this will be the same for all rows in a single fetch)", + Resolver: noopResolver(), // _cq_sync_time is set later in the SDK, so we use noopResolver. + IgnoreInTests: true, +} + +var CqSourceName = Column{ + Name: "_cq_source_name", + Type: TypeString, + Description: "Internal CQ row that references the source plugin name data was retrieved", + Resolver: noopResolver(), // _cq_source_name is set later in the SDK, so we use noopResolver. + IgnoreInTests: true, +} func cqUUIDResolver() ColumnResolver { return func(_ context.Context, _ ClientMeta, r *Resource, c Column) error { @@ -40,3 +66,12 @@ func parentCqUUIDResolver() ColumnResolver { return r.Set(c.Name, parentCqID) } } + +// A placeholder resolver that doesn't do anything. +// Use this when you need a resolver that doesn't do anything (rather than `nil“), because the default +// resolver actually uses reflection to look for fields in the upstream resource. +func noopResolver() ColumnResolver { + return func(_ context.Context, _ ClientMeta, r *Resource, c Column) error { + return nil + } +}