diff --git a/internal/cmd/command.go b/internal/cmd/command.go index d221b4f739..06f0ee4a18 100644 --- a/internal/cmd/command.go +++ b/internal/cmd/command.go @@ -111,7 +111,7 @@ func NewConfluentCommand(cfg *v1.Config, isTest bool, ver *pversion.Version) *co } apiKeyCmd := apikey.New(prerunner, nil, flagResolver, analyticsClient) - connectCmd := connect.New(cfg, prerunner, analyticsClient) + connectCmd := connect.New(prerunner, analyticsClient) environmentCmd := environment.New(prerunner, analyticsClient) cli.AddCommand(admin.New(prerunner, isTest)) diff --git a/internal/cmd/connect/autocomplete.go b/internal/cmd/connect/autocomplete.go new file mode 100644 index 0000000000..bea194793e --- /dev/null +++ b/internal/cmd/connect/autocomplete.go @@ -0,0 +1,81 @@ +package connect + +import ( + "context" + + "github.com/c-bata/go-prompt" + schedv1 "github.com/confluentinc/cc-structs/kafka/scheduler/v1" + opv1 "github.com/confluentinc/cc-structs/operator/v1" + "github.com/spf13/cobra" + + "github.com/confluentinc/cli/internal/pkg/shell/completer" +) + +func (c *command) Cmd() *cobra.Command { + return c.Command +} + +func (c *command) ServerCompletableChildren() []*cobra.Command { + return c.completableChildren +} + +func (c *command) ServerComplete() []prompt.Suggest { + var suggestions []prompt.Suggest + connectors, err := c.fetchConnectors() + if err != nil { + return suggestions + } + for _, conn := range connectors { + suggestions = append(suggestions, prompt.Suggest{ + Text: conn.Id.Id, + Description: conn.Info.Name, + }) + } + return suggestions +} + +func (c *command) fetchConnectors() (map[string]*opv1.ConnectorExpansion, error) { + kafkaCluster, err := c.Context.GetKafkaClusterForCommand(c.Command) + if err != nil { + return nil, err + } + connectors, err := c.Client.Connect.ListWithExpansions(context.Background(), &schedv1.Connector{AccountId: c.EnvironmentId(), KafkaClusterId: kafkaCluster.ID}, "status,info,id") + if err != nil { + return nil, err + } + return connectors, nil + +} + +func (c *command) ServerCompletableFlagChildren() map[string][]*cobra.Command { + return c.completableFlagChildren +} + +func (c *command) ServerFlagComplete() map[string]func() []prompt.Suggest { + return map[string]func() []prompt.Suggest{ + "cluster": completer.ClusterFlagServerCompleterFunc(c.Client, c.EnvironmentId()), + } +} + +func (c *pluginCommand) Cmd() *cobra.Command { + return c.Command +} + +func (c *pluginCommand) ServerComplete() []prompt.Suggest { + var suggestions []prompt.Suggest + plugins, err := c.getPlugins(c.Command) + if err != nil { + return suggestions + } + for _, conn := range plugins { + suggestions = append(suggestions, prompt.Suggest{ + Text: conn.Class, + Description: conn.Type, + }) + } + return suggestions +} + +func (c *pluginCommand) ServerCompletableChildren() []*cobra.Command { + return c.completableChildren +} diff --git a/internal/cmd/connect/command.go b/internal/cmd/connect/command.go index 47731c1b84..1a19732246 100644 --- a/internal/cmd/connect/command.go +++ b/internal/cmd/connect/command.go @@ -1,26 +1,10 @@ package connect import ( - "context" - "fmt" - "os" - - "github.com/c-bata/go-prompt" "github.com/spf13/cobra" - schedv1 "github.com/confluentinc/cc-structs/kafka/scheduler/v1" - opv1 "github.com/confluentinc/cc-structs/operator/v1" - "github.com/confluentinc/go-printer" - "github.com/confluentinc/cli/internal/pkg/analytics" pcmd "github.com/confluentinc/cli/internal/pkg/cmd" - v1 "github.com/confluentinc/cli/internal/pkg/config/v1" - "github.com/confluentinc/cli/internal/pkg/errors" - "github.com/confluentinc/cli/internal/pkg/examples" - "github.com/confluentinc/cli/internal/pkg/output" - "github.com/confluentinc/cli/internal/pkg/shell/completer" - "github.com/confluentinc/cli/internal/pkg/utils" - pversion "github.com/confluentinc/cli/internal/pkg/version" ) type command struct { @@ -39,433 +23,46 @@ type connectorDescribeDisplay struct { Trace string `json:"trace,omitempty" yaml:"trace,omitempty"` } -type taskDescribeDisplay struct { - TaskId int32 `json:"task_id" yaml:"task_id"` - State string `json:"state" yaml:"state"` -} -type configDescribeDisplay struct { - Config string `json:"config" yaml:"config"` - Value string `json:"value" yaml:"value"` -} - -type structuredDescribeDisplay struct { - Connector *connectorDescribeDisplay `json:"connector" yaml:"connector"` - Tasks []taskDescribeDisplay `json:"tasks" yaml:"task"` - Configs []configDescribeDisplay `json:"configs" yaml:"configs"` -} - var ( - describeRenames = map[string]string{} listFields = []string{"ID", "Name", "Status", "Type", "Trace"} listStructuredLabels = []string{"id", "name", "status", "type", "trace"} ) // New returns the default command object for interacting with Connect. -func New(cfg *v1.Config, prerunner pcmd.PreRunner, analyticsClient analytics.Client) *command { - cliCmd := pcmd.NewAuthenticatedStateFlagCommand( - &cobra.Command{ - Use: "connect", - Short: "Manage Kafka Connect.", - Annotations: map[string]string{pcmd.RunRequirement: pcmd.RequireNonAPIKeyCloudLoginOrOnPremLogin}, - }, prerunner, SubcommandFlags) - cmd := &command{ - AuthenticatedStateFlagCommand: cliCmd, - prerunner: prerunner, - analyticsClient: analyticsClient, - } - cmd.init() - return cmd -} - -func (c *command) init() { - describeCmd := &cobra.Command{ - Use: "describe ", - Short: "Describe a connector.", - Args: cobra.ExactArgs(1), - RunE: pcmd.NewCLIRunE(c.describe), - Annotations: map[string]string{pcmd.RunRequirement: pcmd.RequireNonAPIKeyCloudLogin}, - Example: examples.BuildExampleString( - examples.Example{ - Text: "Describe connector and task level details of a connector in the current or specified Kafka cluster context.", - Code: fmt.Sprintf("%s connect describe \n%s connect describe --cluster ", pversion.CLIName, pversion.CLIName), - }, - ), +func New(prerunner pcmd.PreRunner, analyticsClient analytics.Client) *command { + cmd := &cobra.Command{ + Use: "connect", + Short: "Manage Kafka Connect.", + Annotations: map[string]string{pcmd.RunRequirement: pcmd.RequireNonAPIKeyCloudLoginOrOnPremLogin}, } - describeCmd.Flags().StringP(output.FlagName, output.ShortHandFlag, output.DefaultValue, output.Usage) - listCmd := &cobra.Command{ - Use: "list", - Short: "List connectors.", - Args: cobra.NoArgs, - RunE: pcmd.NewCLIRunE(c.list), - Annotations: map[string]string{pcmd.RunRequirement: pcmd.RequireNonAPIKeyCloudLogin}, - Example: examples.BuildExampleString( - examples.Example{ - Text: "List connectors in the current or specified Kafka cluster context.", - Code: fmt.Sprintf("%s connect list\n%s connect list --cluster ", pversion.CLIName, pversion.CLIName), - }, - ), - } - listCmd.Flags().StringP(output.FlagName, output.ShortHandFlag, output.DefaultValue, output.Usage) - - createCmd := &cobra.Command{ - Use: "create", - Short: "Create a connector.", - Args: cobra.NoArgs, - RunE: pcmd.NewCLIRunE(c.create), - Annotations: map[string]string{pcmd.RunRequirement: pcmd.RequireNonAPIKeyCloudLogin}, - Example: examples.BuildExampleString( - examples.Example{ - Text: "Create a connector in the current or specified Kafka cluster context.", - Code: fmt.Sprintf("%s connect create --config \n%s connect create --cluster --config ", pversion.CLIName, pversion.CLIName), - }, - ), - } - createCmd.Flags().String("config", "", "JSON connector config file.") - createCmd.Flags().StringP(output.FlagName, output.ShortHandFlag, output.DefaultValue, output.Usage) - panicOnError(createCmd.MarkFlagRequired("config")) - - deleteCmd := &cobra.Command{ - Use: "delete ", - Short: "Delete a connector.", - Args: cobra.ExactArgs(1), - RunE: pcmd.NewCLIRunE(c.delete), - Annotations: map[string]string{pcmd.RunRequirement: pcmd.RequireNonAPIKeyCloudLogin}, - Example: examples.BuildExampleString( - examples.Example{ - Text: "Delete a connector in the current or specified Kafka cluster context.", - Code: fmt.Sprintf("%s connect delete --config \n%s connect delete --cluster --config ", pversion.CLIName, pversion.CLIName), - }, - ), - } - - updateCmd := &cobra.Command{ - Use: "update ", - Short: "Update a connector configuration.", - Args: cobra.ExactArgs(1), - RunE: pcmd.NewCLIRunE(c.update), - Annotations: map[string]string{pcmd.RunRequirement: pcmd.RequireNonAPIKeyCloudLogin}, - } - updateCmd.Flags().String("config", "", "JSON connector config file.") - panicOnError(updateCmd.MarkFlagRequired("config")) - - pauseCmd := &cobra.Command{ - Use: "pause ", - Short: "Pause a connector.", - Args: cobra.ExactArgs(1), - RunE: pcmd.NewCLIRunE(c.pause), - Annotations: map[string]string{pcmd.RunRequirement: pcmd.RequireNonAPIKeyCloudLogin}, - Example: examples.BuildExampleString( - examples.Example{ - Text: "Pause a connector in the current or specified Kafka cluster context.", - Code: fmt.Sprintf("%s connect pause --config \n%s connect pause --cluster --config ", pversion.CLIName, pversion.CLIName), - }, - ), + c := &command{ + AuthenticatedStateFlagCommand: pcmd.NewAuthenticatedStateFlagCommand(cmd, prerunner, subcommandFlags), + prerunner: prerunner, + analyticsClient: analyticsClient, } - resumeCmd := &cobra.Command{ - Use: "resume ", - Short: "Resume a connector.", - Args: cobra.ExactArgs(1), - RunE: pcmd.NewCLIRunE(c.resume), - Annotations: map[string]string{pcmd.RunRequirement: pcmd.RequireNonAPIKeyCloudLogin}, - Example: examples.BuildExampleString( - examples.Example{ - Text: "Resume a connector in the current or specified Kafka cluster context.", - Code: fmt.Sprintf("%s connect resume --config \n%s connect resume --cluster --config ", pversion.CLIName, pversion.CLIName), - }, - ), - } + createCmd := c.newCreateCommand() + describeCmd := c.newDescribeCommand() + deleteCmd := c.newDeleteCommand() + listCmd := c.newListCommand() + pauseCmd := c.newPauseCommand() + resumeCmd := c.newResumeCommand() + updateCmd := c.newUpdateCommand() - c.AddCommand(NewClusterCommandOnPrem(c.prerunner)) + c.AddCommand(newClusterCommand(c.prerunner)) c.AddCommand(createCmd) c.AddCommand(deleteCmd) c.AddCommand(describeCmd) - c.AddCommand(NewEventCommand(c.prerunner)) + c.AddCommand(newEventCommand(c.prerunner)) c.AddCommand(listCmd) c.AddCommand(pauseCmd) - c.AddCommand(NewPluginCommand(c.prerunner)) + c.AddCommand(newPluginCommand(c.prerunner)) c.AddCommand(resumeCmd) c.AddCommand(updateCmd) c.completableChildren = []*cobra.Command{deleteCmd, describeCmd, pauseCmd, resumeCmd, updateCmd} - c.completableFlagChildren = map[string][]*cobra.Command{ - "cluster": {createCmd}, - } -} - -func (c *command) list(cmd *cobra.Command, _ []string) error { - kafkaCluster, err := c.Context.GetKafkaClusterForCommand(cmd) - if err != nil { - return err - } - connectors, err := c.Client.Connect.ListWithExpansions(context.Background(), &schedv1.Connector{AccountId: c.EnvironmentId(), KafkaClusterId: kafkaCluster.ID}, "status,info,id") - if err != nil { - return err - } - outputWriter, err := output.NewListOutputWriter(cmd, listFields, listFields, listStructuredLabels) - if err != nil { - return err - } - for name, connector := range connectors { - connector := &connectorDescribeDisplay{ - Name: name, - ID: connector.Id.Id, - Status: connector.Status.Connector.State, - Type: connector.Info.Type, - Trace: connector.Status.Connector.Trace, - } - outputWriter.AddElement(connector) - } - return outputWriter.Out() -} - -func (c *command) describe(cmd *cobra.Command, args []string) error { - kafkaCluster, err := c.Context.GetKafkaClusterForCommand(cmd) - if err != nil { - return err - } - connector, err := c.Client.Connect.GetExpansionById(context.Background(), &schedv1.Connector{AccountId: c.EnvironmentId(), KafkaClusterId: kafkaCluster.ID, Id: args[0]}) - if err != nil { - return err - } - - outputOption, err := cmd.Flags().GetString(output.FlagName) - if err != nil { - return err - } - if outputOption == output.Human.String() { - return printHumanDescribe(cmd, connector) - } else { - return printStructuredDescribe(connector, outputOption) - } -} - -func (c *command) create(cmd *cobra.Command, _ []string) error { - kafkaCluster, err := c.Context.GetKafkaClusterForCommand(cmd) - if err != nil { - return err - } - userConfigs, err := getConfig(cmd) - if err != nil { - return err - } - connector, err := c.Client.Connect.Create(context.Background(), &schedv1.ConnectorConfig{UserConfigs: *userConfigs, AccountId: c.EnvironmentId(), KafkaClusterId: kafkaCluster.ID, Name: (*userConfigs)["name"], Plugin: (*userConfigs)["connector.class"]}) - if err != nil { - return err - } - // Resolve Connector ID from Name of created connector - connectorExpansion, err := c.Client.Connect.GetExpansionByName(context.Background(), &schedv1.Connector{AccountId: c.EnvironmentId(), KafkaClusterId: kafkaCluster.ID, Name: connector.Name}) - if err != nil { - return err - } - outputFormat, err := cmd.Flags().GetString(output.FlagName) - if err != nil { - return err - } - trace := connectorExpansion.Status.Connector.Trace - if outputFormat == output.Human.String() { - utils.Printf(cmd, errors.CreatedConnectorMsg, connector.Name, connectorExpansion.Id.Id) - if trace != "" { - utils.Printf(cmd, "Error Trace: %s\n", trace) - } - } else { - return output.StructuredOutput(outputFormat, &struct { - ConnectorName string `json:"name" yaml:"name"` - Id string `json:"id" yaml:"id"` - Trace string `json:"error_trace,omitempty" yaml:"error_trace,omitempty"` - }{ - ConnectorName: connector.Name, - Id: connectorExpansion.Id.Id, - Trace: trace, - }) - } - c.analyticsClient.SetSpecialProperty(analytics.ResourceIDPropertiesKey, connectorExpansion.Id.Id) - return nil -} - -func (c *command) update(cmd *cobra.Command, args []string) error { - userConfigs, err := getConfig(cmd) - if err != nil { - return err - } - kafkaCluster, err := c.Context.GetKafkaClusterForCommand(cmd) - if err != nil { - return err - } - // Resolve Connector Name from ID - connector, err := c.Client.Connect.GetExpansionById(context.Background(), &schedv1.Connector{AccountId: c.EnvironmentId(), KafkaClusterId: kafkaCluster.ID, Id: args[0]}) - if err != nil { - return err - } - _, err = c.Client.Connect.Update(context.Background(), &schedv1.ConnectorConfig{UserConfigs: *userConfigs, AccountId: c.EnvironmentId(), KafkaClusterId: kafkaCluster.ID, Name: connector.Info.Name, Plugin: (*userConfigs)["connector.class"]}) - if err != nil { - return err - } - utils.Printf(cmd, errors.UpdatedConnectorMsg, args[0]) - return nil -} - -func (c *command) delete(cmd *cobra.Command, args []string) error { - kafkaCluster, err := c.Context.GetKafkaClusterForCommand(cmd) - if err != nil { - return err - } - connector, err := c.Client.Connect.GetExpansionById(context.Background(), &schedv1.Connector{AccountId: c.EnvironmentId(), KafkaClusterId: kafkaCluster.ID, Id: args[0]}) - if err != nil { - return err - } - err = c.Client.Connect.Delete(context.Background(), &schedv1.Connector{Name: connector.Info.Name, AccountId: c.EnvironmentId(), KafkaClusterId: kafkaCluster.ID}) - if err != nil { - return err - } - utils.Printf(cmd, errors.DeletedConnectorMsg, args[0]) - c.analyticsClient.SetSpecialProperty(analytics.ResourceIDPropertiesKey, connector.Id.Id) - return nil -} - -func (c *command) pause(cmd *cobra.Command, args []string) error { - kafkaCluster, err := c.Context.GetKafkaClusterForCommand(cmd) - if err != nil { - return err - } - connector, err := c.Client.Connect.GetExpansionById(context.Background(), &schedv1.Connector{AccountId: c.EnvironmentId(), KafkaClusterId: kafkaCluster.ID, Id: args[0]}) - if err != nil { - return err - } - err = c.Client.Connect.Pause(context.Background(), &schedv1.Connector{Name: connector.Info.Name, AccountId: c.EnvironmentId(), KafkaClusterId: kafkaCluster.ID}) - if err != nil { - return err - } - utils.Printf(cmd, errors.PausedConnectorMsg, args[0]) - return nil -} - -func (c *command) resume(cmd *cobra.Command, args []string) error { - kafkaCluster, err := c.Context.GetKafkaClusterForCommand(cmd) - if err != nil { - return err - } - connector, err := c.Client.Connect.GetExpansionById(context.Background(), &schedv1.Connector{AccountId: c.EnvironmentId(), KafkaClusterId: kafkaCluster.ID, Id: args[0]}) - if err != nil { - return err - } - err = c.Client.Connect.Resume(context.Background(), &schedv1.Connector{Name: connector.Info.Name, AccountId: c.EnvironmentId(), KafkaClusterId: kafkaCluster.ID}) - if err != nil { - return err - } - utils.Printf(cmd, errors.ResumedConnectorMsg, args[0]) - return nil -} - -func panicOnError(err error) { - if err != nil { - panic(err) - } -} - -func printHumanDescribe(cmd *cobra.Command, connector *opv1.ConnectorExpansion) error { - utils.Println(cmd, "Connector Details") - data := &connectorDescribeDisplay{ - Name: connector.Status.Name, - ID: connector.Id.Id, - Status: connector.Status.Connector.State, - Type: connector.Info.Type, - Trace: connector.Status.Connector.Trace, - } - _ = printer.RenderTableOut(data, listFields, describeRenames, os.Stdout) - utils.Println(cmd, "\n\nTask Level Details") - var tasks [][]string - titleRow := []string{"TaskId", "State"} - for _, task := range connector.Status.Tasks { - tasks = append(tasks, printer.ToRow(&taskDescribeDisplay{ - task.Id, - task.State, - }, titleRow)) - } - printer.RenderCollectionTable(tasks, titleRow) - utils.Println(cmd, "\n\nConfiguration Details") - var configs [][]string - titleRow = []string{"Config", "Value"} - for name, value := range connector.Info.Config { - configs = append(configs, printer.ToRow(&configDescribeDisplay{ - name, - value, - }, titleRow)) - } - printer.RenderCollectionTable(configs, titleRow) - return nil -} - -func printStructuredDescribe(connector *opv1.ConnectorExpansion, format string) error { - structuredDisplay := &structuredDescribeDisplay{ - Connector: &connectorDescribeDisplay{ - Name: connector.Status.Name, - ID: connector.Id.Id, - Status: connector.Status.Connector.State, - Type: connector.Info.Type, - Trace: connector.Status.Connector.Trace, - }, - Tasks: []taskDescribeDisplay{}, - Configs: []configDescribeDisplay{}, - } - for _, task := range connector.Status.Tasks { - structuredDisplay.Tasks = append(structuredDisplay.Tasks, taskDescribeDisplay{ - TaskId: task.Id, - State: task.State, - }) - } - for name, value := range connector.Info.Config { - structuredDisplay.Configs = append(structuredDisplay.Configs, configDescribeDisplay{ - Config: name, - Value: value, - }) - } - return output.StructuredOutput(format, structuredDisplay) -} - -func (c *command) Cmd() *cobra.Command { - return c.Command -} - -func (c *command) ServerCompletableChildren() []*cobra.Command { - return c.completableChildren -} + c.completableFlagChildren = map[string][]*cobra.Command{"cluster": {createCmd}} -func (c *command) ServerComplete() []prompt.Suggest { - var suggestions []prompt.Suggest - connectors, err := c.fetchConnectors() - if err != nil { - return suggestions - } - for _, conn := range connectors { - suggestions = append(suggestions, prompt.Suggest{ - Text: conn.Id.Id, - Description: conn.Info.Name, - }) - } - return suggestions -} - -func (c *command) fetchConnectors() (map[string]*opv1.ConnectorExpansion, error) { - kafkaCluster, err := c.Context.GetKafkaClusterForCommand(c.Command) - if err != nil { - return nil, err - } - connectors, err := c.Client.Connect.ListWithExpansions(context.Background(), &schedv1.Connector{AccountId: c.EnvironmentId(), KafkaClusterId: kafkaCluster.ID}, "status,info,id") - if err != nil { - return nil, err - } - return connectors, nil - -} - -func (c *command) ServerCompletableFlagChildren() map[string][]*cobra.Command { - return c.completableFlagChildren -} - -func (c *command) ServerFlagComplete() map[string]func() []prompt.Suggest { - return map[string]func() []prompt.Suggest{ - "cluster": completer.ClusterFlagServerCompleterFunc(c.Client, c.EnvironmentId()), - } + return c } diff --git a/internal/cmd/connect/command_cluster.go b/internal/cmd/connect/command_cluster.go new file mode 100644 index 0000000000..2bc58dd953 --- /dev/null +++ b/internal/cmd/connect/command_cluster.go @@ -0,0 +1,54 @@ +package connect + +import ( + "context" + + "github.com/antihax/optional" + mds "github.com/confluentinc/mds-sdk-go/mdsv1" + "github.com/spf13/cobra" + + print "github.com/confluentinc/cli/internal/pkg/cluster" + pcmd "github.com/confluentinc/cli/internal/pkg/cmd" + "github.com/confluentinc/cli/internal/pkg/output" +) + +const clusterType = "connect-cluster" + +type clusterCommand struct { + *pcmd.AuthenticatedStateFlagCommand + prerunner pcmd.PreRunner +} + +func newClusterCommand(prerunner pcmd.PreRunner) *cobra.Command { + cmd := &cobra.Command{ + Use: "cluster", + Short: "Manage Connect clusters.", + Annotations: map[string]string{pcmd.RunRequirement: pcmd.RequireOnPremLogin}, + } + + c := &clusterCommand{ + AuthenticatedStateFlagCommand: pcmd.NewAuthenticatedWithMDSStateFlagCommand(cmd, prerunner, clusterSubcommandFlags), + prerunner: prerunner, + } + + c.AddCommand(c.newListCommand()) + + return c.Command +} + +func (c *clusterCommand) list(cmd *cobra.Command, _ []string) error { + ctx := context.WithValue(context.Background(), mds.ContextAccessToken, c.State.AuthToken) + opts := &mds.ClusterRegistryListOpts{ClusterType: optional.NewString(clusterType)} + + clusterInfos, response, err := c.MDSClient.ClusterRegistryApi.ClusterRegistryList(ctx, opts) + if err != nil { + return print.HandleClusterError(err, response) + } + + format, err := cmd.Flags().GetString(output.FlagName) + if err != nil { + return err + } + + return print.PrintCluster(clusterInfos, format) +} diff --git a/internal/cmd/connect/command_cluster_list.go b/internal/cmd/connect/command_cluster_list.go new file mode 100644 index 0000000000..b19ae2fb4e --- /dev/null +++ b/internal/cmd/connect/command_cluster_list.go @@ -0,0 +1,22 @@ +package connect + +import ( + "github.com/spf13/cobra" + + pcmd "github.com/confluentinc/cli/internal/pkg/cmd" + "github.com/confluentinc/cli/internal/pkg/output" +) + +func (c *clusterCommand) newListCommand() *cobra.Command { + cmd := &cobra.Command{ + Use: "list", + Short: "List registered Connect clusters.", + Long: "List Connect clusters that are registered with the MDS cluster registry.", + Args: cobra.NoArgs, + RunE: pcmd.NewCLIRunE(c.list), + } + + cmd.Flags().StringP(output.FlagName, output.ShortHandFlag, output.DefaultValue, output.Usage) + + return cmd +} diff --git a/internal/cmd/connect/command_cluster_onprem.go b/internal/cmd/connect/command_cluster_onprem.go deleted file mode 100644 index 85f288307f..0000000000 --- a/internal/cmd/connect/command_cluster_onprem.go +++ /dev/null @@ -1,68 +0,0 @@ -package connect - -import ( - "context" - - "github.com/antihax/optional" - mds "github.com/confluentinc/mds-sdk-go/mdsv1" - "github.com/spf13/cobra" - - print "github.com/confluentinc/cli/internal/pkg/cluster" - pcmd "github.com/confluentinc/cli/internal/pkg/cmd" - "github.com/confluentinc/cli/internal/pkg/output" -) - -var clusterType = "connect-cluster" - -type clusterCommandOnPrem struct { - *pcmd.AuthenticatedStateFlagCommand - prerunner pcmd.PreRunner -} - -// NewClusterCommand returns the Cobra command for Kafka cluster. -func NewClusterCommandOnPrem(prerunner pcmd.PreRunner) *cobra.Command { - cliCmd := pcmd.NewAuthenticatedWithMDSStateFlagCommand( - &cobra.Command{ - Use: "cluster", - Short: "Manage Connect clusters.", - Annotations: map[string]string{pcmd.RunRequirement: pcmd.RequireOnPremLogin}, - }, - prerunner, ClusterSubcommandFlags) - cmd := &clusterCommandOnPrem{ - AuthenticatedStateFlagCommand: cliCmd, - prerunner: prerunner, - } - cmd.init() - return cmd.Command -} - -func (c *clusterCommandOnPrem) init() { - listCmd := &cobra.Command{ - Use: "list", - Short: "List registered Connect clusters.", - Long: "List Connect clusters that are registered with the MDS cluster registry.", - Args: cobra.NoArgs, - RunE: pcmd.NewCLIRunE(c.list), - } - listCmd.Flags().StringP(output.FlagName, output.ShortHandFlag, output.DefaultValue, output.Usage) - c.AddCommand(listCmd) -} - -func (c *clusterCommandOnPrem) createContext() context.Context { - return context.WithValue(context.Background(), mds.ContextAccessToken, c.State.AuthToken) -} - -func (c *clusterCommandOnPrem) list(cmd *cobra.Command, _ []string) error { - connectClustertype := &mds.ClusterRegistryListOpts{ - ClusterType: optional.NewString(clusterType), - } - clusterInfos, response, err := c.MDSClient.ClusterRegistryApi.ClusterRegistryList(c.createContext(), connectClustertype) - if err != nil { - return print.HandleClusterError(err, response) - } - format, err := cmd.Flags().GetString(output.FlagName) - if err != nil { - return err - } - return print.PrintCluster(clusterInfos, format) -} diff --git a/internal/cmd/connect/command_create.go b/internal/cmd/connect/command_create.go new file mode 100644 index 0000000000..23487b6d97 --- /dev/null +++ b/internal/cmd/connect/command_create.go @@ -0,0 +1,104 @@ +package connect + +import ( + "context" + + schedv1 "github.com/confluentinc/cc-structs/kafka/scheduler/v1" + "github.com/spf13/cobra" + + "github.com/confluentinc/cli/internal/pkg/analytics" + pcmd "github.com/confluentinc/cli/internal/pkg/cmd" + "github.com/confluentinc/cli/internal/pkg/errors" + "github.com/confluentinc/cli/internal/pkg/examples" + "github.com/confluentinc/cli/internal/pkg/output" + "github.com/confluentinc/cli/internal/pkg/utils" +) + +func (c *command) newCreateCommand() *cobra.Command { + cmd := &cobra.Command{ + Use: "create", + Short: "Create a connector.", + Args: cobra.NoArgs, + RunE: pcmd.NewCLIRunE(c.create), + Annotations: map[string]string{pcmd.RunRequirement: pcmd.RequireNonAPIKeyCloudLogin}, + Example: examples.BuildExampleString( + examples.Example{ + Text: "Create a connector in the current or specified Kafka cluster context.", + Code: "confluent connect create --config config.json", + }, + examples.Example{ + Code: "confluent connect create --config config.json --cluster lkc-123456", + }, + ), + } + + cmd.Flags().String("config", "", "JSON connector config file.") + cmd.Flags().StringP(output.FlagName, output.ShortHandFlag, output.DefaultValue, output.Usage) + + _ = cmd.MarkFlagRequired("config") + + return cmd +} + +func (c *command) create(cmd *cobra.Command, _ []string) error { + kafkaCluster, err := c.Context.GetKafkaClusterForCommand(cmd) + if err != nil { + return err + } + + userConfigs, err := getConfig(cmd) + if err != nil { + return err + } + + connectorConfig := &schedv1.ConnectorConfig{ + UserConfigs: *userConfigs, + AccountId: c.EnvironmentId(), + KafkaClusterId: kafkaCluster.ID, + Name: (*userConfigs)["name"], + Plugin: (*userConfigs)["connector.class"], + } + + connectorInfo, err := c.Client.Connect.Create(context.Background(), connectorConfig) + if err != nil { + return err + } + + connector := &schedv1.Connector{ + AccountId: c.EnvironmentId(), + KafkaClusterId: kafkaCluster.ID, + Name: connectorInfo.Name, + } + + // Resolve Connector ID from name of created connector + connectorExpansion, err := c.Client.Connect.GetExpansionByName(context.Background(), connector) + if err != nil { + return err + } + + outputFormat, err := cmd.Flags().GetString(output.FlagName) + if err != nil { + return err + } + + trace := connectorExpansion.Status.Connector.Trace + if outputFormat == output.Human.String() { + utils.Printf(cmd, errors.CreatedConnectorMsg, connectorInfo.Name, connectorExpansion.Id.Id) + if trace != "" { + utils.Printf(cmd, "Error Trace: %s\n", trace) + } + } else { + return output.StructuredOutput(outputFormat, &struct { + ConnectorName string `json:"name" yaml:"name"` + Id string `json:"id" yaml:"id"` + Trace string `json:"error_trace,omitempty" yaml:"error_trace,omitempty"` + }{ + ConnectorName: connectorInfo.Name, + Id: connectorExpansion.Id.Id, + Trace: trace, + }) + } + + c.analyticsClient.SetSpecialProperty(analytics.ResourceIDPropertiesKey, connectorExpansion.Id.Id) + return nil +} diff --git a/internal/cmd/connect/command_delete.go b/internal/cmd/connect/command_delete.go new file mode 100644 index 0000000000..ca468827d9 --- /dev/null +++ b/internal/cmd/connect/command_delete.go @@ -0,0 +1,65 @@ +package connect + +import ( + "context" + + schedv1 "github.com/confluentinc/cc-structs/kafka/scheduler/v1" + "github.com/spf13/cobra" + + "github.com/confluentinc/cli/internal/pkg/analytics" + pcmd "github.com/confluentinc/cli/internal/pkg/cmd" + "github.com/confluentinc/cli/internal/pkg/errors" + "github.com/confluentinc/cli/internal/pkg/examples" + "github.com/confluentinc/cli/internal/pkg/utils" +) + +func (c *command) newDeleteCommand() *cobra.Command { + return &cobra.Command{ + Use: "delete ", + Short: "Delete a connector.", + Args: cobra.ExactArgs(1), + RunE: pcmd.NewCLIRunE(c.delete), + Annotations: map[string]string{pcmd.RunRequirement: pcmd.RequireNonAPIKeyCloudLogin}, + Example: examples.BuildExampleString( + examples.Example{ + Text: "Delete a connector in the current or specified Kafka cluster context.", + Code: "confluent connect delete --config config.json", + }, + examples.Example{ + Code: "confluent connect delete --config config.json --cluster lkc-123456", + }, + ), + } +} + +func (c *command) delete(cmd *cobra.Command, args []string) error { + kafkaCluster, err := c.Context.GetKafkaClusterForCommand(cmd) + if err != nil { + return err + } + + connector := &schedv1.Connector{ + AccountId: c.EnvironmentId(), + KafkaClusterId: kafkaCluster.ID, + Id: args[0], + } + + connectorExpansion, err := c.Client.Connect.GetExpansionById(context.Background(), connector) + if err != nil { + return err + } + + connector = &schedv1.Connector{ + Name: connectorExpansion.Info.Name, + AccountId: c.EnvironmentId(), + KafkaClusterId: kafkaCluster.ID, + } + + if err := c.Client.Connect.Delete(context.Background(), connector); err != nil { + return err + } + + utils.Printf(cmd, errors.DeletedConnectorMsg, args[0]) + c.analyticsClient.SetSpecialProperty(analytics.ResourceIDPropertiesKey, connectorExpansion.Id.Id) + return nil +} diff --git a/internal/cmd/connect/command_describe.go b/internal/cmd/connect/command_describe.go new file mode 100644 index 0000000000..28860a3666 --- /dev/null +++ b/internal/cmd/connect/command_describe.go @@ -0,0 +1,145 @@ +package connect + +import ( + "context" + "os" + + schedv1 "github.com/confluentinc/cc-structs/kafka/scheduler/v1" + opv1 "github.com/confluentinc/cc-structs/operator/v1" + "github.com/confluentinc/go-printer" + "github.com/spf13/cobra" + + pcmd "github.com/confluentinc/cli/internal/pkg/cmd" + "github.com/confluentinc/cli/internal/pkg/examples" + "github.com/confluentinc/cli/internal/pkg/output" + "github.com/confluentinc/cli/internal/pkg/utils" +) + +type taskDescribeDisplay struct { + TaskId int32 `json:"task_id" yaml:"task_id"` + State string `json:"state" yaml:"state"` +} + +type configDescribeDisplay struct { + Config string `json:"config" yaml:"config"` + Value string `json:"value" yaml:"value"` +} + +type structuredDescribeDisplay struct { + Connector *connectorDescribeDisplay `json:"connector" yaml:"connector"` + Tasks []taskDescribeDisplay `json:"tasks" yaml:"task"` + Configs []configDescribeDisplay `json:"configs" yaml:"configs"` +} + +func (c *command) newDescribeCommand() *cobra.Command { + cmd := &cobra.Command{ + Use: "describe ", + Short: "Describe a connector.", + Args: cobra.ExactArgs(1), + RunE: pcmd.NewCLIRunE(c.describe), + Annotations: map[string]string{pcmd.RunRequirement: pcmd.RequireNonAPIKeyCloudLogin}, + Example: examples.BuildExampleString( + examples.Example{ + Text: "Describe connector and task level details of a connector in the current or specified Kafka cluster context.", + Code: "confluent connect describe lcc-123456", + }, + examples.Example{ + Code: "confluent connect describe lcc-123456 --cluster lkc-123456", + }, + ), + } + + cmd.Flags().StringP(output.FlagName, output.ShortHandFlag, output.DefaultValue, output.Usage) + + return cmd +} + +func (c *command) describe(cmd *cobra.Command, args []string) error { + kafkaCluster, err := c.Context.GetKafkaClusterForCommand(cmd) + if err != nil { + return err + } + + connector := &schedv1.Connector{ + AccountId: c.EnvironmentId(), + KafkaClusterId: kafkaCluster.ID, + Id: args[0], + } + + connectorExpansion, err := c.Client.Connect.GetExpansionById(context.Background(), connector) + if err != nil { + return err + } + + outputOption, err := cmd.Flags().GetString(output.FlagName) + if err != nil { + return err + } + + if outputOption == output.Human.String() { + return printHumanDescribe(cmd, connectorExpansion) + } else { + return printStructuredDescribe(connectorExpansion, outputOption) + } +} + +func printHumanDescribe(cmd *cobra.Command, connector *opv1.ConnectorExpansion) error { + utils.Println(cmd, "Connector Details") + data := &connectorDescribeDisplay{ + Name: connector.Status.Name, + ID: connector.Id.Id, + Status: connector.Status.Connector.State, + Type: connector.Info.Type, + Trace: connector.Status.Connector.Trace, + } + _ = printer.RenderTableOut(data, listFields, map[string]string{}, os.Stdout) + + utils.Println(cmd, "\n\nTask Level Details") + var tasks [][]string + titleRow := []string{"TaskId", "State"} + for _, task := range connector.Status.Tasks { + tasks = append(tasks, printer.ToRow(&taskDescribeDisplay{ + task.Id, + task.State, + }, titleRow)) + } + printer.RenderCollectionTable(tasks, titleRow) + utils.Println(cmd, "\n\nConfiguration Details") + var configs [][]string + titleRow = []string{"Config", "Value"} + for name, value := range connector.Info.Config { + configs = append(configs, printer.ToRow(&configDescribeDisplay{ + name, + value, + }, titleRow)) + } + printer.RenderCollectionTable(configs, titleRow) + return nil +} + +func printStructuredDescribe(connector *opv1.ConnectorExpansion, format string) error { + structuredDisplay := &structuredDescribeDisplay{ + Connector: &connectorDescribeDisplay{ + Name: connector.Status.Name, + ID: connector.Id.Id, + Status: connector.Status.Connector.State, + Type: connector.Info.Type, + Trace: connector.Status.Connector.Trace, + }, + Tasks: []taskDescribeDisplay{}, + Configs: []configDescribeDisplay{}, + } + for _, task := range connector.Status.Tasks { + structuredDisplay.Tasks = append(structuredDisplay.Tasks, taskDescribeDisplay{ + TaskId: task.Id, + State: task.State, + }) + } + for name, value := range connector.Info.Config { + structuredDisplay.Configs = append(structuredDisplay.Configs, configDescribeDisplay{ + Config: name, + Value: value, + }) + } + return output.StructuredOutput(format, structuredDisplay) +} diff --git a/internal/cmd/connect/command_event.go b/internal/cmd/connect/command_event.go index 83a5a5e4d7..9c0fd0a1cd 100644 --- a/internal/cmd/connect/command_event.go +++ b/internal/cmd/connect/command_event.go @@ -1,86 +1,25 @@ package connect import ( - "context" - "github.com/spf13/cobra" pcmd "github.com/confluentinc/cli/internal/pkg/cmd" - "github.com/confluentinc/cli/internal/pkg/errors" - "github.com/confluentinc/cli/internal/pkg/output" ) type eventCommand struct { *pcmd.AuthenticatedCLICommand } -type connectLogEventsInfo struct { - ClusterId string - EnvironmentId string - ServiceAccountId string - TopicName string -} - -var ( - connectLogListFields = []string{"ClusterId", "EnvironmentId", "ServiceAccountId", "TopicName"} - humanLabelMap = map[string]string{ - "ClusterId": "Cluster", - "EnvironmentId": "Environment", - "ServiceAccountId": "Service Account", - "TopicName": "Topic Name", - } - structuredLabelMap = map[string]string{ - "ClusterId": "cluster_id", - "EnvironmentId": "environment_id", - "ServiceAccountId": "service_account_id", - "TopicName": "topic_name", - } -) - -// NewEventCommand returns the Cobra command for Connect log. -func NewEventCommand(prerunner pcmd.PreRunner) *cobra.Command { - cmd := &eventCommand{ - pcmd.NewAuthenticatedCLICommand( - &cobra.Command{ - Use: "event", - Short: "Manage Connect log events configuration.", - Annotations: map[string]string{pcmd.RunRequirement: pcmd.RequireNonAPIKeyCloudLogin}, - }, - prerunner, - ), - } - cmd.init() - return cmd.Command -} - -func (c *eventCommand) init() { - describeCmd := &cobra.Command{ - Use: "describe", - Short: "Describe the Connect log events configuration.", - Args: cobra.NoArgs, - RunE: pcmd.NewCLIRunE(c.describe), +func newEventCommand(prerunner pcmd.PreRunner) *cobra.Command { + cmd := &cobra.Command{ + Use: "event", + Short: "Manage Connect log events configuration.", + Annotations: map[string]string{pcmd.RunRequirement: pcmd.RequireNonAPIKeyCloudLogin}, } - describeCmd.Flags().StringP(output.FlagName, output.ShortHandFlag, output.DefaultValue, output.Usage) - c.AddCommand(describeCmd) -} -func (c *eventCommand) describe(cmd *cobra.Command, _ []string) error { - if c.State.Auth == nil || c.State.Auth.Organization == nil || c.State.Auth.Organization.AuditLog == nil || - c.State.Auth.Organization.AuditLog.ClusterId == "" { - return errors.New(errors.ConnectLogEventsNotEnabledErrorMsg) - } - - auditLog := c.State.Auth.Organization.AuditLog + c := &eventCommand{pcmd.NewAuthenticatedCLICommand(cmd, prerunner)} - serviceAccount, err := c.Client.User.GetServiceAccount(context.Background(), auditLog.ServiceAccountId) - if err != nil { - return err - } + c.AddCommand(c.newDescribeCommand()) - return output.DescribeObject(cmd, &connectLogEventsInfo{ - ClusterId: auditLog.ClusterId, - EnvironmentId: auditLog.AccountId, - ServiceAccountId: serviceAccount.ResourceId, - TopicName: "confluent-connect-log-events", - }, connectLogListFields, humanLabelMap, structuredLabelMap) + return c.Command } diff --git a/internal/cmd/connect/command_event_describe.go b/internal/cmd/connect/command_event_describe.go new file mode 100644 index 0000000000..a094675bb0 --- /dev/null +++ b/internal/cmd/connect/command_event_describe.go @@ -0,0 +1,70 @@ +package connect + +import ( + "context" + + "github.com/spf13/cobra" + + pcmd "github.com/confluentinc/cli/internal/pkg/cmd" + "github.com/confluentinc/cli/internal/pkg/errors" + "github.com/confluentinc/cli/internal/pkg/output" +) + +var ( + connectLogListFields = []string{"ClusterId", "EnvironmentId", "ServiceAccountId", "TopicName"} + humanLabelMap = map[string]string{ + "ClusterId": "Cluster", + "EnvironmentId": "Environment", + "ServiceAccountId": "Service Account", + "TopicName": "Topic Name", + } + structuredLabelMap = map[string]string{ + "ClusterId": "cluster_id", + "EnvironmentId": "environment_id", + "ServiceAccountId": "service_account_id", + "TopicName": "topic_name", + } +) + +type connectLogEventsInfo struct { + ClusterId string + EnvironmentId string + ServiceAccountId string + TopicName string +} + +func (c *eventCommand) newDescribeCommand() *cobra.Command { + cmd := &cobra.Command{ + Use: "describe", + Short: "Describe the Connect log events configuration.", + Args: cobra.NoArgs, + RunE: pcmd.NewCLIRunE(c.describe), + } + + cmd.Flags().StringP(output.FlagName, output.ShortHandFlag, output.DefaultValue, output.Usage) + + return cmd +} + +func (c *eventCommand) describe(cmd *cobra.Command, _ []string) error { + if c.State.Auth == nil || c.State.Auth.Organization == nil || c.State.Auth.Organization.AuditLog == nil || + c.State.Auth.Organization.AuditLog.ClusterId == "" { + return errors.New(errors.ConnectLogEventsNotEnabledErrorMsg) + } + + auditLog := c.State.Auth.Organization.AuditLog + + serviceAccount, err := c.Client.User.GetServiceAccount(context.Background(), auditLog.ServiceAccountId) + if err != nil { + return err + } + + info := &connectLogEventsInfo{ + ClusterId: auditLog.ClusterId, + EnvironmentId: auditLog.AccountId, + ServiceAccountId: serviceAccount.ResourceId, + TopicName: "confluent-connect-log-events", + } + + return output.DescribeObject(cmd, info, connectLogListFields, humanLabelMap, structuredLabelMap) +} diff --git a/internal/cmd/connect/command_list.go b/internal/cmd/connect/command_list.go new file mode 100644 index 0000000000..07c9ea2d5c --- /dev/null +++ b/internal/cmd/connect/command_list.go @@ -0,0 +1,65 @@ +package connect + +import ( + "context" + + schedv1 "github.com/confluentinc/cc-structs/kafka/scheduler/v1" + "github.com/spf13/cobra" + + pcmd "github.com/confluentinc/cli/internal/pkg/cmd" + "github.com/confluentinc/cli/internal/pkg/examples" + "github.com/confluentinc/cli/internal/pkg/output" +) + +func (c *command) newListCommand() *cobra.Command { + cmd := &cobra.Command{ + Use: "list", + Short: "List connectors.", + Args: cobra.NoArgs, + RunE: pcmd.NewCLIRunE(c.list), + Annotations: map[string]string{pcmd.RunRequirement: pcmd.RequireNonAPIKeyCloudLogin}, + Example: examples.BuildExampleString( + examples.Example{ + Text: "List connectors in the current or specified Kafka cluster context.", + Code: "confluent connect list", + }, + examples.Example{ + Code: "confluent connect list --cluster lkc-123456", + }, + ), + } + + cmd.Flags().StringP(output.FlagName, output.ShortHandFlag, output.DefaultValue, output.Usage) + + return cmd +} + +func (c *command) list(cmd *cobra.Command, _ []string) error { + kafkaCluster, err := c.Context.GetKafkaClusterForCommand(cmd) + if err != nil { + return err + } + + connectors, err := c.Client.Connect.ListWithExpansions(context.Background(), &schedv1.Connector{AccountId: c.EnvironmentId(), KafkaClusterId: kafkaCluster.ID}, "status,info,id") + if err != nil { + return err + } + + outputWriter, err := output.NewListOutputWriter(cmd, listFields, listFields, listStructuredLabels) + if err != nil { + return err + } + + for name, connector := range connectors { + connector := &connectorDescribeDisplay{ + Name: name, + ID: connector.Id.Id, + Status: connector.Status.Connector.State, + Type: connector.Info.Type, + Trace: connector.Status.Connector.Trace, + } + outputWriter.AddElement(connector) + } + + return outputWriter.Out() +} diff --git a/internal/cmd/connect/command_pause.go b/internal/cmd/connect/command_pause.go new file mode 100644 index 0000000000..4f23398c1b --- /dev/null +++ b/internal/cmd/connect/command_pause.go @@ -0,0 +1,63 @@ +package connect + +import ( + "context" + + schedv1 "github.com/confluentinc/cc-structs/kafka/scheduler/v1" + "github.com/spf13/cobra" + + pcmd "github.com/confluentinc/cli/internal/pkg/cmd" + "github.com/confluentinc/cli/internal/pkg/errors" + "github.com/confluentinc/cli/internal/pkg/examples" + "github.com/confluentinc/cli/internal/pkg/utils" +) + +func (c *command) newPauseCommand() *cobra.Command { + return &cobra.Command{ + Use: "pause ", + Short: "Pause a connector.", + Args: cobra.ExactArgs(1), + RunE: pcmd.NewCLIRunE(c.pause), + Annotations: map[string]string{pcmd.RunRequirement: pcmd.RequireNonAPIKeyCloudLogin}, + Example: examples.BuildExampleString( + examples.Example{ + Text: "Pause a connector in the current or specified Kafka cluster context.", + Code: "confluent connect pause --config config.json", + }, + examples.Example{ + Code: "confluent connect pause --config config.json --cluster lkc-123456", + }, + ), + } +} + +func (c *command) pause(cmd *cobra.Command, args []string) error { + kafkaCluster, err := c.Context.GetKafkaClusterForCommand(cmd) + if err != nil { + return err + } + + connector := &schedv1.Connector{ + AccountId: c.EnvironmentId(), + KafkaClusterId: kafkaCluster.ID, + Id: args[0], + } + + connectorExpansion, err := c.Client.Connect.GetExpansionById(context.Background(), connector) + if err != nil { + return err + } + + connector = &schedv1.Connector{ + Name: connectorExpansion.Info.Name, + AccountId: c.EnvironmentId(), + KafkaClusterId: kafkaCluster.ID, + } + + if err := c.Client.Connect.Pause(context.Background(), connector); err != nil { + return err + } + + utils.Printf(cmd, errors.PausedConnectorMsg, args[0]) + return nil +} diff --git a/internal/cmd/connect/command_plugin.go b/internal/cmd/connect/command_plugin.go index ac7a2295f5..13318cbb8a 100644 --- a/internal/cmd/connect/command_plugin.go +++ b/internal/cmd/connect/command_plugin.go @@ -2,18 +2,12 @@ package connect import ( "context" - "fmt" - "github.com/c-bata/go-prompt" schedv1 "github.com/confluentinc/cc-structs/kafka/scheduler/v1" + opv1 "github.com/confluentinc/cc-structs/operator/v1" "github.com/spf13/cobra" pcmd "github.com/confluentinc/cli/internal/pkg/cmd" - "github.com/confluentinc/cli/internal/pkg/errors" - "github.com/confluentinc/cli/internal/pkg/examples" - "github.com/confluentinc/cli/internal/pkg/output" - "github.com/confluentinc/cli/internal/pkg/utils" - "github.com/confluentinc/cli/internal/pkg/version" ) type pluginCommand struct { @@ -21,154 +15,30 @@ type pluginCommand struct { completableChildren []*cobra.Command } -type pluginDisplay struct { - PluginName string - Type string -} +func newPluginCommand(prerunner pcmd.PreRunner) *cobra.Command { + cmd := &cobra.Command{ + Use: "plugin", + Short: "Show plugins and their configurations.", + Annotations: map[string]string{pcmd.RunRequirement: pcmd.RequireNonAPIKeyCloudLogin}, + } -var ( - pluginFields = []string{"PluginName", "Type"} - pluginHumanFields = []string{"Plugin Name", "Type"} - pluginStructureLabels = []string{"plugin_name", "type"} -) + c := &pluginCommand{AuthenticatedStateFlagCommand: pcmd.NewAuthenticatedStateFlagCommand(cmd, prerunner, subcommandFlags)} -// New returns the default command object for interacting with Connect. -func NewPluginCommand(prerunner pcmd.PreRunner) *cobra.Command { - c := &pluginCommand{ - AuthenticatedStateFlagCommand: pcmd.NewAuthenticatedStateFlagCommand(&cobra.Command{ - Use: "plugin", - Short: "Show plugins and their configurations.", - Annotations: map[string]string{pcmd.RunRequirement: pcmd.RequireNonAPIKeyCloudLogin}, - }, prerunner, SubcommandFlags), - } - c.init() - return c.Command -} + describeCmd := c.newDescribeCommand() -func (c *pluginCommand) init() { - describeCmd := &cobra.Command{ - Use: "describe ", - Short: "Describe a connector plugin type.", - Args: cobra.ExactArgs(1), - RunE: pcmd.NewCLIRunE(c.describe), - Example: examples.BuildExampleString( - examples.Example{ - Text: "Describe required connector configuration parameters for a specific connector plugin.", - Code: fmt.Sprintf("%s connect plugin describe ", version.CLIName), - }, - ), - } - describeCmd.Flags().StringP(output.FlagName, output.ShortHandFlag, output.DefaultValue, output.Usage) c.AddCommand(describeCmd) + c.AddCommand(c.newListCommand()) - listCmd := &cobra.Command{ - Use: "list", - Short: "List connector plugin types.", - Args: cobra.NoArgs, - RunE: pcmd.NewCLIRunE(c.list), - Example: examples.BuildExampleString( - examples.Example{ - Text: "List connectors in the current or specified Kafka cluster context.", - Code: fmt.Sprintf("%s connect plugin list", version.CLIName), - }, - ), - } - listCmd.Flags().StringP(output.FlagName, output.ShortHandFlag, output.DefaultValue, output.Usage) - c.AddCommand(listCmd) c.completableChildren = []*cobra.Command{describeCmd} -} -func (c *pluginCommand) list(cmd *cobra.Command, _ []string) error { - outputWriter, err := output.NewListOutputWriter(cmd, pluginFields, pluginHumanFields, pluginStructureLabels) - if err != nil { - return err - } - plugins, err := c.getPlugins(cmd) - if err != nil { - return err - } - for _, conn := range plugins { - outputWriter.AddElement(conn) - } - outputWriter.StableSort() - return outputWriter.Out() + return c.Command } -func (c *pluginCommand) getPlugins(cmd *cobra.Command) ([]*pluginDisplay, error) { +func (c *pluginCommand) getPlugins(cmd *cobra.Command) ([]*opv1.ConnectorPluginInfo, error) { kafkaCluster, err := c.Context.GetKafkaClusterForCommand(cmd) if err != nil { return nil, err } - connectorInfo, err := c.Client.Connect.GetPlugins(context.Background(), &schedv1.Connector{AccountId: c.EnvironmentId(), KafkaClusterId: kafkaCluster.ID}, "") - if err != nil { - return nil, err - } - var plugins []*pluginDisplay - for _, conn := range connectorInfo { - plugins = append(plugins, &pluginDisplay{ - PluginName: conn.Class, - Type: conn.Type, - }) - } - return plugins, nil -} - -func (c *pluginCommand) describe(cmd *cobra.Command, args []string) error { - kafkaCluster, err := c.Context.GetKafkaClusterForCommand(cmd) - if err != nil { - return err - } - if len(args) == 0 { - return errors.Errorf(errors.PluginNameNotPassedErrorMsg) - } - config := map[string]string{"connector.class": args[0]} - - reply, err := c.Client.Connect.Validate(context.Background(), - &schedv1.ConnectorConfig{ - UserConfigs: config, - AccountId: c.EnvironmentId(), - KafkaClusterId: kafkaCluster.ID, - Plugin: args[0]}) - if reply != nil && err != nil { - outputFormat, flagErr := cmd.Flags().GetString(output.FlagName) - if flagErr != nil { - return flagErr - } - if outputFormat == output.Human.String() { - utils.Println(cmd, "Following are the required configs: \nconnector.class: "+args[0]+"\n"+err.Error()) - } else { - - for _, c := range reply.Configs { - if len(c.Value.Errors) > 0 { - config[c.Value.Name] = fmt.Sprintf("%s ", c.Value.Errors[0]) - } - } - return output.StructuredOutput(outputFormat, &config) - } - return nil - } - return errors.Errorf(errors.InvalidCloudErrorMsg) -} - -func (c *pluginCommand) Cmd() *cobra.Command { - return c.Command -} - -func (c *pluginCommand) ServerComplete() []prompt.Suggest { - var suggestions []prompt.Suggest - plugins, err := c.getPlugins(c.Command) - if err != nil { - return suggestions - } - for _, conn := range plugins { - suggestions = append(suggestions, prompt.Suggest{ - Text: conn.PluginName, - Description: conn.Type, - }) - } - return suggestions -} -func (c *pluginCommand) ServerCompletableChildren() []*cobra.Command { - return c.completableChildren + return c.Client.Connect.GetPlugins(context.Background(), &schedv1.Connector{AccountId: c.EnvironmentId(), KafkaClusterId: kafkaCluster.ID}, "") } diff --git a/internal/cmd/connect/command_plugin_describe.go b/internal/cmd/connect/command_plugin_describe.go new file mode 100644 index 0000000000..49b148dd32 --- /dev/null +++ b/internal/cmd/connect/command_plugin_describe.go @@ -0,0 +1,77 @@ +package connect + +import ( + "context" + "fmt" + + schedv1 "github.com/confluentinc/cc-structs/kafka/scheduler/v1" + "github.com/spf13/cobra" + + pcmd "github.com/confluentinc/cli/internal/pkg/cmd" + "github.com/confluentinc/cli/internal/pkg/errors" + "github.com/confluentinc/cli/internal/pkg/examples" + "github.com/confluentinc/cli/internal/pkg/output" + "github.com/confluentinc/cli/internal/pkg/utils" + "github.com/confluentinc/cli/internal/pkg/version" +) + +func (c *pluginCommand) newDescribeCommand() *cobra.Command { + cmd := &cobra.Command{ + Use: "describe ", + Short: "Describe a connector plugin.", + Args: cobra.ExactArgs(1), + RunE: pcmd.NewCLIRunE(c.describe), + Example: examples.BuildExampleString( + examples.Example{ + Text: `Describe the required connector configuration parameters for connector plugin "MySource".`, + Code: fmt.Sprintf("%s connect plugin describe MySource", version.CLIName), + }, + ), + } + + cmd.Flags().StringP(output.FlagName, output.ShortHandFlag, output.DefaultValue, output.Usage) + + return cmd +} + +func (c *pluginCommand) describe(cmd *cobra.Command, args []string) error { + if len(args) == 0 { + return errors.Errorf(errors.PluginNameNotPassedErrorMsg) + } + + kafkaCluster, err := c.Context.GetKafkaClusterForCommand(cmd) + if err != nil { + return err + } + + config := map[string]string{"connector.class": args[0]} + connectorConfig := &schedv1.ConnectorConfig{ + UserConfigs: config, + AccountId: c.EnvironmentId(), + KafkaClusterId: kafkaCluster.ID, + Plugin: args[0], + } + + reply, err := c.Client.Connect.Validate(context.Background(), connectorConfig) + if reply != nil && err != nil { + outputFormat, flagErr := cmd.Flags().GetString(output.FlagName) + if flagErr != nil { + return flagErr + } + + if outputFormat == output.Human.String() { + utils.Println(cmd, "The following are required configs:") + utils.Print(cmd, "connector.class : "+args[0]+"\n"+err.Error()) + return nil + } + + for _, c := range reply.Configs { + if len(c.Value.Errors) > 0 { + config[c.Value.Name] = fmt.Sprintf("%s ", c.Value.Errors[0]) + } + } + return output.StructuredOutput(outputFormat, &config) + } + + return errors.Errorf(errors.InvalidCloudErrorMsg) +} diff --git a/internal/cmd/connect/command_plugin_list.go b/internal/cmd/connect/command_plugin_list.go new file mode 100644 index 0000000000..9bac01dfa7 --- /dev/null +++ b/internal/cmd/connect/command_plugin_list.go @@ -0,0 +1,56 @@ +package connect + +import ( + "github.com/spf13/cobra" + + pcmd "github.com/confluentinc/cli/internal/pkg/cmd" + "github.com/confluentinc/cli/internal/pkg/examples" + "github.com/confluentinc/cli/internal/pkg/output" +) + +var ( + pluginFields = []string{"Class", "Type"} + pluginHumanFields = []string{"Plugin Name", "Type"} + pluginStructureLabels = []string{"plugin_name", "type"} +) + +func (c *pluginCommand) newListCommand() *cobra.Command { + cmd := &cobra.Command{ + Use: "list", + Short: "List connector plugin types.", + Args: cobra.NoArgs, + RunE: pcmd.NewCLIRunE(c.list), + Example: examples.BuildExampleString( + examples.Example{ + Text: "List connectors in the current or specified Kafka cluster context.", + Code: "confluent connect plugin list", + }, + examples.Example{ + Code: "confluent connect plugin list --cluster lkc-123456", + }, + ), + } + + cmd.Flags().StringP(output.FlagName, output.ShortHandFlag, output.DefaultValue, output.Usage) + + return cmd +} + +func (c *pluginCommand) list(cmd *cobra.Command, _ []string) error { + plugins, err := c.getPlugins(cmd) + if err != nil { + return err + } + + outputWriter, err := output.NewListOutputWriter(cmd, pluginFields, pluginHumanFields, pluginStructureLabels) + if err != nil { + return err + } + + for _, conn := range plugins { + outputWriter.AddElement(conn) + } + outputWriter.StableSort() + + return outputWriter.Out() +} diff --git a/internal/cmd/connect/command_resume.go b/internal/cmd/connect/command_resume.go new file mode 100644 index 0000000000..99340254c9 --- /dev/null +++ b/internal/cmd/connect/command_resume.go @@ -0,0 +1,63 @@ +package connect + +import ( + "context" + + schedv1 "github.com/confluentinc/cc-structs/kafka/scheduler/v1" + "github.com/spf13/cobra" + + pcmd "github.com/confluentinc/cli/internal/pkg/cmd" + "github.com/confluentinc/cli/internal/pkg/errors" + "github.com/confluentinc/cli/internal/pkg/examples" + "github.com/confluentinc/cli/internal/pkg/utils" +) + +func (c *command) newResumeCommand() *cobra.Command { + return &cobra.Command{ + Use: "resume ", + Short: "Resume a connector.", + Args: cobra.ExactArgs(1), + RunE: pcmd.NewCLIRunE(c.resume), + Annotations: map[string]string{pcmd.RunRequirement: pcmd.RequireNonAPIKeyCloudLogin}, + Example: examples.BuildExampleString( + examples.Example{ + Text: "Resume a connector in the current or specified Kafka cluster context.", + Code: "confluent connect resume --config config.json", + }, + examples.Example{ + Code: "confluent connect resume --config config.json --cluster lkc-123456", + }, + ), + } +} + +func (c *command) resume(cmd *cobra.Command, args []string) error { + kafkaCluster, err := c.Context.GetKafkaClusterForCommand(cmd) + if err != nil { + return err + } + + connector := &schedv1.Connector{ + AccountId: c.EnvironmentId(), + KafkaClusterId: kafkaCluster.ID, + Id: args[0], + } + + connectorExpansion, err := c.Client.Connect.GetExpansionById(context.Background(), connector) + if err != nil { + return err + } + + connector = &schedv1.Connector{ + Name: connectorExpansion.Info.Name, + AccountId: c.EnvironmentId(), + KafkaClusterId: kafkaCluster.ID, + } + + if err := c.Client.Connect.Resume(context.Background(), connector); err != nil { + return err + } + + utils.Printf(cmd, errors.ResumedConnectorMsg, args[0]) + return nil +} diff --git a/internal/cmd/connect/command_test.go b/internal/cmd/connect/command_test.go index 2d900f24fa..032744c8d4 100644 --- a/internal/cmd/connect/command_test.go +++ b/internal/cmd/connect/command_test.go @@ -133,8 +133,7 @@ func (suite *ConnectTestSuite) SetupTest() { func (suite *ConnectTestSuite) newCmd() *command { prerunner := cliMock.NewPreRunnerMock(&ccloud.Client{Connect: suite.connectMock, Kafka: suite.kafkaMock}, nil, nil, suite.conf) - cmd := New(suite.conf, prerunner, suite.analyticsClient) - return cmd + return New(prerunner, suite.analyticsClient) } func (suite *ConnectTestSuite) TestPauseConnector() { diff --git a/internal/cmd/connect/command_update.go b/internal/cmd/connect/command_update.go new file mode 100644 index 0000000000..4b342386a8 --- /dev/null +++ b/internal/cmd/connect/command_update.go @@ -0,0 +1,67 @@ +package connect + +import ( + "context" + + schedv1 "github.com/confluentinc/cc-structs/kafka/scheduler/v1" + "github.com/spf13/cobra" + + pcmd "github.com/confluentinc/cli/internal/pkg/cmd" + "github.com/confluentinc/cli/internal/pkg/errors" + "github.com/confluentinc/cli/internal/pkg/utils" +) + +func (c *command) newUpdateCommand() *cobra.Command { + cmd := &cobra.Command{ + Use: "update ", + Short: "Update a connector configuration.", + Args: cobra.ExactArgs(1), + RunE: pcmd.NewCLIRunE(c.update), + Annotations: map[string]string{pcmd.RunRequirement: pcmd.RequireNonAPIKeyCloudLogin}, + } + + cmd.Flags().String("config", "", "JSON connector config file.") + + _ = cmd.MarkFlagRequired("config") + + return cmd +} + +func (c *command) update(cmd *cobra.Command, args []string) error { + userConfigs, err := getConfig(cmd) + if err != nil { + return err + } + + kafkaCluster, err := c.Context.GetKafkaClusterForCommand(cmd) + if err != nil { + return err + } + + connector := &schedv1.Connector{ + AccountId: c.EnvironmentId(), + KafkaClusterId: kafkaCluster.ID, + Id: args[0], + } + + // Resolve Connector Name from ID + connectorExpansion, err := c.Client.Connect.GetExpansionById(context.Background(), connector) + if err != nil { + return err + } + + connectorConfig := &schedv1.ConnectorConfig{ + UserConfigs: *userConfigs, + AccountId: c.EnvironmentId(), + KafkaClusterId: kafkaCluster.ID, + Name: connectorExpansion.Info.Name, + Plugin: (*userConfigs)["connector.class"], + } + + if _, err := c.Client.Connect.Update(context.Background(), connectorConfig); err != nil { + return err + } + + utils.Printf(cmd, errors.UpdatedConnectorMsg, args[0]) + return nil +} diff --git a/internal/cmd/connect/state_flags.go b/internal/cmd/connect/state_flags.go index 0654862d9b..37c4db48df 100644 --- a/internal/cmd/connect/state_flags.go +++ b/internal/cmd/connect/state_flags.go @@ -6,11 +6,11 @@ import ( "github.com/confluentinc/cli/internal/pkg/cmd" ) -var ClusterSubcommandFlags = map[string]*pflag.FlagSet{ +var clusterSubcommandFlags = map[string]*pflag.FlagSet{ "list": cmd.ContextSet(), } -var SubcommandFlags = map[string]*pflag.FlagSet{ +var subcommandFlags = map[string]*pflag.FlagSet{ "describe": cmd.ClusterEnvironmentContextSet(), "list": cmd.ClusterEnvironmentContextSet(), "create": cmd.ClusterEnvironmentContextSet(), diff --git a/internal/cmd/connect/utils.go b/internal/cmd/connect/utils.go index 228f708ba8..92350071b0 100644 --- a/internal/cmd/connect/utils.go +++ b/internal/cmd/connect/utils.go @@ -10,26 +10,28 @@ import ( ) const ( - Config = "config" - Name = "name" - ConnectorClass = "connector.class" + config = "config" + name = "name" + connectorClass = "connector.class" ) func getConfig(cmd *cobra.Command) (*map[string]string, error) { - fileName, err := cmd.Flags().GetString(Config) + fileName, err := cmd.Flags().GetString(config) if err != nil { return nil, errors.Wrap(err, "error reading --config as string") } - var options map[string]string - options, err = parseConfigFile(fileName) + + options, err := parseConfigFile(fileName) if err != nil { return nil, errors.Wrapf(err, "unable to parse config %s", fileName) } - _, nameExists := options[Name] - _, classExists := options[ConnectorClass] + + _, nameExists := options[name] + _, classExists := options[connectorClass] if !nameExists || !classExists { return nil, errors.Errorf(errors.MissingRequiredConfigsErrorMsg, fileName) } + return &options, nil } @@ -45,27 +47,28 @@ func parseConfigFile(fileName string) (map[string]string, error) { kvPairs := make(map[string]string) var options map[string]interface{} - err = json.Unmarshal(jsonFile, &options) - - if err != nil { + if err := json.Unmarshal(jsonFile, &options); err != nil { return nil, errors.Wrapf(err, errors.ParseConfigErrorMsg, fileName) } + for key, val := range options { if val2, ok := val.(string); ok { kvPairs[key] = val2 } else { // We support object-as-a-value only for "config" key. - if key != Config { - return nil, errors.Errorf("Only string value is permitted for the configuration : %s", key) + if key != config { + return nil, errors.Errorf(`only string values are permitted for the configuration "%s"`, key) } + configMap, ok := val.(map[string]interface{}) if !ok { - return nil, errors.Errorf("Value for the configuration : %s is malformed", Config) + return nil, errors.Errorf(`value for the configuration "%s" is malformed`, config) } + for configKey, configVal := range configMap { value, isString := configVal.(string) if !isString { - return nil, errors.Errorf("Only string value is permitted for the configuration : %s", configKey) + return nil, errors.Errorf(`only string values are permitted for the configuration "%s"`, configKey) } kvPairs[configKey] = value } diff --git a/test/fixtures/output/connect/connect-plugin-describe.golden b/test/fixtures/output/connect/connect-plugin-describe.golden index 228551b07e..ba58f490d8 100644 --- a/test/fixtures/output/connect/connect-plugin-describe.golden +++ b/test/fixtures/output/connect/connect-plugin-describe.golden @@ -1,5 +1,5 @@ -Following are the required configs: -connector.class: GcsSink +The following are required configs: +connector.class : GcsSink kafka.api.key : ["kafka.api.key" is required] kafka.api.secret : ["kafka.api.secret" is required] topics : ["topics" is required] @@ -8,4 +8,3 @@ gcs.credentials.config : ["gcs.credentials.config" is required] gcs.bucket.name : ["gcs.bucket.name" is required] time.interval : ["data.format" is required Value "null" doesn't belong to the property's "time.interval" enum] tasks.max : ["tasks.max" is required] - diff --git a/test/fixtures/output/connect/connect-plugin-help.golden b/test/fixtures/output/connect/connect-plugin-help.golden index 712b4c36a4..99e4204692 100644 --- a/test/fixtures/output/connect/connect-plugin-help.golden +++ b/test/fixtures/output/connect/connect-plugin-help.golden @@ -4,7 +4,7 @@ Usage: confluent connect plugin [command] Available Commands: - describe Describe a connector plugin type. + describe Describe a connector plugin. list List connector plugin types. Global Flags: