From 7aa0c9b4bcf57cd05c45dc65e3150083aece6f05 Mon Sep 17 00:00:00 2001 From: Yevgeny Pats <16490766+yevgenypats@users.noreply.github.com> Date: Wed, 8 Feb 2023 15:49:59 +0200 Subject: [PATCH 1/7] feat: Add table diff methods --- cloudquery/destination/v0/destination.proto | 1 + internal/memdb/memdb.go | 2 +- internal/pb/destination/v0/destination.pb.go | 202 +++++++++--------- .../servers/destination/v0/destinations.go | 9 +- plugins/destination/plugin.go | 10 +- plugins/destination/plugin_testing.go | 18 +- plugins/source/options.go | 13 ++ plugins/source/plugin.go | 40 +++- schema/column.go | 1 + schema/meta.go | 5 + schema/table.go | 51 +++++ schema/table_test.go | 76 +++++++ 12 files changed, 309 insertions(+), 119 deletions(-) diff --git a/cloudquery/destination/v0/destination.proto b/cloudquery/destination/v0/destination.proto index cd803537cf..dc3ae6b132 100644 --- a/cloudquery/destination/v0/destination.proto +++ b/cloudquery/destination/v0/destination.proto @@ -37,6 +37,7 @@ message Migrate { string name = 1; string version = 2; bytes tables = 3; + bytes options = 4; } message Response {} } diff --git a/internal/memdb/memdb.go b/internal/memdb/memdb.go index f6db98134f..b199b8539b 100644 --- a/internal/memdb/memdb.go +++ b/internal/memdb/memdb.go @@ -99,7 +99,7 @@ func (c *client) overwrite(table *schema.Table, data []any) { c.memoryDB[table.Name] = append(c.memoryDB[table.Name], data) } -func (c *client) Migrate(_ context.Context, tables schema.Tables) error { +func (c *client) Migrate(_ context.Context, tables schema.Tables, options destination.MigrateOptions) error { for _, table := range tables { if c.memoryDB[table.Name] == nil { c.memoryDB[table.Name] = make([][]any, 0) diff --git a/internal/pb/destination/v0/destination.pb.go b/internal/pb/destination/v0/destination.pb.go index 5002ca7286..4d24b1ebbb 100644 --- a/internal/pb/destination/v0/destination.pb.go +++ b/internal/pb/destination/v0/destination.pb.go @@ -258,6 +258,7 @@ type Migrate_Request struct { Name string `protobuf:"bytes,1,opt,name=name,proto3" json:"name,omitempty"` Version string `protobuf:"bytes,2,opt,name=version,proto3" json:"version,omitempty"` Tables []byte `protobuf:"bytes,3,opt,name=tables,proto3" json:"tables,omitempty"` + Options []byte `protobuf:"bytes,4,opt,name=options,proto3" json:"options,omitempty"` } func (x *Migrate_Request) Reset() { @@ -313,6 +314,13 @@ func (x *Migrate_Request) GetTables() []byte { return nil } +func (x *Migrate_Request) GetOptions() []byte { + if x != nil { + return x.Options + } + return nil +} + type Migrate_Response struct { state protoimpl.MessageState sizeCache protoimpl.SizeCache @@ -864,104 +872,106 @@ var file_cloudquery_destination_v0_destination_proto_rawDesc = []byte{ 0x2f, 0x62, 0x61, 0x73, 0x65, 0x2f, 0x76, 0x30, 0x2f, 0x62, 0x61, 0x73, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x1a, 0x1f, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2f, 0x74, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x2e, 0x70, - 0x72, 0x6f, 0x74, 0x6f, 0x22, 0x66, 0x0a, 0x07, 0x4d, 0x69, 0x67, 0x72, 0x61, 0x74, 0x65, 0x1a, - 0x4f, 0x0a, 0x07, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x12, 0x0a, 0x04, 0x6e, 0x61, - 0x6d, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, 0x6e, 0x61, 0x6d, 0x65, 0x12, 0x18, - 0x0a, 0x07, 0x76, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, - 0x07, 0x76, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x12, 0x16, 0x0a, 0x06, 0x74, 0x61, 0x62, 0x6c, - 0x65, 0x73, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x06, 0x74, 0x61, 0x62, 0x6c, 0x65, 0x73, - 0x1a, 0x0a, 0x0a, 0x08, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0xb1, 0x01, 0x0a, - 0x05, 0x57, 0x72, 0x69, 0x74, 0x65, 0x1a, 0x77, 0x0a, 0x07, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, - 0x74, 0x12, 0x1a, 0x0a, 0x08, 0x72, 0x65, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x18, 0x01, 0x20, - 0x01, 0x28, 0x0c, 0x52, 0x08, 0x72, 0x65, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x12, 0x16, 0x0a, - 0x06, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x06, 0x73, - 0x6f, 0x75, 0x72, 0x63, 0x65, 0x12, 0x38, 0x0a, 0x09, 0x74, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, - 0x6d, 0x70, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1a, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, - 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x54, 0x69, 0x6d, 0x65, 0x73, - 0x74, 0x61, 0x6d, 0x70, 0x52, 0x09, 0x74, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x1a, - 0x2f, 0x0a, 0x08, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x23, 0x0a, 0x0d, 0x66, - 0x61, 0x69, 0x6c, 0x65, 0x64, 0x5f, 0x77, 0x72, 0x69, 0x74, 0x65, 0x73, 0x18, 0x01, 0x20, 0x01, - 0x28, 0x04, 0x52, 0x0c, 0x66, 0x61, 0x69, 0x6c, 0x65, 0x64, 0x57, 0x72, 0x69, 0x74, 0x65, 0x73, - 0x22, 0xc7, 0x01, 0x0a, 0x06, 0x57, 0x72, 0x69, 0x74, 0x65, 0x32, 0x1a, 0xb0, 0x01, 0x0a, 0x07, - 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x16, 0x0a, 0x06, 0x73, 0x6f, 0x75, 0x72, 0x63, - 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x06, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x12, - 0x38, 0x0a, 0x09, 0x74, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x18, 0x02, 0x20, 0x01, - 0x28, 0x0b, 0x32, 0x1a, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, - 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x54, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x52, 0x09, - 0x74, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x12, 0x16, 0x0a, 0x06, 0x74, 0x61, 0x62, + 0x72, 0x6f, 0x74, 0x6f, 0x22, 0x80, 0x01, 0x0a, 0x07, 0x4d, 0x69, 0x67, 0x72, 0x61, 0x74, 0x65, + 0x1a, 0x69, 0x0a, 0x07, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x12, 0x0a, 0x04, 0x6e, + 0x61, 0x6d, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, 0x6e, 0x61, 0x6d, 0x65, 0x12, + 0x18, 0x0a, 0x07, 0x76, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, + 0x52, 0x07, 0x76, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x12, 0x16, 0x0a, 0x06, 0x74, 0x61, 0x62, 0x6c, 0x65, 0x73, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x06, 0x74, 0x61, 0x62, 0x6c, 0x65, - 0x73, 0x12, 0x1a, 0x0a, 0x08, 0x72, 0x65, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x18, 0x04, 0x20, - 0x01, 0x28, 0x0c, 0x52, 0x08, 0x72, 0x65, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x12, 0x1f, 0x0a, - 0x0b, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x5f, 0x73, 0x70, 0x65, 0x63, 0x18, 0x05, 0x20, 0x01, - 0x28, 0x0c, 0x52, 0x0a, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x53, 0x70, 0x65, 0x63, 0x1a, 0x0a, - 0x0a, 0x08, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x1e, 0x0a, 0x05, 0x43, 0x6c, - 0x6f, 0x73, 0x65, 0x1a, 0x09, 0x0a, 0x07, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x0a, - 0x0a, 0x08, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0xb5, 0x01, 0x0a, 0x0b, 0x44, - 0x65, 0x6c, 0x65, 0x74, 0x65, 0x53, 0x74, 0x61, 0x6c, 0x65, 0x1a, 0x73, 0x0a, 0x07, 0x52, 0x65, - 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x16, 0x0a, 0x06, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x18, - 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x06, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x12, 0x38, 0x0a, - 0x09, 0x74, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, - 0x32, 0x1a, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, - 0x75, 0x66, 0x2e, 0x54, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x52, 0x09, 0x74, 0x69, - 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x12, 0x16, 0x0a, 0x06, 0x74, 0x61, 0x62, 0x6c, 0x65, - 0x73, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x06, 0x74, 0x61, 0x62, 0x6c, 0x65, 0x73, 0x1a, - 0x31, 0x0a, 0x08, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x25, 0x0a, 0x0e, 0x66, - 0x61, 0x69, 0x6c, 0x65, 0x64, 0x5f, 0x64, 0x65, 0x6c, 0x65, 0x74, 0x65, 0x73, 0x18, 0x01, 0x20, - 0x01, 0x28, 0x04, 0x52, 0x0d, 0x66, 0x61, 0x69, 0x6c, 0x65, 0x64, 0x44, 0x65, 0x6c, 0x65, 0x74, - 0x65, 0x73, 0x22, 0x48, 0x0a, 0x15, 0x47, 0x65, 0x74, 0x44, 0x65, 0x73, 0x74, 0x69, 0x6e, 0x61, - 0x74, 0x69, 0x6f, 0x6e, 0x4d, 0x65, 0x74, 0x72, 0x69, 0x63, 0x73, 0x1a, 0x09, 0x0a, 0x07, 0x52, - 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x24, 0x0a, 0x08, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, - 0x73, 0x65, 0x12, 0x18, 0x0a, 0x07, 0x6d, 0x65, 0x74, 0x72, 0x69, 0x63, 0x73, 0x18, 0x01, 0x20, - 0x01, 0x28, 0x0c, 0x52, 0x07, 0x6d, 0x65, 0x74, 0x72, 0x69, 0x63, 0x73, 0x32, 0xb5, 0x05, 0x0a, - 0x0b, 0x44, 0x65, 0x73, 0x74, 0x69, 0x6e, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x12, 0x5b, 0x0a, 0x12, - 0x47, 0x65, 0x74, 0x50, 0x72, 0x6f, 0x74, 0x6f, 0x63, 0x6f, 0x6c, 0x56, 0x65, 0x72, 0x73, 0x69, - 0x6f, 0x6e, 0x12, 0x21, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x47, 0x65, 0x74, 0x50, 0x72, - 0x6f, 0x74, 0x6f, 0x63, 0x6f, 0x6c, 0x56, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x2e, 0x52, 0x65, - 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x22, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x47, 0x65, - 0x74, 0x50, 0x72, 0x6f, 0x74, 0x6f, 0x63, 0x6f, 0x6c, 0x56, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, - 0x2e, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x3a, 0x0a, 0x07, 0x47, 0x65, 0x74, - 0x4e, 0x61, 0x6d, 0x65, 0x12, 0x16, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x47, 0x65, 0x74, - 0x4e, 0x61, 0x6d, 0x65, 0x2e, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x17, 0x2e, 0x70, - 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x47, 0x65, 0x74, 0x4e, 0x61, 0x6d, 0x65, 0x2e, 0x52, 0x65, 0x73, - 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x43, 0x0a, 0x0a, 0x47, 0x65, 0x74, 0x56, 0x65, 0x72, 0x73, - 0x69, 0x6f, 0x6e, 0x12, 0x19, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x47, 0x65, 0x74, 0x56, - 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x2e, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x1a, + 0x73, 0x12, 0x18, 0x0a, 0x07, 0x6f, 0x70, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x18, 0x04, 0x20, 0x01, + 0x28, 0x0c, 0x52, 0x07, 0x6f, 0x70, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x1a, 0x0a, 0x0a, 0x08, 0x52, + 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0xb1, 0x01, 0x0a, 0x05, 0x57, 0x72, 0x69, 0x74, + 0x65, 0x1a, 0x77, 0x0a, 0x07, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x1a, 0x0a, 0x08, + 0x72, 0x65, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x08, + 0x72, 0x65, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x12, 0x16, 0x0a, 0x06, 0x73, 0x6f, 0x75, 0x72, + 0x63, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x06, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, + 0x12, 0x38, 0x0a, 0x09, 0x74, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x18, 0x03, 0x20, + 0x01, 0x28, 0x0b, 0x32, 0x1a, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, + 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x54, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x52, + 0x09, 0x74, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x1a, 0x2f, 0x0a, 0x08, 0x52, 0x65, + 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x23, 0x0a, 0x0d, 0x66, 0x61, 0x69, 0x6c, 0x65, 0x64, + 0x5f, 0x77, 0x72, 0x69, 0x74, 0x65, 0x73, 0x18, 0x01, 0x20, 0x01, 0x28, 0x04, 0x52, 0x0c, 0x66, + 0x61, 0x69, 0x6c, 0x65, 0x64, 0x57, 0x72, 0x69, 0x74, 0x65, 0x73, 0x22, 0xc7, 0x01, 0x0a, 0x06, + 0x57, 0x72, 0x69, 0x74, 0x65, 0x32, 0x1a, 0xb0, 0x01, 0x0a, 0x07, 0x52, 0x65, 0x71, 0x75, 0x65, + 0x73, 0x74, 0x12, 0x16, 0x0a, 0x06, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x18, 0x01, 0x20, 0x01, + 0x28, 0x09, 0x52, 0x06, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x12, 0x38, 0x0a, 0x09, 0x74, 0x69, + 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1a, 0x2e, + 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, + 0x54, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x52, 0x09, 0x74, 0x69, 0x6d, 0x65, 0x73, + 0x74, 0x61, 0x6d, 0x70, 0x12, 0x16, 0x0a, 0x06, 0x74, 0x61, 0x62, 0x6c, 0x65, 0x73, 0x18, 0x03, + 0x20, 0x01, 0x28, 0x0c, 0x52, 0x06, 0x74, 0x61, 0x62, 0x6c, 0x65, 0x73, 0x12, 0x1a, 0x0a, 0x08, + 0x72, 0x65, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x18, 0x04, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x08, + 0x72, 0x65, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x12, 0x1f, 0x0a, 0x0b, 0x73, 0x6f, 0x75, 0x72, + 0x63, 0x65, 0x5f, 0x73, 0x70, 0x65, 0x63, 0x18, 0x05, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x0a, 0x73, + 0x6f, 0x75, 0x72, 0x63, 0x65, 0x53, 0x70, 0x65, 0x63, 0x1a, 0x0a, 0x0a, 0x08, 0x52, 0x65, 0x73, + 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x1e, 0x0a, 0x05, 0x43, 0x6c, 0x6f, 0x73, 0x65, 0x1a, 0x09, + 0x0a, 0x07, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x0a, 0x0a, 0x08, 0x52, 0x65, 0x73, + 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0xb5, 0x01, 0x0a, 0x0b, 0x44, 0x65, 0x6c, 0x65, 0x74, 0x65, + 0x53, 0x74, 0x61, 0x6c, 0x65, 0x1a, 0x73, 0x0a, 0x07, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, + 0x12, 0x16, 0x0a, 0x06, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, + 0x52, 0x06, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x12, 0x38, 0x0a, 0x09, 0x74, 0x69, 0x6d, 0x65, + 0x73, 0x74, 0x61, 0x6d, 0x70, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1a, 0x2e, 0x67, 0x6f, + 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x54, 0x69, + 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x52, 0x09, 0x74, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, + 0x6d, 0x70, 0x12, 0x16, 0x0a, 0x06, 0x74, 0x61, 0x62, 0x6c, 0x65, 0x73, 0x18, 0x03, 0x20, 0x01, + 0x28, 0x0c, 0x52, 0x06, 0x74, 0x61, 0x62, 0x6c, 0x65, 0x73, 0x1a, 0x31, 0x0a, 0x08, 0x52, 0x65, + 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x25, 0x0a, 0x0e, 0x66, 0x61, 0x69, 0x6c, 0x65, 0x64, + 0x5f, 0x64, 0x65, 0x6c, 0x65, 0x74, 0x65, 0x73, 0x18, 0x01, 0x20, 0x01, 0x28, 0x04, 0x52, 0x0d, + 0x66, 0x61, 0x69, 0x6c, 0x65, 0x64, 0x44, 0x65, 0x6c, 0x65, 0x74, 0x65, 0x73, 0x22, 0x48, 0x0a, + 0x15, 0x47, 0x65, 0x74, 0x44, 0x65, 0x73, 0x74, 0x69, 0x6e, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x4d, + 0x65, 0x74, 0x72, 0x69, 0x63, 0x73, 0x1a, 0x09, 0x0a, 0x07, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, + 0x74, 0x1a, 0x24, 0x0a, 0x08, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x18, 0x0a, + 0x07, 0x6d, 0x65, 0x74, 0x72, 0x69, 0x63, 0x73, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x07, + 0x6d, 0x65, 0x74, 0x72, 0x69, 0x63, 0x73, 0x32, 0xb5, 0x05, 0x0a, 0x0b, 0x44, 0x65, 0x73, 0x74, + 0x69, 0x6e, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x12, 0x5b, 0x0a, 0x12, 0x47, 0x65, 0x74, 0x50, 0x72, + 0x6f, 0x74, 0x6f, 0x63, 0x6f, 0x6c, 0x56, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x12, 0x21, 0x2e, + 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x47, 0x65, 0x74, 0x50, 0x72, 0x6f, 0x74, 0x6f, 0x63, 0x6f, + 0x6c, 0x56, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x2e, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, + 0x1a, 0x22, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x47, 0x65, 0x74, 0x50, 0x72, 0x6f, 0x74, + 0x6f, 0x63, 0x6f, 0x6c, 0x56, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x2e, 0x52, 0x65, 0x73, 0x70, + 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x3a, 0x0a, 0x07, 0x47, 0x65, 0x74, 0x4e, 0x61, 0x6d, 0x65, 0x12, + 0x16, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x47, 0x65, 0x74, 0x4e, 0x61, 0x6d, 0x65, 0x2e, + 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x17, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e, + 0x47, 0x65, 0x74, 0x4e, 0x61, 0x6d, 0x65, 0x2e, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, + 0x12, 0x43, 0x0a, 0x0a, 0x47, 0x65, 0x74, 0x56, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x12, 0x19, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x47, 0x65, 0x74, 0x56, 0x65, 0x72, 0x73, 0x69, 0x6f, - 0x6e, 0x2e, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x40, 0x0a, 0x09, 0x43, 0x6f, - 0x6e, 0x66, 0x69, 0x67, 0x75, 0x72, 0x65, 0x12, 0x18, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e, - 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x75, 0x72, 0x65, 0x2e, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, - 0x74, 0x1a, 0x19, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, - 0x75, 0x72, 0x65, 0x2e, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x3a, 0x0a, 0x07, - 0x4d, 0x69, 0x67, 0x72, 0x61, 0x74, 0x65, 0x12, 0x16, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e, - 0x4d, 0x69, 0x67, 0x72, 0x61, 0x74, 0x65, 0x2e, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, - 0x17, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x4d, 0x69, 0x67, 0x72, 0x61, 0x74, 0x65, 0x2e, - 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x36, 0x0a, 0x05, 0x57, 0x72, 0x69, 0x74, - 0x65, 0x12, 0x14, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x57, 0x72, 0x69, 0x74, 0x65, 0x2e, - 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x15, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e, - 0x57, 0x72, 0x69, 0x74, 0x65, 0x2e, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x28, 0x01, - 0x12, 0x39, 0x0a, 0x06, 0x57, 0x72, 0x69, 0x74, 0x65, 0x32, 0x12, 0x15, 0x2e, 0x70, 0x72, 0x6f, - 0x74, 0x6f, 0x2e, 0x57, 0x72, 0x69, 0x74, 0x65, 0x32, 0x2e, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, - 0x74, 0x1a, 0x16, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x57, 0x72, 0x69, 0x74, 0x65, 0x32, - 0x2e, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x28, 0x01, 0x12, 0x34, 0x0a, 0x05, 0x43, - 0x6c, 0x6f, 0x73, 0x65, 0x12, 0x14, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x43, 0x6c, 0x6f, - 0x73, 0x65, 0x2e, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x15, 0x2e, 0x70, 0x72, 0x6f, - 0x74, 0x6f, 0x2e, 0x43, 0x6c, 0x6f, 0x73, 0x65, 0x2e, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, - 0x65, 0x12, 0x46, 0x0a, 0x0b, 0x44, 0x65, 0x6c, 0x65, 0x74, 0x65, 0x53, 0x74, 0x61, 0x6c, 0x65, - 0x12, 0x1a, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x44, 0x65, 0x6c, 0x65, 0x74, 0x65, 0x53, - 0x74, 0x61, 0x6c, 0x65, 0x2e, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x1b, 0x2e, 0x70, - 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x44, 0x65, 0x6c, 0x65, 0x74, 0x65, 0x53, 0x74, 0x61, 0x6c, 0x65, - 0x2e, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x59, 0x0a, 0x0a, 0x47, 0x65, 0x74, - 0x4d, 0x65, 0x74, 0x72, 0x69, 0x63, 0x73, 0x12, 0x24, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e, - 0x47, 0x65, 0x74, 0x44, 0x65, 0x73, 0x74, 0x69, 0x6e, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x4d, 0x65, - 0x74, 0x72, 0x69, 0x63, 0x73, 0x2e, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x25, 0x2e, - 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x47, 0x65, 0x74, 0x44, 0x65, 0x73, 0x74, 0x69, 0x6e, 0x61, - 0x74, 0x69, 0x6f, 0x6e, 0x4d, 0x65, 0x74, 0x72, 0x69, 0x63, 0x73, 0x2e, 0x52, 0x65, 0x73, 0x70, - 0x6f, 0x6e, 0x73, 0x65, 0x42, 0x49, 0x5a, 0x47, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, - 0x6f, 0x6d, 0x2f, 0x63, 0x6c, 0x6f, 0x75, 0x64, 0x71, 0x75, 0x65, 0x72, 0x79, 0x2f, 0x70, 0x6c, - 0x75, 0x67, 0x69, 0x6e, 0x2d, 0x73, 0x64, 0x6b, 0x2f, 0x69, 0x6e, 0x74, 0x65, 0x72, 0x6e, 0x61, - 0x6c, 0x2f, 0x70, 0x62, 0x2f, 0x64, 0x65, 0x73, 0x74, 0x69, 0x6e, 0x61, 0x74, 0x69, 0x6f, 0x6e, - 0x2f, 0x76, 0x30, 0x3b, 0x64, 0x65, 0x73, 0x74, 0x69, 0x6e, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x62, - 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, + 0x6e, 0x2e, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x1a, 0x2e, 0x70, 0x72, 0x6f, 0x74, + 0x6f, 0x2e, 0x47, 0x65, 0x74, 0x56, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x2e, 0x52, 0x65, 0x73, + 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x40, 0x0a, 0x09, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x75, + 0x72, 0x65, 0x12, 0x18, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x43, 0x6f, 0x6e, 0x66, 0x69, + 0x67, 0x75, 0x72, 0x65, 0x2e, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x19, 0x2e, 0x70, + 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x75, 0x72, 0x65, 0x2e, 0x52, + 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x3a, 0x0a, 0x07, 0x4d, 0x69, 0x67, 0x72, 0x61, + 0x74, 0x65, 0x12, 0x16, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x4d, 0x69, 0x67, 0x72, 0x61, + 0x74, 0x65, 0x2e, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x17, 0x2e, 0x70, 0x72, 0x6f, + 0x74, 0x6f, 0x2e, 0x4d, 0x69, 0x67, 0x72, 0x61, 0x74, 0x65, 0x2e, 0x52, 0x65, 0x73, 0x70, 0x6f, + 0x6e, 0x73, 0x65, 0x12, 0x36, 0x0a, 0x05, 0x57, 0x72, 0x69, 0x74, 0x65, 0x12, 0x14, 0x2e, 0x70, + 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x57, 0x72, 0x69, 0x74, 0x65, 0x2e, 0x52, 0x65, 0x71, 0x75, 0x65, + 0x73, 0x74, 0x1a, 0x15, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x57, 0x72, 0x69, 0x74, 0x65, + 0x2e, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x28, 0x01, 0x12, 0x39, 0x0a, 0x06, 0x57, + 0x72, 0x69, 0x74, 0x65, 0x32, 0x12, 0x15, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x57, 0x72, + 0x69, 0x74, 0x65, 0x32, 0x2e, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x16, 0x2e, 0x70, + 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x57, 0x72, 0x69, 0x74, 0x65, 0x32, 0x2e, 0x52, 0x65, 0x73, 0x70, + 0x6f, 0x6e, 0x73, 0x65, 0x28, 0x01, 0x12, 0x34, 0x0a, 0x05, 0x43, 0x6c, 0x6f, 0x73, 0x65, 0x12, + 0x14, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x43, 0x6c, 0x6f, 0x73, 0x65, 0x2e, 0x52, 0x65, + 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x15, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x43, 0x6c, + 0x6f, 0x73, 0x65, 0x2e, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x46, 0x0a, 0x0b, + 0x44, 0x65, 0x6c, 0x65, 0x74, 0x65, 0x53, 0x74, 0x61, 0x6c, 0x65, 0x12, 0x1a, 0x2e, 0x70, 0x72, + 0x6f, 0x74, 0x6f, 0x2e, 0x44, 0x65, 0x6c, 0x65, 0x74, 0x65, 0x53, 0x74, 0x61, 0x6c, 0x65, 0x2e, + 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x1b, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e, + 0x44, 0x65, 0x6c, 0x65, 0x74, 0x65, 0x53, 0x74, 0x61, 0x6c, 0x65, 0x2e, 0x52, 0x65, 0x73, 0x70, + 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x59, 0x0a, 0x0a, 0x47, 0x65, 0x74, 0x4d, 0x65, 0x74, 0x72, 0x69, + 0x63, 0x73, 0x12, 0x24, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x47, 0x65, 0x74, 0x44, 0x65, + 0x73, 0x74, 0x69, 0x6e, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x4d, 0x65, 0x74, 0x72, 0x69, 0x63, 0x73, + 0x2e, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x25, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, + 0x2e, 0x47, 0x65, 0x74, 0x44, 0x65, 0x73, 0x74, 0x69, 0x6e, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x4d, + 0x65, 0x74, 0x72, 0x69, 0x63, 0x73, 0x2e, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x42, + 0x49, 0x5a, 0x47, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x63, 0x6c, + 0x6f, 0x75, 0x64, 0x71, 0x75, 0x65, 0x72, 0x79, 0x2f, 0x70, 0x6c, 0x75, 0x67, 0x69, 0x6e, 0x2d, + 0x73, 0x64, 0x6b, 0x2f, 0x69, 0x6e, 0x74, 0x65, 0x72, 0x6e, 0x61, 0x6c, 0x2f, 0x70, 0x62, 0x2f, + 0x64, 0x65, 0x73, 0x74, 0x69, 0x6e, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x2f, 0x76, 0x30, 0x3b, 0x64, + 0x65, 0x73, 0x74, 0x69, 0x6e, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, + 0x6f, 0x33, } var ( diff --git a/internal/servers/destination/v0/destinations.go b/internal/servers/destination/v0/destinations.go index c6a2c0ff70..e7ba3098b7 100644 --- a/internal/servers/destination/v0/destinations.go +++ b/internal/servers/destination/v0/destinations.go @@ -54,7 +54,14 @@ func (s *Server) Migrate(ctx context.Context, req *pb.Migrate_Request) (*pb.Migr if err := json.Unmarshal(req.Tables, &tables); err != nil { return nil, status.Errorf(codes.InvalidArgument, "failed to unmarshal tables: %v", err) } - return &pb.Migrate_Response{}, s.Plugin.Migrate(ctx, tables) + var options destination.MigrateOptions + if len(req.Options) > 0 { + if err := json.Unmarshal(req.Options, &options); err != nil { + return nil, status.Errorf(codes.InvalidArgument, "failed to unmarshal options: %v", err) + } + } + + return &pb.Migrate_Response{}, s.Plugin.Migrate(ctx, tables, options) } func (*Server) Write(pb.Destination_WriteServer) error { diff --git a/plugins/destination/plugin.go b/plugins/destination/plugin.go index a2ace6c403..090c10b0ed 100644 --- a/plugins/destination/plugin.go +++ b/plugins/destination/plugin.go @@ -50,10 +50,14 @@ func (*UnimplementedUnmanagedWriter) Metrics() Metrics { panic("Metrics not implemented") } +type MigrateOptions struct { + Force bool +} + type Client interface { schema.CQTypeTransformer ReverseTransformValues(table *schema.Table, values []any) (schema.CQTypes, error) - Migrate(ctx context.Context, tables schema.Tables) error + Migrate(ctx context.Context, tables schema.Tables, options MigrateOptions) error Read(ctx context.Context, table *schema.Table, sourceName string, res chan<- []any) error ManagedWriter UnmanagedWriter @@ -184,9 +188,9 @@ func (p *Plugin) Init(ctx context.Context, logger zerolog.Logger, spec specs.Des } // we implement all DestinationClient functions so we can hook into pre-post behavior -func (p *Plugin) Migrate(ctx context.Context, tables schema.Tables) error { +func (p *Plugin) Migrate(ctx context.Context, tables schema.Tables, options MigrateOptions) error { SetDestinationManagedCqColumns(tables) - return p.client.Migrate(ctx, tables) + return p.client.Migrate(ctx, tables, options) } func (p *Plugin) readAll(ctx context.Context, table *schema.Table, sourceName string) ([]schema.CQTypes, error) { diff --git a/plugins/destination/plugin_testing.go b/plugins/destination/plugin_testing.go index 8d49de2e19..20e7985d06 100644 --- a/plugins/destination/plugin_testing.go +++ b/plugins/destination/plugin_testing.go @@ -51,6 +51,7 @@ type PluginTestSuiteTests struct { } func (*PluginTestSuite) destinationPluginTestWriteOverwrite(ctx context.Context, p *Plugin, logger zerolog.Logger, spec specs.Destination) error { + var options MigrateOptions spec.WriteMode = specs.WriteModeOverwrite if err := p.Init(ctx, logger, spec); err != nil { return fmt.Errorf("failed to init plugin: %w", err) @@ -61,7 +62,7 @@ func (*PluginTestSuite) destinationPluginTestWriteOverwrite(ctx context.Context, tables := []*schema.Table{ table, } - if err := p.Migrate(ctx, tables); err != nil { + if err := p.Migrate(ctx, tables, options); err != nil { return fmt.Errorf("failed to migrate tables: %w", err) } @@ -131,6 +132,7 @@ func (*PluginTestSuite) destinationPluginTestWriteOverwrite(ctx context.Context, } func (*PluginTestSuite) destinationPluginTestWriteOverwriteDeleteStale(ctx context.Context, p *Plugin, logger zerolog.Logger, spec specs.Destination) error { + var options MigrateOptions spec.WriteMode = specs.WriteModeOverwriteDeleteStale if err := p.Init(ctx, logger, spec); err != nil { return fmt.Errorf("failed to init plugin: %w", err) @@ -144,7 +146,7 @@ func (*PluginTestSuite) destinationPluginTestWriteOverwriteDeleteStale(ctx conte table, incTable, } - if err := p.Migrate(ctx, tables); err != nil { + if err := p.Migrate(ctx, tables, options); err != nil { return fmt.Errorf("failed to migrate tables: %w", err) } @@ -243,6 +245,7 @@ func (*PluginTestSuite) destinationPluginTestWriteOverwriteDeleteStale(ctx conte } func (s *PluginTestSuite) destinationPluginTestWriteAppend(ctx context.Context, p *Plugin, logger zerolog.Logger, spec specs.Destination) error { + var options MigrateOptions spec.WriteMode = specs.WriteModeAppend if err := p.Init(ctx, logger, spec); err != nil { return fmt.Errorf("failed to init plugin: %w", err) @@ -253,7 +256,7 @@ func (s *PluginTestSuite) destinationPluginTestWriteAppend(ctx context.Context, tables := []*schema.Table{ table, } - if err := p.Migrate(ctx, tables); err != nil { + if err := p.Migrate(ctx, tables, options); err != nil { return fmt.Errorf("failed to migrate tables: %w", err) } @@ -315,13 +318,14 @@ func (*PluginTestSuite) destinationPluginTestMigrate( ) error { spec.WriteMode = mode spec.BatchSize = 1 + var options MigrateOptions if err := p.Init(ctx, logger, spec); err != nil { return fmt.Errorf("failed to init plugin: %w", err) } suffix := strings.ToLower(strings.ReplaceAll(mode.String(), "-", "_")) tableName := "cq_test_migrate_" + suffix table := testdata.TestTable(tableName) - if err := p.Migrate(ctx, []*schema.Table{table}); err != nil { + if err := p.Migrate(ctx, []*schema.Table{table}, options); err != nil { return fmt.Errorf("failed to migrate tables: %w", err) } @@ -339,7 +343,7 @@ func (*PluginTestSuite) destinationPluginTestMigrate( a := table.Columns.Index("uuid") b := table.Columns.Index("float") table.Columns[a], table.Columns[b] = table.Columns[b], table.Columns[a] - if err := p.Migrate(ctx, []*schema.Table{table}); err != nil { + if err := p.Migrate(ctx, []*schema.Table{table}, options); err != nil { return fmt.Errorf("failed to migrate table with changed column ordering: %w", err) } resource2 := createTestResources(table, sourceName, syncTime, 1)[0] @@ -360,7 +364,7 @@ func (*PluginTestSuite) destinationPluginTestMigrate( Name: "new_column", Type: schema.TypeInt, }) - if err := p.Migrate(ctx, []*schema.Table{table}); err != nil { + if err := p.Migrate(ctx, []*schema.Table{table}, options); err != nil { return fmt.Errorf("failed to migrate table with new column: %w", err) } resource3 := createTestResources(table, sourceName, syncTime, 1)[0] @@ -378,7 +382,7 @@ func (*PluginTestSuite) destinationPluginTestMigrate( // check that migration still succeeds when there is an extra column in the destination table, // which should be ignored oldTable := testdata.TestTable(tableName) - if err := p.Migrate(ctx, []*schema.Table{oldTable}); err != nil { + if err := p.Migrate(ctx, []*schema.Table{oldTable}, options); err != nil { return fmt.Errorf("failed to migrate table with extra column in destination: %w", err) } resource4 := createTestResources(oldTable, sourceName, syncTime, 1)[0] diff --git a/plugins/source/options.go b/plugins/source/options.go index f701ce9225..395c2ac5de 100644 --- a/plugins/source/options.go +++ b/plugins/source/options.go @@ -16,3 +16,16 @@ func WithDynamicTableOption(getDynamicTables GetTables) Option { p.getDynamicTables = getDynamicTables } } + +// WithNoInternalColumns won't add internal columns (_cq_id, _cq_parent_cq_id) to the plugin tables +func WithNoInternalColumns() Option { + return func(p *Plugin) { + p.internalColumns = false + } +} + +func WithUnmanaged() Option { + return func(p *Plugin) { + p.unmanaged = true + } +} diff --git a/plugins/source/plugin.go b/plugins/source/plugin.go index 65db79d49b..e55d0af5cb 100644 --- a/plugins/source/plugin.go +++ b/plugins/source/plugin.go @@ -33,6 +33,11 @@ type Plugin struct { newExecutionClient NewExecutionClientFunc // dynamic table function if specified getDynamicTables GetTables + // NoInternalColumns if set to true will not add internal columns to tables such as _cq_id and _cq_parent_id + // useful for sources such as PostgreSQL and other databases + internalColumns bool + // unmanaged if set to true then the plugin will call Sync directly and not use the scheduler + unmanaged bool // Tables is all tables supported by this source plugin tables schema.Tables // status sync metrics @@ -129,6 +134,7 @@ func NewPlugin(name string, version string, tables []*schema.Table, newExecution newExecutionClient: newExecutionClient, metrics: &Metrics{TableClient: make(map[string]map[string]*TableClientMetrics)}, caser: caser.New(), + internalColumns: true, } for _, opt := range options { opt(&p) @@ -137,8 +143,10 @@ func NewPlugin(name string, version string, tables []*schema.Table, newExecution if err := transformTables(p.tables); err != nil { panic(err) } - if err := addInternalColumns(p.tables); err != nil { - panic(err) + if p.internalColumns { + if err := addInternalColumns(p.tables); err != nil { + panic(err) + } } if err := p.validate(); err != nil { panic(err) @@ -244,8 +252,10 @@ func (p *Plugin) Init(ctx context.Context, spec specs.Source) error { if err := transformTables(tables); err != nil { return err } - if err := addInternalColumns(tables); err != nil { - return err + if p.internalColumns { + if err := addInternalColumns(tables); err != nil { + return err + } } if err := p.validate(); err != nil { return err @@ -281,13 +291,21 @@ func (p *Plugin) Sync(ctx context.Context, res chan<- *schema.Resource) error { } startTime := time.Now() - switch p.spec.Scheduler { - case specs.SchedulerDFS: - p.syncDfs(ctx, p.spec, p.client, p.sessionTables, res) - case specs.SchedulerRoundRobin: - p.syncRoundRobin(ctx, p.spec, p.client, p.sessionTables, res) - default: - return fmt.Errorf("unknown scheduler %s. Options are: %v", p.spec.Scheduler, specs.AllSchedulers.String()) + + if p.unmanaged { + unmanagedClient := p.client.(schema.SourceUnmanagedClient) + if err := unmanagedClient.Sync(ctx, res); err != nil { + return fmt.Errorf("failed to sync unmanaged client: %w", err) + } + } else { + switch p.spec.Scheduler { + case specs.SchedulerDFS: + p.syncDfs(ctx, p.spec, p.client, p.sessionTables, res) + case specs.SchedulerRoundRobin: + p.syncRoundRobin(ctx, p.spec, p.client, p.sessionTables, res) + default: + return fmt.Errorf("unknown scheduler %s. Options are: %v", p.spec.Scheduler, specs.AllSchedulers.String()) + } } p.logger.Info().Uint64("resources", p.metrics.TotalResources()).Uint64("errors", p.metrics.TotalErrors()).Uint64("panics", p.metrics.TotalPanics()).TimeDiff("duration", time.Now(), startTime).Msg("sync finished") diff --git a/schema/column.go b/schema/column.go index 607188dab3..b3f6f33334 100644 --- a/schema/column.go +++ b/schema/column.go @@ -16,6 +16,7 @@ type ColumnResolver func(ctx context.Context, meta ClientMeta, resource *Resourc // ColumnCreationOptions allow modification of how column is defined when table is created type ColumnCreationOptions struct { PrimaryKey bool `json:"primary_key,omitempty"` + NotNull bool `json:"not_null,omitempty"` // IncrementalKey is a flag that indicates if the column is used as part of an incremental key. // It is mainly used for documentation purposes, but may also be used as part of ensuring that // migrations are done correctly. diff --git a/schema/meta.go b/schema/meta.go index 50a61697c6..05a37e8a98 100644 --- a/schema/meta.go +++ b/schema/meta.go @@ -10,6 +10,11 @@ type ClientMeta interface { ID() string } +type SourceUnmanagedClient interface { + ClientMeta + Sync(ctx context.Context, res chan<- *Resource) error +} + // These columns are managed and populated by the source plugins var CqIDColumn = Column{ Name: "_cq_id", diff --git a/schema/table.go b/schema/table.go index 4b0f2ba605..add1c73ce7 100644 --- a/schema/table.go +++ b/schema/table.go @@ -65,6 +65,8 @@ type Table struct { // Parent is the parent table in case this table is called via parent table (i.e. relation) Parent *Table `json:"-"` + + PkConstraintName string `json:"pk_constraint_name"` } var ( @@ -246,6 +248,55 @@ func (t *Table) ValidateName() error { return nil } +// GetAddedColumns returns a list of columns that are in this table but not in the other table. +func (t *Table) GetAddedColumns(other *Table) []Column { + var added []Column + for _, c := range t.Columns { + if other.Columns.Get(c.Name) == nil { + added = append(added, c) + } + } + return added +} + +// GetMissingColumns returns a list of columns that are in this table but have different type in the other table. +func (t *Table) GetChangedColumns(other *Table) []Column { + var changed []Column + for _, c := range t.Columns { + otherCol := other.Columns.Get(c.Name) + if otherCol == nil { + continue + } + if c.Type != otherCol.Type { + changed = append(changed, c) + } + if c.CreationOptions.NotNull != otherCol.CreationOptions.NotNull { + changed = append(changed, c) + } + } + return changed +} + +func (t *Table) IsPkEqual(other *Table) bool { + for _, c := range t.Columns { + if c.CreationOptions.PrimaryKey { + otherCol := other.Columns.Get(c.Name) + if otherCol == nil || !otherCol.CreationOptions.PrimaryKey { + return false + } + } + } + for _, c := range other.Columns { + if c.CreationOptions.PrimaryKey { + otherCol := t.Columns.Get(c.Name) + if otherCol == nil || !otherCol.CreationOptions.PrimaryKey { + return false + } + } + } + return true +} + func (t *Table) ValidateDuplicateColumns() error { columns := make(map[string]bool, len(t.Columns)) for _, c := range t.Columns { diff --git a/schema/table_test.go b/schema/table_test.go index 47f8a622bd..efd36a27f8 100644 --- a/schema/table_test.go +++ b/schema/table_test.go @@ -217,3 +217,79 @@ func TestTablesFilterDFS(t *testing.T) { }) } } + +var testTable1 = &Table{ + Name: "test", + Columns: []Column{ + {Name: "bool", Type: TypeBool}, + }, +} + +var testTable2 = &Table{ + Name: "test", + Columns: []Column{ + {Name: "bool", Type: TypeBool}, + {Name: "bool1", Type: TypeBool}, + }, +} + +var testTable3 = &Table{ + Name: "test", + Columns: []Column{ + {Name: "bool", Type: TypeString}, + }, +} + +var testTable4 = &Table{ + Name: "test", + Columns: []Column{ + {Name: "bool", Type: TypeBool, CreationOptions: ColumnCreationOptions{PrimaryKey: true, NotNull: true}}, + }, +} + +func TestGetAddedColumns(t *testing.T) { + columns := testTable1.GetAddedColumns(testTable1) + if columns != nil { + t.Fatalf("got %v want nil", columns) + } + + columns = testTable2.GetAddedColumns(testTable1) + if len(columns) != 1 { + t.Fatalf("got %v want 1", columns) + } + if columns[0].Name != "bool1" { + t.Fatalf("got %v want bool1", columns[0].Name) + } +} + +func TestGetChangedColumns(t *testing.T) { + columns := testTable1.GetChangedColumns(testTable1) + if columns != nil { + t.Fatalf("got %v want nil", columns) + } + + columns = testTable3.GetChangedColumns(testTable2) + if len(columns) != 1 { + t.Fatalf("got %v want 1", columns) + } + if columns[0].Name != "bool" { + t.Fatalf("got %v want bool", columns[0].Name) + } + + columns = testTable4.GetChangedColumns(testTable2) + if len(columns) != 1 { + t.Fatalf("got %v want 1", columns) + } + if columns[0].Name != "bool" { + t.Fatalf("got %v want bool", columns[0].Name) + } +} + +func TestIsPkEqual(t *testing.T) { + if !testTable1.IsPkEqual(testTable1) { + t.Fatalf("got false want true") + } + if testTable4.IsPkEqual(testTable2) { + t.Fatalf("got true want false") + } +} \ No newline at end of file From ee722af73ee7a05c4eafaee625a107120f2d4e47 Mon Sep 17 00:00:00 2001 From: Yevgeny Pats <16490766+yevgenypats@users.noreply.github.com> Date: Wed, 8 Feb 2023 15:55:56 +0200 Subject: [PATCH 2/7] gofmt --- internal/memdb/memdb.go | 2 +- plugins/source/plugin.go | 2 +- schema/column.go | 2 +- schema/table_test.go | 10 +++++----- 4 files changed, 8 insertions(+), 8 deletions(-) diff --git a/internal/memdb/memdb.go b/internal/memdb/memdb.go index b199b8539b..26ee2deca8 100644 --- a/internal/memdb/memdb.go +++ b/internal/memdb/memdb.go @@ -99,7 +99,7 @@ func (c *client) overwrite(table *schema.Table, data []any) { c.memoryDB[table.Name] = append(c.memoryDB[table.Name], data) } -func (c *client) Migrate(_ context.Context, tables schema.Tables, options destination.MigrateOptions) error { +func (c *client) Migrate(_ context.Context, tables schema.Tables, _ destination.MigrateOptions) error { for _, table := range tables { if c.memoryDB[table.Name] == nil { c.memoryDB[table.Name] = make([][]any, 0) diff --git a/plugins/source/plugin.go b/plugins/source/plugin.go index e55d0af5cb..ba374b10d6 100644 --- a/plugins/source/plugin.go +++ b/plugins/source/plugin.go @@ -134,7 +134,7 @@ func NewPlugin(name string, version string, tables []*schema.Table, newExecution newExecutionClient: newExecutionClient, metrics: &Metrics{TableClient: make(map[string]map[string]*TableClientMetrics)}, caser: caser.New(), - internalColumns: true, + internalColumns: true, } for _, opt := range options { opt(&p) diff --git a/schema/column.go b/schema/column.go index b3f6f33334..7a67d9a84b 100644 --- a/schema/column.go +++ b/schema/column.go @@ -16,7 +16,7 @@ type ColumnResolver func(ctx context.Context, meta ClientMeta, resource *Resourc // ColumnCreationOptions allow modification of how column is defined when table is created type ColumnCreationOptions struct { PrimaryKey bool `json:"primary_key,omitempty"` - NotNull bool `json:"not_null,omitempty"` + NotNull bool `json:"not_null,omitempty"` // IncrementalKey is a flag that indicates if the column is used as part of an incremental key. // It is mainly used for documentation purposes, but may also be used as part of ensuring that // migrations are done correctly. diff --git a/schema/table_test.go b/schema/table_test.go index efd36a27f8..01f174a2bd 100644 --- a/schema/table_test.go +++ b/schema/table_test.go @@ -219,14 +219,14 @@ func TestTablesFilterDFS(t *testing.T) { } var testTable1 = &Table{ - Name: "test", + Name: "test", Columns: []Column{ {Name: "bool", Type: TypeBool}, }, } var testTable2 = &Table{ - Name: "test", + Name: "test", Columns: []Column{ {Name: "bool", Type: TypeBool}, {Name: "bool1", Type: TypeBool}, @@ -234,14 +234,14 @@ var testTable2 = &Table{ } var testTable3 = &Table{ - Name: "test", + Name: "test", Columns: []Column{ {Name: "bool", Type: TypeString}, }, } var testTable4 = &Table{ - Name: "test", + Name: "test", Columns: []Column{ {Name: "bool", Type: TypeBool, CreationOptions: ColumnCreationOptions{PrimaryKey: true, NotNull: true}}, }, @@ -292,4 +292,4 @@ func TestIsPkEqual(t *testing.T) { if testTable4.IsPkEqual(testTable2) { t.Fatalf("got true want false") } -} \ No newline at end of file +} From 7f56598a55e38fc08a41cf0fb2558da7ec5df9b7 Mon Sep 17 00:00:00 2001 From: Yevgeny Pats <16490766+yevgenypats@users.noreply.github.com> Date: Wed, 8 Feb 2023 16:20:30 +0200 Subject: [PATCH 3/7] remove unrelated code --- plugins/source/options.go | 13 ------------- plugins/source/plugin.go | 39 +++++++++++---------------------------- 2 files changed, 11 insertions(+), 41 deletions(-) diff --git a/plugins/source/options.go b/plugins/source/options.go index 395c2ac5de..f701ce9225 100644 --- a/plugins/source/options.go +++ b/plugins/source/options.go @@ -16,16 +16,3 @@ func WithDynamicTableOption(getDynamicTables GetTables) Option { p.getDynamicTables = getDynamicTables } } - -// WithNoInternalColumns won't add internal columns (_cq_id, _cq_parent_cq_id) to the plugin tables -func WithNoInternalColumns() Option { - return func(p *Plugin) { - p.internalColumns = false - } -} - -func WithUnmanaged() Option { - return func(p *Plugin) { - p.unmanaged = true - } -} diff --git a/plugins/source/plugin.go b/plugins/source/plugin.go index ba374b10d6..0a29d215c7 100644 --- a/plugins/source/plugin.go +++ b/plugins/source/plugin.go @@ -33,11 +33,6 @@ type Plugin struct { newExecutionClient NewExecutionClientFunc // dynamic table function if specified getDynamicTables GetTables - // NoInternalColumns if set to true will not add internal columns to tables such as _cq_id and _cq_parent_id - // useful for sources such as PostgreSQL and other databases - internalColumns bool - // unmanaged if set to true then the plugin will call Sync directly and not use the scheduler - unmanaged bool // Tables is all tables supported by this source plugin tables schema.Tables // status sync metrics @@ -134,7 +129,6 @@ func NewPlugin(name string, version string, tables []*schema.Table, newExecution newExecutionClient: newExecutionClient, metrics: &Metrics{TableClient: make(map[string]map[string]*TableClientMetrics)}, caser: caser.New(), - internalColumns: true, } for _, opt := range options { opt(&p) @@ -143,10 +137,8 @@ func NewPlugin(name string, version string, tables []*schema.Table, newExecution if err := transformTables(p.tables); err != nil { panic(err) } - if p.internalColumns { - if err := addInternalColumns(p.tables); err != nil { - panic(err) - } + if err := addInternalColumns(p.tables); err != nil { + panic(err) } if err := p.validate(); err != nil { panic(err) @@ -252,10 +244,8 @@ func (p *Plugin) Init(ctx context.Context, spec specs.Source) error { if err := transformTables(tables); err != nil { return err } - if p.internalColumns { - if err := addInternalColumns(tables); err != nil { - return err - } + if err := addInternalColumns(tables); err != nil { + return err } if err := p.validate(); err != nil { return err @@ -292,20 +282,13 @@ func (p *Plugin) Sync(ctx context.Context, res chan<- *schema.Resource) error { startTime := time.Now() - if p.unmanaged { - unmanagedClient := p.client.(schema.SourceUnmanagedClient) - if err := unmanagedClient.Sync(ctx, res); err != nil { - return fmt.Errorf("failed to sync unmanaged client: %w", err) - } - } else { - switch p.spec.Scheduler { - case specs.SchedulerDFS: - p.syncDfs(ctx, p.spec, p.client, p.sessionTables, res) - case specs.SchedulerRoundRobin: - p.syncRoundRobin(ctx, p.spec, p.client, p.sessionTables, res) - default: - return fmt.Errorf("unknown scheduler %s. Options are: %v", p.spec.Scheduler, specs.AllSchedulers.String()) - } + switch p.spec.Scheduler { + case specs.SchedulerDFS: + p.syncDfs(ctx, p.spec, p.client, p.sessionTables, res) + case specs.SchedulerRoundRobin: + p.syncRoundRobin(ctx, p.spec, p.client, p.sessionTables, res) + default: + return fmt.Errorf("unknown scheduler %s. Options are: %v", p.spec.Scheduler, specs.AllSchedulers.String()) } p.logger.Info().Uint64("resources", p.metrics.TotalResources()).Uint64("errors", p.metrics.TotalErrors()).Uint64("panics", p.metrics.TotalPanics()).TimeDiff("duration", time.Now(), startTime).Msg("sync finished") From 11d47ec6407f1db37b2bf3dad842d14007350a00 Mon Sep 17 00:00:00 2001 From: Yevgeny Pats <16490766+yevgenypats@users.noreply.github.com> Date: Wed, 8 Feb 2023 16:21:52 +0200 Subject: [PATCH 4/7] remove more nurelated code --- schema/meta.go | 5 ----- 1 file changed, 5 deletions(-) diff --git a/schema/meta.go b/schema/meta.go index 05a37e8a98..50a61697c6 100644 --- a/schema/meta.go +++ b/schema/meta.go @@ -10,11 +10,6 @@ type ClientMeta interface { ID() string } -type SourceUnmanagedClient interface { - ClientMeta - Sync(ctx context.Context, res chan<- *Resource) error -} - // These columns are managed and populated by the source plugins var CqIDColumn = Column{ Name: "_cq_id", From 60b1f41eb4a16c9aed9f67cbe4c0e575595a6bc2 Mon Sep 17 00:00:00 2001 From: Yevgeny Pats Date: Wed, 8 Feb 2023 17:12:48 +0200 Subject: [PATCH 5/7] Update schema/table.go Co-authored-by: Herman Schaaf --- schema/table.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/schema/table.go b/schema/table.go index add1c73ce7..b554db6814 100644 --- a/schema/table.go +++ b/schema/table.go @@ -259,7 +259,7 @@ func (t *Table) GetAddedColumns(other *Table) []Column { return added } -// GetMissingColumns returns a list of columns that are in this table but have different type in the other table. +// GetChangedColumns returns a list of columns that are in this table but have different type in the other table. func (t *Table) GetChangedColumns(other *Table) []Column { var changed []Column for _, c := range t.Columns { From a5dd080321156f2a54cfdc68e9eb39c6cf3ecf32 Mon Sep 17 00:00:00 2001 From: Yevgeny Pats Date: Wed, 8 Feb 2023 17:12:57 +0200 Subject: [PATCH 6/7] Update schema/table.go Co-authored-by: Herman Schaaf --- schema/table.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/schema/table.go b/schema/table.go index b554db6814..7d608b1841 100644 --- a/schema/table.go +++ b/schema/table.go @@ -277,7 +277,7 @@ func (t *Table) GetChangedColumns(other *Table) []Column { return changed } -func (t *Table) IsPkEqual(other *Table) bool { +func (t *Table) IsPrimaryKeyEqual(other *Table) bool { for _, c := range t.Columns { if c.CreationOptions.PrimaryKey { otherCol := other.Columns.Get(c.Name) From 54af4f458ab927a18a359a619f8db0daaa72d63c Mon Sep 17 00:00:00 2001 From: Yevgeny Pats <16490766+yevgenypats@users.noreply.github.com> Date: Wed, 8 Feb 2023 17:35:40 +0200 Subject: [PATCH 7/7] fmt --- schema/table_test.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/schema/table_test.go b/schema/table_test.go index 01f174a2bd..eb876d2b38 100644 --- a/schema/table_test.go +++ b/schema/table_test.go @@ -286,10 +286,10 @@ func TestGetChangedColumns(t *testing.T) { } func TestIsPkEqual(t *testing.T) { - if !testTable1.IsPkEqual(testTable1) { + if !testTable1.IsPrimaryKeyEqual(testTable1) { t.Fatalf("got false want true") } - if testTable4.IsPkEqual(testTable2) { + if testTable4.IsPrimaryKeyEqual(testTable2) { t.Fatalf("got true want false") } }