diff --git a/clients/destination.go b/clients/destination.go index 80d94d4905..1704d13505 100644 --- a/clients/destination.go +++ b/clients/destination.go @@ -15,11 +15,14 @@ import ( "time" "github.com/cloudquery/plugin-sdk/internal/pb" + "github.com/cloudquery/plugin-sdk/internal/versions" "github.com/cloudquery/plugin-sdk/schema" "github.com/cloudquery/plugin-sdk/specs" "github.com/rs/zerolog" "google.golang.org/grpc" + "google.golang.org/grpc/codes" "google.golang.org/grpc/credentials/insecure" + "google.golang.org/grpc/status" "google.golang.org/protobuf/types/known/timestamppb" ) @@ -78,7 +81,9 @@ func NewDestinationClient(ctx context.Context, registry specs.Registry, path str } return c, nil case specs.RegistryLocal: - return c.newManagedClient(ctx, path) + if err := c.newManagedClient(ctx, path); err != nil { + return nil, err + } case specs.RegistryGithub: pathSplit := strings.Split(path, "/") if len(pathSplit) != 2 { @@ -90,26 +95,40 @@ func NewDestinationClient(ctx context.Context, registry specs.Registry, path str if err := DownloadPluginFromGithub(ctx, localPath, org, name, version, PluginTypeDestination); err != nil { return nil, err } - return c.newManagedClient(ctx, localPath) + if err := c.newManagedClient(ctx, localPath); err != nil { + return nil, err + } default: return nil, fmt.Errorf("unsupported registry %s", registry) } + protocolVersion, err := c.GetProtocolVersion(ctx) + if err != nil { + return nil, err + } + + if protocolVersion < versions.DestinationProtocolVersion { + return nil, fmt.Errorf("destination plugin protocol version %d is lower than client version %d. Try updating client", protocolVersion, versions.DestinationProtocolVersion) + } else if protocolVersion > versions.DestinationProtocolVersion { + return nil, fmt.Errorf("destination plugin protocol version %d is higher than client version %d. Try updating destination plugin", protocolVersion, versions.DestinationProtocolVersion) + } + + return c, nil } // newManagedClient starts a new destination plugin process from local file, connects to it via gRPC server // and returns a new DestinationClient -func (c *DestinationClient) newManagedClient(ctx context.Context, path string) (*DestinationClient, error) { +func (c *DestinationClient) newManagedClient(ctx context.Context, path string) error { c.grpcSocketName = generateRandomUnixSocketName() // spawn the plugin first and then connect cmd := exec.CommandContext(ctx, path, "serve", "--network", "unix", "--address", c.grpcSocketName, "--log-level", c.logger.GetLevel().String(), "--log-format", "json") reader, err := cmd.StdoutPipe() if err != nil { - return nil, fmt.Errorf("failed to get stdout pipe: %w", err) + return fmt.Errorf("failed to get stdout pipe: %w", err) } cmd.Stderr = os.Stderr if err := cmd.Start(); err != nil { - return nil, fmt.Errorf("failed to start plugin %s: %w", path, err) + return fmt.Errorf("failed to start plugin %s: %w", path, err) } c.wg.Add(1) @@ -157,10 +176,26 @@ func (c *DestinationClient) newManagedClient(ctx context.Context, path string) ( if err := cmd.Process.Kill(); err != nil { c.logger.Error().Err(err).Msg("failed to kill plugin process") } - return c, err + return err } c.pbClient = pb.NewDestinationClient(c.conn) - return c, nil + return nil +} + +func (c *DestinationClient) GetProtocolVersion(ctx context.Context) (uint64, error) { + res, err := c.pbClient.GetProtocolVersion(ctx, &pb.GetProtocolVersion_Request{}) + if err != nil { + s, ok := status.FromError(err) + if !ok { + return 0, fmt.Errorf("failed to cal GetProtocolVersion: %w", err) + } + if s.Code() != codes.Unimplemented { + return 0, err + } + c.logger.Warn().Err(err).Msg("plugin does not support protocol version. assuming protocol version 1") + return 1, nil + } + return res.Version, nil } func (c *DestinationClient) Name(ctx context.Context) (string, error) { @@ -207,7 +242,7 @@ func (c *DestinationClient) Migrate(ctx context.Context, tables []*schema.Table) // Write writes rows as they are received from the channel to the destination plugin. // resources is marshaled schema.Resource. We are not marshalling this inside the function -// because usually it is alreadun marshalled from the source plugin. +// because usually it is alreadun marshalled from the destination plugin. func (c *DestinationClient) Write(ctx context.Context, source string, syncTime time.Time, resources <-chan []byte) (uint64, error) { saveClient, err := c.pbClient.Write(ctx) if err != nil { diff --git a/clients/source.go b/clients/source.go index a135684d36..1166a5ef85 100644 --- a/clients/source.go +++ b/clients/source.go @@ -14,11 +14,14 @@ import ( "sync" "github.com/cloudquery/plugin-sdk/internal/pb" + "github.com/cloudquery/plugin-sdk/internal/versions" "github.com/cloudquery/plugin-sdk/schema" "github.com/cloudquery/plugin-sdk/specs" "github.com/rs/zerolog" "google.golang.org/grpc" + "google.golang.org/grpc/codes" "google.golang.org/grpc/credentials/insecure" + "google.golang.org/grpc/status" ) // SourceClient @@ -82,7 +85,9 @@ func NewSourceClient(ctx context.Context, registry specs.Registry, path string, } return c, nil case specs.RegistryLocal: - return c.newManagedClient(ctx, path) + if err := c.newManagedClient(ctx, path); err != nil { + return nil, err + } case specs.RegistryGithub: pathSplit := strings.Split(path, "/") if len(pathSplit) != 2 { @@ -94,26 +99,40 @@ func NewSourceClient(ctx context.Context, registry specs.Registry, path string, if err := DownloadPluginFromGithub(ctx, localPath, org, name, version, PluginTypeSource); err != nil { return nil, err } - return c.newManagedClient(ctx, localPath) + if err := c.newManagedClient(ctx, localPath); err != nil { + return nil, err + } default: return nil, fmt.Errorf("unsupported registry %s", registry) } + + protocolVersion, err := c.GetProtocolVersion(ctx) + if err != nil { + return nil, err + } + if protocolVersion < versions.SourceProtocolVersion { + return nil, fmt.Errorf("source plugin protocol version %d is lower than client version %d. Try updating client", protocolVersion, versions.SourceProtocolVersion) + } else if protocolVersion > versions.SourceProtocolVersion { + return nil, fmt.Errorf("source plugin protocol version %d is higher than client version %d. Try updating destination plugin", protocolVersion, versions.SourceProtocolVersion) + } + + return c, nil } // newManagedClient starts a new source plugin process from local path, connects to it via gRPC server // and returns a new SourceClient -func (c *SourceClient) newManagedClient(ctx context.Context, path string) (*SourceClient, error) { +func (c *SourceClient) newManagedClient(ctx context.Context, path string) error { c.grpcSocketName = generateRandomUnixSocketName() // spawn the plugin first and then connect cmd := exec.CommandContext(ctx, path, "serve", "--network", "unix", "--address", c.grpcSocketName, "--log-level", c.logger.GetLevel().String(), "--log-format", "json") reader, err := cmd.StdoutPipe() if err != nil { - return nil, fmt.Errorf("failed to get stdout pipe: %w", err) + return fmt.Errorf("failed to get stdout pipe: %w", err) } cmd.Stderr = os.Stderr if err := cmd.Start(); err != nil { - return nil, fmt.Errorf("failed to start plugin %s: %w", path, err) + return fmt.Errorf("failed to start plugin %s: %w", path, err) } c.wg.Add(1) @@ -161,10 +180,26 @@ func (c *SourceClient) newManagedClient(ctx context.Context, path string) (*Sour if err := cmd.Process.Kill(); err != nil { c.logger.Error().Err(err).Msg("failed to kill plugin process") } - return c, err + return err } c.pbClient = pb.NewSourceClient(c.conn) - return c, nil + return nil +} + +func (c *SourceClient) GetProtocolVersion(ctx context.Context) (uint64, error) { + res, err := c.pbClient.GetProtocolVersion(ctx, &pb.GetProtocolVersion_Request{}) + if err != nil { + s, ok := status.FromError(err) + if !ok { + return 0, fmt.Errorf("failed to cal GetProtocolVersion: %w", err) + } + if s.Code() != codes.Unimplemented { + return 0, err + } + c.logger.Warn().Err(err).Msg("plugin does not support protocol version. assuming protocol version 1") + return 1, nil + } + return res.Version, nil } func (c *SourceClient) Name(ctx context.Context) (string, error) { @@ -197,7 +232,7 @@ func (c *SourceClient) GetTables(ctx context.Context) ([]*schema.Table, error) { // Sync start syncing for the source client per the given spec and returning the results // in the given channel. res is marshaled schema.Resource. We are not unmarshalling this for performance reasons -// as usually this is sent over-the-wire anyway to a destination plugin +// as usually this is sent over-the-wire anyway to a source plugin func (c *SourceClient) Sync(ctx context.Context, spec specs.Source, res chan<- []byte) error { b, err := json.Marshal(spec) if err != nil { diff --git a/internal/pb/base.pb.go b/internal/pb/base.pb.go index b3a77b7ac1..2856bb43e9 100644 --- a/internal/pb/base.pb.go +++ b/internal/pb/base.pb.go @@ -96,6 +96,44 @@ func (*GetVersion) Descriptor() ([]byte, []int) { return file_internal_pb_base_proto_rawDescGZIP(), []int{1} } +type GetProtocolVersion struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields +} + +func (x *GetProtocolVersion) Reset() { + *x = GetProtocolVersion{} + if protoimpl.UnsafeEnabled { + mi := &file_internal_pb_base_proto_msgTypes[2] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *GetProtocolVersion) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*GetProtocolVersion) ProtoMessage() {} + +func (x *GetProtocolVersion) ProtoReflect() protoreflect.Message { + mi := &file_internal_pb_base_proto_msgTypes[2] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use GetProtocolVersion.ProtoReflect.Descriptor instead. +func (*GetProtocolVersion) Descriptor() ([]byte, []int) { + return file_internal_pb_base_proto_rawDescGZIP(), []int{2} +} + type Configure struct { state protoimpl.MessageState sizeCache protoimpl.SizeCache @@ -105,7 +143,7 @@ type Configure struct { func (x *Configure) Reset() { *x = Configure{} if protoimpl.UnsafeEnabled { - mi := &file_internal_pb_base_proto_msgTypes[2] + mi := &file_internal_pb_base_proto_msgTypes[3] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -118,7 +156,7 @@ func (x *Configure) String() string { func (*Configure) ProtoMessage() {} func (x *Configure) ProtoReflect() protoreflect.Message { - mi := &file_internal_pb_base_proto_msgTypes[2] + mi := &file_internal_pb_base_proto_msgTypes[3] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -131,7 +169,7 @@ func (x *Configure) ProtoReflect() protoreflect.Message { // Deprecated: Use Configure.ProtoReflect.Descriptor instead. func (*Configure) Descriptor() ([]byte, []int) { - return file_internal_pb_base_proto_rawDescGZIP(), []int{2} + return file_internal_pb_base_proto_rawDescGZIP(), []int{3} } type GetName_Request struct { @@ -143,7 +181,7 @@ type GetName_Request struct { func (x *GetName_Request) Reset() { *x = GetName_Request{} if protoimpl.UnsafeEnabled { - mi := &file_internal_pb_base_proto_msgTypes[3] + mi := &file_internal_pb_base_proto_msgTypes[4] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -156,7 +194,7 @@ func (x *GetName_Request) String() string { func (*GetName_Request) ProtoMessage() {} func (x *GetName_Request) ProtoReflect() protoreflect.Message { - mi := &file_internal_pb_base_proto_msgTypes[3] + mi := &file_internal_pb_base_proto_msgTypes[4] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -183,7 +221,7 @@ type GetName_Response struct { func (x *GetName_Response) Reset() { *x = GetName_Response{} if protoimpl.UnsafeEnabled { - mi := &file_internal_pb_base_proto_msgTypes[4] + mi := &file_internal_pb_base_proto_msgTypes[5] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -196,7 +234,7 @@ func (x *GetName_Response) String() string { func (*GetName_Response) ProtoMessage() {} func (x *GetName_Response) ProtoReflect() protoreflect.Message { - mi := &file_internal_pb_base_proto_msgTypes[4] + mi := &file_internal_pb_base_proto_msgTypes[5] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -228,7 +266,7 @@ type GetVersion_Request struct { func (x *GetVersion_Request) Reset() { *x = GetVersion_Request{} if protoimpl.UnsafeEnabled { - mi := &file_internal_pb_base_proto_msgTypes[5] + mi := &file_internal_pb_base_proto_msgTypes[6] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -241,7 +279,7 @@ func (x *GetVersion_Request) String() string { func (*GetVersion_Request) ProtoMessage() {} func (x *GetVersion_Request) ProtoReflect() protoreflect.Message { - mi := &file_internal_pb_base_proto_msgTypes[5] + mi := &file_internal_pb_base_proto_msgTypes[6] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -268,7 +306,7 @@ type GetVersion_Response struct { func (x *GetVersion_Response) Reset() { *x = GetVersion_Response{} if protoimpl.UnsafeEnabled { - mi := &file_internal_pb_base_proto_msgTypes[6] + mi := &file_internal_pb_base_proto_msgTypes[7] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -281,7 +319,7 @@ func (x *GetVersion_Response) String() string { func (*GetVersion_Response) ProtoMessage() {} func (x *GetVersion_Response) ProtoReflect() protoreflect.Message { - mi := &file_internal_pb_base_proto_msgTypes[6] + mi := &file_internal_pb_base_proto_msgTypes[7] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -304,6 +342,91 @@ func (x *GetVersion_Response) GetVersion() string { return "" } +type GetProtocolVersion_Request struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields +} + +func (x *GetProtocolVersion_Request) Reset() { + *x = GetProtocolVersion_Request{} + if protoimpl.UnsafeEnabled { + mi := &file_internal_pb_base_proto_msgTypes[8] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *GetProtocolVersion_Request) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*GetProtocolVersion_Request) ProtoMessage() {} + +func (x *GetProtocolVersion_Request) ProtoReflect() protoreflect.Message { + mi := &file_internal_pb_base_proto_msgTypes[8] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use GetProtocolVersion_Request.ProtoReflect.Descriptor instead. +func (*GetProtocolVersion_Request) Descriptor() ([]byte, []int) { + return file_internal_pb_base_proto_rawDescGZIP(), []int{2, 0} +} + +type GetProtocolVersion_Response struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Version uint64 `protobuf:"varint,1,opt,name=version,proto3" json:"version,omitempty"` +} + +func (x *GetProtocolVersion_Response) Reset() { + *x = GetProtocolVersion_Response{} + if protoimpl.UnsafeEnabled { + mi := &file_internal_pb_base_proto_msgTypes[9] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *GetProtocolVersion_Response) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*GetProtocolVersion_Response) ProtoMessage() {} + +func (x *GetProtocolVersion_Response) ProtoReflect() protoreflect.Message { + mi := &file_internal_pb_base_proto_msgTypes[9] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use GetProtocolVersion_Response.ProtoReflect.Descriptor instead. +func (*GetProtocolVersion_Response) Descriptor() ([]byte, []int) { + return file_internal_pb_base_proto_rawDescGZIP(), []int{2, 1} +} + +func (x *GetProtocolVersion_Response) GetVersion() uint64 { + if x != nil { + return x.Version + } + return 0 +} + type Configure_Request struct { state protoimpl.MessageState sizeCache protoimpl.SizeCache @@ -317,7 +440,7 @@ type Configure_Request struct { func (x *Configure_Request) Reset() { *x = Configure_Request{} if protoimpl.UnsafeEnabled { - mi := &file_internal_pb_base_proto_msgTypes[7] + mi := &file_internal_pb_base_proto_msgTypes[10] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -330,7 +453,7 @@ func (x *Configure_Request) String() string { func (*Configure_Request) ProtoMessage() {} func (x *Configure_Request) ProtoReflect() protoreflect.Message { - mi := &file_internal_pb_base_proto_msgTypes[7] + mi := &file_internal_pb_base_proto_msgTypes[10] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -343,7 +466,7 @@ func (x *Configure_Request) ProtoReflect() protoreflect.Message { // Deprecated: Use Configure_Request.ProtoReflect.Descriptor instead. func (*Configure_Request) Descriptor() ([]byte, []int) { - return file_internal_pb_base_proto_rawDescGZIP(), []int{2, 0} + return file_internal_pb_base_proto_rawDescGZIP(), []int{3, 0} } func (x *Configure_Request) GetConfig() []byte { @@ -364,7 +487,7 @@ type Configure_Response struct { func (x *Configure_Response) Reset() { *x = Configure_Response{} if protoimpl.UnsafeEnabled { - mi := &file_internal_pb_base_proto_msgTypes[8] + mi := &file_internal_pb_base_proto_msgTypes[11] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -377,7 +500,7 @@ func (x *Configure_Response) String() string { func (*Configure_Response) ProtoMessage() {} func (x *Configure_Response) ProtoReflect() protoreflect.Message { - mi := &file_internal_pb_base_proto_msgTypes[8] + mi := &file_internal_pb_base_proto_msgTypes[11] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -390,7 +513,7 @@ func (x *Configure_Response) ProtoReflect() protoreflect.Message { // Deprecated: Use Configure_Response.ProtoReflect.Descriptor instead. func (*Configure_Response) Descriptor() ([]byte, []int) { - return file_internal_pb_base_proto_rawDescGZIP(), []int{2, 1} + return file_internal_pb_base_proto_rawDescGZIP(), []int{3, 1} } func (x *Configure_Response) GetError() string { @@ -412,13 +535,17 @@ var file_internal_pb_base_proto_rawDesc = []byte{ 0x69, 0x6f, 0x6e, 0x1a, 0x09, 0x0a, 0x07, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x24, 0x0a, 0x08, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x18, 0x0a, 0x07, 0x76, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x07, 0x76, 0x65, 0x72, - 0x73, 0x69, 0x6f, 0x6e, 0x22, 0x50, 0x0a, 0x09, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x75, 0x72, - 0x65, 0x1a, 0x21, 0x0a, 0x07, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x16, 0x0a, 0x06, - 0x63, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x06, 0x63, 0x6f, - 0x6e, 0x66, 0x69, 0x67, 0x1a, 0x20, 0x0a, 0x08, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, - 0x12, 0x14, 0x0a, 0x05, 0x65, 0x72, 0x72, 0x6f, 0x72, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, - 0x05, 0x65, 0x72, 0x72, 0x6f, 0x72, 0x42, 0x05, 0x5a, 0x03, 0x2f, 0x70, 0x62, 0x62, 0x06, 0x70, - 0x72, 0x6f, 0x74, 0x6f, 0x33, + 0x73, 0x69, 0x6f, 0x6e, 0x22, 0x45, 0x0a, 0x12, 0x47, 0x65, 0x74, 0x50, 0x72, 0x6f, 0x74, 0x6f, + 0x63, 0x6f, 0x6c, 0x56, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x1a, 0x09, 0x0a, 0x07, 0x52, 0x65, + 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x24, 0x0a, 0x08, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, + 0x65, 0x12, 0x18, 0x0a, 0x07, 0x76, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x18, 0x01, 0x20, 0x01, + 0x28, 0x04, 0x52, 0x07, 0x76, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x22, 0x50, 0x0a, 0x09, 0x43, + 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x75, 0x72, 0x65, 0x1a, 0x21, 0x0a, 0x07, 0x52, 0x65, 0x71, 0x75, + 0x65, 0x73, 0x74, 0x12, 0x16, 0x0a, 0x06, 0x63, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x18, 0x01, 0x20, + 0x01, 0x28, 0x0c, 0x52, 0x06, 0x63, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x1a, 0x20, 0x0a, 0x08, 0x52, + 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x14, 0x0a, 0x05, 0x65, 0x72, 0x72, 0x6f, 0x72, + 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, 0x65, 0x72, 0x72, 0x6f, 0x72, 0x42, 0x05, 0x5a, + 0x03, 0x2f, 0x70, 0x62, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, } var ( @@ -433,17 +560,20 @@ func file_internal_pb_base_proto_rawDescGZIP() []byte { return file_internal_pb_base_proto_rawDescData } -var file_internal_pb_base_proto_msgTypes = make([]protoimpl.MessageInfo, 9) +var file_internal_pb_base_proto_msgTypes = make([]protoimpl.MessageInfo, 12) var file_internal_pb_base_proto_goTypes = []interface{}{ - (*GetName)(nil), // 0: proto.GetName - (*GetVersion)(nil), // 1: proto.GetVersion - (*Configure)(nil), // 2: proto.Configure - (*GetName_Request)(nil), // 3: proto.GetName.Request - (*GetName_Response)(nil), // 4: proto.GetName.Response - (*GetVersion_Request)(nil), // 5: proto.GetVersion.Request - (*GetVersion_Response)(nil), // 6: proto.GetVersion.Response - (*Configure_Request)(nil), // 7: proto.Configure.Request - (*Configure_Response)(nil), // 8: proto.Configure.Response + (*GetName)(nil), // 0: proto.GetName + (*GetVersion)(nil), // 1: proto.GetVersion + (*GetProtocolVersion)(nil), // 2: proto.GetProtocolVersion + (*Configure)(nil), // 3: proto.Configure + (*GetName_Request)(nil), // 4: proto.GetName.Request + (*GetName_Response)(nil), // 5: proto.GetName.Response + (*GetVersion_Request)(nil), // 6: proto.GetVersion.Request + (*GetVersion_Response)(nil), // 7: proto.GetVersion.Response + (*GetProtocolVersion_Request)(nil), // 8: proto.GetProtocolVersion.Request + (*GetProtocolVersion_Response)(nil), // 9: proto.GetProtocolVersion.Response + (*Configure_Request)(nil), // 10: proto.Configure.Request + (*Configure_Response)(nil), // 11: proto.Configure.Response } var file_internal_pb_base_proto_depIdxs = []int32{ 0, // [0:0] is the sub-list for method output_type @@ -484,7 +614,7 @@ func file_internal_pb_base_proto_init() { } } file_internal_pb_base_proto_msgTypes[2].Exporter = func(v interface{}, i int) interface{} { - switch v := v.(*Configure); i { + switch v := v.(*GetProtocolVersion); i { case 0: return &v.state case 1: @@ -496,7 +626,7 @@ func file_internal_pb_base_proto_init() { } } file_internal_pb_base_proto_msgTypes[3].Exporter = func(v interface{}, i int) interface{} { - switch v := v.(*GetName_Request); i { + switch v := v.(*Configure); i { case 0: return &v.state case 1: @@ -508,7 +638,7 @@ func file_internal_pb_base_proto_init() { } } file_internal_pb_base_proto_msgTypes[4].Exporter = func(v interface{}, i int) interface{} { - switch v := v.(*GetName_Response); i { + switch v := v.(*GetName_Request); i { case 0: return &v.state case 1: @@ -520,7 +650,7 @@ func file_internal_pb_base_proto_init() { } } file_internal_pb_base_proto_msgTypes[5].Exporter = func(v interface{}, i int) interface{} { - switch v := v.(*GetVersion_Request); i { + switch v := v.(*GetName_Response); i { case 0: return &v.state case 1: @@ -532,7 +662,7 @@ func file_internal_pb_base_proto_init() { } } file_internal_pb_base_proto_msgTypes[6].Exporter = func(v interface{}, i int) interface{} { - switch v := v.(*GetVersion_Response); i { + switch v := v.(*GetVersion_Request); i { case 0: return &v.state case 1: @@ -544,7 +674,7 @@ func file_internal_pb_base_proto_init() { } } file_internal_pb_base_proto_msgTypes[7].Exporter = func(v interface{}, i int) interface{} { - switch v := v.(*Configure_Request); i { + switch v := v.(*GetVersion_Response); i { case 0: return &v.state case 1: @@ -556,6 +686,42 @@ func file_internal_pb_base_proto_init() { } } file_internal_pb_base_proto_msgTypes[8].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*GetProtocolVersion_Request); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_internal_pb_base_proto_msgTypes[9].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*GetProtocolVersion_Response); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_internal_pb_base_proto_msgTypes[10].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*Configure_Request); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_internal_pb_base_proto_msgTypes[11].Exporter = func(v interface{}, i int) interface{} { switch v := v.(*Configure_Response); i { case 0: return &v.state @@ -574,7 +740,7 @@ func file_internal_pb_base_proto_init() { GoPackagePath: reflect.TypeOf(x{}).PkgPath(), RawDescriptor: file_internal_pb_base_proto_rawDesc, NumEnums: 0, - NumMessages: 9, + NumMessages: 12, NumExtensions: 0, NumServices: 0, }, diff --git a/internal/pb/base.proto b/internal/pb/base.proto index b6e0994002..8c4690e8e5 100644 --- a/internal/pb/base.proto +++ b/internal/pb/base.proto @@ -16,6 +16,13 @@ message GetVersion { } } +message GetProtocolVersion { + message Request {} + message Response { + uint64 version = 1; + } +} + message Configure { message Request { // Holds information such as credentials, regions, accounts, etc' diff --git a/internal/pb/destination.pb.go b/internal/pb/destination.pb.go index d1f5e97270..2177899f2b 100644 --- a/internal/pb/destination.pb.go +++ b/internal/pb/destination.pb.go @@ -611,36 +611,42 @@ var file_internal_pb_destination_proto_rawDesc = []byte{ 0x1a, 0x31, 0x0a, 0x08, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x25, 0x0a, 0x0e, 0x66, 0x61, 0x69, 0x6c, 0x65, 0x64, 0x5f, 0x64, 0x65, 0x6c, 0x65, 0x74, 0x65, 0x73, 0x18, 0x01, 0x20, 0x01, 0x28, 0x04, 0x52, 0x0d, 0x66, 0x61, 0x69, 0x6c, 0x65, 0x64, 0x44, 0x65, 0x6c, 0x65, - 0x74, 0x65, 0x73, 0x32, 0xc2, 0x03, 0x0a, 0x0b, 0x44, 0x65, 0x73, 0x74, 0x69, 0x6e, 0x61, 0x74, - 0x69, 0x6f, 0x6e, 0x12, 0x3a, 0x0a, 0x07, 0x47, 0x65, 0x74, 0x4e, 0x61, 0x6d, 0x65, 0x12, 0x16, - 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x47, 0x65, 0x74, 0x4e, 0x61, 0x6d, 0x65, 0x2e, 0x52, - 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x17, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x47, - 0x65, 0x74, 0x4e, 0x61, 0x6d, 0x65, 0x2e, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, - 0x43, 0x0a, 0x0a, 0x47, 0x65, 0x74, 0x56, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x12, 0x19, 0x2e, - 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x47, 0x65, 0x74, 0x56, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, - 0x2e, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x1a, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, - 0x2e, 0x47, 0x65, 0x74, 0x56, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x2e, 0x52, 0x65, 0x73, 0x70, - 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x40, 0x0a, 0x09, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x75, 0x72, - 0x65, 0x12, 0x18, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, - 0x75, 0x72, 0x65, 0x2e, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x19, 0x2e, 0x70, 0x72, - 0x6f, 0x74, 0x6f, 0x2e, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x75, 0x72, 0x65, 0x2e, 0x52, 0x65, - 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x3a, 0x0a, 0x07, 0x4d, 0x69, 0x67, 0x72, 0x61, 0x74, - 0x65, 0x12, 0x16, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x4d, 0x69, 0x67, 0x72, 0x61, 0x74, - 0x65, 0x2e, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x17, 0x2e, 0x70, 0x72, 0x6f, 0x74, - 0x6f, 0x2e, 0x4d, 0x69, 0x67, 0x72, 0x61, 0x74, 0x65, 0x2e, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, - 0x73, 0x65, 0x12, 0x36, 0x0a, 0x05, 0x57, 0x72, 0x69, 0x74, 0x65, 0x12, 0x14, 0x2e, 0x70, 0x72, - 0x6f, 0x74, 0x6f, 0x2e, 0x57, 0x72, 0x69, 0x74, 0x65, 0x2e, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, - 0x74, 0x1a, 0x15, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x57, 0x72, 0x69, 0x74, 0x65, 0x2e, - 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x28, 0x01, 0x12, 0x34, 0x0a, 0x05, 0x43, 0x6c, - 0x6f, 0x73, 0x65, 0x12, 0x14, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x43, 0x6c, 0x6f, 0x73, - 0x65, 0x2e, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x15, 0x2e, 0x70, 0x72, 0x6f, 0x74, - 0x6f, 0x2e, 0x43, 0x6c, 0x6f, 0x73, 0x65, 0x2e, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, - 0x12, 0x46, 0x0a, 0x0b, 0x44, 0x65, 0x6c, 0x65, 0x74, 0x65, 0x53, 0x74, 0x61, 0x6c, 0x65, 0x12, - 0x1a, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x44, 0x65, 0x6c, 0x65, 0x74, 0x65, 0x53, 0x74, - 0x61, 0x6c, 0x65, 0x2e, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x1b, 0x2e, 0x70, 0x72, - 0x6f, 0x74, 0x6f, 0x2e, 0x44, 0x65, 0x6c, 0x65, 0x74, 0x65, 0x53, 0x74, 0x61, 0x6c, 0x65, 0x2e, - 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x42, 0x05, 0x5a, 0x03, 0x2f, 0x70, 0x62, 0x62, - 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, + 0x74, 0x65, 0x73, 0x32, 0x9f, 0x04, 0x0a, 0x0b, 0x44, 0x65, 0x73, 0x74, 0x69, 0x6e, 0x61, 0x74, + 0x69, 0x6f, 0x6e, 0x12, 0x5b, 0x0a, 0x12, 0x47, 0x65, 0x74, 0x50, 0x72, 0x6f, 0x74, 0x6f, 0x63, + 0x6f, 0x6c, 0x56, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x12, 0x21, 0x2e, 0x70, 0x72, 0x6f, 0x74, + 0x6f, 0x2e, 0x47, 0x65, 0x74, 0x50, 0x72, 0x6f, 0x74, 0x6f, 0x63, 0x6f, 0x6c, 0x56, 0x65, 0x72, + 0x73, 0x69, 0x6f, 0x6e, 0x2e, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x22, 0x2e, 0x70, + 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x47, 0x65, 0x74, 0x50, 0x72, 0x6f, 0x74, 0x6f, 0x63, 0x6f, 0x6c, + 0x56, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x2e, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, + 0x12, 0x3a, 0x0a, 0x07, 0x47, 0x65, 0x74, 0x4e, 0x61, 0x6d, 0x65, 0x12, 0x16, 0x2e, 0x70, 0x72, + 0x6f, 0x74, 0x6f, 0x2e, 0x47, 0x65, 0x74, 0x4e, 0x61, 0x6d, 0x65, 0x2e, 0x52, 0x65, 0x71, 0x75, + 0x65, 0x73, 0x74, 0x1a, 0x17, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x47, 0x65, 0x74, 0x4e, + 0x61, 0x6d, 0x65, 0x2e, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x43, 0x0a, 0x0a, + 0x47, 0x65, 0x74, 0x56, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x12, 0x19, 0x2e, 0x70, 0x72, 0x6f, + 0x74, 0x6f, 0x2e, 0x47, 0x65, 0x74, 0x56, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x2e, 0x52, 0x65, + 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x1a, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x47, 0x65, + 0x74, 0x56, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x2e, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, + 0x65, 0x12, 0x40, 0x0a, 0x09, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x75, 0x72, 0x65, 0x12, 0x18, + 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x75, 0x72, 0x65, + 0x2e, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x19, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, + 0x2e, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x75, 0x72, 0x65, 0x2e, 0x52, 0x65, 0x73, 0x70, 0x6f, + 0x6e, 0x73, 0x65, 0x12, 0x3a, 0x0a, 0x07, 0x4d, 0x69, 0x67, 0x72, 0x61, 0x74, 0x65, 0x12, 0x16, + 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x4d, 0x69, 0x67, 0x72, 0x61, 0x74, 0x65, 0x2e, 0x52, + 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x17, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x4d, + 0x69, 0x67, 0x72, 0x61, 0x74, 0x65, 0x2e, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, + 0x36, 0x0a, 0x05, 0x57, 0x72, 0x69, 0x74, 0x65, 0x12, 0x14, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, + 0x2e, 0x57, 0x72, 0x69, 0x74, 0x65, 0x2e, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x15, + 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x57, 0x72, 0x69, 0x74, 0x65, 0x2e, 0x52, 0x65, 0x73, + 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x28, 0x01, 0x12, 0x34, 0x0a, 0x05, 0x43, 0x6c, 0x6f, 0x73, 0x65, + 0x12, 0x14, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x43, 0x6c, 0x6f, 0x73, 0x65, 0x2e, 0x52, + 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x15, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x43, + 0x6c, 0x6f, 0x73, 0x65, 0x2e, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x46, 0x0a, + 0x0b, 0x44, 0x65, 0x6c, 0x65, 0x74, 0x65, 0x53, 0x74, 0x61, 0x6c, 0x65, 0x12, 0x1a, 0x2e, 0x70, + 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x44, 0x65, 0x6c, 0x65, 0x74, 0x65, 0x53, 0x74, 0x61, 0x6c, 0x65, + 0x2e, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x1b, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, + 0x2e, 0x44, 0x65, 0x6c, 0x65, 0x74, 0x65, 0x53, 0x74, 0x61, 0x6c, 0x65, 0x2e, 0x52, 0x65, 0x73, + 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x42, 0x05, 0x5a, 0x03, 0x2f, 0x70, 0x62, 0x62, 0x06, 0x70, 0x72, + 0x6f, 0x74, 0x6f, 0x33, } var ( @@ -657,45 +663,49 @@ func file_internal_pb_destination_proto_rawDescGZIP() []byte { var file_internal_pb_destination_proto_msgTypes = make([]protoimpl.MessageInfo, 12) var file_internal_pb_destination_proto_goTypes = []interface{}{ - (*Migrate)(nil), // 0: proto.Migrate - (*Write)(nil), // 1: proto.Write - (*Close)(nil), // 2: proto.Close - (*DeleteStale)(nil), // 3: proto.DeleteStale - (*Migrate_Request)(nil), // 4: proto.Migrate.Request - (*Migrate_Response)(nil), // 5: proto.Migrate.Response - (*Write_Request)(nil), // 6: proto.Write.Request - (*Write_Response)(nil), // 7: proto.Write.Response - (*Close_Request)(nil), // 8: proto.Close.Request - (*Close_Response)(nil), // 9: proto.Close.Response - (*DeleteStale_Request)(nil), // 10: proto.DeleteStale.Request - (*DeleteStale_Response)(nil), // 11: proto.DeleteStale.Response - (*timestamppb.Timestamp)(nil), // 12: google.protobuf.Timestamp - (*GetName_Request)(nil), // 13: proto.GetName.Request - (*GetVersion_Request)(nil), // 14: proto.GetVersion.Request - (*Configure_Request)(nil), // 15: proto.Configure.Request - (*GetName_Response)(nil), // 16: proto.GetName.Response - (*GetVersion_Response)(nil), // 17: proto.GetVersion.Response - (*Configure_Response)(nil), // 18: proto.Configure.Response + (*Migrate)(nil), // 0: proto.Migrate + (*Write)(nil), // 1: proto.Write + (*Close)(nil), // 2: proto.Close + (*DeleteStale)(nil), // 3: proto.DeleteStale + (*Migrate_Request)(nil), // 4: proto.Migrate.Request + (*Migrate_Response)(nil), // 5: proto.Migrate.Response + (*Write_Request)(nil), // 6: proto.Write.Request + (*Write_Response)(nil), // 7: proto.Write.Response + (*Close_Request)(nil), // 8: proto.Close.Request + (*Close_Response)(nil), // 9: proto.Close.Response + (*DeleteStale_Request)(nil), // 10: proto.DeleteStale.Request + (*DeleteStale_Response)(nil), // 11: proto.DeleteStale.Response + (*timestamppb.Timestamp)(nil), // 12: google.protobuf.Timestamp + (*GetProtocolVersion_Request)(nil), // 13: proto.GetProtocolVersion.Request + (*GetName_Request)(nil), // 14: proto.GetName.Request + (*GetVersion_Request)(nil), // 15: proto.GetVersion.Request + (*Configure_Request)(nil), // 16: proto.Configure.Request + (*GetProtocolVersion_Response)(nil), // 17: proto.GetProtocolVersion.Response + (*GetName_Response)(nil), // 18: proto.GetName.Response + (*GetVersion_Response)(nil), // 19: proto.GetVersion.Response + (*Configure_Response)(nil), // 20: proto.Configure.Response } var file_internal_pb_destination_proto_depIdxs = []int32{ 12, // 0: proto.Write.Request.timestamp:type_name -> google.protobuf.Timestamp 12, // 1: proto.DeleteStale.Request.timestamp:type_name -> google.protobuf.Timestamp - 13, // 2: proto.Destination.GetName:input_type -> proto.GetName.Request - 14, // 3: proto.Destination.GetVersion:input_type -> proto.GetVersion.Request - 15, // 4: proto.Destination.Configure:input_type -> proto.Configure.Request - 4, // 5: proto.Destination.Migrate:input_type -> proto.Migrate.Request - 6, // 6: proto.Destination.Write:input_type -> proto.Write.Request - 8, // 7: proto.Destination.Close:input_type -> proto.Close.Request - 10, // 8: proto.Destination.DeleteStale:input_type -> proto.DeleteStale.Request - 16, // 9: proto.Destination.GetName:output_type -> proto.GetName.Response - 17, // 10: proto.Destination.GetVersion:output_type -> proto.GetVersion.Response - 18, // 11: proto.Destination.Configure:output_type -> proto.Configure.Response - 5, // 12: proto.Destination.Migrate:output_type -> proto.Migrate.Response - 7, // 13: proto.Destination.Write:output_type -> proto.Write.Response - 9, // 14: proto.Destination.Close:output_type -> proto.Close.Response - 11, // 15: proto.Destination.DeleteStale:output_type -> proto.DeleteStale.Response - 9, // [9:16] is the sub-list for method output_type - 2, // [2:9] is the sub-list for method input_type + 13, // 2: proto.Destination.GetProtocolVersion:input_type -> proto.GetProtocolVersion.Request + 14, // 3: proto.Destination.GetName:input_type -> proto.GetName.Request + 15, // 4: proto.Destination.GetVersion:input_type -> proto.GetVersion.Request + 16, // 5: proto.Destination.Configure:input_type -> proto.Configure.Request + 4, // 6: proto.Destination.Migrate:input_type -> proto.Migrate.Request + 6, // 7: proto.Destination.Write:input_type -> proto.Write.Request + 8, // 8: proto.Destination.Close:input_type -> proto.Close.Request + 10, // 9: proto.Destination.DeleteStale:input_type -> proto.DeleteStale.Request + 17, // 10: proto.Destination.GetProtocolVersion:output_type -> proto.GetProtocolVersion.Response + 18, // 11: proto.Destination.GetName:output_type -> proto.GetName.Response + 19, // 12: proto.Destination.GetVersion:output_type -> proto.GetVersion.Response + 20, // 13: proto.Destination.Configure:output_type -> proto.Configure.Response + 5, // 14: proto.Destination.Migrate:output_type -> proto.Migrate.Response + 7, // 15: proto.Destination.Write:output_type -> proto.Write.Response + 9, // 16: proto.Destination.Close:output_type -> proto.Close.Response + 11, // 17: proto.Destination.DeleteStale:output_type -> proto.DeleteStale.Response + 10, // [10:18] is the sub-list for method output_type + 2, // [2:10] is the sub-list for method input_type 2, // [2:2] is the sub-list for extension type_name 2, // [2:2] is the sub-list for extension extendee 0, // [0:2] is the sub-list for field type_name diff --git a/internal/pb/destination.proto b/internal/pb/destination.proto index f03b8f7913..da5a9ec9a1 100644 --- a/internal/pb/destination.proto +++ b/internal/pb/destination.proto @@ -5,6 +5,10 @@ import "internal/pb/base.proto"; import "google/protobuf/timestamp.proto"; service Destination { + // Get the current protocol version of the plugin. This helps + // get the right message about upgrade/downgrade of cli and/or plugin. + // Also, on the cli side it can try to upgrade/downgrade the protocol if cli supports it. + rpc GetProtocolVersion(GetProtocolVersion.Request) returns (GetProtocolVersion.Response); // Get the name of the plugin rpc GetName(GetName.Request) returns (GetName.Response); // Get the current version of the plugin diff --git a/internal/pb/destination_grpc.pb.go b/internal/pb/destination_grpc.pb.go index b2e68dd670..d427a036a4 100644 --- a/internal/pb/destination_grpc.pb.go +++ b/internal/pb/destination_grpc.pb.go @@ -22,6 +22,10 @@ const _ = grpc.SupportPackageIsVersion7 // // For semantics around ctx use and closing/ending streaming RPCs, please refer to https://pkg.go.dev/google.golang.org/grpc/?tab=doc#ClientConn.NewStream. type DestinationClient interface { + // Get the current protocol version of the plugin. This helps + // get the right message about upgrade/downgrade of cli and/or plugin. + // Also, on the cli side it can try to upgrade/downgrade the protocol if cli supports it. + GetProtocolVersion(ctx context.Context, in *GetProtocolVersion_Request, opts ...grpc.CallOption) (*GetProtocolVersion_Response, error) // Get the name of the plugin GetName(ctx context.Context, in *GetName_Request, opts ...grpc.CallOption) (*GetName_Response, error) // Get the current version of the plugin @@ -47,6 +51,15 @@ func NewDestinationClient(cc grpc.ClientConnInterface) DestinationClient { return &destinationClient{cc} } +func (c *destinationClient) GetProtocolVersion(ctx context.Context, in *GetProtocolVersion_Request, opts ...grpc.CallOption) (*GetProtocolVersion_Response, error) { + out := new(GetProtocolVersion_Response) + err := c.cc.Invoke(ctx, "/proto.Destination/GetProtocolVersion", in, out, opts...) + if err != nil { + return nil, err + } + return out, nil +} + func (c *destinationClient) GetName(ctx context.Context, in *GetName_Request, opts ...grpc.CallOption) (*GetName_Response, error) { out := new(GetName_Response) err := c.cc.Invoke(ctx, "/proto.Destination/GetName", in, out, opts...) @@ -139,6 +152,10 @@ func (c *destinationClient) DeleteStale(ctx context.Context, in *DeleteStale_Req // All implementations must embed UnimplementedDestinationServer // for forward compatibility type DestinationServer interface { + // Get the current protocol version of the plugin. This helps + // get the right message about upgrade/downgrade of cli and/or plugin. + // Also, on the cli side it can try to upgrade/downgrade the protocol if cli supports it. + GetProtocolVersion(context.Context, *GetProtocolVersion_Request) (*GetProtocolVersion_Response, error) // Get the name of the plugin GetName(context.Context, *GetName_Request) (*GetName_Response, error) // Get the current version of the plugin @@ -161,6 +178,9 @@ type DestinationServer interface { type UnimplementedDestinationServer struct { } +func (UnimplementedDestinationServer) GetProtocolVersion(context.Context, *GetProtocolVersion_Request) (*GetProtocolVersion_Response, error) { + return nil, status.Errorf(codes.Unimplemented, "method GetProtocolVersion not implemented") +} func (UnimplementedDestinationServer) GetName(context.Context, *GetName_Request) (*GetName_Response, error) { return nil, status.Errorf(codes.Unimplemented, "method GetName not implemented") } @@ -195,6 +215,24 @@ func RegisterDestinationServer(s grpc.ServiceRegistrar, srv DestinationServer) { s.RegisterService(&Destination_ServiceDesc, srv) } +func _Destination_GetProtocolVersion_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(GetProtocolVersion_Request) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(DestinationServer).GetProtocolVersion(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: "/proto.Destination/GetProtocolVersion", + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(DestinationServer).GetProtocolVersion(ctx, req.(*GetProtocolVersion_Request)) + } + return interceptor(ctx, in, info, handler) +} + func _Destination_GetName_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { in := new(GetName_Request) if err := dec(in); err != nil { @@ -336,6 +374,10 @@ var Destination_ServiceDesc = grpc.ServiceDesc{ ServiceName: "proto.Destination", HandlerType: (*DestinationServer)(nil), Methods: []grpc.MethodDesc{ + { + MethodName: "GetProtocolVersion", + Handler: _Destination_GetProtocolVersion_Handler, + }, { MethodName: "GetName", Handler: _Destination_GetName_Handler, diff --git a/internal/pb/source.pb.go b/internal/pb/source.pb.go index 7673fab13c..36caaf169a 100644 --- a/internal/pb/source.pb.go +++ b/internal/pb/source.pb.go @@ -455,29 +455,35 @@ var file_internal_pb_source_proto_rawDesc = []byte{ 0x18, 0x0a, 0x07, 0x76, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x07, 0x76, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x12, 0x16, 0x0a, 0x06, 0x74, 0x61, 0x62, 0x6c, 0x65, 0x73, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x06, 0x74, 0x61, 0x62, 0x6c, 0x65, - 0x73, 0x32, 0xd1, 0x02, 0x0a, 0x06, 0x53, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x12, 0x3a, 0x0a, 0x07, - 0x47, 0x65, 0x74, 0x4e, 0x61, 0x6d, 0x65, 0x12, 0x16, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e, - 0x47, 0x65, 0x74, 0x4e, 0x61, 0x6d, 0x65, 0x2e, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, - 0x17, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x47, 0x65, 0x74, 0x4e, 0x61, 0x6d, 0x65, 0x2e, - 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x43, 0x0a, 0x0a, 0x47, 0x65, 0x74, 0x56, - 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x12, 0x19, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x47, - 0x65, 0x74, 0x56, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x2e, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, - 0x74, 0x1a, 0x1a, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x47, 0x65, 0x74, 0x56, 0x65, 0x72, - 0x73, 0x69, 0x6f, 0x6e, 0x2e, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x40, 0x0a, - 0x09, 0x47, 0x65, 0x74, 0x54, 0x61, 0x62, 0x6c, 0x65, 0x73, 0x12, 0x18, 0x2e, 0x70, 0x72, 0x6f, - 0x74, 0x6f, 0x2e, 0x47, 0x65, 0x74, 0x54, 0x61, 0x62, 0x6c, 0x65, 0x73, 0x2e, 0x52, 0x65, 0x71, - 0x75, 0x65, 0x73, 0x74, 0x1a, 0x19, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x47, 0x65, 0x74, - 0x54, 0x61, 0x62, 0x6c, 0x65, 0x73, 0x2e, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, - 0x4f, 0x0a, 0x0e, 0x47, 0x65, 0x74, 0x53, 0x79, 0x6e, 0x63, 0x53, 0x75, 0x6d, 0x6d, 0x61, 0x72, - 0x79, 0x12, 0x1d, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x47, 0x65, 0x74, 0x53, 0x79, 0x6e, - 0x63, 0x53, 0x75, 0x6d, 0x6d, 0x61, 0x72, 0x79, 0x2e, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, - 0x1a, 0x1e, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x47, 0x65, 0x74, 0x53, 0x79, 0x6e, 0x63, - 0x53, 0x75, 0x6d, 0x6d, 0x61, 0x72, 0x79, 0x2e, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, - 0x12, 0x33, 0x0a, 0x04, 0x53, 0x79, 0x6e, 0x63, 0x12, 0x13, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, - 0x2e, 0x53, 0x79, 0x6e, 0x63, 0x2e, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x14, 0x2e, - 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x53, 0x79, 0x6e, 0x63, 0x2e, 0x52, 0x65, 0x73, 0x70, 0x6f, - 0x6e, 0x73, 0x65, 0x30, 0x01, 0x42, 0x05, 0x5a, 0x03, 0x2f, 0x70, 0x62, 0x62, 0x06, 0x70, 0x72, - 0x6f, 0x74, 0x6f, 0x33, + 0x73, 0x32, 0xae, 0x03, 0x0a, 0x06, 0x53, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x12, 0x5b, 0x0a, 0x12, + 0x47, 0x65, 0x74, 0x50, 0x72, 0x6f, 0x74, 0x6f, 0x63, 0x6f, 0x6c, 0x56, 0x65, 0x72, 0x73, 0x69, + 0x6f, 0x6e, 0x12, 0x21, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x47, 0x65, 0x74, 0x50, 0x72, + 0x6f, 0x74, 0x6f, 0x63, 0x6f, 0x6c, 0x56, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x2e, 0x52, 0x65, + 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x22, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x47, 0x65, + 0x74, 0x50, 0x72, 0x6f, 0x74, 0x6f, 0x63, 0x6f, 0x6c, 0x56, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, + 0x2e, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x3a, 0x0a, 0x07, 0x47, 0x65, 0x74, + 0x4e, 0x61, 0x6d, 0x65, 0x12, 0x16, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x47, 0x65, 0x74, + 0x4e, 0x61, 0x6d, 0x65, 0x2e, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x17, 0x2e, 0x70, + 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x47, 0x65, 0x74, 0x4e, 0x61, 0x6d, 0x65, 0x2e, 0x52, 0x65, 0x73, + 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x43, 0x0a, 0x0a, 0x47, 0x65, 0x74, 0x56, 0x65, 0x72, 0x73, + 0x69, 0x6f, 0x6e, 0x12, 0x19, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x47, 0x65, 0x74, 0x56, + 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x2e, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x1a, + 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x47, 0x65, 0x74, 0x56, 0x65, 0x72, 0x73, 0x69, 0x6f, + 0x6e, 0x2e, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x40, 0x0a, 0x09, 0x47, 0x65, + 0x74, 0x54, 0x61, 0x62, 0x6c, 0x65, 0x73, 0x12, 0x18, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e, + 0x47, 0x65, 0x74, 0x54, 0x61, 0x62, 0x6c, 0x65, 0x73, 0x2e, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, + 0x74, 0x1a, 0x19, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x47, 0x65, 0x74, 0x54, 0x61, 0x62, + 0x6c, 0x65, 0x73, 0x2e, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x4f, 0x0a, 0x0e, + 0x47, 0x65, 0x74, 0x53, 0x79, 0x6e, 0x63, 0x53, 0x75, 0x6d, 0x6d, 0x61, 0x72, 0x79, 0x12, 0x1d, + 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x47, 0x65, 0x74, 0x53, 0x79, 0x6e, 0x63, 0x53, 0x75, + 0x6d, 0x6d, 0x61, 0x72, 0x79, 0x2e, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x1e, 0x2e, + 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x47, 0x65, 0x74, 0x53, 0x79, 0x6e, 0x63, 0x53, 0x75, 0x6d, + 0x6d, 0x61, 0x72, 0x79, 0x2e, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x33, 0x0a, + 0x04, 0x53, 0x79, 0x6e, 0x63, 0x12, 0x13, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x53, 0x79, + 0x6e, 0x63, 0x2e, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x14, 0x2e, 0x70, 0x72, 0x6f, + 0x74, 0x6f, 0x2e, 0x53, 0x79, 0x6e, 0x63, 0x2e, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, + 0x30, 0x01, 0x42, 0x05, 0x5a, 0x03, 0x2f, 0x70, 0x62, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, + 0x33, } var ( @@ -494,35 +500,39 @@ func file_internal_pb_source_proto_rawDescGZIP() []byte { var file_internal_pb_source_proto_msgTypes = make([]protoimpl.MessageInfo, 9) var file_internal_pb_source_proto_goTypes = []interface{}{ - (*Sync)(nil), // 0: proto.Sync - (*GetSyncSummary)(nil), // 1: proto.GetSyncSummary - (*GetTables)(nil), // 2: proto.GetTables - (*Sync_Request)(nil), // 3: proto.Sync.Request - (*Sync_Response)(nil), // 4: proto.Sync.Response - (*GetSyncSummary_Request)(nil), // 5: proto.GetSyncSummary.Request - (*GetSyncSummary_Response)(nil), // 6: proto.GetSyncSummary.Response - (*GetTables_Request)(nil), // 7: proto.GetTables.Request - (*GetTables_Response)(nil), // 8: proto.GetTables.Response - (*timestamppb.Timestamp)(nil), // 9: google.protobuf.Timestamp - (*GetName_Request)(nil), // 10: proto.GetName.Request - (*GetVersion_Request)(nil), // 11: proto.GetVersion.Request - (*GetName_Response)(nil), // 12: proto.GetName.Response - (*GetVersion_Response)(nil), // 13: proto.GetVersion.Response + (*Sync)(nil), // 0: proto.Sync + (*GetSyncSummary)(nil), // 1: proto.GetSyncSummary + (*GetTables)(nil), // 2: proto.GetTables + (*Sync_Request)(nil), // 3: proto.Sync.Request + (*Sync_Response)(nil), // 4: proto.Sync.Response + (*GetSyncSummary_Request)(nil), // 5: proto.GetSyncSummary.Request + (*GetSyncSummary_Response)(nil), // 6: proto.GetSyncSummary.Response + (*GetTables_Request)(nil), // 7: proto.GetTables.Request + (*GetTables_Response)(nil), // 8: proto.GetTables.Response + (*timestamppb.Timestamp)(nil), // 9: google.protobuf.Timestamp + (*GetProtocolVersion_Request)(nil), // 10: proto.GetProtocolVersion.Request + (*GetName_Request)(nil), // 11: proto.GetName.Request + (*GetVersion_Request)(nil), // 12: proto.GetVersion.Request + (*GetProtocolVersion_Response)(nil), // 13: proto.GetProtocolVersion.Response + (*GetName_Response)(nil), // 14: proto.GetName.Response + (*GetVersion_Response)(nil), // 15: proto.GetVersion.Response } var file_internal_pb_source_proto_depIdxs = []int32{ 9, // 0: proto.Sync.Request.timestamp:type_name -> google.protobuf.Timestamp - 10, // 1: proto.Source.GetName:input_type -> proto.GetName.Request - 11, // 2: proto.Source.GetVersion:input_type -> proto.GetVersion.Request - 7, // 3: proto.Source.GetTables:input_type -> proto.GetTables.Request - 5, // 4: proto.Source.GetSyncSummary:input_type -> proto.GetSyncSummary.Request - 3, // 5: proto.Source.Sync:input_type -> proto.Sync.Request - 12, // 6: proto.Source.GetName:output_type -> proto.GetName.Response - 13, // 7: proto.Source.GetVersion:output_type -> proto.GetVersion.Response - 8, // 8: proto.Source.GetTables:output_type -> proto.GetTables.Response - 6, // 9: proto.Source.GetSyncSummary:output_type -> proto.GetSyncSummary.Response - 4, // 10: proto.Source.Sync:output_type -> proto.Sync.Response - 6, // [6:11] is the sub-list for method output_type - 1, // [1:6] is the sub-list for method input_type + 10, // 1: proto.Source.GetProtocolVersion:input_type -> proto.GetProtocolVersion.Request + 11, // 2: proto.Source.GetName:input_type -> proto.GetName.Request + 12, // 3: proto.Source.GetVersion:input_type -> proto.GetVersion.Request + 7, // 4: proto.Source.GetTables:input_type -> proto.GetTables.Request + 5, // 5: proto.Source.GetSyncSummary:input_type -> proto.GetSyncSummary.Request + 3, // 6: proto.Source.Sync:input_type -> proto.Sync.Request + 13, // 7: proto.Source.GetProtocolVersion:output_type -> proto.GetProtocolVersion.Response + 14, // 8: proto.Source.GetName:output_type -> proto.GetName.Response + 15, // 9: proto.Source.GetVersion:output_type -> proto.GetVersion.Response + 8, // 10: proto.Source.GetTables:output_type -> proto.GetTables.Response + 6, // 11: proto.Source.GetSyncSummary:output_type -> proto.GetSyncSummary.Response + 4, // 12: proto.Source.Sync:output_type -> proto.Sync.Response + 7, // [7:13] is the sub-list for method output_type + 1, // [1:7] is the sub-list for method input_type 1, // [1:1] is the sub-list for extension type_name 1, // [1:1] is the sub-list for extension extendee 0, // [0:1] is the sub-list for field type_name diff --git a/internal/pb/source.proto b/internal/pb/source.proto index 658231ec97..33ed8a6375 100644 --- a/internal/pb/source.proto +++ b/internal/pb/source.proto @@ -5,6 +5,10 @@ import "internal/pb/base.proto"; import "google/protobuf/timestamp.proto"; service Source { + // Get the current protocol version of the plugin. This helps + // get the right message about upgrade/downgrade of cli and/or plugin. + // Also, on the cli side it can try to upgrade/downgrade the protocol if cli supports it. + rpc GetProtocolVersion(GetProtocolVersion.Request) returns (GetProtocolVersion.Response); // Get the name of the plugin rpc GetName(GetName.Request) returns (GetName.Response); // Get the current version of the plugin diff --git a/internal/pb/source_grpc.pb.go b/internal/pb/source_grpc.pb.go index 72bddd378b..8b8b5f7cc4 100644 --- a/internal/pb/source_grpc.pb.go +++ b/internal/pb/source_grpc.pb.go @@ -22,6 +22,10 @@ const _ = grpc.SupportPackageIsVersion7 // // For semantics around ctx use and closing/ending streaming RPCs, please refer to https://pkg.go.dev/google.golang.org/grpc/?tab=doc#ClientConn.NewStream. type SourceClient interface { + // Get the current protocol version of the plugin. This helps + // get the right message about upgrade/downgrade of cli and/or plugin. + // Also, on the cli side it can try to upgrade/downgrade the protocol if cli supports it. + GetProtocolVersion(ctx context.Context, in *GetProtocolVersion_Request, opts ...grpc.CallOption) (*GetProtocolVersion_Response, error) // Get the name of the plugin GetName(ctx context.Context, in *GetName_Request, opts ...grpc.CallOption) (*GetName_Response, error) // Get the current version of the plugin @@ -43,6 +47,15 @@ func NewSourceClient(cc grpc.ClientConnInterface) SourceClient { return &sourceClient{cc} } +func (c *sourceClient) GetProtocolVersion(ctx context.Context, in *GetProtocolVersion_Request, opts ...grpc.CallOption) (*GetProtocolVersion_Response, error) { + out := new(GetProtocolVersion_Response) + err := c.cc.Invoke(ctx, "/proto.Source/GetProtocolVersion", in, out, opts...) + if err != nil { + return nil, err + } + return out, nil +} + func (c *sourceClient) GetName(ctx context.Context, in *GetName_Request, opts ...grpc.CallOption) (*GetName_Response, error) { out := new(GetName_Response) err := c.cc.Invoke(ctx, "/proto.Source/GetName", in, out, opts...) @@ -115,6 +128,10 @@ func (x *sourceSyncClient) Recv() (*Sync_Response, error) { // All implementations must embed UnimplementedSourceServer // for forward compatibility type SourceServer interface { + // Get the current protocol version of the plugin. This helps + // get the right message about upgrade/downgrade of cli and/or plugin. + // Also, on the cli side it can try to upgrade/downgrade the protocol if cli supports it. + GetProtocolVersion(context.Context, *GetProtocolVersion_Request) (*GetProtocolVersion_Response, error) // Get the name of the plugin GetName(context.Context, *GetName_Request) (*GetName_Response, error) // Get the current version of the plugin @@ -133,6 +150,9 @@ type SourceServer interface { type UnimplementedSourceServer struct { } +func (UnimplementedSourceServer) GetProtocolVersion(context.Context, *GetProtocolVersion_Request) (*GetProtocolVersion_Response, error) { + return nil, status.Errorf(codes.Unimplemented, "method GetProtocolVersion not implemented") +} func (UnimplementedSourceServer) GetName(context.Context, *GetName_Request) (*GetName_Response, error) { return nil, status.Errorf(codes.Unimplemented, "method GetName not implemented") } @@ -161,6 +181,24 @@ func RegisterSourceServer(s grpc.ServiceRegistrar, srv SourceServer) { s.RegisterService(&Source_ServiceDesc, srv) } +func _Source_GetProtocolVersion_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(GetProtocolVersion_Request) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(SourceServer).GetProtocolVersion(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: "/proto.Source/GetProtocolVersion", + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(SourceServer).GetProtocolVersion(ctx, req.(*GetProtocolVersion_Request)) + } + return interceptor(ctx, in, info, handler) +} + func _Source_GetName_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { in := new(GetName_Request) if err := dec(in); err != nil { @@ -261,6 +299,10 @@ var Source_ServiceDesc = grpc.ServiceDesc{ ServiceName: "proto.Source", HandlerType: (*SourceServer)(nil), Methods: []grpc.MethodDesc{ + { + MethodName: "GetProtocolVersion", + Handler: _Source_GetProtocolVersion_Handler, + }, { MethodName: "GetName", Handler: _Source_GetName_Handler, diff --git a/internal/servers/destinations.go b/internal/servers/destinations.go index ebccdfdd90..bfcfecd385 100644 --- a/internal/servers/destinations.go +++ b/internal/servers/destinations.go @@ -8,6 +8,7 @@ import ( "sync" "github.com/cloudquery/plugin-sdk/internal/pb" + "github.com/cloudquery/plugin-sdk/internal/versions" "github.com/cloudquery/plugin-sdk/plugins" "github.com/cloudquery/plugin-sdk/schema" "github.com/cloudquery/plugin-sdk/specs" @@ -22,6 +23,12 @@ type DestinationServer struct { Logger zerolog.Logger } +func (*DestinationServer) GetProtocolVersion(context.Context, *pb.GetProtocolVersion_Request) (*pb.GetProtocolVersion_Response, error) { + return &pb.GetProtocolVersion_Response{ + Version: versions.DestinationProtocolVersion, + }, nil +} + func (s *DestinationServer) Configure(ctx context.Context, req *pb.Configure_Request) (*pb.Configure_Response, error) { var spec specs.Destination if err := json.Unmarshal(req.Config, &spec); err != nil { diff --git a/internal/servers/source.go b/internal/servers/source.go index eb4309fa80..1256988302 100644 --- a/internal/servers/source.go +++ b/internal/servers/source.go @@ -7,6 +7,7 @@ import ( "fmt" "github.com/cloudquery/plugin-sdk/internal/pb" + "github.com/cloudquery/plugin-sdk/internal/versions" "github.com/cloudquery/plugin-sdk/plugins" "github.com/cloudquery/plugin-sdk/schema" "github.com/cloudquery/plugin-sdk/specs" @@ -22,6 +23,12 @@ type SourceServer struct { summary *schema.SyncSummary } +func (*SourceServer) GetProtocolVersion(context.Context, *pb.GetProtocolVersion_Request) (*pb.GetProtocolVersion_Response, error) { + return &pb.GetProtocolVersion_Response{ + Version: versions.SourceProtocolVersion, + }, nil +} + func (s *SourceServer) GetTables(context.Context, *pb.GetTables_Request) (*pb.GetTables_Response, error) { b, err := json.Marshal(s.Plugin.Tables()) if err != nil { diff --git a/internal/versions/versions.go b/internal/versions/versions.go new file mode 100644 index 0000000000..87e979c4ae --- /dev/null +++ b/internal/versions/versions.go @@ -0,0 +1,6 @@ +package versions + +const ( + SourceProtocolVersion = uint64(1) + DestinationProtocolVersion = uint64(1) +) diff --git a/serve/source_test.go b/serve/source_test.go index 45b6b2515c..acab7ac3b0 100644 --- a/serve/source_test.go +++ b/serve/source_test.go @@ -9,6 +9,7 @@ import ( "time" "github.com/cloudquery/plugin-sdk/clients" + "github.com/cloudquery/plugin-sdk/internal/versions" "github.com/cloudquery/plugin-sdk/plugins" "github.com/cloudquery/plugin-sdk/schema" "github.com/cloudquery/plugin-sdk/specs" @@ -115,6 +116,14 @@ func TestServeSource(t *testing.T) { } }() + protocolVersion, err := c.GetProtocolVersion(ctx) + if err != nil { + t.Fatal(err) + } + if versions.SourceProtocolVersion != protocolVersion { + t.Fatalf("expected protocol version %d, got %d", versions.SourceProtocolVersion, protocolVersion) + } + name, err := c.Name(ctx) if err != nil { t.Fatal(err)