From 5733977456f1ad497ad7ec452da054c12f86977f Mon Sep 17 00:00:00 2001 From: Yevgeny Pats <16490766+yevgenypats@users.noreply.github.com> Date: Fri, 14 Oct 2022 15:00:52 +0300 Subject: [PATCH 1/4] feat: Support application level protocol message. This will let us make breaking changes to the protocol while ensuring users get the right message on what todo. e.g update plugin or cli (client). --- clients/destination.go | 39 ++++- clients/source.go | 39 ++++- internal/pb/base.pb.go | 248 ++++++++++++++++++++++++----- internal/pb/base.proto | 7 + internal/pb/destination.pb.go | 140 ++++++++-------- internal/pb/destination.proto | 4 + internal/pb/destination_grpc.pb.go | 42 +++++ internal/pb/source.pb.go | 108 +++++++------ internal/pb/source.proto | 4 + internal/pb/source_grpc.pb.go | 42 +++++ internal/servers/destinations.go | 8 + internal/servers/source.go | 8 + internal/versions/versions.go | 6 + serve/source_test.go | 9 ++ 14 files changed, 535 insertions(+), 169 deletions(-) create mode 100644 internal/versions/versions.go diff --git a/clients/destination.go b/clients/destination.go index 80d94d4905..1101ad0de9 100644 --- a/clients/destination.go +++ b/clients/destination.go @@ -15,6 +15,7 @@ import ( "time" "github.com/cloudquery/plugin-sdk/internal/pb" + "github.com/cloudquery/plugin-sdk/internal/versions" "github.com/cloudquery/plugin-sdk/schema" "github.com/cloudquery/plugin-sdk/specs" "github.com/rs/zerolog" @@ -78,7 +79,9 @@ func NewDestinationClient(ctx context.Context, registry specs.Registry, path str } return c, nil case specs.RegistryLocal: - return c.newManagedClient(ctx, path) + if err := c.newManagedClient(ctx, path); err != nil { + return nil, err + } case specs.RegistryGithub: pathSplit := strings.Split(path, "/") if len(pathSplit) != 2 { @@ -90,26 +93,40 @@ func NewDestinationClient(ctx context.Context, registry specs.Registry, path str if err := DownloadPluginFromGithub(ctx, localPath, org, name, version, PluginTypeDestination); err != nil { return nil, err } - return c.newManagedClient(ctx, localPath) + if err := c.newManagedClient(ctx, localPath); err != nil { + return nil, err + } default: return nil, fmt.Errorf("unsupported registry %s", registry) } + protocolVersion, err := c.GetProtocolVersion(ctx) + if err != nil { + return nil, err + } + + if protocolVersion < versions.DestinationProtocolVersion { + return nil, fmt.Errorf("destination plugin protocol version %d is lower than client version %d. Try updating client", protocolVersion, versions.DestinationProtocolVersion) + } else if protocolVersion > versions.DestinationProtocolVersion { + return nil, fmt.Errorf("destination plugin protocol version %d is higher than client version %d. Try updating destination plugin", protocolVersion, versions.DestinationProtocolVersion) + } + + return c, nil } // newManagedClient starts a new destination plugin process from local file, connects to it via gRPC server // and returns a new DestinationClient -func (c *DestinationClient) newManagedClient(ctx context.Context, path string) (*DestinationClient, error) { +func (c *DestinationClient) newManagedClient(ctx context.Context, path string) error { c.grpcSocketName = generateRandomUnixSocketName() // spawn the plugin first and then connect cmd := exec.CommandContext(ctx, path, "serve", "--network", "unix", "--address", c.grpcSocketName, "--log-level", c.logger.GetLevel().String(), "--log-format", "json") reader, err := cmd.StdoutPipe() if err != nil { - return nil, fmt.Errorf("failed to get stdout pipe: %w", err) + return fmt.Errorf("failed to get stdout pipe: %w", err) } cmd.Stderr = os.Stderr if err := cmd.Start(); err != nil { - return nil, fmt.Errorf("failed to start plugin %s: %w", path, err) + return fmt.Errorf("failed to start plugin %s: %w", path, err) } c.wg.Add(1) @@ -157,10 +174,18 @@ func (c *DestinationClient) newManagedClient(ctx context.Context, path string) ( if err := cmd.Process.Kill(); err != nil { c.logger.Error().Err(err).Msg("failed to kill plugin process") } - return c, err + return err } c.pbClient = pb.NewDestinationClient(c.conn) - return c, nil + return nil +} + +func (c *DestinationClient) GetProtocolVersion(ctx context.Context) (uint64, error) { + res, err := c.pbClient.GetProtocolVersion(ctx, &pb.GetProtocolVersion_Request{}) + if err != nil { + return 0, fmt.Errorf("failed to call GetProtocolVersion: %w", err) + } + return res.Version, nil } func (c *DestinationClient) Name(ctx context.Context) (string, error) { diff --git a/clients/source.go b/clients/source.go index a135684d36..e966e01683 100644 --- a/clients/source.go +++ b/clients/source.go @@ -14,6 +14,7 @@ import ( "sync" "github.com/cloudquery/plugin-sdk/internal/pb" + "github.com/cloudquery/plugin-sdk/internal/versions" "github.com/cloudquery/plugin-sdk/schema" "github.com/cloudquery/plugin-sdk/specs" "github.com/rs/zerolog" @@ -82,7 +83,9 @@ func NewSourceClient(ctx context.Context, registry specs.Registry, path string, } return c, nil case specs.RegistryLocal: - return c.newManagedClient(ctx, path) + if err := c.newManagedClient(ctx, path); err != nil { + return nil, err + } case specs.RegistryGithub: pathSplit := strings.Split(path, "/") if len(pathSplit) != 2 { @@ -94,26 +97,40 @@ func NewSourceClient(ctx context.Context, registry specs.Registry, path string, if err := DownloadPluginFromGithub(ctx, localPath, org, name, version, PluginTypeSource); err != nil { return nil, err } - return c.newManagedClient(ctx, localPath) + if err := c.newManagedClient(ctx, localPath); err != nil { + return nil, err + } default: return nil, fmt.Errorf("unsupported registry %s", registry) } + + protocolVersion, err := c.GetProtocolVersion(ctx) + if err != nil { + return nil, err + } + if protocolVersion < versions.SourceProtocolVersion { + return nil, fmt.Errorf("destination plugin protocol version %d is lower than client version %d. Try updating client", protocolVersion, versions.SourceProtocolVersion) + } else if protocolVersion > versions.SourceProtocolVersion { + return nil, fmt.Errorf("destination plugin protocol version %d is higher than client version %d. Try updating destination plugin", protocolVersion, versions.SourceProtocolVersion) + } + + return c, nil } // newManagedClient starts a new source plugin process from local path, connects to it via gRPC server // and returns a new SourceClient -func (c *SourceClient) newManagedClient(ctx context.Context, path string) (*SourceClient, error) { +func (c *SourceClient) newManagedClient(ctx context.Context, path string) error { c.grpcSocketName = generateRandomUnixSocketName() // spawn the plugin first and then connect cmd := exec.CommandContext(ctx, path, "serve", "--network", "unix", "--address", c.grpcSocketName, "--log-level", c.logger.GetLevel().String(), "--log-format", "json") reader, err := cmd.StdoutPipe() if err != nil { - return nil, fmt.Errorf("failed to get stdout pipe: %w", err) + return fmt.Errorf("failed to get stdout pipe: %w", err) } cmd.Stderr = os.Stderr if err := cmd.Start(); err != nil { - return nil, fmt.Errorf("failed to start plugin %s: %w", path, err) + return fmt.Errorf("failed to start plugin %s: %w", path, err) } c.wg.Add(1) @@ -161,10 +178,18 @@ func (c *SourceClient) newManagedClient(ctx context.Context, path string) (*Sour if err := cmd.Process.Kill(); err != nil { c.logger.Error().Err(err).Msg("failed to kill plugin process") } - return c, err + return err } c.pbClient = pb.NewSourceClient(c.conn) - return c, nil + return nil +} + +func (c *SourceClient) GetProtocolVersion(ctx context.Context) (uint64, error) { + res, err := c.pbClient.GetProtocolVersion(ctx, &pb.GetProtocolVersion_Request{}) + if err != nil { + return 0, fmt.Errorf("failed to call GetProtocolVersion: %w", err) + } + return res.Version, nil } func (c *SourceClient) Name(ctx context.Context) (string, error) { diff --git a/internal/pb/base.pb.go b/internal/pb/base.pb.go index b3a77b7ac1..2856bb43e9 100644 --- a/internal/pb/base.pb.go +++ b/internal/pb/base.pb.go @@ -96,6 +96,44 @@ func (*GetVersion) Descriptor() ([]byte, []int) { return file_internal_pb_base_proto_rawDescGZIP(), []int{1} } +type GetProtocolVersion struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields +} + +func (x *GetProtocolVersion) Reset() { + *x = GetProtocolVersion{} + if protoimpl.UnsafeEnabled { + mi := &file_internal_pb_base_proto_msgTypes[2] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *GetProtocolVersion) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*GetProtocolVersion) ProtoMessage() {} + +func (x *GetProtocolVersion) ProtoReflect() protoreflect.Message { + mi := &file_internal_pb_base_proto_msgTypes[2] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use GetProtocolVersion.ProtoReflect.Descriptor instead. +func (*GetProtocolVersion) Descriptor() ([]byte, []int) { + return file_internal_pb_base_proto_rawDescGZIP(), []int{2} +} + type Configure struct { state protoimpl.MessageState sizeCache protoimpl.SizeCache @@ -105,7 +143,7 @@ type Configure struct { func (x *Configure) Reset() { *x = Configure{} if protoimpl.UnsafeEnabled { - mi := &file_internal_pb_base_proto_msgTypes[2] + mi := &file_internal_pb_base_proto_msgTypes[3] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -118,7 +156,7 @@ func (x *Configure) String() string { func (*Configure) ProtoMessage() {} func (x *Configure) ProtoReflect() protoreflect.Message { - mi := &file_internal_pb_base_proto_msgTypes[2] + mi := &file_internal_pb_base_proto_msgTypes[3] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -131,7 +169,7 @@ func (x *Configure) ProtoReflect() protoreflect.Message { // Deprecated: Use Configure.ProtoReflect.Descriptor instead. func (*Configure) Descriptor() ([]byte, []int) { - return file_internal_pb_base_proto_rawDescGZIP(), []int{2} + return file_internal_pb_base_proto_rawDescGZIP(), []int{3} } type GetName_Request struct { @@ -143,7 +181,7 @@ type GetName_Request struct { func (x *GetName_Request) Reset() { *x = GetName_Request{} if protoimpl.UnsafeEnabled { - mi := &file_internal_pb_base_proto_msgTypes[3] + mi := &file_internal_pb_base_proto_msgTypes[4] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -156,7 +194,7 @@ func (x *GetName_Request) String() string { func (*GetName_Request) ProtoMessage() {} func (x *GetName_Request) ProtoReflect() protoreflect.Message { - mi := &file_internal_pb_base_proto_msgTypes[3] + mi := &file_internal_pb_base_proto_msgTypes[4] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -183,7 +221,7 @@ type GetName_Response struct { func (x *GetName_Response) Reset() { *x = GetName_Response{} if protoimpl.UnsafeEnabled { - mi := &file_internal_pb_base_proto_msgTypes[4] + mi := &file_internal_pb_base_proto_msgTypes[5] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -196,7 +234,7 @@ func (x *GetName_Response) String() string { func (*GetName_Response) ProtoMessage() {} func (x *GetName_Response) ProtoReflect() protoreflect.Message { - mi := &file_internal_pb_base_proto_msgTypes[4] + mi := &file_internal_pb_base_proto_msgTypes[5] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -228,7 +266,7 @@ type GetVersion_Request struct { func (x *GetVersion_Request) Reset() { *x = GetVersion_Request{} if protoimpl.UnsafeEnabled { - mi := &file_internal_pb_base_proto_msgTypes[5] + mi := &file_internal_pb_base_proto_msgTypes[6] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -241,7 +279,7 @@ func (x *GetVersion_Request) String() string { func (*GetVersion_Request) ProtoMessage() {} func (x *GetVersion_Request) ProtoReflect() protoreflect.Message { - mi := &file_internal_pb_base_proto_msgTypes[5] + mi := &file_internal_pb_base_proto_msgTypes[6] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -268,7 +306,7 @@ type GetVersion_Response struct { func (x *GetVersion_Response) Reset() { *x = GetVersion_Response{} if protoimpl.UnsafeEnabled { - mi := &file_internal_pb_base_proto_msgTypes[6] + mi := &file_internal_pb_base_proto_msgTypes[7] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -281,7 +319,7 @@ func (x *GetVersion_Response) String() string { func (*GetVersion_Response) ProtoMessage() {} func (x *GetVersion_Response) ProtoReflect() protoreflect.Message { - mi := &file_internal_pb_base_proto_msgTypes[6] + mi := &file_internal_pb_base_proto_msgTypes[7] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -304,6 +342,91 @@ func (x *GetVersion_Response) GetVersion() string { return "" } +type GetProtocolVersion_Request struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields +} + +func (x *GetProtocolVersion_Request) Reset() { + *x = GetProtocolVersion_Request{} + if protoimpl.UnsafeEnabled { + mi := &file_internal_pb_base_proto_msgTypes[8] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *GetProtocolVersion_Request) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*GetProtocolVersion_Request) ProtoMessage() {} + +func (x *GetProtocolVersion_Request) ProtoReflect() protoreflect.Message { + mi := &file_internal_pb_base_proto_msgTypes[8] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use GetProtocolVersion_Request.ProtoReflect.Descriptor instead. +func (*GetProtocolVersion_Request) Descriptor() ([]byte, []int) { + return file_internal_pb_base_proto_rawDescGZIP(), []int{2, 0} +} + +type GetProtocolVersion_Response struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Version uint64 `protobuf:"varint,1,opt,name=version,proto3" json:"version,omitempty"` +} + +func (x *GetProtocolVersion_Response) Reset() { + *x = GetProtocolVersion_Response{} + if protoimpl.UnsafeEnabled { + mi := &file_internal_pb_base_proto_msgTypes[9] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *GetProtocolVersion_Response) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*GetProtocolVersion_Response) ProtoMessage() {} + +func (x *GetProtocolVersion_Response) ProtoReflect() protoreflect.Message { + mi := &file_internal_pb_base_proto_msgTypes[9] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use GetProtocolVersion_Response.ProtoReflect.Descriptor instead. +func (*GetProtocolVersion_Response) Descriptor() ([]byte, []int) { + return file_internal_pb_base_proto_rawDescGZIP(), []int{2, 1} +} + +func (x *GetProtocolVersion_Response) GetVersion() uint64 { + if x != nil { + return x.Version + } + return 0 +} + type Configure_Request struct { state protoimpl.MessageState sizeCache protoimpl.SizeCache @@ -317,7 +440,7 @@ type Configure_Request struct { func (x *Configure_Request) Reset() { *x = Configure_Request{} if protoimpl.UnsafeEnabled { - mi := &file_internal_pb_base_proto_msgTypes[7] + mi := &file_internal_pb_base_proto_msgTypes[10] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -330,7 +453,7 @@ func (x *Configure_Request) String() string { func (*Configure_Request) ProtoMessage() {} func (x *Configure_Request) ProtoReflect() protoreflect.Message { - mi := &file_internal_pb_base_proto_msgTypes[7] + mi := &file_internal_pb_base_proto_msgTypes[10] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -343,7 +466,7 @@ func (x *Configure_Request) ProtoReflect() protoreflect.Message { // Deprecated: Use Configure_Request.ProtoReflect.Descriptor instead. func (*Configure_Request) Descriptor() ([]byte, []int) { - return file_internal_pb_base_proto_rawDescGZIP(), []int{2, 0} + return file_internal_pb_base_proto_rawDescGZIP(), []int{3, 0} } func (x *Configure_Request) GetConfig() []byte { @@ -364,7 +487,7 @@ type Configure_Response struct { func (x *Configure_Response) Reset() { *x = Configure_Response{} if protoimpl.UnsafeEnabled { - mi := &file_internal_pb_base_proto_msgTypes[8] + mi := &file_internal_pb_base_proto_msgTypes[11] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -377,7 +500,7 @@ func (x *Configure_Response) String() string { func (*Configure_Response) ProtoMessage() {} func (x *Configure_Response) ProtoReflect() protoreflect.Message { - mi := &file_internal_pb_base_proto_msgTypes[8] + mi := &file_internal_pb_base_proto_msgTypes[11] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -390,7 +513,7 @@ func (x *Configure_Response) ProtoReflect() protoreflect.Message { // Deprecated: Use Configure_Response.ProtoReflect.Descriptor instead. func (*Configure_Response) Descriptor() ([]byte, []int) { - return file_internal_pb_base_proto_rawDescGZIP(), []int{2, 1} + return file_internal_pb_base_proto_rawDescGZIP(), []int{3, 1} } func (x *Configure_Response) GetError() string { @@ -412,13 +535,17 @@ var file_internal_pb_base_proto_rawDesc = []byte{ 0x69, 0x6f, 0x6e, 0x1a, 0x09, 0x0a, 0x07, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x24, 0x0a, 0x08, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x18, 0x0a, 0x07, 0x76, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x07, 0x76, 0x65, 0x72, - 0x73, 0x69, 0x6f, 0x6e, 0x22, 0x50, 0x0a, 0x09, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x75, 0x72, - 0x65, 0x1a, 0x21, 0x0a, 0x07, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x16, 0x0a, 0x06, - 0x63, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x06, 0x63, 0x6f, - 0x6e, 0x66, 0x69, 0x67, 0x1a, 0x20, 0x0a, 0x08, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, - 0x12, 0x14, 0x0a, 0x05, 0x65, 0x72, 0x72, 0x6f, 0x72, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, - 0x05, 0x65, 0x72, 0x72, 0x6f, 0x72, 0x42, 0x05, 0x5a, 0x03, 0x2f, 0x70, 0x62, 0x62, 0x06, 0x70, - 0x72, 0x6f, 0x74, 0x6f, 0x33, + 0x73, 0x69, 0x6f, 0x6e, 0x22, 0x45, 0x0a, 0x12, 0x47, 0x65, 0x74, 0x50, 0x72, 0x6f, 0x74, 0x6f, + 0x63, 0x6f, 0x6c, 0x56, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x1a, 0x09, 0x0a, 0x07, 0x52, 0x65, + 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x24, 0x0a, 0x08, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, + 0x65, 0x12, 0x18, 0x0a, 0x07, 0x76, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x18, 0x01, 0x20, 0x01, + 0x28, 0x04, 0x52, 0x07, 0x76, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x22, 0x50, 0x0a, 0x09, 0x43, + 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x75, 0x72, 0x65, 0x1a, 0x21, 0x0a, 0x07, 0x52, 0x65, 0x71, 0x75, + 0x65, 0x73, 0x74, 0x12, 0x16, 0x0a, 0x06, 0x63, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x18, 0x01, 0x20, + 0x01, 0x28, 0x0c, 0x52, 0x06, 0x63, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x1a, 0x20, 0x0a, 0x08, 0x52, + 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x14, 0x0a, 0x05, 0x65, 0x72, 0x72, 0x6f, 0x72, + 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, 0x65, 0x72, 0x72, 0x6f, 0x72, 0x42, 0x05, 0x5a, + 0x03, 0x2f, 0x70, 0x62, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, } var ( @@ -433,17 +560,20 @@ func file_internal_pb_base_proto_rawDescGZIP() []byte { return file_internal_pb_base_proto_rawDescData } -var file_internal_pb_base_proto_msgTypes = make([]protoimpl.MessageInfo, 9) +var file_internal_pb_base_proto_msgTypes = make([]protoimpl.MessageInfo, 12) var file_internal_pb_base_proto_goTypes = []interface{}{ - (*GetName)(nil), // 0: proto.GetName - (*GetVersion)(nil), // 1: proto.GetVersion - (*Configure)(nil), // 2: proto.Configure - (*GetName_Request)(nil), // 3: proto.GetName.Request - (*GetName_Response)(nil), // 4: proto.GetName.Response - (*GetVersion_Request)(nil), // 5: proto.GetVersion.Request - (*GetVersion_Response)(nil), // 6: proto.GetVersion.Response - (*Configure_Request)(nil), // 7: proto.Configure.Request - (*Configure_Response)(nil), // 8: proto.Configure.Response + (*GetName)(nil), // 0: proto.GetName + (*GetVersion)(nil), // 1: proto.GetVersion + (*GetProtocolVersion)(nil), // 2: proto.GetProtocolVersion + (*Configure)(nil), // 3: proto.Configure + (*GetName_Request)(nil), // 4: proto.GetName.Request + (*GetName_Response)(nil), // 5: proto.GetName.Response + (*GetVersion_Request)(nil), // 6: proto.GetVersion.Request + (*GetVersion_Response)(nil), // 7: proto.GetVersion.Response + (*GetProtocolVersion_Request)(nil), // 8: proto.GetProtocolVersion.Request + (*GetProtocolVersion_Response)(nil), // 9: proto.GetProtocolVersion.Response + (*Configure_Request)(nil), // 10: proto.Configure.Request + (*Configure_Response)(nil), // 11: proto.Configure.Response } var file_internal_pb_base_proto_depIdxs = []int32{ 0, // [0:0] is the sub-list for method output_type @@ -484,7 +614,7 @@ func file_internal_pb_base_proto_init() { } } file_internal_pb_base_proto_msgTypes[2].Exporter = func(v interface{}, i int) interface{} { - switch v := v.(*Configure); i { + switch v := v.(*GetProtocolVersion); i { case 0: return &v.state case 1: @@ -496,7 +626,7 @@ func file_internal_pb_base_proto_init() { } } file_internal_pb_base_proto_msgTypes[3].Exporter = func(v interface{}, i int) interface{} { - switch v := v.(*GetName_Request); i { + switch v := v.(*Configure); i { case 0: return &v.state case 1: @@ -508,7 +638,7 @@ func file_internal_pb_base_proto_init() { } } file_internal_pb_base_proto_msgTypes[4].Exporter = func(v interface{}, i int) interface{} { - switch v := v.(*GetName_Response); i { + switch v := v.(*GetName_Request); i { case 0: return &v.state case 1: @@ -520,7 +650,7 @@ func file_internal_pb_base_proto_init() { } } file_internal_pb_base_proto_msgTypes[5].Exporter = func(v interface{}, i int) interface{} { - switch v := v.(*GetVersion_Request); i { + switch v := v.(*GetName_Response); i { case 0: return &v.state case 1: @@ -532,7 +662,7 @@ func file_internal_pb_base_proto_init() { } } file_internal_pb_base_proto_msgTypes[6].Exporter = func(v interface{}, i int) interface{} { - switch v := v.(*GetVersion_Response); i { + switch v := v.(*GetVersion_Request); i { case 0: return &v.state case 1: @@ -544,7 +674,7 @@ func file_internal_pb_base_proto_init() { } } file_internal_pb_base_proto_msgTypes[7].Exporter = func(v interface{}, i int) interface{} { - switch v := v.(*Configure_Request); i { + switch v := v.(*GetVersion_Response); i { case 0: return &v.state case 1: @@ -556,6 +686,42 @@ func file_internal_pb_base_proto_init() { } } file_internal_pb_base_proto_msgTypes[8].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*GetProtocolVersion_Request); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_internal_pb_base_proto_msgTypes[9].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*GetProtocolVersion_Response); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_internal_pb_base_proto_msgTypes[10].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*Configure_Request); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_internal_pb_base_proto_msgTypes[11].Exporter = func(v interface{}, i int) interface{} { switch v := v.(*Configure_Response); i { case 0: return &v.state @@ -574,7 +740,7 @@ func file_internal_pb_base_proto_init() { GoPackagePath: reflect.TypeOf(x{}).PkgPath(), RawDescriptor: file_internal_pb_base_proto_rawDesc, NumEnums: 0, - NumMessages: 9, + NumMessages: 12, NumExtensions: 0, NumServices: 0, }, diff --git a/internal/pb/base.proto b/internal/pb/base.proto index b6e0994002..8c4690e8e5 100644 --- a/internal/pb/base.proto +++ b/internal/pb/base.proto @@ -16,6 +16,13 @@ message GetVersion { } } +message GetProtocolVersion { + message Request {} + message Response { + uint64 version = 1; + } +} + message Configure { message Request { // Holds information such as credentials, regions, accounts, etc' diff --git a/internal/pb/destination.pb.go b/internal/pb/destination.pb.go index d1f5e97270..2177899f2b 100644 --- a/internal/pb/destination.pb.go +++ b/internal/pb/destination.pb.go @@ -611,36 +611,42 @@ var file_internal_pb_destination_proto_rawDesc = []byte{ 0x1a, 0x31, 0x0a, 0x08, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x25, 0x0a, 0x0e, 0x66, 0x61, 0x69, 0x6c, 0x65, 0x64, 0x5f, 0x64, 0x65, 0x6c, 0x65, 0x74, 0x65, 0x73, 0x18, 0x01, 0x20, 0x01, 0x28, 0x04, 0x52, 0x0d, 0x66, 0x61, 0x69, 0x6c, 0x65, 0x64, 0x44, 0x65, 0x6c, 0x65, - 0x74, 0x65, 0x73, 0x32, 0xc2, 0x03, 0x0a, 0x0b, 0x44, 0x65, 0x73, 0x74, 0x69, 0x6e, 0x61, 0x74, - 0x69, 0x6f, 0x6e, 0x12, 0x3a, 0x0a, 0x07, 0x47, 0x65, 0x74, 0x4e, 0x61, 0x6d, 0x65, 0x12, 0x16, - 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x47, 0x65, 0x74, 0x4e, 0x61, 0x6d, 0x65, 0x2e, 0x52, - 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x17, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x47, - 0x65, 0x74, 0x4e, 0x61, 0x6d, 0x65, 0x2e, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, - 0x43, 0x0a, 0x0a, 0x47, 0x65, 0x74, 0x56, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x12, 0x19, 0x2e, - 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x47, 0x65, 0x74, 0x56, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, - 0x2e, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x1a, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, - 0x2e, 0x47, 0x65, 0x74, 0x56, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x2e, 0x52, 0x65, 0x73, 0x70, - 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x40, 0x0a, 0x09, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x75, 0x72, - 0x65, 0x12, 0x18, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, - 0x75, 0x72, 0x65, 0x2e, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x19, 0x2e, 0x70, 0x72, - 0x6f, 0x74, 0x6f, 0x2e, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x75, 0x72, 0x65, 0x2e, 0x52, 0x65, - 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x3a, 0x0a, 0x07, 0x4d, 0x69, 0x67, 0x72, 0x61, 0x74, - 0x65, 0x12, 0x16, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x4d, 0x69, 0x67, 0x72, 0x61, 0x74, - 0x65, 0x2e, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x17, 0x2e, 0x70, 0x72, 0x6f, 0x74, - 0x6f, 0x2e, 0x4d, 0x69, 0x67, 0x72, 0x61, 0x74, 0x65, 0x2e, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, - 0x73, 0x65, 0x12, 0x36, 0x0a, 0x05, 0x57, 0x72, 0x69, 0x74, 0x65, 0x12, 0x14, 0x2e, 0x70, 0x72, - 0x6f, 0x74, 0x6f, 0x2e, 0x57, 0x72, 0x69, 0x74, 0x65, 0x2e, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, - 0x74, 0x1a, 0x15, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x57, 0x72, 0x69, 0x74, 0x65, 0x2e, - 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x28, 0x01, 0x12, 0x34, 0x0a, 0x05, 0x43, 0x6c, - 0x6f, 0x73, 0x65, 0x12, 0x14, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x43, 0x6c, 0x6f, 0x73, - 0x65, 0x2e, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x15, 0x2e, 0x70, 0x72, 0x6f, 0x74, - 0x6f, 0x2e, 0x43, 0x6c, 0x6f, 0x73, 0x65, 0x2e, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, - 0x12, 0x46, 0x0a, 0x0b, 0x44, 0x65, 0x6c, 0x65, 0x74, 0x65, 0x53, 0x74, 0x61, 0x6c, 0x65, 0x12, - 0x1a, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x44, 0x65, 0x6c, 0x65, 0x74, 0x65, 0x53, 0x74, - 0x61, 0x6c, 0x65, 0x2e, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x1b, 0x2e, 0x70, 0x72, - 0x6f, 0x74, 0x6f, 0x2e, 0x44, 0x65, 0x6c, 0x65, 0x74, 0x65, 0x53, 0x74, 0x61, 0x6c, 0x65, 0x2e, - 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x42, 0x05, 0x5a, 0x03, 0x2f, 0x70, 0x62, 0x62, - 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, + 0x74, 0x65, 0x73, 0x32, 0x9f, 0x04, 0x0a, 0x0b, 0x44, 0x65, 0x73, 0x74, 0x69, 0x6e, 0x61, 0x74, + 0x69, 0x6f, 0x6e, 0x12, 0x5b, 0x0a, 0x12, 0x47, 0x65, 0x74, 0x50, 0x72, 0x6f, 0x74, 0x6f, 0x63, + 0x6f, 0x6c, 0x56, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x12, 0x21, 0x2e, 0x70, 0x72, 0x6f, 0x74, + 0x6f, 0x2e, 0x47, 0x65, 0x74, 0x50, 0x72, 0x6f, 0x74, 0x6f, 0x63, 0x6f, 0x6c, 0x56, 0x65, 0x72, + 0x73, 0x69, 0x6f, 0x6e, 0x2e, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x22, 0x2e, 0x70, + 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x47, 0x65, 0x74, 0x50, 0x72, 0x6f, 0x74, 0x6f, 0x63, 0x6f, 0x6c, + 0x56, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x2e, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, + 0x12, 0x3a, 0x0a, 0x07, 0x47, 0x65, 0x74, 0x4e, 0x61, 0x6d, 0x65, 0x12, 0x16, 0x2e, 0x70, 0x72, + 0x6f, 0x74, 0x6f, 0x2e, 0x47, 0x65, 0x74, 0x4e, 0x61, 0x6d, 0x65, 0x2e, 0x52, 0x65, 0x71, 0x75, + 0x65, 0x73, 0x74, 0x1a, 0x17, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x47, 0x65, 0x74, 0x4e, + 0x61, 0x6d, 0x65, 0x2e, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x43, 0x0a, 0x0a, + 0x47, 0x65, 0x74, 0x56, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x12, 0x19, 0x2e, 0x70, 0x72, 0x6f, + 0x74, 0x6f, 0x2e, 0x47, 0x65, 0x74, 0x56, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x2e, 0x52, 0x65, + 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x1a, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x47, 0x65, + 0x74, 0x56, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x2e, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, + 0x65, 0x12, 0x40, 0x0a, 0x09, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x75, 0x72, 0x65, 0x12, 0x18, + 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x75, 0x72, 0x65, + 0x2e, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x19, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, + 0x2e, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x75, 0x72, 0x65, 0x2e, 0x52, 0x65, 0x73, 0x70, 0x6f, + 0x6e, 0x73, 0x65, 0x12, 0x3a, 0x0a, 0x07, 0x4d, 0x69, 0x67, 0x72, 0x61, 0x74, 0x65, 0x12, 0x16, + 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x4d, 0x69, 0x67, 0x72, 0x61, 0x74, 0x65, 0x2e, 0x52, + 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x17, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x4d, + 0x69, 0x67, 0x72, 0x61, 0x74, 0x65, 0x2e, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, + 0x36, 0x0a, 0x05, 0x57, 0x72, 0x69, 0x74, 0x65, 0x12, 0x14, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, + 0x2e, 0x57, 0x72, 0x69, 0x74, 0x65, 0x2e, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x15, + 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x57, 0x72, 0x69, 0x74, 0x65, 0x2e, 0x52, 0x65, 0x73, + 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x28, 0x01, 0x12, 0x34, 0x0a, 0x05, 0x43, 0x6c, 0x6f, 0x73, 0x65, + 0x12, 0x14, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x43, 0x6c, 0x6f, 0x73, 0x65, 0x2e, 0x52, + 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x15, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x43, + 0x6c, 0x6f, 0x73, 0x65, 0x2e, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x46, 0x0a, + 0x0b, 0x44, 0x65, 0x6c, 0x65, 0x74, 0x65, 0x53, 0x74, 0x61, 0x6c, 0x65, 0x12, 0x1a, 0x2e, 0x70, + 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x44, 0x65, 0x6c, 0x65, 0x74, 0x65, 0x53, 0x74, 0x61, 0x6c, 0x65, + 0x2e, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x1b, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, + 0x2e, 0x44, 0x65, 0x6c, 0x65, 0x74, 0x65, 0x53, 0x74, 0x61, 0x6c, 0x65, 0x2e, 0x52, 0x65, 0x73, + 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x42, 0x05, 0x5a, 0x03, 0x2f, 0x70, 0x62, 0x62, 0x06, 0x70, 0x72, + 0x6f, 0x74, 0x6f, 0x33, } var ( @@ -657,45 +663,49 @@ func file_internal_pb_destination_proto_rawDescGZIP() []byte { var file_internal_pb_destination_proto_msgTypes = make([]protoimpl.MessageInfo, 12) var file_internal_pb_destination_proto_goTypes = []interface{}{ - (*Migrate)(nil), // 0: proto.Migrate - (*Write)(nil), // 1: proto.Write - (*Close)(nil), // 2: proto.Close - (*DeleteStale)(nil), // 3: proto.DeleteStale - (*Migrate_Request)(nil), // 4: proto.Migrate.Request - (*Migrate_Response)(nil), // 5: proto.Migrate.Response - (*Write_Request)(nil), // 6: proto.Write.Request - (*Write_Response)(nil), // 7: proto.Write.Response - (*Close_Request)(nil), // 8: proto.Close.Request - (*Close_Response)(nil), // 9: proto.Close.Response - (*DeleteStale_Request)(nil), // 10: proto.DeleteStale.Request - (*DeleteStale_Response)(nil), // 11: proto.DeleteStale.Response - (*timestamppb.Timestamp)(nil), // 12: google.protobuf.Timestamp - (*GetName_Request)(nil), // 13: proto.GetName.Request - (*GetVersion_Request)(nil), // 14: proto.GetVersion.Request - (*Configure_Request)(nil), // 15: proto.Configure.Request - (*GetName_Response)(nil), // 16: proto.GetName.Response - (*GetVersion_Response)(nil), // 17: proto.GetVersion.Response - (*Configure_Response)(nil), // 18: proto.Configure.Response + (*Migrate)(nil), // 0: proto.Migrate + (*Write)(nil), // 1: proto.Write + (*Close)(nil), // 2: proto.Close + (*DeleteStale)(nil), // 3: proto.DeleteStale + (*Migrate_Request)(nil), // 4: proto.Migrate.Request + (*Migrate_Response)(nil), // 5: proto.Migrate.Response + (*Write_Request)(nil), // 6: proto.Write.Request + (*Write_Response)(nil), // 7: proto.Write.Response + (*Close_Request)(nil), // 8: proto.Close.Request + (*Close_Response)(nil), // 9: proto.Close.Response + (*DeleteStale_Request)(nil), // 10: proto.DeleteStale.Request + (*DeleteStale_Response)(nil), // 11: proto.DeleteStale.Response + (*timestamppb.Timestamp)(nil), // 12: google.protobuf.Timestamp + (*GetProtocolVersion_Request)(nil), // 13: proto.GetProtocolVersion.Request + (*GetName_Request)(nil), // 14: proto.GetName.Request + (*GetVersion_Request)(nil), // 15: proto.GetVersion.Request + (*Configure_Request)(nil), // 16: proto.Configure.Request + (*GetProtocolVersion_Response)(nil), // 17: proto.GetProtocolVersion.Response + (*GetName_Response)(nil), // 18: proto.GetName.Response + (*GetVersion_Response)(nil), // 19: proto.GetVersion.Response + (*Configure_Response)(nil), // 20: proto.Configure.Response } var file_internal_pb_destination_proto_depIdxs = []int32{ 12, // 0: proto.Write.Request.timestamp:type_name -> google.protobuf.Timestamp 12, // 1: proto.DeleteStale.Request.timestamp:type_name -> google.protobuf.Timestamp - 13, // 2: proto.Destination.GetName:input_type -> proto.GetName.Request - 14, // 3: proto.Destination.GetVersion:input_type -> proto.GetVersion.Request - 15, // 4: proto.Destination.Configure:input_type -> proto.Configure.Request - 4, // 5: proto.Destination.Migrate:input_type -> proto.Migrate.Request - 6, // 6: proto.Destination.Write:input_type -> proto.Write.Request - 8, // 7: proto.Destination.Close:input_type -> proto.Close.Request - 10, // 8: proto.Destination.DeleteStale:input_type -> proto.DeleteStale.Request - 16, // 9: proto.Destination.GetName:output_type -> proto.GetName.Response - 17, // 10: proto.Destination.GetVersion:output_type -> proto.GetVersion.Response - 18, // 11: proto.Destination.Configure:output_type -> proto.Configure.Response - 5, // 12: proto.Destination.Migrate:output_type -> proto.Migrate.Response - 7, // 13: proto.Destination.Write:output_type -> proto.Write.Response - 9, // 14: proto.Destination.Close:output_type -> proto.Close.Response - 11, // 15: proto.Destination.DeleteStale:output_type -> proto.DeleteStale.Response - 9, // [9:16] is the sub-list for method output_type - 2, // [2:9] is the sub-list for method input_type + 13, // 2: proto.Destination.GetProtocolVersion:input_type -> proto.GetProtocolVersion.Request + 14, // 3: proto.Destination.GetName:input_type -> proto.GetName.Request + 15, // 4: proto.Destination.GetVersion:input_type -> proto.GetVersion.Request + 16, // 5: proto.Destination.Configure:input_type -> proto.Configure.Request + 4, // 6: proto.Destination.Migrate:input_type -> proto.Migrate.Request + 6, // 7: proto.Destination.Write:input_type -> proto.Write.Request + 8, // 8: proto.Destination.Close:input_type -> proto.Close.Request + 10, // 9: proto.Destination.DeleteStale:input_type -> proto.DeleteStale.Request + 17, // 10: proto.Destination.GetProtocolVersion:output_type -> proto.GetProtocolVersion.Response + 18, // 11: proto.Destination.GetName:output_type -> proto.GetName.Response + 19, // 12: proto.Destination.GetVersion:output_type -> proto.GetVersion.Response + 20, // 13: proto.Destination.Configure:output_type -> proto.Configure.Response + 5, // 14: proto.Destination.Migrate:output_type -> proto.Migrate.Response + 7, // 15: proto.Destination.Write:output_type -> proto.Write.Response + 9, // 16: proto.Destination.Close:output_type -> proto.Close.Response + 11, // 17: proto.Destination.DeleteStale:output_type -> proto.DeleteStale.Response + 10, // [10:18] is the sub-list for method output_type + 2, // [2:10] is the sub-list for method input_type 2, // [2:2] is the sub-list for extension type_name 2, // [2:2] is the sub-list for extension extendee 0, // [0:2] is the sub-list for field type_name diff --git a/internal/pb/destination.proto b/internal/pb/destination.proto index f03b8f7913..da5a9ec9a1 100644 --- a/internal/pb/destination.proto +++ b/internal/pb/destination.proto @@ -5,6 +5,10 @@ import "internal/pb/base.proto"; import "google/protobuf/timestamp.proto"; service Destination { + // Get the current protocol version of the plugin. This helps + // get the right message about upgrade/downgrade of cli and/or plugin. + // Also, on the cli side it can try to upgrade/downgrade the protocol if cli supports it. + rpc GetProtocolVersion(GetProtocolVersion.Request) returns (GetProtocolVersion.Response); // Get the name of the plugin rpc GetName(GetName.Request) returns (GetName.Response); // Get the current version of the plugin diff --git a/internal/pb/destination_grpc.pb.go b/internal/pb/destination_grpc.pb.go index b2e68dd670..d427a036a4 100644 --- a/internal/pb/destination_grpc.pb.go +++ b/internal/pb/destination_grpc.pb.go @@ -22,6 +22,10 @@ const _ = grpc.SupportPackageIsVersion7 // // For semantics around ctx use and closing/ending streaming RPCs, please refer to https://pkg.go.dev/google.golang.org/grpc/?tab=doc#ClientConn.NewStream. type DestinationClient interface { + // Get the current protocol version of the plugin. This helps + // get the right message about upgrade/downgrade of cli and/or plugin. + // Also, on the cli side it can try to upgrade/downgrade the protocol if cli supports it. + GetProtocolVersion(ctx context.Context, in *GetProtocolVersion_Request, opts ...grpc.CallOption) (*GetProtocolVersion_Response, error) // Get the name of the plugin GetName(ctx context.Context, in *GetName_Request, opts ...grpc.CallOption) (*GetName_Response, error) // Get the current version of the plugin @@ -47,6 +51,15 @@ func NewDestinationClient(cc grpc.ClientConnInterface) DestinationClient { return &destinationClient{cc} } +func (c *destinationClient) GetProtocolVersion(ctx context.Context, in *GetProtocolVersion_Request, opts ...grpc.CallOption) (*GetProtocolVersion_Response, error) { + out := new(GetProtocolVersion_Response) + err := c.cc.Invoke(ctx, "/proto.Destination/GetProtocolVersion", in, out, opts...) + if err != nil { + return nil, err + } + return out, nil +} + func (c *destinationClient) GetName(ctx context.Context, in *GetName_Request, opts ...grpc.CallOption) (*GetName_Response, error) { out := new(GetName_Response) err := c.cc.Invoke(ctx, "/proto.Destination/GetName", in, out, opts...) @@ -139,6 +152,10 @@ func (c *destinationClient) DeleteStale(ctx context.Context, in *DeleteStale_Req // All implementations must embed UnimplementedDestinationServer // for forward compatibility type DestinationServer interface { + // Get the current protocol version of the plugin. This helps + // get the right message about upgrade/downgrade of cli and/or plugin. + // Also, on the cli side it can try to upgrade/downgrade the protocol if cli supports it. + GetProtocolVersion(context.Context, *GetProtocolVersion_Request) (*GetProtocolVersion_Response, error) // Get the name of the plugin GetName(context.Context, *GetName_Request) (*GetName_Response, error) // Get the current version of the plugin @@ -161,6 +178,9 @@ type DestinationServer interface { type UnimplementedDestinationServer struct { } +func (UnimplementedDestinationServer) GetProtocolVersion(context.Context, *GetProtocolVersion_Request) (*GetProtocolVersion_Response, error) { + return nil, status.Errorf(codes.Unimplemented, "method GetProtocolVersion not implemented") +} func (UnimplementedDestinationServer) GetName(context.Context, *GetName_Request) (*GetName_Response, error) { return nil, status.Errorf(codes.Unimplemented, "method GetName not implemented") } @@ -195,6 +215,24 @@ func RegisterDestinationServer(s grpc.ServiceRegistrar, srv DestinationServer) { s.RegisterService(&Destination_ServiceDesc, srv) } +func _Destination_GetProtocolVersion_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(GetProtocolVersion_Request) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(DestinationServer).GetProtocolVersion(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: "/proto.Destination/GetProtocolVersion", + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(DestinationServer).GetProtocolVersion(ctx, req.(*GetProtocolVersion_Request)) + } + return interceptor(ctx, in, info, handler) +} + func _Destination_GetName_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { in := new(GetName_Request) if err := dec(in); err != nil { @@ -336,6 +374,10 @@ var Destination_ServiceDesc = grpc.ServiceDesc{ ServiceName: "proto.Destination", HandlerType: (*DestinationServer)(nil), Methods: []grpc.MethodDesc{ + { + MethodName: "GetProtocolVersion", + Handler: _Destination_GetProtocolVersion_Handler, + }, { MethodName: "GetName", Handler: _Destination_GetName_Handler, diff --git a/internal/pb/source.pb.go b/internal/pb/source.pb.go index 7673fab13c..36caaf169a 100644 --- a/internal/pb/source.pb.go +++ b/internal/pb/source.pb.go @@ -455,29 +455,35 @@ var file_internal_pb_source_proto_rawDesc = []byte{ 0x18, 0x0a, 0x07, 0x76, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x07, 0x76, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x12, 0x16, 0x0a, 0x06, 0x74, 0x61, 0x62, 0x6c, 0x65, 0x73, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x06, 0x74, 0x61, 0x62, 0x6c, 0x65, - 0x73, 0x32, 0xd1, 0x02, 0x0a, 0x06, 0x53, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x12, 0x3a, 0x0a, 0x07, - 0x47, 0x65, 0x74, 0x4e, 0x61, 0x6d, 0x65, 0x12, 0x16, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e, - 0x47, 0x65, 0x74, 0x4e, 0x61, 0x6d, 0x65, 0x2e, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, - 0x17, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x47, 0x65, 0x74, 0x4e, 0x61, 0x6d, 0x65, 0x2e, - 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x43, 0x0a, 0x0a, 0x47, 0x65, 0x74, 0x56, - 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x12, 0x19, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x47, - 0x65, 0x74, 0x56, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x2e, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, - 0x74, 0x1a, 0x1a, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x47, 0x65, 0x74, 0x56, 0x65, 0x72, - 0x73, 0x69, 0x6f, 0x6e, 0x2e, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x40, 0x0a, - 0x09, 0x47, 0x65, 0x74, 0x54, 0x61, 0x62, 0x6c, 0x65, 0x73, 0x12, 0x18, 0x2e, 0x70, 0x72, 0x6f, - 0x74, 0x6f, 0x2e, 0x47, 0x65, 0x74, 0x54, 0x61, 0x62, 0x6c, 0x65, 0x73, 0x2e, 0x52, 0x65, 0x71, - 0x75, 0x65, 0x73, 0x74, 0x1a, 0x19, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x47, 0x65, 0x74, - 0x54, 0x61, 0x62, 0x6c, 0x65, 0x73, 0x2e, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, - 0x4f, 0x0a, 0x0e, 0x47, 0x65, 0x74, 0x53, 0x79, 0x6e, 0x63, 0x53, 0x75, 0x6d, 0x6d, 0x61, 0x72, - 0x79, 0x12, 0x1d, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x47, 0x65, 0x74, 0x53, 0x79, 0x6e, - 0x63, 0x53, 0x75, 0x6d, 0x6d, 0x61, 0x72, 0x79, 0x2e, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, - 0x1a, 0x1e, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x47, 0x65, 0x74, 0x53, 0x79, 0x6e, 0x63, - 0x53, 0x75, 0x6d, 0x6d, 0x61, 0x72, 0x79, 0x2e, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, - 0x12, 0x33, 0x0a, 0x04, 0x53, 0x79, 0x6e, 0x63, 0x12, 0x13, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, - 0x2e, 0x53, 0x79, 0x6e, 0x63, 0x2e, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x14, 0x2e, - 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x53, 0x79, 0x6e, 0x63, 0x2e, 0x52, 0x65, 0x73, 0x70, 0x6f, - 0x6e, 0x73, 0x65, 0x30, 0x01, 0x42, 0x05, 0x5a, 0x03, 0x2f, 0x70, 0x62, 0x62, 0x06, 0x70, 0x72, - 0x6f, 0x74, 0x6f, 0x33, + 0x73, 0x32, 0xae, 0x03, 0x0a, 0x06, 0x53, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x12, 0x5b, 0x0a, 0x12, + 0x47, 0x65, 0x74, 0x50, 0x72, 0x6f, 0x74, 0x6f, 0x63, 0x6f, 0x6c, 0x56, 0x65, 0x72, 0x73, 0x69, + 0x6f, 0x6e, 0x12, 0x21, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x47, 0x65, 0x74, 0x50, 0x72, + 0x6f, 0x74, 0x6f, 0x63, 0x6f, 0x6c, 0x56, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x2e, 0x52, 0x65, + 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x22, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x47, 0x65, + 0x74, 0x50, 0x72, 0x6f, 0x74, 0x6f, 0x63, 0x6f, 0x6c, 0x56, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, + 0x2e, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x3a, 0x0a, 0x07, 0x47, 0x65, 0x74, + 0x4e, 0x61, 0x6d, 0x65, 0x12, 0x16, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x47, 0x65, 0x74, + 0x4e, 0x61, 0x6d, 0x65, 0x2e, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x17, 0x2e, 0x70, + 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x47, 0x65, 0x74, 0x4e, 0x61, 0x6d, 0x65, 0x2e, 0x52, 0x65, 0x73, + 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x43, 0x0a, 0x0a, 0x47, 0x65, 0x74, 0x56, 0x65, 0x72, 0x73, + 0x69, 0x6f, 0x6e, 0x12, 0x19, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x47, 0x65, 0x74, 0x56, + 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x2e, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x1a, + 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x47, 0x65, 0x74, 0x56, 0x65, 0x72, 0x73, 0x69, 0x6f, + 0x6e, 0x2e, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x40, 0x0a, 0x09, 0x47, 0x65, + 0x74, 0x54, 0x61, 0x62, 0x6c, 0x65, 0x73, 0x12, 0x18, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e, + 0x47, 0x65, 0x74, 0x54, 0x61, 0x62, 0x6c, 0x65, 0x73, 0x2e, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, + 0x74, 0x1a, 0x19, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x47, 0x65, 0x74, 0x54, 0x61, 0x62, + 0x6c, 0x65, 0x73, 0x2e, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x4f, 0x0a, 0x0e, + 0x47, 0x65, 0x74, 0x53, 0x79, 0x6e, 0x63, 0x53, 0x75, 0x6d, 0x6d, 0x61, 0x72, 0x79, 0x12, 0x1d, + 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x47, 0x65, 0x74, 0x53, 0x79, 0x6e, 0x63, 0x53, 0x75, + 0x6d, 0x6d, 0x61, 0x72, 0x79, 0x2e, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x1e, 0x2e, + 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x47, 0x65, 0x74, 0x53, 0x79, 0x6e, 0x63, 0x53, 0x75, 0x6d, + 0x6d, 0x61, 0x72, 0x79, 0x2e, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x33, 0x0a, + 0x04, 0x53, 0x79, 0x6e, 0x63, 0x12, 0x13, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x53, 0x79, + 0x6e, 0x63, 0x2e, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x14, 0x2e, 0x70, 0x72, 0x6f, + 0x74, 0x6f, 0x2e, 0x53, 0x79, 0x6e, 0x63, 0x2e, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, + 0x30, 0x01, 0x42, 0x05, 0x5a, 0x03, 0x2f, 0x70, 0x62, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, + 0x33, } var ( @@ -494,35 +500,39 @@ func file_internal_pb_source_proto_rawDescGZIP() []byte { var file_internal_pb_source_proto_msgTypes = make([]protoimpl.MessageInfo, 9) var file_internal_pb_source_proto_goTypes = []interface{}{ - (*Sync)(nil), // 0: proto.Sync - (*GetSyncSummary)(nil), // 1: proto.GetSyncSummary - (*GetTables)(nil), // 2: proto.GetTables - (*Sync_Request)(nil), // 3: proto.Sync.Request - (*Sync_Response)(nil), // 4: proto.Sync.Response - (*GetSyncSummary_Request)(nil), // 5: proto.GetSyncSummary.Request - (*GetSyncSummary_Response)(nil), // 6: proto.GetSyncSummary.Response - (*GetTables_Request)(nil), // 7: proto.GetTables.Request - (*GetTables_Response)(nil), // 8: proto.GetTables.Response - (*timestamppb.Timestamp)(nil), // 9: google.protobuf.Timestamp - (*GetName_Request)(nil), // 10: proto.GetName.Request - (*GetVersion_Request)(nil), // 11: proto.GetVersion.Request - (*GetName_Response)(nil), // 12: proto.GetName.Response - (*GetVersion_Response)(nil), // 13: proto.GetVersion.Response + (*Sync)(nil), // 0: proto.Sync + (*GetSyncSummary)(nil), // 1: proto.GetSyncSummary + (*GetTables)(nil), // 2: proto.GetTables + (*Sync_Request)(nil), // 3: proto.Sync.Request + (*Sync_Response)(nil), // 4: proto.Sync.Response + (*GetSyncSummary_Request)(nil), // 5: proto.GetSyncSummary.Request + (*GetSyncSummary_Response)(nil), // 6: proto.GetSyncSummary.Response + (*GetTables_Request)(nil), // 7: proto.GetTables.Request + (*GetTables_Response)(nil), // 8: proto.GetTables.Response + (*timestamppb.Timestamp)(nil), // 9: google.protobuf.Timestamp + (*GetProtocolVersion_Request)(nil), // 10: proto.GetProtocolVersion.Request + (*GetName_Request)(nil), // 11: proto.GetName.Request + (*GetVersion_Request)(nil), // 12: proto.GetVersion.Request + (*GetProtocolVersion_Response)(nil), // 13: proto.GetProtocolVersion.Response + (*GetName_Response)(nil), // 14: proto.GetName.Response + (*GetVersion_Response)(nil), // 15: proto.GetVersion.Response } var file_internal_pb_source_proto_depIdxs = []int32{ 9, // 0: proto.Sync.Request.timestamp:type_name -> google.protobuf.Timestamp - 10, // 1: proto.Source.GetName:input_type -> proto.GetName.Request - 11, // 2: proto.Source.GetVersion:input_type -> proto.GetVersion.Request - 7, // 3: proto.Source.GetTables:input_type -> proto.GetTables.Request - 5, // 4: proto.Source.GetSyncSummary:input_type -> proto.GetSyncSummary.Request - 3, // 5: proto.Source.Sync:input_type -> proto.Sync.Request - 12, // 6: proto.Source.GetName:output_type -> proto.GetName.Response - 13, // 7: proto.Source.GetVersion:output_type -> proto.GetVersion.Response - 8, // 8: proto.Source.GetTables:output_type -> proto.GetTables.Response - 6, // 9: proto.Source.GetSyncSummary:output_type -> proto.GetSyncSummary.Response - 4, // 10: proto.Source.Sync:output_type -> proto.Sync.Response - 6, // [6:11] is the sub-list for method output_type - 1, // [1:6] is the sub-list for method input_type + 10, // 1: proto.Source.GetProtocolVersion:input_type -> proto.GetProtocolVersion.Request + 11, // 2: proto.Source.GetName:input_type -> proto.GetName.Request + 12, // 3: proto.Source.GetVersion:input_type -> proto.GetVersion.Request + 7, // 4: proto.Source.GetTables:input_type -> proto.GetTables.Request + 5, // 5: proto.Source.GetSyncSummary:input_type -> proto.GetSyncSummary.Request + 3, // 6: proto.Source.Sync:input_type -> proto.Sync.Request + 13, // 7: proto.Source.GetProtocolVersion:output_type -> proto.GetProtocolVersion.Response + 14, // 8: proto.Source.GetName:output_type -> proto.GetName.Response + 15, // 9: proto.Source.GetVersion:output_type -> proto.GetVersion.Response + 8, // 10: proto.Source.GetTables:output_type -> proto.GetTables.Response + 6, // 11: proto.Source.GetSyncSummary:output_type -> proto.GetSyncSummary.Response + 4, // 12: proto.Source.Sync:output_type -> proto.Sync.Response + 7, // [7:13] is the sub-list for method output_type + 1, // [1:7] is the sub-list for method input_type 1, // [1:1] is the sub-list for extension type_name 1, // [1:1] is the sub-list for extension extendee 0, // [0:1] is the sub-list for field type_name diff --git a/internal/pb/source.proto b/internal/pb/source.proto index 658231ec97..33ed8a6375 100644 --- a/internal/pb/source.proto +++ b/internal/pb/source.proto @@ -5,6 +5,10 @@ import "internal/pb/base.proto"; import "google/protobuf/timestamp.proto"; service Source { + // Get the current protocol version of the plugin. This helps + // get the right message about upgrade/downgrade of cli and/or plugin. + // Also, on the cli side it can try to upgrade/downgrade the protocol if cli supports it. + rpc GetProtocolVersion(GetProtocolVersion.Request) returns (GetProtocolVersion.Response); // Get the name of the plugin rpc GetName(GetName.Request) returns (GetName.Response); // Get the current version of the plugin diff --git a/internal/pb/source_grpc.pb.go b/internal/pb/source_grpc.pb.go index 72bddd378b..8b8b5f7cc4 100644 --- a/internal/pb/source_grpc.pb.go +++ b/internal/pb/source_grpc.pb.go @@ -22,6 +22,10 @@ const _ = grpc.SupportPackageIsVersion7 // // For semantics around ctx use and closing/ending streaming RPCs, please refer to https://pkg.go.dev/google.golang.org/grpc/?tab=doc#ClientConn.NewStream. type SourceClient interface { + // Get the current protocol version of the plugin. This helps + // get the right message about upgrade/downgrade of cli and/or plugin. + // Also, on the cli side it can try to upgrade/downgrade the protocol if cli supports it. + GetProtocolVersion(ctx context.Context, in *GetProtocolVersion_Request, opts ...grpc.CallOption) (*GetProtocolVersion_Response, error) // Get the name of the plugin GetName(ctx context.Context, in *GetName_Request, opts ...grpc.CallOption) (*GetName_Response, error) // Get the current version of the plugin @@ -43,6 +47,15 @@ func NewSourceClient(cc grpc.ClientConnInterface) SourceClient { return &sourceClient{cc} } +func (c *sourceClient) GetProtocolVersion(ctx context.Context, in *GetProtocolVersion_Request, opts ...grpc.CallOption) (*GetProtocolVersion_Response, error) { + out := new(GetProtocolVersion_Response) + err := c.cc.Invoke(ctx, "/proto.Source/GetProtocolVersion", in, out, opts...) + if err != nil { + return nil, err + } + return out, nil +} + func (c *sourceClient) GetName(ctx context.Context, in *GetName_Request, opts ...grpc.CallOption) (*GetName_Response, error) { out := new(GetName_Response) err := c.cc.Invoke(ctx, "/proto.Source/GetName", in, out, opts...) @@ -115,6 +128,10 @@ func (x *sourceSyncClient) Recv() (*Sync_Response, error) { // All implementations must embed UnimplementedSourceServer // for forward compatibility type SourceServer interface { + // Get the current protocol version of the plugin. This helps + // get the right message about upgrade/downgrade of cli and/or plugin. + // Also, on the cli side it can try to upgrade/downgrade the protocol if cli supports it. + GetProtocolVersion(context.Context, *GetProtocolVersion_Request) (*GetProtocolVersion_Response, error) // Get the name of the plugin GetName(context.Context, *GetName_Request) (*GetName_Response, error) // Get the current version of the plugin @@ -133,6 +150,9 @@ type SourceServer interface { type UnimplementedSourceServer struct { } +func (UnimplementedSourceServer) GetProtocolVersion(context.Context, *GetProtocolVersion_Request) (*GetProtocolVersion_Response, error) { + return nil, status.Errorf(codes.Unimplemented, "method GetProtocolVersion not implemented") +} func (UnimplementedSourceServer) GetName(context.Context, *GetName_Request) (*GetName_Response, error) { return nil, status.Errorf(codes.Unimplemented, "method GetName not implemented") } @@ -161,6 +181,24 @@ func RegisterSourceServer(s grpc.ServiceRegistrar, srv SourceServer) { s.RegisterService(&Source_ServiceDesc, srv) } +func _Source_GetProtocolVersion_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(GetProtocolVersion_Request) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(SourceServer).GetProtocolVersion(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: "/proto.Source/GetProtocolVersion", + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(SourceServer).GetProtocolVersion(ctx, req.(*GetProtocolVersion_Request)) + } + return interceptor(ctx, in, info, handler) +} + func _Source_GetName_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { in := new(GetName_Request) if err := dec(in); err != nil { @@ -261,6 +299,10 @@ var Source_ServiceDesc = grpc.ServiceDesc{ ServiceName: "proto.Source", HandlerType: (*SourceServer)(nil), Methods: []grpc.MethodDesc{ + { + MethodName: "GetProtocolVersion", + Handler: _Source_GetProtocolVersion_Handler, + }, { MethodName: "GetName", Handler: _Source_GetName_Handler, diff --git a/internal/servers/destinations.go b/internal/servers/destinations.go index ebccdfdd90..9b49daa320 100644 --- a/internal/servers/destinations.go +++ b/internal/servers/destinations.go @@ -8,6 +8,7 @@ import ( "sync" "github.com/cloudquery/plugin-sdk/internal/pb" + "github.com/cloudquery/plugin-sdk/internal/versions" "github.com/cloudquery/plugin-sdk/plugins" "github.com/cloudquery/plugin-sdk/schema" "github.com/cloudquery/plugin-sdk/specs" @@ -22,6 +23,13 @@ type DestinationServer struct { Logger zerolog.Logger } + +func (s *DestinationServer) GetProtocolVersion(context.Context, *pb.GetProtocolVersion_Request) (*pb.GetProtocolVersion_Response, error) { + return &pb.GetProtocolVersion_Response{ + Version: versions.DestinationProtocolVersion, + }, nil +} + func (s *DestinationServer) Configure(ctx context.Context, req *pb.Configure_Request) (*pb.Configure_Response, error) { var spec specs.Destination if err := json.Unmarshal(req.Config, &spec); err != nil { diff --git a/internal/servers/source.go b/internal/servers/source.go index eb4309fa80..73c1a2e5ed 100644 --- a/internal/servers/source.go +++ b/internal/servers/source.go @@ -7,6 +7,7 @@ import ( "fmt" "github.com/cloudquery/plugin-sdk/internal/pb" + "github.com/cloudquery/plugin-sdk/internal/versions" "github.com/cloudquery/plugin-sdk/plugins" "github.com/cloudquery/plugin-sdk/schema" "github.com/cloudquery/plugin-sdk/specs" @@ -22,6 +23,13 @@ type SourceServer struct { summary *schema.SyncSummary } + +func (s *SourceServer) GetProtocolVersion(context.Context, *pb.GetProtocolVersion_Request) (*pb.GetProtocolVersion_Response, error) { + return &pb.GetProtocolVersion_Response{ + Version: versions.SourceProtocolVersion, + }, nil +} + func (s *SourceServer) GetTables(context.Context, *pb.GetTables_Request) (*pb.GetTables_Response, error) { b, err := json.Marshal(s.Plugin.Tables()) if err != nil { diff --git a/internal/versions/versions.go b/internal/versions/versions.go new file mode 100644 index 0000000000..78269191b7 --- /dev/null +++ b/internal/versions/versions.go @@ -0,0 +1,6 @@ +package versions + +const ( + SourceProtocolVersion = uint64(1) + DestinationProtocolVersion = uint64(1) +) \ No newline at end of file diff --git a/serve/source_test.go b/serve/source_test.go index 45b6b2515c..acab7ac3b0 100644 --- a/serve/source_test.go +++ b/serve/source_test.go @@ -9,6 +9,7 @@ import ( "time" "github.com/cloudquery/plugin-sdk/clients" + "github.com/cloudquery/plugin-sdk/internal/versions" "github.com/cloudquery/plugin-sdk/plugins" "github.com/cloudquery/plugin-sdk/schema" "github.com/cloudquery/plugin-sdk/specs" @@ -115,6 +116,14 @@ func TestServeSource(t *testing.T) { } }() + protocolVersion, err := c.GetProtocolVersion(ctx) + if err != nil { + t.Fatal(err) + } + if versions.SourceProtocolVersion != protocolVersion { + t.Fatalf("expected protocol version %d, got %d", versions.SourceProtocolVersion, protocolVersion) + } + name, err := c.Name(ctx) if err != nil { t.Fatal(err) From 105a4378efcd868a4fa9d16c31d6c0c91804dbd7 Mon Sep 17 00:00:00 2001 From: Yevgeny Pats <16490766+yevgenypats@users.noreply.github.com> Date: Fri, 14 Oct 2022 15:04:46 +0300 Subject: [PATCH 2/4] fix lint --- clients/source.go | 2 +- internal/servers/destinations.go | 3 +-- internal/servers/source.go | 3 +-- internal/versions/versions.go | 4 ++-- 4 files changed, 5 insertions(+), 7 deletions(-) diff --git a/clients/source.go b/clients/source.go index e966e01683..56193c9c18 100644 --- a/clients/source.go +++ b/clients/source.go @@ -119,7 +119,7 @@ func NewSourceClient(ctx context.Context, registry specs.Registry, path string, // newManagedClient starts a new source plugin process from local path, connects to it via gRPC server // and returns a new SourceClient -func (c *SourceClient) newManagedClient(ctx context.Context, path string) error { +func (c *SourceClient) newManagedClient(ctx context.Context, path string) error { c.grpcSocketName = generateRandomUnixSocketName() // spawn the plugin first and then connect cmd := exec.CommandContext(ctx, path, "serve", "--network", "unix", "--address", c.grpcSocketName, diff --git a/internal/servers/destinations.go b/internal/servers/destinations.go index 9b49daa320..bfcfecd385 100644 --- a/internal/servers/destinations.go +++ b/internal/servers/destinations.go @@ -23,8 +23,7 @@ type DestinationServer struct { Logger zerolog.Logger } - -func (s *DestinationServer) GetProtocolVersion(context.Context, *pb.GetProtocolVersion_Request) (*pb.GetProtocolVersion_Response, error) { +func (*DestinationServer) GetProtocolVersion(context.Context, *pb.GetProtocolVersion_Request) (*pb.GetProtocolVersion_Response, error) { return &pb.GetProtocolVersion_Response{ Version: versions.DestinationProtocolVersion, }, nil diff --git a/internal/servers/source.go b/internal/servers/source.go index 73c1a2e5ed..1256988302 100644 --- a/internal/servers/source.go +++ b/internal/servers/source.go @@ -23,8 +23,7 @@ type SourceServer struct { summary *schema.SyncSummary } - -func (s *SourceServer) GetProtocolVersion(context.Context, *pb.GetProtocolVersion_Request) (*pb.GetProtocolVersion_Response, error) { +func (*SourceServer) GetProtocolVersion(context.Context, *pb.GetProtocolVersion_Request) (*pb.GetProtocolVersion_Response, error) { return &pb.GetProtocolVersion_Response{ Version: versions.SourceProtocolVersion, }, nil diff --git a/internal/versions/versions.go b/internal/versions/versions.go index 78269191b7..87e979c4ae 100644 --- a/internal/versions/versions.go +++ b/internal/versions/versions.go @@ -1,6 +1,6 @@ package versions const ( - SourceProtocolVersion = uint64(1) + SourceProtocolVersion = uint64(1) DestinationProtocolVersion = uint64(1) -) \ No newline at end of file +) From 2a0b36121f4f706cf77c9fb5c992950a0759d866 Mon Sep 17 00:00:00 2001 From: Yevgeny Pats <16490766+yevgenypats@users.noreply.github.com> Date: Fri, 14 Oct 2022 17:18:03 +0300 Subject: [PATCH 3/4] fix tests and assume version 1 if not implemented --- clients/destination.go | 12 +++++++++++- clients/source.go | 12 +++++++++++- 2 files changed, 22 insertions(+), 2 deletions(-) diff --git a/clients/destination.go b/clients/destination.go index 1101ad0de9..226679c00e 100644 --- a/clients/destination.go +++ b/clients/destination.go @@ -20,7 +20,9 @@ import ( "github.com/cloudquery/plugin-sdk/specs" "github.com/rs/zerolog" "google.golang.org/grpc" + "google.golang.org/grpc/codes" "google.golang.org/grpc/credentials/insecure" + "google.golang.org/grpc/status" "google.golang.org/protobuf/types/known/timestamppb" ) @@ -183,7 +185,15 @@ func (c *DestinationClient) newManagedClient(ctx context.Context, path string) e func (c *DestinationClient) GetProtocolVersion(ctx context.Context) (uint64, error) { res, err := c.pbClient.GetProtocolVersion(ctx, &pb.GetProtocolVersion_Request{}) if err != nil { - return 0, fmt.Errorf("failed to call GetProtocolVersion: %w", err) + s, ok := status.FromError(err) + if !ok { + return 0, fmt.Errorf("failed to cal GetProtocolVersion: %w", err) + } + if s.Code() != codes.Unimplemented { + return 0, err + } + c.logger.Warn().Err(err).Msg("plugin does not support protocol version. assuming protocol verison 1") + return 1, nil } return res.Version, nil } diff --git a/clients/source.go b/clients/source.go index 56193c9c18..ff348fafa5 100644 --- a/clients/source.go +++ b/clients/source.go @@ -19,7 +19,9 @@ import ( "github.com/cloudquery/plugin-sdk/specs" "github.com/rs/zerolog" "google.golang.org/grpc" + "google.golang.org/grpc/codes" "google.golang.org/grpc/credentials/insecure" + "google.golang.org/grpc/status" ) // SourceClient @@ -187,7 +189,15 @@ func (c *SourceClient) newManagedClient(ctx context.Context, path string) error func (c *SourceClient) GetProtocolVersion(ctx context.Context) (uint64, error) { res, err := c.pbClient.GetProtocolVersion(ctx, &pb.GetProtocolVersion_Request{}) if err != nil { - return 0, fmt.Errorf("failed to call GetProtocolVersion: %w", err) + s, ok := status.FromError(err) + if !ok { + return 0, fmt.Errorf("failed to cal GetProtocolVersion: %w", err) + } + if s.Code() != codes.Unimplemented { + return 0, err + } + c.logger.Warn().Err(err).Msg("plugin does not support protocol version. assuming protocol verison 1") + return 1, nil } return res.Version, nil } From 2382d18fcc79f12ffb63877ed5fdad2a34c94458 Mon Sep 17 00:00:00 2001 From: Yevgeny Pats <16490766+yevgenypats@users.noreply.github.com> Date: Fri, 14 Oct 2022 17:51:54 +0300 Subject: [PATCH 4/4] fix lint --- clients/destination.go | 4 ++-- clients/source.go | 8 ++++---- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/clients/destination.go b/clients/destination.go index 226679c00e..1704d13505 100644 --- a/clients/destination.go +++ b/clients/destination.go @@ -192,7 +192,7 @@ func (c *DestinationClient) GetProtocolVersion(ctx context.Context) (uint64, err if s.Code() != codes.Unimplemented { return 0, err } - c.logger.Warn().Err(err).Msg("plugin does not support protocol version. assuming protocol verison 1") + c.logger.Warn().Err(err).Msg("plugin does not support protocol version. assuming protocol version 1") return 1, nil } return res.Version, nil @@ -242,7 +242,7 @@ func (c *DestinationClient) Migrate(ctx context.Context, tables []*schema.Table) // Write writes rows as they are received from the channel to the destination plugin. // resources is marshaled schema.Resource. We are not marshalling this inside the function -// because usually it is alreadun marshalled from the source plugin. +// because usually it is alreadun marshalled from the destination plugin. func (c *DestinationClient) Write(ctx context.Context, source string, syncTime time.Time, resources <-chan []byte) (uint64, error) { saveClient, err := c.pbClient.Write(ctx) if err != nil { diff --git a/clients/source.go b/clients/source.go index ff348fafa5..1166a5ef85 100644 --- a/clients/source.go +++ b/clients/source.go @@ -111,9 +111,9 @@ func NewSourceClient(ctx context.Context, registry specs.Registry, path string, return nil, err } if protocolVersion < versions.SourceProtocolVersion { - return nil, fmt.Errorf("destination plugin protocol version %d is lower than client version %d. Try updating client", protocolVersion, versions.SourceProtocolVersion) + return nil, fmt.Errorf("source plugin protocol version %d is lower than client version %d. Try updating client", protocolVersion, versions.SourceProtocolVersion) } else if protocolVersion > versions.SourceProtocolVersion { - return nil, fmt.Errorf("destination plugin protocol version %d is higher than client version %d. Try updating destination plugin", protocolVersion, versions.SourceProtocolVersion) + return nil, fmt.Errorf("source plugin protocol version %d is higher than client version %d. Try updating destination plugin", protocolVersion, versions.SourceProtocolVersion) } return c, nil @@ -196,7 +196,7 @@ func (c *SourceClient) GetProtocolVersion(ctx context.Context) (uint64, error) { if s.Code() != codes.Unimplemented { return 0, err } - c.logger.Warn().Err(err).Msg("plugin does not support protocol version. assuming protocol verison 1") + c.logger.Warn().Err(err).Msg("plugin does not support protocol version. assuming protocol version 1") return 1, nil } return res.Version, nil @@ -232,7 +232,7 @@ func (c *SourceClient) GetTables(ctx context.Context) ([]*schema.Table, error) { // Sync start syncing for the source client per the given spec and returning the results // in the given channel. res is marshaled schema.Resource. We are not unmarshalling this for performance reasons -// as usually this is sent over-the-wire anyway to a destination plugin +// as usually this is sent over-the-wire anyway to a source plugin func (c *SourceClient) Sync(ctx context.Context, spec specs.Source, res chan<- []byte) error { b, err := json.Marshal(spec) if err != nil {