diff --git a/clients/destination.go b/clients/destination.go index b53d3b90ed..9822971220 100644 --- a/clients/destination.go +++ b/clients/destination.go @@ -276,7 +276,7 @@ func (c *DestinationClient) Write(ctx context.Context, source string, syncTime t return res.FailedWrites, nil } -func (c *DestinationClient) Write2(ctx context.Context, tables schema.Tables, source string, syncTime time.Time, resources <-chan []byte) error { +func (c *DestinationClient) Write2(ctx context.Context, sourceSpec specs.Source, tables schema.Tables, syncTime time.Time, resources <-chan []byte) error { saveClient, err := c.pbClient.Write2(ctx) if err != nil { return fmt.Errorf("failed to call Write2: %w", err) @@ -285,10 +285,15 @@ func (c *DestinationClient) Write2(ctx context.Context, tables schema.Tables, so if err != nil { return fmt.Errorf("failed to marshal tables: %w", err) } + sourceSpecBytes, err := json.Marshal(sourceSpec) + if err != nil { + return fmt.Errorf("failed to marshal source spec: %w", err) + } if err := saveClient.Send(&pb.Write2_Request{ - Tables: b, - Source: source, - Timestamp: timestamppb.New(syncTime), + Tables: b, + Source: sourceSpec.Name, + Timestamp: timestamppb.New(syncTime), + SourceSpec: sourceSpecBytes, }); err != nil { return fmt.Errorf("failed to send tables: %w", err) } diff --git a/clients/destination_test.go b/clients/destination_test.go index f0998a581f..c516bfdaa9 100644 --- a/clients/destination_test.go +++ b/clients/destination_test.go @@ -86,7 +86,7 @@ func TestDestinationClientWriteReturnsCorrectError(t *testing.T) { t.Fatal(err) } - name, err := c.Name(ctx) + _, err = c.Name(ctx) if err != nil { t.Fatal("failed to get name", err) } @@ -108,7 +108,9 @@ func TestDestinationClientWriteReturnsCorrectError(t *testing.T) { resourcesChannel <- destResource2 } }() - - err = c.Write2(ctx, tables, name, time.Now().UTC(), resourcesChannel) + sourceSpec := specs.Source{ + Name: "TestDestinationClientWriteReturnsCorrectError", + } + err = c.Write2(ctx, sourceSpec, tables, time.Now().UTC(), resourcesChannel) require.ErrorContains(t, err, "context canceled") } diff --git a/internal/memdb/memdb_test.go b/internal/memdb/memdb_test.go index c9f0b64b14..ee8642e598 100644 --- a/internal/memdb/memdb_test.go +++ b/internal/memdb/memdb_test.go @@ -62,13 +62,16 @@ func TestOnWriteError(t *testing.T) { } sourceName := "TestDestinationOnWriteError" syncTime := time.Now() + sourceSpec := specs.Source{ + Name: sourceName, + } ch := make(chan schema.DestinationResource, 1) ch <- schema.DestinationResource{ TableName: "test", Data: testdata.GenTestData(table), } close(ch) - err := p.Write(ctx, tables, sourceName, syncTime, ch) + err := p.Write(ctx, sourceSpec, tables, syncTime, ch) if err == nil { t.Fatal("expected error") } @@ -90,6 +93,9 @@ func TestOnWriteCtxCancelled(t *testing.T) { } sourceName := "TestDestinationOnWriteError" syncTime := time.Now() + sourceSpec := specs.Source{ + Name: sourceName, + } ch := make(chan schema.DestinationResource, 1) ctx, cancel := context.WithTimeout(ctx, 2*time.Second) ch <- schema.DestinationResource{ @@ -97,7 +103,7 @@ func TestOnWriteCtxCancelled(t *testing.T) { Data: testdata.GenTestData(table), } defer cancel() - err := p.Write(ctx, tables, sourceName, syncTime, ch) + err := p.Write(ctx, sourceSpec, tables, syncTime, ch) if err != nil { t.Fatal(err) } diff --git a/internal/pb/destination.pb.go b/internal/pb/destination.pb.go index 91011f893a..0384df4769 100644 --- a/internal/pb/destination.pb.go +++ b/internal/pb/destination.pb.go @@ -472,6 +472,8 @@ type Write2_Request struct { Tables []byte `protobuf:"bytes,3,opt,name=tables,proto3" json:"tables,omitempty"` // marshalled *schema.Resources Resource []byte `protobuf:"bytes,4,opt,name=resource,proto3" json:"resource,omitempty"` + // marshalled specs.Source + SourceSpec []byte `protobuf:"bytes,5,opt,name=source_spec,json=sourceSpec,proto3" json:"source_spec,omitempty"` } func (x *Write2_Request) Reset() { @@ -534,6 +536,13 @@ func (x *Write2_Request) GetResource() []byte { return nil } +func (x *Write2_Request) GetSourceSpec() []byte { + if x != nil { + return x.SourceSpec + } + return nil +} + type Write2_Response struct { state protoimpl.MessageState sizeCache protoimpl.SizeCache @@ -870,8 +879,8 @@ var file_internal_pb_destination_proto_rawDesc = []byte{ 0x09, 0x74, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x1a, 0x2f, 0x0a, 0x08, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x23, 0x0a, 0x0d, 0x66, 0x61, 0x69, 0x6c, 0x65, 0x64, 0x5f, 0x77, 0x72, 0x69, 0x74, 0x65, 0x73, 0x18, 0x01, 0x20, 0x01, 0x28, 0x04, 0x52, 0x0c, 0x66, - 0x61, 0x69, 0x6c, 0x65, 0x64, 0x57, 0x72, 0x69, 0x74, 0x65, 0x73, 0x22, 0xa6, 0x01, 0x0a, 0x06, - 0x57, 0x72, 0x69, 0x74, 0x65, 0x32, 0x1a, 0x8f, 0x01, 0x0a, 0x07, 0x52, 0x65, 0x71, 0x75, 0x65, + 0x61, 0x69, 0x6c, 0x65, 0x64, 0x57, 0x72, 0x69, 0x74, 0x65, 0x73, 0x22, 0xc7, 0x01, 0x0a, 0x06, + 0x57, 0x72, 0x69, 0x74, 0x65, 0x32, 0x1a, 0xb0, 0x01, 0x0a, 0x07, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x16, 0x0a, 0x06, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x06, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x12, 0x38, 0x0a, 0x09, 0x74, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1a, 0x2e, @@ -880,70 +889,72 @@ var file_internal_pb_destination_proto_rawDesc = []byte{ 0x74, 0x61, 0x6d, 0x70, 0x12, 0x16, 0x0a, 0x06, 0x74, 0x61, 0x62, 0x6c, 0x65, 0x73, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x06, 0x74, 0x61, 0x62, 0x6c, 0x65, 0x73, 0x12, 0x1a, 0x0a, 0x08, 0x72, 0x65, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x18, 0x04, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x08, - 0x72, 0x65, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x1a, 0x0a, 0x0a, 0x08, 0x52, 0x65, 0x73, 0x70, - 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x1e, 0x0a, 0x05, 0x43, 0x6c, 0x6f, 0x73, 0x65, 0x1a, 0x09, 0x0a, - 0x07, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x0a, 0x0a, 0x08, 0x52, 0x65, 0x73, 0x70, - 0x6f, 0x6e, 0x73, 0x65, 0x22, 0xb5, 0x01, 0x0a, 0x0b, 0x44, 0x65, 0x6c, 0x65, 0x74, 0x65, 0x53, - 0x74, 0x61, 0x6c, 0x65, 0x1a, 0x73, 0x0a, 0x07, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, - 0x16, 0x0a, 0x06, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, - 0x06, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x12, 0x38, 0x0a, 0x09, 0x74, 0x69, 0x6d, 0x65, 0x73, - 0x74, 0x61, 0x6d, 0x70, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1a, 0x2e, 0x67, 0x6f, 0x6f, - 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x54, 0x69, 0x6d, - 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x52, 0x09, 0x74, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, - 0x70, 0x12, 0x16, 0x0a, 0x06, 0x74, 0x61, 0x62, 0x6c, 0x65, 0x73, 0x18, 0x03, 0x20, 0x01, 0x28, - 0x0c, 0x52, 0x06, 0x74, 0x61, 0x62, 0x6c, 0x65, 0x73, 0x1a, 0x31, 0x0a, 0x08, 0x52, 0x65, 0x73, - 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x25, 0x0a, 0x0e, 0x66, 0x61, 0x69, 0x6c, 0x65, 0x64, 0x5f, - 0x64, 0x65, 0x6c, 0x65, 0x74, 0x65, 0x73, 0x18, 0x01, 0x20, 0x01, 0x28, 0x04, 0x52, 0x0d, 0x66, - 0x61, 0x69, 0x6c, 0x65, 0x64, 0x44, 0x65, 0x6c, 0x65, 0x74, 0x65, 0x73, 0x22, 0x48, 0x0a, 0x15, - 0x47, 0x65, 0x74, 0x44, 0x65, 0x73, 0x74, 0x69, 0x6e, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x4d, 0x65, - 0x74, 0x72, 0x69, 0x63, 0x73, 0x1a, 0x09, 0x0a, 0x07, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, - 0x1a, 0x24, 0x0a, 0x08, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x18, 0x0a, 0x07, - 0x6d, 0x65, 0x74, 0x72, 0x69, 0x63, 0x73, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x07, 0x6d, - 0x65, 0x74, 0x72, 0x69, 0x63, 0x73, 0x32, 0xb5, 0x05, 0x0a, 0x0b, 0x44, 0x65, 0x73, 0x74, 0x69, - 0x6e, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x12, 0x5b, 0x0a, 0x12, 0x47, 0x65, 0x74, 0x50, 0x72, 0x6f, - 0x74, 0x6f, 0x63, 0x6f, 0x6c, 0x56, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x12, 0x21, 0x2e, 0x70, - 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x47, 0x65, 0x74, 0x50, 0x72, 0x6f, 0x74, 0x6f, 0x63, 0x6f, 0x6c, - 0x56, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x2e, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, - 0x22, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x47, 0x65, 0x74, 0x50, 0x72, 0x6f, 0x74, 0x6f, - 0x63, 0x6f, 0x6c, 0x56, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x2e, 0x52, 0x65, 0x73, 0x70, 0x6f, - 0x6e, 0x73, 0x65, 0x12, 0x3a, 0x0a, 0x07, 0x47, 0x65, 0x74, 0x4e, 0x61, 0x6d, 0x65, 0x12, 0x16, - 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x47, 0x65, 0x74, 0x4e, 0x61, 0x6d, 0x65, 0x2e, 0x52, - 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x17, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x47, - 0x65, 0x74, 0x4e, 0x61, 0x6d, 0x65, 0x2e, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, - 0x43, 0x0a, 0x0a, 0x47, 0x65, 0x74, 0x56, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x12, 0x19, 0x2e, - 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x47, 0x65, 0x74, 0x56, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, - 0x2e, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x1a, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, - 0x2e, 0x47, 0x65, 0x74, 0x56, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x2e, 0x52, 0x65, 0x73, 0x70, - 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x40, 0x0a, 0x09, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x75, 0x72, - 0x65, 0x12, 0x18, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, - 0x75, 0x72, 0x65, 0x2e, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x19, 0x2e, 0x70, 0x72, - 0x6f, 0x74, 0x6f, 0x2e, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x75, 0x72, 0x65, 0x2e, 0x52, 0x65, - 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x3a, 0x0a, 0x07, 0x4d, 0x69, 0x67, 0x72, 0x61, 0x74, - 0x65, 0x12, 0x16, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x4d, 0x69, 0x67, 0x72, 0x61, 0x74, - 0x65, 0x2e, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x17, 0x2e, 0x70, 0x72, 0x6f, 0x74, - 0x6f, 0x2e, 0x4d, 0x69, 0x67, 0x72, 0x61, 0x74, 0x65, 0x2e, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, - 0x73, 0x65, 0x12, 0x36, 0x0a, 0x05, 0x57, 0x72, 0x69, 0x74, 0x65, 0x12, 0x14, 0x2e, 0x70, 0x72, - 0x6f, 0x74, 0x6f, 0x2e, 0x57, 0x72, 0x69, 0x74, 0x65, 0x2e, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, - 0x74, 0x1a, 0x15, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x57, 0x72, 0x69, 0x74, 0x65, 0x2e, - 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x28, 0x01, 0x12, 0x39, 0x0a, 0x06, 0x57, 0x72, - 0x69, 0x74, 0x65, 0x32, 0x12, 0x15, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x57, 0x72, 0x69, - 0x74, 0x65, 0x32, 0x2e, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x16, 0x2e, 0x70, 0x72, - 0x6f, 0x74, 0x6f, 0x2e, 0x57, 0x72, 0x69, 0x74, 0x65, 0x32, 0x2e, 0x52, 0x65, 0x73, 0x70, 0x6f, - 0x6e, 0x73, 0x65, 0x28, 0x01, 0x12, 0x34, 0x0a, 0x05, 0x43, 0x6c, 0x6f, 0x73, 0x65, 0x12, 0x14, - 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x43, 0x6c, 0x6f, 0x73, 0x65, 0x2e, 0x52, 0x65, 0x71, - 0x75, 0x65, 0x73, 0x74, 0x1a, 0x15, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x43, 0x6c, 0x6f, - 0x73, 0x65, 0x2e, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x46, 0x0a, 0x0b, 0x44, - 0x65, 0x6c, 0x65, 0x74, 0x65, 0x53, 0x74, 0x61, 0x6c, 0x65, 0x12, 0x1a, 0x2e, 0x70, 0x72, 0x6f, - 0x74, 0x6f, 0x2e, 0x44, 0x65, 0x6c, 0x65, 0x74, 0x65, 0x53, 0x74, 0x61, 0x6c, 0x65, 0x2e, 0x52, - 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x1b, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x44, - 0x65, 0x6c, 0x65, 0x74, 0x65, 0x53, 0x74, 0x61, 0x6c, 0x65, 0x2e, 0x52, 0x65, 0x73, 0x70, 0x6f, - 0x6e, 0x73, 0x65, 0x12, 0x59, 0x0a, 0x0a, 0x47, 0x65, 0x74, 0x4d, 0x65, 0x74, 0x72, 0x69, 0x63, - 0x73, 0x12, 0x24, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x47, 0x65, 0x74, 0x44, 0x65, 0x73, - 0x74, 0x69, 0x6e, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x4d, 0x65, 0x74, 0x72, 0x69, 0x63, 0x73, 0x2e, - 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x25, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e, - 0x47, 0x65, 0x74, 0x44, 0x65, 0x73, 0x74, 0x69, 0x6e, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x4d, 0x65, - 0x74, 0x72, 0x69, 0x63, 0x73, 0x2e, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x42, 0x05, - 0x5a, 0x03, 0x2f, 0x70, 0x62, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, + 0x72, 0x65, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x12, 0x1f, 0x0a, 0x0b, 0x73, 0x6f, 0x75, 0x72, + 0x63, 0x65, 0x5f, 0x73, 0x70, 0x65, 0x63, 0x18, 0x05, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x0a, 0x73, + 0x6f, 0x75, 0x72, 0x63, 0x65, 0x53, 0x70, 0x65, 0x63, 0x1a, 0x0a, 0x0a, 0x08, 0x52, 0x65, 0x73, + 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x1e, 0x0a, 0x05, 0x43, 0x6c, 0x6f, 0x73, 0x65, 0x1a, 0x09, + 0x0a, 0x07, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x0a, 0x0a, 0x08, 0x52, 0x65, 0x73, + 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0xb5, 0x01, 0x0a, 0x0b, 0x44, 0x65, 0x6c, 0x65, 0x74, 0x65, + 0x53, 0x74, 0x61, 0x6c, 0x65, 0x1a, 0x73, 0x0a, 0x07, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, + 0x12, 0x16, 0x0a, 0x06, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, + 0x52, 0x06, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x12, 0x38, 0x0a, 0x09, 0x74, 0x69, 0x6d, 0x65, + 0x73, 0x74, 0x61, 0x6d, 0x70, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1a, 0x2e, 0x67, 0x6f, + 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x54, 0x69, + 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x52, 0x09, 0x74, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, + 0x6d, 0x70, 0x12, 0x16, 0x0a, 0x06, 0x74, 0x61, 0x62, 0x6c, 0x65, 0x73, 0x18, 0x03, 0x20, 0x01, + 0x28, 0x0c, 0x52, 0x06, 0x74, 0x61, 0x62, 0x6c, 0x65, 0x73, 0x1a, 0x31, 0x0a, 0x08, 0x52, 0x65, + 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x25, 0x0a, 0x0e, 0x66, 0x61, 0x69, 0x6c, 0x65, 0x64, + 0x5f, 0x64, 0x65, 0x6c, 0x65, 0x74, 0x65, 0x73, 0x18, 0x01, 0x20, 0x01, 0x28, 0x04, 0x52, 0x0d, + 0x66, 0x61, 0x69, 0x6c, 0x65, 0x64, 0x44, 0x65, 0x6c, 0x65, 0x74, 0x65, 0x73, 0x22, 0x48, 0x0a, + 0x15, 0x47, 0x65, 0x74, 0x44, 0x65, 0x73, 0x74, 0x69, 0x6e, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x4d, + 0x65, 0x74, 0x72, 0x69, 0x63, 0x73, 0x1a, 0x09, 0x0a, 0x07, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, + 0x74, 0x1a, 0x24, 0x0a, 0x08, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x18, 0x0a, + 0x07, 0x6d, 0x65, 0x74, 0x72, 0x69, 0x63, 0x73, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x07, + 0x6d, 0x65, 0x74, 0x72, 0x69, 0x63, 0x73, 0x32, 0xb5, 0x05, 0x0a, 0x0b, 0x44, 0x65, 0x73, 0x74, + 0x69, 0x6e, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x12, 0x5b, 0x0a, 0x12, 0x47, 0x65, 0x74, 0x50, 0x72, + 0x6f, 0x74, 0x6f, 0x63, 0x6f, 0x6c, 0x56, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x12, 0x21, 0x2e, + 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x47, 0x65, 0x74, 0x50, 0x72, 0x6f, 0x74, 0x6f, 0x63, 0x6f, + 0x6c, 0x56, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x2e, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, + 0x1a, 0x22, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x47, 0x65, 0x74, 0x50, 0x72, 0x6f, 0x74, + 0x6f, 0x63, 0x6f, 0x6c, 0x56, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x2e, 0x52, 0x65, 0x73, 0x70, + 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x3a, 0x0a, 0x07, 0x47, 0x65, 0x74, 0x4e, 0x61, 0x6d, 0x65, 0x12, + 0x16, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x47, 0x65, 0x74, 0x4e, 0x61, 0x6d, 0x65, 0x2e, + 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x17, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e, + 0x47, 0x65, 0x74, 0x4e, 0x61, 0x6d, 0x65, 0x2e, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, + 0x12, 0x43, 0x0a, 0x0a, 0x47, 0x65, 0x74, 0x56, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x12, 0x19, + 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x47, 0x65, 0x74, 0x56, 0x65, 0x72, 0x73, 0x69, 0x6f, + 0x6e, 0x2e, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x1a, 0x2e, 0x70, 0x72, 0x6f, 0x74, + 0x6f, 0x2e, 0x47, 0x65, 0x74, 0x56, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x2e, 0x52, 0x65, 0x73, + 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x40, 0x0a, 0x09, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x75, + 0x72, 0x65, 0x12, 0x18, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x43, 0x6f, 0x6e, 0x66, 0x69, + 0x67, 0x75, 0x72, 0x65, 0x2e, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x19, 0x2e, 0x70, + 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x75, 0x72, 0x65, 0x2e, 0x52, + 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x3a, 0x0a, 0x07, 0x4d, 0x69, 0x67, 0x72, 0x61, + 0x74, 0x65, 0x12, 0x16, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x4d, 0x69, 0x67, 0x72, 0x61, + 0x74, 0x65, 0x2e, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x17, 0x2e, 0x70, 0x72, 0x6f, + 0x74, 0x6f, 0x2e, 0x4d, 0x69, 0x67, 0x72, 0x61, 0x74, 0x65, 0x2e, 0x52, 0x65, 0x73, 0x70, 0x6f, + 0x6e, 0x73, 0x65, 0x12, 0x36, 0x0a, 0x05, 0x57, 0x72, 0x69, 0x74, 0x65, 0x12, 0x14, 0x2e, 0x70, + 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x57, 0x72, 0x69, 0x74, 0x65, 0x2e, 0x52, 0x65, 0x71, 0x75, 0x65, + 0x73, 0x74, 0x1a, 0x15, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x57, 0x72, 0x69, 0x74, 0x65, + 0x2e, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x28, 0x01, 0x12, 0x39, 0x0a, 0x06, 0x57, + 0x72, 0x69, 0x74, 0x65, 0x32, 0x12, 0x15, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x57, 0x72, + 0x69, 0x74, 0x65, 0x32, 0x2e, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x16, 0x2e, 0x70, + 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x57, 0x72, 0x69, 0x74, 0x65, 0x32, 0x2e, 0x52, 0x65, 0x73, 0x70, + 0x6f, 0x6e, 0x73, 0x65, 0x28, 0x01, 0x12, 0x34, 0x0a, 0x05, 0x43, 0x6c, 0x6f, 0x73, 0x65, 0x12, + 0x14, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x43, 0x6c, 0x6f, 0x73, 0x65, 0x2e, 0x52, 0x65, + 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x15, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x43, 0x6c, + 0x6f, 0x73, 0x65, 0x2e, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x46, 0x0a, 0x0b, + 0x44, 0x65, 0x6c, 0x65, 0x74, 0x65, 0x53, 0x74, 0x61, 0x6c, 0x65, 0x12, 0x1a, 0x2e, 0x70, 0x72, + 0x6f, 0x74, 0x6f, 0x2e, 0x44, 0x65, 0x6c, 0x65, 0x74, 0x65, 0x53, 0x74, 0x61, 0x6c, 0x65, 0x2e, + 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x1b, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e, + 0x44, 0x65, 0x6c, 0x65, 0x74, 0x65, 0x53, 0x74, 0x61, 0x6c, 0x65, 0x2e, 0x52, 0x65, 0x73, 0x70, + 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x59, 0x0a, 0x0a, 0x47, 0x65, 0x74, 0x4d, 0x65, 0x74, 0x72, 0x69, + 0x63, 0x73, 0x12, 0x24, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x47, 0x65, 0x74, 0x44, 0x65, + 0x73, 0x74, 0x69, 0x6e, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x4d, 0x65, 0x74, 0x72, 0x69, 0x63, 0x73, + 0x2e, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x25, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, + 0x2e, 0x47, 0x65, 0x74, 0x44, 0x65, 0x73, 0x74, 0x69, 0x6e, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x4d, + 0x65, 0x74, 0x72, 0x69, 0x63, 0x73, 0x2e, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x42, + 0x05, 0x5a, 0x03, 0x2f, 0x70, 0x62, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, } var ( diff --git a/internal/pb/destination.proto b/internal/pb/destination.proto index 32d3d8255f..882c7e35b2 100644 --- a/internal/pb/destination.proto +++ b/internal/pb/destination.proto @@ -61,6 +61,8 @@ message Write2 { bytes tables = 3; // marshalled *schema.Resources bytes resource = 4; + // marshalled specs.Source + bytes source_spec = 5; } message Response { } diff --git a/internal/servers/destinations.go b/internal/servers/destinations.go index 4bafe6a592..c8ade5ae13 100644 --- a/internal/servers/destinations.go +++ b/internal/servers/destinations.go @@ -76,12 +76,22 @@ func (s *DestinationServer) Write2(msg pb.Destination_Write2Server) error { if err := json.Unmarshal(r.Tables, &tables); err != nil { return status.Errorf(codes.InvalidArgument, "failed to unmarshal tables: %v", err) } - sourceName := r.Source + var sourceSpec specs.Source + if r.SourceSpec == nil { + // this is for backward compatibility + sourceSpec = specs.Source{ + Name: r.Source, + } + } else { + if err := json.Unmarshal(r.SourceSpec, &sourceSpec); err != nil { + return status.Errorf(codes.InvalidArgument, "failed to unmarshal source spec: %v", err) + } + } syncTime := r.Timestamp.AsTime() eg, ctx := errgroup.WithContext(msg.Context()) eg.Go(func() error { - return s.Plugin.Write(ctx, tables, sourceName, syncTime, resources) + return s.Plugin.Write(ctx, sourceSpec, tables, syncTime, resources) }) for { diff --git a/plugins/destination/managed_writer.go b/plugins/destination/managed_writer.go index e26ddffeef..3b98ae92d6 100644 --- a/plugins/destination/managed_writer.go +++ b/plugins/destination/managed_writer.go @@ -7,6 +7,7 @@ import ( "time" "github.com/cloudquery/plugin-sdk/schema" + "github.com/cloudquery/plugin-sdk/specs" ) type worker struct { @@ -65,7 +66,7 @@ func (p *Plugin) flush(ctx context.Context, metrics *Metrics, table *schema.Tabl } } -func (p *Plugin) writeManagedTableBatch(ctx context.Context, tables schema.Tables, sourceName string, syncTime time.Time, res <-chan schema.DestinationResource) error { +func (p *Plugin) writeManagedTableBatch(ctx context.Context, sourceSpec specs.Source, tables schema.Tables, syncTime time.Time, res <-chan schema.DestinationResource) error { syncTime = syncTime.UTC() SetDestinationManagedCqColumns(tables) @@ -100,7 +101,7 @@ func (p *Plugin) writeManagedTableBatch(ctx context.Context, tables schema.Table p.workersLock.Unlock() sourceColumn := &schema.Text{} - _ = sourceColumn.Set(sourceName) + _ = sourceColumn.Set(sourceSpec.Name) syncTimeColumn := &schema.Timestamptz{} _ = syncTimeColumn.Set(syncTime) for r := range res { diff --git a/plugins/destination/plugin.go b/plugins/destination/plugin.go index 698bf8d090..160677713d 100644 --- a/plugins/destination/plugin.go +++ b/plugins/destination/plugin.go @@ -223,44 +223,44 @@ func (p *Plugin) Read(ctx context.Context, table *schema.Table, sourceName strin } // this function is currently used mostly for testing so it's not a public api -func (p *Plugin) writeOne(ctx context.Context, tables schema.Tables, sourceName string, syncTime time.Time, resource schema.DestinationResource) error { +func (p *Plugin) writeOne(ctx context.Context, sourceSpec specs.Source, tables schema.Tables, syncTime time.Time, resource schema.DestinationResource) error { resources := []schema.DestinationResource{resource} - return p.writeAll(ctx, tables, sourceName, syncTime, resources) + return p.writeAll(ctx, sourceSpec, tables, syncTime, resources) } // this function is currently used mostly for testing so it's not a public api -func (p *Plugin) writeAll(ctx context.Context, tables schema.Tables, sourceName string, syncTime time.Time, resources []schema.DestinationResource) error { +func (p *Plugin) writeAll(ctx context.Context, sourceSpec specs.Source, tables schema.Tables, syncTime time.Time, resources []schema.DestinationResource) error { ch := make(chan schema.DestinationResource, len(resources)) for _, resource := range resources { ch <- resource } close(ch) - return p.Write(ctx, tables, sourceName, syncTime, ch) + return p.Write(ctx, sourceSpec, tables, syncTime, ch) } -func (p *Plugin) Write(ctx context.Context, tables schema.Tables, sourceName string, syncTime time.Time, res <-chan schema.DestinationResource) error { +func (p *Plugin) Write(ctx context.Context, sourceSpec specs.Source, tables schema.Tables, syncTime time.Time, res <-chan schema.DestinationResource) error { syncTime = syncTime.UTC() SetDestinationManagedCqColumns(tables) switch p.writerType { case unmanaged: - if err := p.writeUnmanaged(ctx, tables, sourceName, syncTime, res); err != nil { + if err := p.writeUnmanaged(ctx, sourceSpec, tables, syncTime, res); err != nil { return err } case managed: - if err := p.writeManagedTableBatch(ctx, tables, sourceName, syncTime, res); err != nil { + if err := p.writeManagedTableBatch(ctx, sourceSpec, tables, syncTime, res); err != nil { return err } default: panic("unknown client type") } if p.spec.WriteMode == specs.WriteModeOverwriteDeleteStale { - include := func(t *schema.Table) bool { return true } - exclude := func(t *schema.Table) bool { return t.IsIncremental } - nonIncrementalTables, err := tables.FilterDfsFunc(include, exclude) - if err != nil { - return err + tablesToDelete := tables + if sourceSpec.Backend != specs.BackendNone { + include := func(t *schema.Table) bool { return true } + exclude := func(t *schema.Table) bool { return t.IsIncremental } + tablesToDelete = tables.FilterDfsFunc(include, exclude) } - if err := p.DeleteStale(ctx, nonIncrementalTables, sourceName, syncTime); err != nil { + if err := p.DeleteStale(ctx, tablesToDelete, sourceSpec.Name, syncTime); err != nil { return err } } diff --git a/plugins/destination/plugin_testing.go b/plugins/destination/plugin_testing.go index ddfc508c08..42d5184622 100644 --- a/plugins/destination/plugin_testing.go +++ b/plugins/destination/plugin_testing.go @@ -59,9 +59,12 @@ func (*PluginTestSuite) destinationPluginTestWriteOverwrite(ctx context.Context, } sourceName := "testOverwriteSource" + uuid.NewString() + sourceSpec := specs.Source{ + Name: sourceName, + } resources := createTestResources(table, sourceName, syncTime, 2) - if err := p.writeAll(ctx, tables, sourceName, syncTime, resources); err != nil { + if err := p.writeAll(ctx, sourceSpec, tables, syncTime, resources); err != nil { return fmt.Errorf("failed to write all: %w", err) } sortResources(table, resources) @@ -95,7 +98,7 @@ func (*PluginTestSuite) destinationPluginTestWriteOverwrite(ctx context.Context, _ = updatedResource.Data[1].Set(secondSyncTime) // write second time - if err := p.writeOne(ctx, tables, sourceName, secondSyncTime, updatedResource); err != nil { + if err := p.writeOne(ctx, sourceSpec, tables, secondSyncTime, updatedResource); err != nil { return fmt.Errorf("failed to write one second time: %w", err) } @@ -139,10 +142,14 @@ func (*PluginTestSuite) destinationPluginTestWriteOverwriteDeleteStale(ctx conte } sourceName := "testOverwriteSource" + uuid.NewString() + sourceSpec := specs.Source{ + Name: sourceName, + Backend: specs.BackendLocal, + } resources := createTestResources(table, sourceName, syncTime, 2) incResources := createTestResources(incTable, sourceName, syncTime, 2) - if err := p.writeAll(ctx, tables, sourceName, syncTime, append(resources, incResources...)); err != nil { + if err := p.writeAll(ctx, sourceSpec, tables, syncTime, append(resources, incResources...)); err != nil { return fmt.Errorf("failed to write all: %w", err) } sortResources(table, resources) @@ -185,7 +192,7 @@ func (*PluginTestSuite) destinationPluginTestWriteOverwriteDeleteStale(ctx conte _ = updatedResource.Data[1].Set(secondSyncTime) // write second time - if err := p.writeOne(ctx, tables, sourceName, secondSyncTime, updatedResource); err != nil { + if err := p.writeOne(ctx, sourceSpec, tables, secondSyncTime, updatedResource); err != nil { return fmt.Errorf("failed to write one second time: %w", err) } @@ -245,8 +252,11 @@ func (s *PluginTestSuite) destinationPluginTestWriteAppend(ctx context.Context, resources := make([]schema.DestinationResource, 2) sourceName := "testAppendSource" + uuid.NewString() + specSource := specs.Source{ + Name: sourceName, + } resources[0] = createTestResources(table, sourceName, syncTime, 1)[0] - if err := p.writeOne(ctx, tables, sourceName, syncTime, resources[0]); err != nil { + if err := p.writeOne(ctx, specSource, tables, syncTime, resources[0]); err != nil { return fmt.Errorf("failed to write one second time: %w", err) } @@ -256,7 +266,7 @@ func (s *PluginTestSuite) destinationPluginTestWriteAppend(ctx context.Context, if !s.tests.SkipSecondAppend { // write second time - if err := p.writeOne(ctx, tables, sourceName, secondSyncTime, resources[1]); err != nil { + if err := p.writeOne(ctx, specSource, tables, secondSyncTime, resources[1]); err != nil { return fmt.Errorf("failed to write one second time: %w", err) } } @@ -301,9 +311,12 @@ func (*PluginTestSuite) destinationPluginTestMigrateAppend(ctx context.Context, return fmt.Errorf("failed to migrate tables: %w", err) } sourceName := "testMigrateAppendSource" + uuid.NewString() + sourceSpec := specs.Source{ + Name: sourceName, + } syncTime := time.Now().UTC().Round(1 * time.Second) resource1 := createTestResources(table, sourceName, syncTime, 1)[0] - if err := p.writeOne(ctx, []*schema.Table{table}, "test_source", syncTime, resource1); err != nil { + if err := p.writeOne(ctx, sourceSpec, []*schema.Table{table}, syncTime, resource1); err != nil { return fmt.Errorf("failed to write one: %w", err) } @@ -315,7 +328,7 @@ func (*PluginTestSuite) destinationPluginTestMigrateAppend(ctx context.Context, return fmt.Errorf("failed to migrate table with changed column ordering: %w", err) } resource2 := createTestResources(table, sourceName, syncTime, 1)[0] - if err := p.writeOne(ctx, []*schema.Table{table}, "test_source", syncTime, resource2); err != nil { + if err := p.writeOne(ctx, sourceSpec, []*schema.Table{table}, syncTime, resource2); err != nil { return fmt.Errorf("failed to write one after column order change: %w", err) } @@ -336,7 +349,7 @@ func (*PluginTestSuite) destinationPluginTestMigrateAppend(ctx context.Context, return fmt.Errorf("failed to migrate table with new column: %w", err) } resource3 := createTestResources(table, sourceName, syncTime, 1)[0] - if err := p.writeOne(ctx, []*schema.Table{table}, "test_source", syncTime, resource3); err != nil { + if err := p.writeOne(ctx, sourceSpec, []*schema.Table{table}, syncTime, resource3); err != nil { return fmt.Errorf("failed to write one after column order change: %w", err) } resourcesRead, err = p.readAll(ctx, table, sourceName) @@ -354,7 +367,7 @@ func (*PluginTestSuite) destinationPluginTestMigrateAppend(ctx context.Context, return fmt.Errorf("failed to migrate table with extra column in destination: %w", err) } resource4 := createTestResources(oldTable, sourceName, syncTime, 1)[0] - if err := p.writeOne(ctx, []*schema.Table{oldTable}, "test_source", syncTime, resource4); err != nil { + if err := p.writeOne(ctx, sourceSpec, []*schema.Table{oldTable}, syncTime, resource4); err != nil { return fmt.Errorf("failed to write one after column order change: %w", err) } resourcesRead, err = p.readAll(ctx, oldTable, sourceName) diff --git a/plugins/destination/unmanaged_writer.go b/plugins/destination/unmanaged_writer.go index 82e50c1135..ff99fc8a57 100644 --- a/plugins/destination/unmanaged_writer.go +++ b/plugins/destination/unmanaged_writer.go @@ -6,17 +6,18 @@ import ( "time" "github.com/cloudquery/plugin-sdk/schema" + "github.com/cloudquery/plugin-sdk/specs" "golang.org/x/sync/errgroup" ) -func (p *Plugin) writeUnmanaged(ctx context.Context, tables schema.Tables, sourceName string, syncTime time.Time, res <-chan schema.DestinationResource) error { +func (p *Plugin) writeUnmanaged(ctx context.Context, sourceSpec specs.Source, tables schema.Tables, syncTime time.Time, res <-chan schema.DestinationResource) error { ch := make(chan *ClientResource) eg, gctx := errgroup.WithContext(ctx) eg.Go(func() error { return p.client.Write(gctx, tables, ch) }) sourceColumn := &schema.Text{} - _ = sourceColumn.Set(sourceName) + _ = sourceColumn.Set(sourceSpec.Name) syncTimeColumn := &schema.Timestamptz{} _ = syncTimeColumn.Set(syncTime) for r := range res { diff --git a/plugins/source/plugin.go b/plugins/source/plugin.go index 5ccbdcf957..26646ec8ac 100644 --- a/plugins/source/plugin.go +++ b/plugins/source/plugin.go @@ -18,15 +18,7 @@ type Options struct { Backend backend.Backend } -type Option func(o *Options) - -func WithBackend(b backend.Backend) Option { - return func(o *Options) { - o.Backend = b - } -} - -type NewExecutionClientFunc func(context.Context, zerolog.Logger, specs.Source, ...Option) (schema.ClientMeta, error) +type NewExecutionClientFunc func(context.Context, zerolog.Logger, specs.Source, Options) (schema.ClientMeta, error) // Plugin is the base structure required to pass to sdk.serve // We take a declarative approach to API here similar to Cobra @@ -182,12 +174,15 @@ func (p *Plugin) Sync(ctx context.Context, spec specs.Source, res chan<- *schema if err != nil { return fmt.Errorf("failed to filter tables: %w", err) } + if len(tables) == 0 { return fmt.Errorf("no tables to sync - please check your spec 'tables' and 'skip_tables' settings") } var be backend.Backend switch spec.Backend { + case specs.BackendNone: + // do nothing case specs.BackendLocal: be, err = local.New(spec) if err != nil { @@ -197,15 +192,17 @@ func (p *Plugin) Sync(ctx context.Context, spec specs.Source, res chan<- *schema return fmt.Errorf("unknown backend: %s", spec.Backend) } - defer func() { - p.logger.Info().Msg("closing backend") - err := be.Close(ctx) - if err != nil { - p.logger.Error().Err(err).Msg("failed to close backend") - } - }() + if be != nil { + defer func() { + p.logger.Info().Msg("closing backend") + err := be.Close(ctx) + if err != nil { + p.logger.Error().Err(err).Msg("failed to close backend") + } + }() + } - c, err := p.newExecutionClient(ctx, p.logger, spec, WithBackend(be)) + c, err := p.newExecutionClient(ctx, p.logger, spec, Options{Backend: be}) if err != nil { return fmt.Errorf("failed to create execution client for source plugin %s: %w", p.name, err) } diff --git a/plugins/source/plugin_test.go b/plugins/source/plugin_test.go index a01c91b9ee..e401e04fbe 100644 --- a/plugins/source/plugin_test.go +++ b/plugins/source/plugin_test.go @@ -115,7 +115,7 @@ func (*testExecutionClient) ID() string { return "testExecutionClient" } -func newTestExecutionClient(context.Context, zerolog.Logger, specs.Source, ...Option) (schema.ClientMeta, error) { +func newTestExecutionClient(context.Context, zerolog.Logger, specs.Source, Options) (schema.ClientMeta, error) { return &testExecutionClient{}, nil } diff --git a/schema/table.go b/schema/table.go index d18641e08c..4b0f2ba605 100644 --- a/schema/table.go +++ b/schema/table.go @@ -72,7 +72,7 @@ var ( reValidColumnName = regexp.MustCompile(`^[a-z_][a-z\d_]*$`) ) -func (tt Tables) FilterDfsFunc(include, exclude func(*Table) bool) (Tables, error) { +func (tt Tables) FilterDfsFunc(include, exclude func(*Table) bool) Tables { filteredTables := make(Tables, 0, len(tt)) for _, t := range tt { filteredTable := t.Copy(nil) @@ -81,7 +81,7 @@ func (tt Tables) FilterDfsFunc(include, exclude func(*Table) bool) (Tables, erro filteredTables = append(filteredTables, filteredTable) } } - return filteredTables, nil + return filteredTables } func (tt Tables) FilterDfs(tables, skipTables []string) (Tables, error) { @@ -126,7 +126,7 @@ func (tt Tables) FilterDfs(tables, skipTables []string) (Tables, error) { } return false } - return tt.FilterDfsFunc(include, exclude) + return tt.FilterDfsFunc(include, exclude), nil } func (tt Tables) FlattenTables() Tables { diff --git a/serve/destination_test.go b/serve/destination_test.go index 028a35a2d0..6b27bf88bb 100644 --- a/serve/destination_test.go +++ b/serve/destination_test.go @@ -99,6 +99,9 @@ func TestDestination(t *testing.T) { syncTime := time.Now() table := testdata.TestTable(tableName) tables := schema.Tables{table} + sourceSpec := specs.Source{ + Name: sourceName, + } if err := c.Migrate(ctx, tables); err != nil { t.Fatal(err) } @@ -117,7 +120,7 @@ func TestDestination(t *testing.T) { resources := make(chan []byte, 1) resources <- b close(resources) - if err := c.Write2(ctx, tables, sourceName, syncTime, resources); err != nil { + if err := c.Write2(ctx, sourceSpec, tables, syncTime, resources); err != nil { t.Fatal(err) } diff --git a/serve/source_test.go b/serve/source_test.go index d513ba41c5..767f17e031 100644 --- a/serve/source_test.go +++ b/serve/source_test.go @@ -61,11 +61,11 @@ func (*testExecutionClient) ID() string { return "testExecutionClient" } -func newTestExecutionClient(context.Context, zerolog.Logger, specs.Source, ...source.Option) (schema.ClientMeta, error) { +func newTestExecutionClient(context.Context, zerolog.Logger, specs.Source, source.Options) (schema.ClientMeta, error) { return &testExecutionClient{}, nil } -func newTestExecutionClientErr(context.Context, zerolog.Logger, specs.Source, ...source.Option) (schema.ClientMeta, error) { +func newTestExecutionClientErr(context.Context, zerolog.Logger, specs.Source, source.Options) (schema.ClientMeta, error) { return nil, errTestExecutionClientErr } diff --git a/specs/backend.go b/specs/backend.go index 1f9247b7d4..4a0254f732 100644 --- a/specs/backend.go +++ b/specs/backend.go @@ -9,11 +9,13 @@ import ( type Backend int const ( - BackendLocal Backend = iota + BackendNone Backend = iota + BackendLocal ) var AllBackends = Backends{BackendLocal} var AllBackendNames = [...]string{ + BackendNone: "none", BackendLocal: "local", } @@ -54,5 +56,5 @@ func BackendFromString(s string) (Backend, error) { return Backend(i), nil } } - return BackendLocal, fmt.Errorf("unknown backend %s", s) + return BackendNone, fmt.Errorf("unknown backend %s", s) } diff --git a/specs/source.go b/specs/source.go index 171050f1d0..47fc1abc78 100644 --- a/specs/source.go +++ b/specs/source.go @@ -39,6 +39,8 @@ type Source struct { // Backend is the name of the state backend to use Backend Backend `json:"backend,omitempty"` + // Sync on + OnlyIncrementalTables bool `json:"only_incremental_tables,omitempty"` // BackendSpec contains any backend-specific configuration BackendSpec any `json:"backend_spec,omitempty"` // Scheduler defines the scheduling algorithm that should be used to sync data @@ -53,7 +55,7 @@ func (s *Source) SetDefaults() { s.Registry = RegistryGithub } if s.Backend.String() == "" { - s.Backend = BackendLocal + s.Backend = BackendNone } if s.Scheduler.String() == "" { s.Scheduler = SchedulerDFS