Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion command/common/cli.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ func HandleError(err error) error {
case shared.ErrNotImplemented:
fmt.Println("Sorry, this functionality is not yet available in the CLI.")
case shared.ErrNotFound:
fmt.Println("Kafka cluster not found.")
fmt.Println("Kafka cluster not found.") // TODO: parametrize ErrNotFound for better error messaging
default:
return err
}
Expand Down
12 changes: 9 additions & 3 deletions command/connect/command.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@ import (
var (
listFields = []string{"Name", "ServiceProvider", "Region", "Durability", "Status"}
listLabels = []string{"Name", "Provider", "Region", "Durability", "Status"}
describeFields = []string{"Name", "KafkaClusterId", "ServiceProvider", "Region", "Status", "Endpoint", "PricePerHour"}
describeLabels = []string{"Name", "Kafka", "Provider", "Region", "Status", "Endpoint", "PricePerHour"}
describeFields = []string{"Name", "KafkaClusterId", "ServiceProvider", "Region", "Status", "PricePerHour"}
describeLabels = []string{"Name", "Kafka", "Provider", "Region", "Status", "PricePerHour"}
)

type Command struct {
Expand Down Expand Up @@ -197,7 +197,13 @@ func (c *Command) update(cmd *cobra.Command, args []string) error {
}

func (c *Command) delete(cmd *cobra.Command, args []string) error {
return common.HandleError(shared.ErrNotImplemented)
req := &schedv1.ConnectCluster{AccountId: c.config.Auth.Account.Id, Name: args[0]}
err := c.connect.Delete(context.Background(), req)
if err != nil {
return common.HandleError(err)
}
fmt.Println("Your connect cluster has been deleted!")
return nil
}

func (c *Command) auth(cmd *cobra.Command, args []string) error {
Expand Down
13 changes: 13 additions & 0 deletions command/connect/grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,14 @@ func (c *GRPCClient) CreateS3Sink(ctx context.Context, config *schedv1.ConnectS3
return resp.Cluster, nil
}

func (c *GRPCClient) Delete(ctx context.Context, cluster *schedv1.ConnectCluster) error {
_, err := c.client.Delete(ctx, &schedv1.DeleteConnectClusterRequest{Cluster: cluster})
if err != nil {
return shared.ConvertGRPCError(err)
}
return nil
}

// The gRPC server the GPRClient talks to. Plugin authors implement this if they're using Go.
type GRPCServer struct {
Impl Connect
Expand All @@ -56,3 +64,8 @@ func (s *GRPCServer) CreateS3Sink(ctx context.Context, req *schedv1.CreateConnec
r, err := s.Impl.CreateS3Sink(ctx, req.Config)
return &schedv1.CreateConnectS3SinkClusterReply{Cluster: r}, err
}

func (s *GRPCServer) Delete(ctx context.Context, req *schedv1.DeleteConnectClusterRequest) (*schedv1.DeleteConnectClusterReply, error) {
err := s.Impl.Delete(ctx, req.Cluster)
return &schedv1.DeleteConnectClusterReply{}, err
}
1 change: 1 addition & 0 deletions command/connect/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ type Connect interface {
List(ctx context.Context, cluster *schedv1.ConnectCluster) ([]*schedv1.ConnectCluster, error)
Describe(ctx context.Context, cluster *schedv1.ConnectCluster) (*schedv1.ConnectCluster, error)
CreateS3Sink(ctx context.Context, config *schedv1.ConnectS3SinkClusterConfig) (*schedv1.ConnectS3SinkCluster, error)
Delete(ctx context.Context, cluster *schedv1.ConnectCluster) error
}

type Plugin struct {
Expand Down
2 changes: 1 addition & 1 deletion http/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ const (

var (
BaseClient = &http.Client{Timeout: timeout}
errNotFound = &corev1.Error{Code: 404, Message: "cluster not found"} // matches gateway response
errNotFound = &corev1.Error{Code: http.StatusNotFound, Message: "cluster not found"} // matches gateway response
)

type Client struct {
Expand Down
13 changes: 13 additions & 0 deletions http/connect.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,3 +68,16 @@ func (s *ConnectService) CreateS3Sink(config *schedv1.ConnectS3SinkClusterConfig
}
return reply.Cluster, resp, nil
}

// Delete destroys a given connect cluster.
func (s *ConnectService) Delete(cluster *schedv1.ConnectCluster) (*http.Response, error) {
reply := new(schedv1.DeleteConnectClusterReply)
resp, err := s.sling.New().Delete("/api/connectors/"+cluster.Id).QueryStruct(cluster).Receive(reply, reply)
if err != nil {
return resp, errors.Wrap(err, "unable to delete connector")
}
if reply.Error != nil {
return resp, errors.Wrap(reply.Error, "error deleting connector")
}
return resp, nil
}
25 changes: 25 additions & 0 deletions plugin/confluent-connect-plugin/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,31 @@ func (c *Connect) CreateS3Sink(ctx context.Context, cfg *schedv1.ConnectS3SinkCl
return ret, nil
}

func (c *Connect) Delete(ctx context.Context, cluster *schedv1.ConnectCluster) error {
c.Logger.Log("msg", "connect.Delete()")
cluster, err := c.resolveClusterID(ctx, cluster)
if err != nil {
return shared.ConvertAPIError(err)
}
_, err = c.Client.Connect.Delete(cluster)
if err != nil {
return shared.ConvertAPIError(err)
}
return nil
}

// resolveClusterID resolves a connect cluster by name/account into a cluster with an ID
func (c *Connect) resolveClusterID(ctx context.Context, cluster *schedv1.ConnectCluster) (*schedv1.ConnectCluster, error) {
if cluster.Id != "" {
return cluster, nil
}
cluster, err := c.Describe(ctx, cluster)
if err != nil {
return nil, err
}
return cluster, nil
}

func check(err error) {
if err != nil {
golog.Fatal(err)
Expand Down
60 changes: 47 additions & 13 deletions shared/connect/connect.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions shared/connect/connect.proto
Original file line number Diff line number Diff line change
Expand Up @@ -9,5 +9,6 @@ service Connect {
rpc List(kafka.scheduler.v1.GetConnectClustersRequest) returns (kafka.scheduler.v1.GetConnectClustersReply) {}
rpc Describe(kafka.scheduler.v1.GetConnectClusterRequest) returns (kafka.scheduler.v1.GetConnectClusterReply) {}
rpc CreateS3Sink(kafka.scheduler.v1.CreateConnectS3SinkClusterRequest) returns (kafka.scheduler.v1.CreateConnectS3SinkClusterReply) {}
rpc Delete(kafka.scheduler.v1.DeleteConnectClusterRequest) returns (kafka.scheduler.v1.DeleteConnectClusterReply) {}
// ...
}
6 changes: 2 additions & 4 deletions shared/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,7 @@ var (
)

func ConvertAPIError(err error) error {
err = errors.Cause(err)
if e, ok := err.(*corev1.Error); ok {
if e, ok := errors.Cause(err).(*corev1.Error); ok {
switch e.Message {
// these messages are returned by the API itself
case "token is expired":
Expand All @@ -54,9 +53,8 @@ func ConvertAPIError(err error) error {

func ConvertGRPCError(err error) error {
if s, ok := status.FromError(err); ok {
// these messages are from the error constants at the top of this file
switch s.Message() {
case ErrMalformedToken.Error():
case ErrExpiredToken.Error():
return ErrExpiredToken
case ErrMalformedToken.Error():
return ErrMalformedToken
Expand Down