diff --git a/README.md b/README.md index 3850186..6f5c359 100644 --- a/README.md +++ b/README.md @@ -11,6 +11,7 @@ Some reasons why you might be interested: * Fast start up time. * No buffering of output. * Binary keys and payloads can be passed and presented in base64 or hex encoding +* Support for tls authentication [![Build Status](https://travis-ci.org/fgeller/kt.svg?branch=master)](https://travis-ci.org/fgeller/kt) @@ -38,6 +39,12 @@ Produce messages: "partition": 0, "startOffset": 0 } + $ echo 'Bob wins Oscar' | kt produce -tlsca myca.pem -tlscert myclientcert.pem -tlscertkey mycertkey.pem -topic actor-news -literal + { + "count": 1, + "partition": 0, + "startOffset": 0 + } $ for i in {6..9} ; do echo Bourne sequel $i in production. | kt produce -topic actor-news -literal ;done { "count": 1, diff --git a/common.go b/common.go index 9d71dcf..8f49171 100644 --- a/common.go +++ b/common.go @@ -2,9 +2,12 @@ package main import ( "bufio" + "crypto/tls" + "crypto/x509" "encoding/json" "fmt" "io" + "io/ioutil" "math/rand" "os" "os/signal" @@ -163,3 +166,30 @@ func randomString(length int) string { r.Read(buf) return fmt.Sprintf("%x", buf)[:length] } + +// setupCerts takes the paths to a tls certificate, CA, and certificate key in +// a PEM format and returns a constructed tls.Config object. +func setupCerts(certPath, caPath, keyPath string) (*tls.Config, error) { + caString, err := ioutil.ReadFile(caPath) + if err != nil { + return nil, err + } + + caPool := x509.NewCertPool() + ok := caPool.AppendCertsFromPEM(caString) + if !ok { + failf("unable to add ca at %s to certificate pool", caPath) + } + + clientCert, err := tls.LoadX509KeyPair(certPath, keyPath) + if err != nil { + return nil, err + } + + bundle := &tls.Config{ + RootCAs: caPool, + Certificates: []tls.Certificate{clientCert}, + } + bundle.BuildNameToCertificate() + return bundle, nil +} diff --git a/consume.go b/consume.go index 0f41b6e..e478b32 100644 --- a/consume.go +++ b/consume.go @@ -20,6 +20,9 @@ import ( type consumeCmd struct { topic string brokers []string + tlsCA string + tlsCert string + tlsCertKey string offsets map[int32]interval timeout time.Duration verbose bool @@ -71,6 +74,9 @@ type interval struct { type consumeArgs struct { topic string brokers string + tlsCA string + tlsCert string + tlsCertKey string timeout time.Duration offsets string verbose bool @@ -199,6 +205,9 @@ func (cmd *consumeCmd) parseArgs(as []string) { args.topic = envTopic } cmd.topic = args.topic + cmd.tlsCA = args.tlsCA + cmd.tlsCert = args.tlsCert + cmd.tlsCertKey = args.tlsCertKey cmd.timeout = args.timeout cmd.verbose = args.verbose cmd.pretty = args.pretty @@ -242,6 +251,9 @@ func (cmd *consumeCmd) parseFlags(as []string) consumeArgs { flags := flag.NewFlagSet("consume", flag.ExitOnError) flags.StringVar(&args.topic, "topic", "", "Topic to consume (required).") flags.StringVar(&args.brokers, "brokers", "", "Comma separated list of brokers. Port defaults to 9092 when omitted (defaults to localhost:9092).") + flags.StringVar(&args.tlsCA, "tlsca", "", "Path to the tls certificate authority file") + flags.StringVar(&args.tlsCert, "tlscert", "", "Path to the tls client certificate file") + flags.StringVar(&args.tlsCertKey, "tlscertkey", "", "Path to the tls client certificate key file") flags.StringVar(&args.offsets, "offsets", "", "Specifies what messages to read by partition and offset range (defaults to all).") flags.DurationVar(&args.timeout, "timeout", time.Duration(0), "Timeout after not reading messages (default 0 to disable).") flags.BoolVar(&args.verbose, "verbose", false, "More verbose logging to stderr.") @@ -275,6 +287,18 @@ func (cmd *consumeCmd) setupClient() { if cmd.verbose { fmt.Fprintf(os.Stderr, "sarama client configuration %#v\n", cfg) } + // AFAIK kafka authentication only works when the ca, cert and certkey are + // presented. + if cmd.tlsCert != "" && cmd.tlsCA != "" && cmd.tlsCertKey != "" { + tlsConfig, err := setupCerts(cmd.tlsCert, cmd.tlsCA, cmd.tlsCertKey) + if err != nil { + failf("failed to setup certificates as consumer err=%v", err) + } + if tlsConfig != nil { + cfg.Net.TLS.Enable = true + cfg.Net.TLS.Config = tlsConfig + } + } if cmd.client, err = sarama.NewClient(cmd.brokers, cfg); err != nil { failf("failed to create client err=%v", err) diff --git a/group.go b/group.go index 1c4f584..87ae072 100644 --- a/group.go +++ b/group.go @@ -17,6 +17,9 @@ import ( type groupCmd struct { brokers []string + tlsCA string + tlsCert string + tlsCertKey string group string filterGroups *regexp.Regexp filterTopics *regexp.Regexp @@ -323,6 +326,18 @@ func (cmd *groupCmd) saramaConfig() *sarama.Config { fmt.Fprintf(os.Stderr, "Failed to read current user err=%v", err) } cfg.ClientID = "kt-group-" + sanitizeUsername(usr.Username) + // AFAIK kafka authentication only works when the ca, cert and certkey are + // presented. + if cmd.tlsCert != "" && cmd.tlsCA != "" && cmd.tlsCertKey != "" { + tlsConfig, err := setupCerts(cmd.tlsCert, cmd.tlsCA, cmd.tlsCertKey) + if err != nil { + failf("failed to setup certificates as group err=%v", err) + } + if tlsConfig != nil { + cfg.Net.TLS.Enable = true + cfg.Net.TLS.Config = tlsConfig + } + } return cfg } @@ -344,6 +359,9 @@ func (cmd *groupCmd) parseArgs(as []string) { } cmd.topic = args.topic + cmd.tlsCA = args.tlsCA + cmd.tlsCert = args.tlsCert + cmd.tlsCertKey = args.tlsCertKey cmd.group = args.group cmd.verbose = args.verbose cmd.pretty = args.pretty @@ -417,6 +435,9 @@ func (cmd *groupCmd) parseArgs(as []string) { type groupArgs struct { topic string brokers string + tlsCA string + tlsCert string + tlsCertKey string partitions string group string filterGroups string @@ -433,6 +454,9 @@ func (cmd *groupCmd) parseFlags(as []string) groupArgs { flags := flag.NewFlagSet("group", flag.ExitOnError) flags.StringVar(&args.topic, "topic", "", "Topic to consume (required).") flags.StringVar(&args.brokers, "brokers", "", "Comma separated list of brokers. Port defaults to 9092 when omitted (defaults to localhost:9092).") + flags.StringVar(&args.tlsCA, "tlsca", "", "Path to the tls certificate authority file") + flags.StringVar(&args.tlsCert, "tlscert", "", "Path to the tls client certificate file") + flags.StringVar(&args.tlsCertKey, "tlscertkey", "", "Path to the tls client certificate key file") flags.StringVar(&args.group, "group", "", "Consumer group name.") flags.StringVar(&args.filterGroups, "filter-groups", "", "Regex to filter groups.") flags.StringVar(&args.filterTopics, "filter-topics", "", "Regex to filter topics.") diff --git a/produce.go b/produce.go index 9ca9c8d..55d4646 100644 --- a/produce.go +++ b/produce.go @@ -19,6 +19,9 @@ type produceArgs struct { topic string partition int brokers string + tlsCA string + tlsCert string + tlsCertKey string batch int timeout time.Duration verbose bool @@ -44,6 +47,9 @@ func (cmd *produceCmd) read(as []string) produceArgs { flags.StringVar(&args.topic, "topic", "", "Topic to produce to (required).") flags.IntVar(&args.partition, "partition", 0, "Partition to produce to (defaults to 0).") flags.StringVar(&args.brokers, "brokers", "", "Comma separated list of brokers. Port defaults to 9092 when omitted (defaults to localhost:9092).") + flags.StringVar(&args.tlsCA, "tlsca", "", "Path to the tls certificate authority file") + flags.StringVar(&args.tlsCert, "tlscert", "", "Path to the tls client certificate file") + flags.StringVar(&args.tlsCertKey, "tlscertkey", "", "Path to the tls client certificate key file") flags.IntVar(&args.batch, "batch", 1, "Max size of a batch before sending it off") flags.DurationVar(&args.timeout, "timeout", 50*time.Millisecond, "Duration to wait for batch to be filled before sending it off") flags.BoolVar(&args.verbose, "verbose", false, "Verbose output") @@ -87,6 +93,9 @@ func (cmd *produceCmd) parseArgs(as []string) { } } cmd.topic = args.topic + cmd.tlsCA = args.tlsCA + cmd.tlsCert = args.tlsCert + cmd.tlsCertKey = args.tlsCertKey envBrokers := os.Getenv("KT_BROKERS") if args.brokers == "" { @@ -162,6 +171,18 @@ func (cmd *produceCmd) findLeaders() { if cmd.verbose { fmt.Fprintf(os.Stderr, "sarama client configuration %#v\n", cfg) } + // AFAIK kafka authentication only works when the ca, cert and certkey are + // presented. + if cmd.tlsCert != "" && cmd.tlsCA != "" && cmd.tlsCertKey != "" { + tlsConfig, err := setupCerts(cmd.tlsCert, cmd.tlsCA, cmd.tlsCertKey) + if err != nil { + failf("failed to setup certificates as producer err=%v", err) + } + if tlsConfig != nil { + cfg.Net.TLS.Enable = true + cfg.Net.TLS.Config = tlsConfig + } + } loop: for _, addr := range cmd.brokers { @@ -219,6 +240,9 @@ loop: type produceCmd struct { topic string brokers []string + tlsCA string + tlsCert string + tlsCertKey string batch int timeout time.Duration verbose bool diff --git a/topic.go b/topic.go index 76e77f4..f7fb113 100644 --- a/topic.go +++ b/topic.go @@ -15,6 +15,9 @@ import ( type topicArgs struct { brokers string + tlsCA string + tlsCert string + tlsCertKey string filter string partitions bool leaders bool @@ -26,6 +29,9 @@ type topicArgs struct { type topicCmd struct { brokers []string + tlsCA string + tlsCert string + tlsCertKey string filter *regexp.Regexp partitions bool leaders bool @@ -58,6 +64,9 @@ func (cmd *topicCmd) parseFlags(as []string) topicArgs { ) flags.StringVar(&args.brokers, "brokers", "", "Comma separated list of brokers. Port defaults to 9092 when omitted.") + flags.StringVar(&args.tlsCA, "tlsca", "", "Path to the tls certificate authority file") + flags.StringVar(&args.tlsCert, "tlscert", "", "Path to the tls client certificate file") + flags.StringVar(&args.tlsCertKey, "tlscertkey", "", "Path to the tls client certificate key file") flags.BoolVar(&args.partitions, "partitions", false, "Include information per partition.") flags.BoolVar(&args.leaders, "leaders", false, "Include leader information per partition.") flags.BoolVar(&args.replicas, "replicas", false, "Include replica ids per partition.") @@ -102,6 +111,9 @@ func (cmd *topicCmd) parseArgs(as []string) { failf("invalid regex for filter err=%s", err) } + cmd.tlsCA = args.tlsCA + cmd.tlsCert = args.tlsCert + cmd.tlsCertKey = args.tlsCertKey cmd.filter = re cmd.partitions = args.partitions cmd.leaders = args.leaders @@ -127,6 +139,18 @@ func (cmd *topicCmd) connect() { if cmd.verbose { fmt.Fprintf(os.Stderr, "sarama client configuration %#v\n", cfg) } + // AFAIK kafka authentication only works when the ca, cert and certkey are + // presented. + if cmd.tlsCert != "" && cmd.tlsCA != "" && cmd.tlsCertKey != "" { + tlsConfig, err := setupCerts(cmd.tlsCert, cmd.tlsCA, cmd.tlsCertKey) + if err != nil { + failf("failed to setup certificates as topic viewer err=%v", err) + } + if tlsConfig != nil { + cfg.Net.TLS.Enable = true + cfg.Net.TLS.Config = tlsConfig + } + } if cmd.client, err = sarama.NewClient(cmd.brokers, cfg); err != nil { failf("failed to create client err=%v", err)