Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 9 additions & 4 deletions clients/destination.go
Original file line number Diff line number Diff line change
Expand Up @@ -276,7 +276,7 @@ func (c *DestinationClient) Write(ctx context.Context, source string, syncTime t
return res.FailedWrites, nil
}

func (c *DestinationClient) Write2(ctx context.Context, tables schema.Tables, source string, syncTime time.Time, resources <-chan []byte) error {
func (c *DestinationClient) Write2(ctx context.Context, sourceSpec specs.Source, tables schema.Tables, syncTime time.Time, resources <-chan []byte) error {
saveClient, err := c.pbClient.Write2(ctx)
if err != nil {
return fmt.Errorf("failed to call Write2: %w", err)
Expand All @@ -285,10 +285,15 @@ func (c *DestinationClient) Write2(ctx context.Context, tables schema.Tables, so
if err != nil {
return fmt.Errorf("failed to marshal tables: %w", err)
}
sourceSpecBytes, err := json.Marshal(sourceSpec)
if err != nil {
return fmt.Errorf("failed to marshal source spec: %w", err)
}
if err := saveClient.Send(&pb.Write2_Request{
Tables: b,
Source: source,
Timestamp: timestamppb.New(syncTime),
Tables: b,
Source: sourceSpec.Name,
Timestamp: timestamppb.New(syncTime),
SourceSpec: sourceSpecBytes,
}); err != nil {
return fmt.Errorf("failed to send tables: %w", err)
}
Expand Down
8 changes: 5 additions & 3 deletions clients/destination_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ func TestDestinationClientWriteReturnsCorrectError(t *testing.T) {
t.Fatal(err)
}

name, err := c.Name(ctx)
_, err = c.Name(ctx)
if err != nil {
t.Fatal("failed to get name", err)
}
Expand All @@ -108,7 +108,9 @@ func TestDestinationClientWriteReturnsCorrectError(t *testing.T) {
resourcesChannel <- destResource2
}
}()

err = c.Write2(ctx, tables, name, time.Now().UTC(), resourcesChannel)
sourceSpec := specs.Source{
Name: "TestDestinationClientWriteReturnsCorrectError",
}
err = c.Write2(ctx, sourceSpec, tables, time.Now().UTC(), resourcesChannel)
require.ErrorContains(t, err, "context canceled")
}
10 changes: 8 additions & 2 deletions internal/memdb/memdb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,13 +62,16 @@ func TestOnWriteError(t *testing.T) {
}
sourceName := "TestDestinationOnWriteError"
syncTime := time.Now()
sourceSpec := specs.Source{
Name: sourceName,
}
ch := make(chan schema.DestinationResource, 1)
ch <- schema.DestinationResource{
TableName: "test",
Data: testdata.GenTestData(table),
}
close(ch)
err := p.Write(ctx, tables, sourceName, syncTime, ch)
err := p.Write(ctx, sourceSpec, tables, syncTime, ch)
if err == nil {
t.Fatal("expected error")
}
Expand All @@ -90,14 +93,17 @@ func TestOnWriteCtxCancelled(t *testing.T) {
}
sourceName := "TestDestinationOnWriteError"
syncTime := time.Now()
sourceSpec := specs.Source{
Name: sourceName,
}
ch := make(chan schema.DestinationResource, 1)
ctx, cancel := context.WithTimeout(ctx, 2*time.Second)
ch <- schema.DestinationResource{
TableName: "test",
Data: testdata.GenTestData(table),
}
defer cancel()
err := p.Write(ctx, tables, sourceName, syncTime, ch)
err := p.Write(ctx, sourceSpec, tables, syncTime, ch)
if err != nil {
t.Fatal(err)
}
Expand Down
143 changes: 77 additions & 66 deletions internal/pb/destination.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions internal/pb/destination.proto
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,8 @@ message Write2 {
bytes tables = 3;
// marshalled *schema.Resources
bytes resource = 4;
// marshalled specs.Source
bytes source_spec = 5;
}
message Response {
}
Expand Down
14 changes: 12 additions & 2 deletions internal/servers/destinations.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,12 +76,22 @@ func (s *DestinationServer) Write2(msg pb.Destination_Write2Server) error {
if err := json.Unmarshal(r.Tables, &tables); err != nil {
return status.Errorf(codes.InvalidArgument, "failed to unmarshal tables: %v", err)
}
sourceName := r.Source
var sourceSpec specs.Source
if r.SourceSpec == nil {
// this is for backward compatibility
sourceSpec = specs.Source{
Name: r.Source,
}
} else {
if err := json.Unmarshal(r.SourceSpec, &sourceSpec); err != nil {
return status.Errorf(codes.InvalidArgument, "failed to unmarshal source spec: %v", err)
}
}
syncTime := r.Timestamp.AsTime()

eg, ctx := errgroup.WithContext(msg.Context())
eg.Go(func() error {
return s.Plugin.Write(ctx, tables, sourceName, syncTime, resources)
return s.Plugin.Write(ctx, sourceSpec, tables, syncTime, resources)
})

for {
Expand Down
5 changes: 3 additions & 2 deletions plugins/destination/managed_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"time"

"github.com/cloudquery/plugin-sdk/schema"
"github.com/cloudquery/plugin-sdk/specs"
)

type worker struct {
Expand Down Expand Up @@ -65,7 +66,7 @@ func (p *Plugin) flush(ctx context.Context, metrics *Metrics, table *schema.Tabl
}
}

func (p *Plugin) writeManagedTableBatch(ctx context.Context, tables schema.Tables, sourceName string, syncTime time.Time, res <-chan schema.DestinationResource) error {
func (p *Plugin) writeManagedTableBatch(ctx context.Context, sourceSpec specs.Source, tables schema.Tables, syncTime time.Time, res <-chan schema.DestinationResource) error {
syncTime = syncTime.UTC()
SetDestinationManagedCqColumns(tables)

Expand Down Expand Up @@ -100,7 +101,7 @@ func (p *Plugin) writeManagedTableBatch(ctx context.Context, tables schema.Table
p.workersLock.Unlock()

sourceColumn := &schema.Text{}
_ = sourceColumn.Set(sourceName)
_ = sourceColumn.Set(sourceSpec.Name)
syncTimeColumn := &schema.Timestamptz{}
_ = syncTimeColumn.Set(syncTime)
for r := range res {
Expand Down
Loading