Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions plugins/destination.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,16 +80,16 @@ func (p *DestinationPlugin) Migrate(ctx context.Context, tables schema.Tables) e
return p.client.Migrate(ctx, tables)
}

func (p *DestinationPlugin) Write(ctx context.Context, source string, syncTime time.Time, res <-chan *schema.Resource) *WriteSummary {
func (p *DestinationPlugin) Write(ctx context.Context, sourceName string, syncTime time.Time, res <-chan *schema.Resource) *WriteSummary {
if p.client == nil {
return nil
}
summary := WriteSummary{}
for r := range res {
if _, ok := r.Data[schema.CqSourceName.Name]; ok {
r.Data[schema.CqSourceName.Name] = source
if _, ok := r.Data[schema.CqSourceName.Name]; !ok {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why this is changed from ok to !ok ? if it doesn't exist it all then the write query will fail because this column wasn't migrated. but it should just be there and be nil and we should overwrite it.

r.Data[schema.CqSourceName.Name] = sourceName
}
if _, ok := r.Data[schema.CqSyncTime.Name]; ok {
if _, ok := r.Data[schema.CqSyncTime.Name]; !ok {
r.Data[schema.CqSyncTime.Name] = syncTime
}
err := p.client.Write(ctx, r.TableName, r.Data)
Expand All @@ -101,7 +101,7 @@ func (p *DestinationPlugin) Write(ctx context.Context, source string, syncTime t
}
}
if p.spec.WriteMode == specs.WriteModeOverwriteDeleteStale {
failedDeletes := p.DeleteStale(ctx, p.tables.TableNames(), source, syncTime)
failedDeletes := p.DeleteStale(ctx, p.tables.TableNames(), sourceName, syncTime)
summary.FailedDeletes = failedDeletes
}
return &summary
Expand Down
43 changes: 39 additions & 4 deletions schema/meta.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,36 @@ type Meta struct {

const FetchIDMetaKey = "cq_fetch_id"

var CqIDColumn = Column{Name: "_cq_id", Type: TypeUUID, Description: "Internal CQ ID of the row", Resolver: cqUUIDResolver()}
var CqParentIDColumn = Column{Name: "_cq_parent_id", Type: TypeUUID, Description: "Internal CQ ID of the parent row", Resolver: parentCqUUIDResolver(), IgnoreInTests: true}
var CqSyncTime = Column{Name: "_cq_sync_time", Type: TypeTimestamp, Description: "Internal CQ row of when sync was started (this will be the same for all rows in a single fetch)", IgnoreInTests: true}
var CqSourceName = Column{Name: "_cq_source_name", Type: TypeString, Description: "Internal CQ row that references the source plugin name data was retrieved", IgnoreInTests: true}
var CqIDColumn = Column{
Name: "_cq_id",
Type: TypeUUID,
Description: "Internal CQ ID of the row",
Resolver: cqUUIDResolver(),
}

var CqParentIDColumn = Column{
Name: "_cq_parent_id",
Type: TypeUUID,
Description: "Internal CQ ID of the parent row",
Resolver: parentCqUUIDResolver(),
IgnoreInTests: true,
}

var CqSyncTime = Column{
Name: "_cq_sync_time",
Type: TypeTimestamp,
Description: "Internal CQ row of when sync was started (this will be the same for all rows in a single fetch)",
Resolver: noopResolver(), // _cq_sync_time is set later in the SDK, so we use noopResolver.
IgnoreInTests: true,
}

var CqSourceName = Column{
Name: "_cq_source_name",
Type: TypeString,
Description: "Internal CQ row that references the source plugin name data was retrieved",
Resolver: noopResolver(), // _cq_source_name is set later in the SDK, so we use noopResolver.
IgnoreInTests: true,
}

func cqUUIDResolver() ColumnResolver {
return func(_ context.Context, _ ClientMeta, r *Resource, c Column) error {
Expand All @@ -40,3 +66,12 @@ func parentCqUUIDResolver() ColumnResolver {
return r.Set(c.Name, parentCqID)
}
}

// A placeholder resolver that doesn't do anything.
// Use this when you need a resolver that doesn't do anything (rather than `nilβ€œ), because the default
// resolver actually uses reflection to look for fields in the upstream resource.
func noopResolver() ColumnResolver {
return func(_ context.Context, _ ClientMeta, r *Resource, c Column) error {
return nil
}
}