Skip to content

Commit

Permalink
Add kafka topic list/create/delete
Browse files Browse the repository at this point in the history
  • Loading branch information
codyaray committed Aug 3, 2018
1 parent 4c50b37 commit 66a90a8
Show file tree
Hide file tree
Showing 7 changed files with 185 additions and 12 deletions.
52 changes: 51 additions & 1 deletion Gopkg.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions Gopkg.toml
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,10 @@
name = "github.com/hashicorp/hcl"
revision = "ef8a98b0bbce4a65b5aa4c368430a80ddc533168"

[[constraint]]
name = "github.com/Shopify/sarama"
revision = "e7238b119b7daab993720f0153eafb88e2b0ac1f"

[prune]
go-tests = true
unused-packages = true
4 changes: 4 additions & 0 deletions command/common/cli.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,10 @@ func HandleError(err error) error {
switch err.(type) {
case editor.ErrEditing:
fmt.Println(err)
case shared.ErrNotAuthenticated:
fmt.Println(err)
case shared.ErrKafka:
fmt.Println(err)
default:
return err
}
Expand Down
2 changes: 1 addition & 1 deletion command/kafka/command_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ func (c *clusterCommand) auth(cmd *cobra.Command, args []string) error {
c.config.Platforms[cfg.Platform].KafkaClusters = map[string]shared.KafkaCluster{}
}
c.config.Platforms[cfg.Platform].KafkaClusters[cfg.Kafka] = shared.KafkaCluster{
Bootstrap: kc.Endpoint,
Bootstrap: strings.TrimPrefix(kc.Endpoint, "SASL_SSL://"),
APIKey: key,
APISecret: secret,
}
Expand Down
71 changes: 63 additions & 8 deletions command/kafka/command_topic.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,13 @@
package kafka

import (
"fmt"
"strings"

"github.com/Shopify/sarama"
"github.com/spf13/cobra"

"github.com/confluentinc/cli/command/common"
"github.com/confluentinc/cli/shared"
)

Expand All @@ -16,8 +21,8 @@ type topicCommand struct {
func NewTopicCommand(config *shared.Config, kafka Kafka) *cobra.Command {
cmd := &topicCommand{
Command: &cobra.Command{
Use: "cluster",
Short: "Manage kafka clusters.",
Use: "topic",
Short: "Manage kafka topics.",
},
config: config,
kafka: kafka,
Expand All @@ -31,13 +36,20 @@ func (c *topicCommand) init() error {
Use: "list",
Short: "List Kafka topics.",
RunE: c.list,
Args: cobra.NoArgs,
Args: cobra.NoArgs,
})
c.AddCommand(&cobra.Command{

createCmd := &cobra.Command{
Use: "create TOPIC",
Short: "Create a Kafka topic.",
RunE: c.create,
})
Args: cobra.ExactArgs(1),
}
createCmd.Flags().Int32("partitions", 12, "Number of topic partitions.")
createCmd.Flags().Int16("replication-factor", 3, "Replication factor.")
createCmd.Flags().StringSlice("config", nil, "A comma separated list of topic configuration (key=value) overrides for the topic being created.")
c.AddCommand(createCmd)

c.AddCommand(&cobra.Command{
Use: "describe TOPIC",
Short: "Describe a Kafka topic.",
Expand Down Expand Up @@ -72,11 +84,49 @@ func (c *topicCommand) init() error {
}

func (c *topicCommand) list(cmd *cobra.Command, args []string) error {
return shared.ErrNotImplemented
client, err := NewSaramaKafkaForConfig(c.config)
if err != nil {
return common.HandleError(shared.ErrKafka(err))
}
topics, err := client.Topics()
if err != nil {
return common.HandleError(shared.ErrKafka(err))
}
for _, topic := range topics {
fmt.Println(topic)
}
return nil
}

func (c *topicCommand) create(cmd *cobra.Command, args []string) error {
return shared.ErrNotImplemented
partitions, err := cmd.Flags().GetInt32("partitions")
if err != nil {
return common.HandleError(err)
}
replicationFactor, err := cmd.Flags().GetInt16("replication-factor")
if err != nil {
return common.HandleError(err)
}
configs, err := cmd.Flags().GetStringSlice("config")
if err != nil {
return common.HandleError(err)
}
client, err := NewSaramaAdminForConfig(c.config)
if err != nil {
return common.HandleError(shared.ErrKafka(err))
}
entries := map[string]*string{}
for _, config := range configs {
pair := strings.SplitN(config, "=", 2)
entries[pair[0]] = &pair[1]
}
config := &sarama.TopicDetail{
NumPartitions: partitions,
ReplicationFactor: replicationFactor,
ConfigEntries: entries,
}
err = client.CreateTopic(args[0], config, false)
return common.HandleError(shared.ErrKafka(err))
}

func (c *topicCommand) describe(cmd *cobra.Command, args []string) error {
Expand All @@ -88,7 +138,12 @@ func (c *topicCommand) update(cmd *cobra.Command, args []string) error {
}

func (c *topicCommand) delete(cmd *cobra.Command, args []string) error {
return shared.ErrNotImplemented
client, err := NewSaramaAdminForConfig(c.config)
if err != nil {
return common.HandleError(shared.ErrKafka(err))
}
err = client.DeleteTopic(args[0])
return common.HandleError(shared.ErrKafka(err))
}

func (c *topicCommand) produce(cmd *cobra.Command, args []string) error {
Expand Down
57 changes: 57 additions & 0 deletions command/kafka/sarama.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
package kafka

import (
"fmt"
"strings"

"github.com/Shopify/sarama"

"github.com/confluentinc/cli/shared"
)

func NewSaramaKafka(kafka shared.KafkaCluster) (sarama.Client, error) {

This comment has been minimized.

Copy link
@travisjeffery

travisjeffery Aug 3, 2018

Contributor

Create a nested pkg for sarama if you want get rid of the Sarama prefix on all these apis

return sarama.NewClient(strings.Split(kafka.Bootstrap, ","), saramaConf(kafka))
}

func NewSaramaAdmin(kafka shared.KafkaCluster) (sarama.ClusterAdmin, error) {
return sarama.NewClusterAdmin(strings.Split(kafka.Bootstrap, ","), saramaConf(kafka))
}

func NewSaramaKafkaForConfig(config *shared.Config) (sarama.Client, error) {
cluster, err := kafkaCluster(config)
if err != nil {
return nil, err
}
return NewSaramaKafka(cluster)
}

func NewSaramaAdminForConfig(config *shared.Config) (sarama.ClusterAdmin, error) {
cluster, err := kafkaCluster(config)
if err != nil {
return nil, err
}
return NewSaramaAdmin(cluster)
}

func kafkaCluster(config *shared.Config) (shared.KafkaCluster, error) {
cfg, err := config.Context()
if err != nil {
return shared.KafkaCluster{}, err
}
cluster, found := config.Platforms[cfg.Platform].KafkaClusters[cfg.Kafka]
if !found {
e := fmt.Errorf("No auth found for Kafka %s. Please run `confluent kafka cluster auth` first.\n", cfg.Kafka)
return shared.KafkaCluster{}, shared.ErrNotAuthenticated(e)
}
return cluster, nil
}

func saramaConf(kafka shared.KafkaCluster) *sarama.Config {
saramaConf := sarama.NewConfig()
saramaConf.Version = sarama.V1_1_0_0
saramaConf.Net.TLS.Enable = true
saramaConf.Net.SASL.Enable = true
saramaConf.Net.SASL.User = kafka.APIKey
saramaConf.Net.SASL.Password = kafka.APISecret
return saramaConf
}
7 changes: 5 additions & 2 deletions shared/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@ import (
* - Commands call ConvertGRPCError() to transform these back into HTTP Error constants
*/

type ErrNotAuthenticated error
type ErrKafka error

var (
ErrNotImplemented = fmt.Errorf("not implemented")
ErrIncorrectAuth = fmt.Errorf("incorrect auth")
Expand All @@ -47,7 +50,7 @@ func ConvertAPIError(err error) error {
// except this one.. its the special case of errNotFound from http/client.go
case "cluster not found":
return ErrNotFound
// TODO: assert invariant for default case: we're missing an corev1.Error -> HTTP Error constant mapping
// TODO: assert invariant for default case: we're missing an corev1.Error -> HTTP Error constant mapping
}
}
return err
Expand All @@ -65,7 +68,7 @@ func ConvertGRPCError(err error) error {
return ErrUnauthorized
case ErrNotFound.Error():
return ErrNotFound
// TODO: assert invariant for default case: we're missing a GRPC -> HTTP Error constant mapping
// TODO: assert invariant for default case: we're missing a GRPC -> HTTP Error constant mapping
}
return fmt.Errorf(s.Message())
}
Expand Down

0 comments on commit 66a90a8

Please sign in to comment.