From 09bbf3113bd9076e976659dc680a5bb51691bd93 Mon Sep 17 00:00:00 2001 From: "Cody A. Ray" Date: Wed, 9 May 2018 17:35:16 -0500 Subject: [PATCH] Add "connect delete " CLI command --- command/common/cli.go | 2 +- command/connect/command.go | 12 +++-- command/connect/grpc.go | 13 ++++++ command/connect/interface.go | 1 + http/client.go | 2 +- http/connect.go | 13 ++++++ plugin/confluent-connect-plugin/main.go | 25 +++++++++++ shared/connect/connect.pb.go | 60 +++++++++++++++++++------ shared/connect/connect.proto | 1 + shared/errors.go | 6 +-- 10 files changed, 113 insertions(+), 22 deletions(-) diff --git a/command/common/cli.go b/command/common/cli.go index 337a7649f3..3319f1592f 100644 --- a/command/common/cli.go +++ b/command/common/cli.go @@ -19,7 +19,7 @@ func HandleError(err error) error { case shared.ErrNotImplemented: fmt.Println("Sorry, this functionality is not yet available in the CLI.") case shared.ErrNotFound: - fmt.Println("Kafka cluster not found.") + fmt.Println("Kafka cluster not found.") // TODO: parametrize ErrNotFound for better error messaging default: return err } diff --git a/command/connect/command.go b/command/connect/command.go index 2fc5242c05..f270834639 100644 --- a/command/connect/command.go +++ b/command/connect/command.go @@ -21,8 +21,8 @@ import ( var ( listFields = []string{"Name", "ServiceProvider", "Region", "Durability", "Status"} listLabels = []string{"Name", "Provider", "Region", "Durability", "Status"} - describeFields = []string{"Name", "KafkaClusterId", "ServiceProvider", "Region", "Status", "Endpoint", "PricePerHour"} - describeLabels = []string{"Name", "Kafka", "Provider", "Region", "Status", "Endpoint", "PricePerHour"} + describeFields = []string{"Name", "KafkaClusterId", "ServiceProvider", "Region", "Status", "PricePerHour"} + describeLabels = []string{"Name", "Kafka", "Provider", "Region", "Status", "PricePerHour"} ) type Command struct { @@ -197,7 +197,13 @@ func (c *Command) update(cmd *cobra.Command, args []string) error { } func (c *Command) delete(cmd *cobra.Command, args []string) error { - return common.HandleError(shared.ErrNotImplemented) + req := &schedv1.ConnectCluster{AccountId: c.config.Auth.Account.Id, Name: args[0]} + err := c.connect.Delete(context.Background(), req) + if err != nil { + return common.HandleError(err) + } + fmt.Println("Your connect cluster has been deleted!") + return nil } func (c *Command) auth(cmd *cobra.Command, args []string) error { diff --git a/command/connect/grpc.go b/command/connect/grpc.go index f1438d940d..bc7886e85e 100644 --- a/command/connect/grpc.go +++ b/command/connect/grpc.go @@ -37,6 +37,14 @@ func (c *GRPCClient) CreateS3Sink(ctx context.Context, config *schedv1.ConnectS3 return resp.Cluster, nil } +func (c *GRPCClient) Delete(ctx context.Context, cluster *schedv1.ConnectCluster) error { + _, err := c.client.Delete(ctx, &schedv1.DeleteConnectClusterRequest{Cluster: cluster}) + if err != nil { + return shared.ConvertGRPCError(err) + } + return nil +} + // The gRPC server the GPRClient talks to. Plugin authors implement this if they're using Go. type GRPCServer struct { Impl Connect @@ -56,3 +64,8 @@ func (s *GRPCServer) CreateS3Sink(ctx context.Context, req *schedv1.CreateConnec r, err := s.Impl.CreateS3Sink(ctx, req.Config) return &schedv1.CreateConnectS3SinkClusterReply{Cluster: r}, err } + +func (s *GRPCServer) Delete(ctx context.Context, req *schedv1.DeleteConnectClusterRequest) (*schedv1.DeleteConnectClusterReply, error) { + err := s.Impl.Delete(ctx, req.Cluster) + return &schedv1.DeleteConnectClusterReply{}, err +} diff --git a/command/connect/interface.go b/command/connect/interface.go index 173f966f6f..d9ae647d89 100644 --- a/command/connect/interface.go +++ b/command/connect/interface.go @@ -15,6 +15,7 @@ type Connect interface { List(ctx context.Context, cluster *schedv1.ConnectCluster) ([]*schedv1.ConnectCluster, error) Describe(ctx context.Context, cluster *schedv1.ConnectCluster) (*schedv1.ConnectCluster, error) CreateS3Sink(ctx context.Context, config *schedv1.ConnectS3SinkClusterConfig) (*schedv1.ConnectS3SinkCluster, error) + Delete(ctx context.Context, cluster *schedv1.ConnectCluster) error } type Plugin struct { diff --git a/http/client.go b/http/client.go index 0936c2cf1d..2ad6c367c1 100644 --- a/http/client.go +++ b/http/client.go @@ -18,7 +18,7 @@ const ( var ( BaseClient = &http.Client{Timeout: timeout} - errNotFound = &corev1.Error{Code: 404, Message: "cluster not found"} // matches gateway response + errNotFound = &corev1.Error{Code: http.StatusNotFound, Message: "cluster not found"} // matches gateway response ) type Client struct { diff --git a/http/connect.go b/http/connect.go index b89d74e13f..a4174b5df8 100644 --- a/http/connect.go +++ b/http/connect.go @@ -68,3 +68,16 @@ func (s *ConnectService) CreateS3Sink(config *schedv1.ConnectS3SinkClusterConfig } return reply.Cluster, resp, nil } + +// Delete destroys a given connect cluster. +func (s *ConnectService) Delete(cluster *schedv1.ConnectCluster) (*http.Response, error) { + reply := new(schedv1.DeleteConnectClusterReply) + resp, err := s.sling.New().Delete("/api/connectors/"+cluster.Id).QueryStruct(cluster).Receive(reply, reply) + if err != nil { + return resp, errors.Wrap(err, "unable to delete connector") + } + if reply.Error != nil { + return resp, errors.Wrap(reply.Error, "error deleting connector") + } + return resp, nil +} diff --git a/plugin/confluent-connect-plugin/main.go b/plugin/confluent-connect-plugin/main.go index 02ee2d0c87..132672ae45 100644 --- a/plugin/confluent-connect-plugin/main.go +++ b/plugin/confluent-connect-plugin/main.go @@ -110,6 +110,31 @@ func (c *Connect) CreateS3Sink(ctx context.Context, cfg *schedv1.ConnectS3SinkCl return ret, nil } +func (c *Connect) Delete(ctx context.Context, cluster *schedv1.ConnectCluster) error { + c.Logger.Log("msg", "connect.Delete()") + cluster, err := c.resolveClusterID(ctx, cluster) + if err != nil { + return shared.ConvertAPIError(err) + } + _, err = c.Client.Connect.Delete(cluster) + if err != nil { + return shared.ConvertAPIError(err) + } + return nil +} + +// resolveClusterID resolves a connect cluster by name/account into a cluster with an ID +func (c *Connect) resolveClusterID(ctx context.Context, cluster *schedv1.ConnectCluster) (*schedv1.ConnectCluster, error) { + if cluster.Id != "" { + return cluster, nil + } + cluster, err := c.Describe(ctx, cluster) + if err != nil { + return nil, err + } + return cluster, nil +} + func check(err error) { if err != nil { golog.Fatal(err) diff --git a/shared/connect/connect.pb.go b/shared/connect/connect.pb.go index e63314372e..357aeb1a7a 100644 --- a/shared/connect/connect.pb.go +++ b/shared/connect/connect.pb.go @@ -44,6 +44,7 @@ type ConnectClient interface { List(ctx context.Context, in *kafka_scheduler_v1.GetConnectClustersRequest, opts ...grpc.CallOption) (*kafka_scheduler_v1.GetConnectClustersReply, error) Describe(ctx context.Context, in *kafka_scheduler_v1.GetConnectClusterRequest, opts ...grpc.CallOption) (*kafka_scheduler_v1.GetConnectClusterReply, error) CreateS3Sink(ctx context.Context, in *kafka_scheduler_v1.CreateConnectS3SinkClusterRequest, opts ...grpc.CallOption) (*kafka_scheduler_v1.CreateConnectS3SinkClusterReply, error) + Delete(ctx context.Context, in *kafka_scheduler_v1.DeleteConnectClusterRequest, opts ...grpc.CallOption) (*kafka_scheduler_v1.DeleteConnectClusterReply, error) } type connectClient struct { @@ -81,12 +82,22 @@ func (c *connectClient) CreateS3Sink(ctx context.Context, in *kafka_scheduler_v1 return out, nil } +func (c *connectClient) Delete(ctx context.Context, in *kafka_scheduler_v1.DeleteConnectClusterRequest, opts ...grpc.CallOption) (*kafka_scheduler_v1.DeleteConnectClusterReply, error) { + out := new(kafka_scheduler_v1.DeleteConnectClusterReply) + err := grpc.Invoke(ctx, "/connect.Connect/Delete", in, out, c.cc, opts...) + if err != nil { + return nil, err + } + return out, nil +} + // Server API for Connect service type ConnectServer interface { List(context.Context, *kafka_scheduler_v1.GetConnectClustersRequest) (*kafka_scheduler_v1.GetConnectClustersReply, error) Describe(context.Context, *kafka_scheduler_v1.GetConnectClusterRequest) (*kafka_scheduler_v1.GetConnectClusterReply, error) CreateS3Sink(context.Context, *kafka_scheduler_v1.CreateConnectS3SinkClusterRequest) (*kafka_scheduler_v1.CreateConnectS3SinkClusterReply, error) + Delete(context.Context, *kafka_scheduler_v1.DeleteConnectClusterRequest) (*kafka_scheduler_v1.DeleteConnectClusterReply, error) } func RegisterConnectServer(s *grpc.Server, srv ConnectServer) { @@ -147,6 +158,24 @@ func _Connect_CreateS3Sink_Handler(srv interface{}, ctx context.Context, dec fun return interceptor(ctx, in, info, handler) } +func _Connect_Delete_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(kafka_scheduler_v1.DeleteConnectClusterRequest) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(ConnectServer).Delete(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: "/connect.Connect/Delete", + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(ConnectServer).Delete(ctx, req.(*kafka_scheduler_v1.DeleteConnectClusterRequest)) + } + return interceptor(ctx, in, info, handler) +} + var _Connect_serviceDesc = grpc.ServiceDesc{ ServiceName: "connect.Connect", HandlerType: (*ConnectServer)(nil), @@ -163,6 +192,10 @@ var _Connect_serviceDesc = grpc.ServiceDesc{ MethodName: "CreateS3Sink", Handler: _Connect_CreateS3Sink_Handler, }, + { + MethodName: "Delete", + Handler: _Connect_Delete_Handler, + }, }, Streams: []grpc.StreamDesc{}, Metadata: "connect.proto", @@ -171,17 +204,18 @@ var _Connect_serviceDesc = grpc.ServiceDesc{ func init() { proto.RegisterFile("connect.proto", fileDescriptorConnect) } var fileDescriptorConnect = []byte{ - // 186 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xe2, 0xe2, 0x4d, 0xce, 0xcf, 0xcb, - 0x4b, 0x4d, 0x2e, 0xd1, 0x2b, 0x28, 0xca, 0x2f, 0xc9, 0x17, 0x62, 0x87, 0x72, 0xa5, 0x94, 0xb2, - 0x13, 0xd3, 0xb2, 0x13, 0xf5, 0x8b, 0x93, 0x33, 0x52, 0x53, 0x4a, 0x73, 0x52, 0x8b, 0xf4, 0xcb, - 0x0c, 0x11, 0x1c, 0x88, 0x62, 0xa3, 0xab, 0x4c, 0x5c, 0xec, 0xce, 0x10, 0xf5, 0x42, 0x29, 0x5c, - 0x2c, 0x3e, 0x99, 0xc5, 0x25, 0x42, 0xba, 0x7a, 0x60, 0x8d, 0x7a, 0x08, 0xb5, 0x65, 0x86, 0x7a, - 0xee, 0xa9, 0x25, 0x50, 0x75, 0xce, 0x39, 0xa5, 0xc5, 0x25, 0xa9, 0x45, 0xc5, 0x41, 0xa9, 0x85, - 0xa5, 0xa9, 0xc5, 0x25, 0x52, 0xda, 0xc4, 0x2a, 0x2f, 0xc8, 0xa9, 0x54, 0x62, 0x10, 0x4a, 0xe3, - 0xe2, 0x70, 0x49, 0x2d, 0x4e, 0x2e, 0xca, 0x4c, 0x4a, 0x15, 0xd2, 0x21, 0x4a, 0x2b, 0xcc, 0x22, - 0x2d, 0x22, 0x55, 0x43, 0xec, 0xa9, 0xe1, 0xe2, 0x71, 0x2e, 0x4a, 0x4d, 0x2c, 0x49, 0x0d, 0x36, - 0x0e, 0xce, 0xcc, 0xcb, 0x16, 0x32, 0xc5, 0xa6, 0x1b, 0xa2, 0x02, 0x6a, 0x00, 0x44, 0x21, 0x9a, - 0xa5, 0xc6, 0xa4, 0x6a, 0x03, 0xdb, 0x9e, 0xc4, 0x06, 0x0e, 0x5e, 0x63, 0x40, 0x00, 0x00, 0x00, - 0xff, 0xff, 0x70, 0xd9, 0x24, 0x3c, 0x9c, 0x01, 0x00, 0x00, + // 206 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x94, 0x91, 0xc1, 0x0a, 0x82, 0x40, + 0x10, 0x40, 0x83, 0x42, 0x63, 0xa9, 0xcb, 0x1e, 0x3d, 0x7a, 0xac, 0x5c, 0x31, 0xe9, 0x0b, 0x14, + 0xba, 0x74, 0xca, 0x2f, 0xd0, 0x75, 0x24, 0x73, 0x51, 0xdb, 0x1d, 0x85, 0xa0, 0x9f, 0xeb, 0xcf, + 0x02, 0xd7, 0x10, 0xc2, 0xc3, 0x76, 0x1c, 0xf6, 0xbd, 0x79, 0x2c, 0x43, 0xb6, 0xbc, 0xa9, 0x6b, + 0xe0, 0xc8, 0x5a, 0xd9, 0x60, 0x43, 0xed, 0x71, 0x74, 0xdc, 0x2a, 0x2d, 0xaa, 0xd4, 0x57, 0xfc, + 0x06, 0x79, 0x27, 0x40, 0xfa, 0x7d, 0x30, 0x0d, 0x1a, 0x3e, 0xbe, 0x97, 0xc4, 0x8e, 0x34, 0x4f, + 0x73, 0xb2, 0xba, 0x94, 0x0a, 0xa9, 0xc7, 0x06, 0x91, 0x4d, 0x6c, 0x1f, 0xb0, 0x33, 0xe0, 0xc8, + 0x45, 0xa2, 0x53, 0x08, 0x52, 0x5d, 0xe1, 0xd1, 0x81, 0x42, 0x67, 0x6f, 0x8a, 0xb7, 0xe2, 0xe9, + 0x2e, 0x68, 0x41, 0xd6, 0x31, 0x28, 0x2e, 0xcb, 0x0c, 0xe8, 0xc1, 0x48, 0xfd, 0x86, 0x76, 0x86, + 0xb4, 0xee, 0xbc, 0xc8, 0x26, 0x92, 0x90, 0x22, 0x24, 0x61, 0x52, 0xd6, 0x15, 0x3d, 0xcd, 0xd9, + 0x9a, 0x18, 0x17, 0x68, 0xf0, 0x27, 0x1a, 0xfe, 0xab, 0xe9, 0xfa, 0x9d, 0x58, 0x31, 0x08, 0x40, + 0xa0, 0xfe, 0xdc, 0x02, 0xfd, 0x36, 0xff, 0x4d, 0xcf, 0x5c, 0x18, 0x5a, 0x99, 0x35, 0x9c, 0x32, + 0xfc, 0x04, 0x00, 0x00, 0xff, 0xff, 0x73, 0x1b, 0x6f, 0x9f, 0x08, 0x02, 0x00, 0x00, } diff --git a/shared/connect/connect.proto b/shared/connect/connect.proto index c24d369c4c..5df4f5c46d 100644 --- a/shared/connect/connect.proto +++ b/shared/connect/connect.proto @@ -9,5 +9,6 @@ service Connect { rpc List(kafka.scheduler.v1.GetConnectClustersRequest) returns (kafka.scheduler.v1.GetConnectClustersReply) {} rpc Describe(kafka.scheduler.v1.GetConnectClusterRequest) returns (kafka.scheduler.v1.GetConnectClusterReply) {} rpc CreateS3Sink(kafka.scheduler.v1.CreateConnectS3SinkClusterRequest) returns (kafka.scheduler.v1.CreateConnectS3SinkClusterReply) {} + rpc Delete(kafka.scheduler.v1.DeleteConnectClusterRequest) returns (kafka.scheduler.v1.DeleteConnectClusterReply) {} // ... } diff --git a/shared/errors.go b/shared/errors.go index 6f0514d4ad..8b5859146c 100644 --- a/shared/errors.go +++ b/shared/errors.go @@ -32,8 +32,7 @@ var ( ) func ConvertAPIError(err error) error { - err = errors.Cause(err) - if e, ok := err.(*corev1.Error); ok { + if e, ok := errors.Cause(err).(*corev1.Error); ok { switch e.Message { // these messages are returned by the API itself case "token is expired": @@ -54,9 +53,8 @@ func ConvertAPIError(err error) error { func ConvertGRPCError(err error) error { if s, ok := status.FromError(err); ok { - // these messages are from the error constants at the top of this file switch s.Message() { - case ErrMalformedToken.Error(): + case ErrExpiredToken.Error(): return ErrExpiredToken case ErrMalformedToken.Error(): return ErrMalformedToken