diff --git a/.github/workflows/go.yml b/.github/workflows/go.yml new file mode 100644 index 0000000..0d929bf --- /dev/null +++ b/.github/workflows/go.yml @@ -0,0 +1,37 @@ +name: Continuous Integration + +on: + workflow_dispatch: + push: + branches: [ main ] + pull_request: + branches: [ main ] + +jobs: + + test: + name: kt Test + runs-on: ubuntu-latest + steps: + + - name: Set up Go 1.x + uses: actions/setup-go@v2 + with: + go-version: ^1.21 + + - name: Check out code into the Go module directory + uses: actions/checkout@v2 + + - name: Get dependencies + run: | + go get -v -t -d ./... + if [ -f Gopkg.toml ]; then + curl https://raw.githubusercontent.com/golang/dep/master/install.sh | sh + dep ensure + fi + + - name: Bring up containers + run: docker compose -f test-dependencies.yml up -d + + - name: Test + run: make test diff --git a/.travis.yml b/.travis.yml deleted file mode 100644 index 66bc8a7..0000000 --- a/.travis.yml +++ /dev/null @@ -1,20 +0,0 @@ -sudo: required -dist: trusty -language: go -services: - - docker - -go: - - 1.14 - -install: -- sudo docker version -- sudo pip install docker-compose -- docker-compose version - -before_script: -- docker-compose -f $TRAVIS_BUILD_DIR/test-dependencies.yml up -d -- sleep 15 - -script: -- make test diff --git a/Makefile b/Makefile index 0255826..8942c46 100644 --- a/Makefile +++ b/Makefile @@ -19,16 +19,19 @@ release-darwin: release: testing clean release-linux release-darwin dep-up: - docker-compose -f ./test-dependencies.yml up -d --remove-orphan - sleep 4 + docker compose -f ./test-dependencies.yml up -d dep-down: - docker-compose -f ./test-dependencies.yml down + docker compose -f ./test-dependencies.yml down testing: dep-up test dep-down test: clean - go test -v -vet=all -failfast + go test -v -vet=all -failfast -race + +.PHONY: test-secrets +test-secrets: + cd test-secrets ; /usr/bin/env bash create-certs.sh clean: rm -f kt diff --git a/README.md b/README.md index d89c680..523605a 100644 --- a/README.md +++ b/README.md @@ -1,4 +1,4 @@ -# kt - a Kafka tool that likes JSON [![Build Status](https://travis-ci.org/fgeller/kt.svg?branch=master)](https://travis-ci.org/fgeller/kt) +# kt - a Kafka tool that likes JSON [![Continuous Integration](https://github.com/fgeller/kt/actions/workflows/go.yml/badge.svg)](https://github.com/fgeller/kt/actions/workflows/go.yml) Some reasons why you might be interested: @@ -7,14 +7,15 @@ Some reasons why you might be interested: * Modify consumer group offsets (e.g., resetting or manually setting offsets per topic and per partition). * JSON output for easy consumption with tools like [kp](https://github.com/echojc/kp) or [jq](https://stedolan.github.io/jq/). * JSON input to facilitate automation via tools like [jsonify](https://github.com/fgeller/jsonify). -* Configure brokers, topic and authentication via environment variables `KT_BROKERS`, `KT_TOPIC` and `KT_AUTH`. +* Configure brokers, topic, Kafka version and authentication via environment variables `KT_BROKERS`, `KT_TOPIC`, `KT_KAFKA_VERSION` and `KT_AUTH`. * Fast start up time. * No buffering of output. * Binary keys and payloads can be passed and presented in base64 or hex encoding. * Support for TLS authentication. * Basic cluster admin functions: Create & delete topics. -I'm not using kt actively myself anymore, so if you think it's lacking some feature - please let me know by creating an issue! +> [!NOTE] +> I'm not using kt actively myself anymore, so if you think it's lacking some feature - please let me know by creating an issue. ## Examples @@ -213,7 +214,7 @@ You can download kt via the [Releases](https://github.com/fgeller/kt/releases) s Alternatively, the usual way via the go tool, for example: - $ go get -u github.com/fgeller/kt + $ go install github.com/fgeller/kt/v14@latest Or via Homebrew on OSX: @@ -283,11 +284,16 @@ Required fields: - `mode`: This needs to be set to `TLS-1way` +Optional fields: + + - `ca-certificate`: Path to your CA certificate + + Example: { - "mode": "TLS-1way", + "mode": "TLS-1way" } ### Other modes diff --git a/admin.go b/admin.go index 9459ffb..1d8ed20 100644 --- a/admin.go +++ b/admin.go @@ -11,12 +11,13 @@ import ( "strings" "time" - "github.com/Shopify/sarama" + "github.com/IBM/sarama" ) type adminCmd struct { + baseCmd + brokers []string - verbose bool version sarama.KafkaVersion timeout *time.Duration auth authConfig @@ -45,14 +46,25 @@ type adminArgs struct { func (cmd *adminCmd) parseArgs(as []string) { var ( args = cmd.parseFlags(as) + err error ) cmd.verbose = args.verbose - cmd.version = kafkaVersion(args.version) + cmd.version, err = chooseKafkaVersion(args.version, os.Getenv(ENV_KAFKA_VERSION)) + if err != nil { + failf("failed to read kafka version err=%v", err) + } + + cmd.timeout, err = parseTimeout(os.Getenv(ENV_ADMIN_TIMEOUT)) + if err != nil { + failf("failed to read timeout from env var err=%v", err) + } - cmd.timeout = parseTimeout(os.Getenv(ENV_ADMIN_TIMEOUT)) if args.timeout != "" { - cmd.timeout = parseTimeout(args.timeout) + cmd.timeout, err = parseTimeout(args.timeout) + if err != nil { + failf("failed to read timeout from args err=%v", err) + } } readAuthFile(args.auth, os.Getenv(ENV_AUTH), &cmd.auth) @@ -90,6 +102,18 @@ func (cmd *adminCmd) parseArgs(as []string) { } } +func parseTimeout(s string) (*time.Duration, error) { + if s == "" { + return nil, nil + } + + v, err := time.ParseDuration(s) + if err != nil { + return nil, err + } + return &v, nil +} + func (cmd *adminCmd) run(args []string) { var err error @@ -105,7 +129,6 @@ func (cmd *adminCmd) run(args []string) { if cmd.createTopic != "" { cmd.runCreateTopic() - } else if cmd.deleteTopic != "" { cmd.runDeleteTopic() } else { @@ -136,9 +159,10 @@ func (cmd *adminCmd) saramaConfig() *sarama.Config { cfg.Version = cmd.version if usr, err = user.Current(); err != nil { - fmt.Fprintf(os.Stderr, "Failed to read current user err=%v", err) + cmd.infof("Failed to read current user err=%v", err) } cfg.ClientID = "kt-admin-" + sanitizeUsername(usr.Username) + cmd.infof("sarama client configuration %#v\n", cfg) if cmd.timeout != nil { cfg.Admin.Timeout = *cmd.timeout @@ -167,9 +191,9 @@ func (cmd *adminCmd) parseFlags(as []string) adminArgs { flags.StringVar(&args.deleteTopic, "deletetopic", "", "Name of the topic that should be deleted.") flags.Usage = func() { - fmt.Fprintln(os.Stderr, "Usage of admin:") + warnf("Usage of admin:") flags.PrintDefaults() - fmt.Fprintln(os.Stderr, adminDocString) + warnf(adminDocString + "\n") } err := flags.Parse(as) @@ -189,7 +213,7 @@ The value supplied on the command line wins over the environment variable value. If both -createtopic and deletetopic are supplied, -createtopic wins. The topic details should be passed via a JSON file that represents a sarama.TopicDetail struct. -cf https://godoc.org/github.com/Shopify/sarama#TopicDetail +cf https://godoc.org/github.com/IBM/sarama#TopicDetail A simple way to pass a JSON file is to use a tool like https://github.com/fgeller/jsonify and shell's process substition: diff --git a/common.go b/common.go index 1470032..62f1198 100644 --- a/common.go +++ b/common.go @@ -1,7 +1,6 @@ package main import ( - "bufio" "crypto/tls" "crypto/x509" "encoding/json" @@ -10,14 +9,14 @@ import ( "io/ioutil" "math/rand" "os" - "os/signal" + "path/filepath" "regexp" "strings" "syscall" "time" "unicode/utf16" - "github.com/Shopify/sarama" + "github.com/IBM/sarama" "golang.org/x/crypto/ssh/terminal" ) @@ -26,49 +25,47 @@ const ( ENV_ADMIN_TIMEOUT = "KT_ADMIN_TIMEOUT" ENV_BROKERS = "KT_BROKERS" ENV_TOPIC = "KT_TOPIC" + ENV_KAFKA_VERSION = "KT_KAFKA_VERSION" ) -var ( - invalidClientIDCharactersRegExp = regexp.MustCompile(`[^a-zA-Z0-9_-]`) -) +var invalidClientIDCharactersRegExp = regexp.MustCompile(`[^a-zA-Z0-9_-]`) -func listenForInterrupt(q chan struct{}) { - signals := make(chan os.Signal, 1) - signal.Notify(signals, os.Kill, os.Interrupt) - sig := <-signals - fmt.Fprintf(os.Stderr, "received signal %s\n", sig) - close(q) +type command interface { + run(args []string) } -func kafkaVersion(s string) sarama.KafkaVersion { - if s == "" { - return sarama.V2_0_0_0 - } - - v, err := sarama.ParseKafkaVersion(strings.TrimPrefix(s, "v")) - if err != nil { - failf(err.Error()) - } - - return v +type baseCmd struct { + verbose bool } -func parseTimeout(s string) *time.Duration { - if s == "" { - return nil +func (b *baseCmd) infof(msg string, args ...interface{}) { + if b.verbose { + warnf(msg, args...) } +} - v, err := time.ParseDuration(s) - if err != nil { - failf(err.Error()) - } +func warnf(msg string, args ...interface{}) { + fmt.Fprintf(os.Stderr, msg, args...) +} - return &v +func outf(msg string, args ...interface{}) { + fmt.Fprintf(os.Stdout, msg, args...) } func logClose(name string, c io.Closer) { if err := c.Close(); err != nil { - fmt.Fprintf(os.Stderr, "failed to close %#v err=%v", name, err) + warnf("failed to close %#v err=%v", name, err) + } +} + +func chooseKafkaVersion(arg, env string) (sarama.KafkaVersion, error) { + switch { + case arg != "": + return sarama.ParseKafkaVersion(strings.TrimPrefix(arg, "v")) + case env != "": + return sarama.ParseKafkaVersion(strings.TrimPrefix(env, "v")) + default: + return sarama.V3_0_0_0, nil } } @@ -109,27 +106,13 @@ func failf(msg string, args ...interface{}) { func exitf(code int, msg string, args ...interface{}) { if code == 0 { - fmt.Fprintf(os.Stdout, msg+"\n", args...) + outf(msg+"\n", args...) } else { - fmt.Fprintf(os.Stderr, msg+"\n", args...) + warnf(msg+"\n", args...) } os.Exit(code) } -func readStdinLines(max int, out chan string) { - scanner := bufio.NewScanner(os.Stdin) - scanner.Buffer(make([]byte, max), max) - - for scanner.Scan() { - out <- scanner.Text() - } - - if err := scanner.Err(); err != nil { - fmt.Fprintf(os.Stderr, "scanning input failed err=%v\n", err) - } - close(out) -} - // hashCode imitates the behavior of the JDK's String#hashCode method. // https://docs.oracle.com/javase/7/docs/api/java/lang/String.html#hashCode() // @@ -219,10 +202,12 @@ func setupCerts(certPath, caPath, keyPath string) (*tls.Config, error) { } type authConfig struct { - Mode string `json:"mode"` - CACert string `json:"ca-certificate"` - ClientCert string `json:"client-certificate"` - ClientCertKey string `json:"client-certificate-key"` + Mode string `json:"mode"` + CACert string `json:"ca-certificate"` + ClientCert string `json:"client-certificate"` + ClientCertKey string `json:"client-certificate-key"` + SASLPlainUser string `json:"sasl_plain_user"` + SASLPlainPassword string `json:"sasl_plain_password"` } func setupAuth(auth authConfig, saramaCfg *sarama.Config) error { @@ -235,14 +220,43 @@ func setupAuth(auth authConfig, saramaCfg *sarama.Config) error { return setupAuthTLS(auth, saramaCfg) case "TLS-1way": return setupAuthTLS1Way(auth, saramaCfg) + case "SASL": + return setupSASL(auth, saramaCfg) default: return fmt.Errorf("unsupport auth mode: %#v", auth.Mode) } } +func setupSASL(auth authConfig, saramaCfg *sarama.Config) error { + saramaCfg.Net.SASL.Enable = true + saramaCfg.Net.SASL.User = auth.SASLPlainUser + saramaCfg.Net.SASL.Password = auth.SASLPlainPassword + return nil +} + func setupAuthTLS1Way(auth authConfig, saramaCfg *sarama.Config) error { saramaCfg.Net.TLS.Enable = true saramaCfg.Net.TLS.Config = &tls.Config{} + + if auth.CACert == "" { + return nil + } + + caString, err := ioutil.ReadFile(auth.CACert) + if err != nil { + return fmt.Errorf("failed to read ca-certificate err=%v", err) + } + + caPool := x509.NewCertPool() + ok := caPool.AppendCertsFromPEM(caString) + if !ok { + failf("unable to add ca-certificate at %s to certificate pool", auth.CACert) + } + + tlsCfg := &tls.Config{RootCAs: caPool} + tlsCfg.BuildNameToCertificate() + + saramaCfg.Net.TLS.Config = tlsCfg return nil } @@ -276,6 +290,12 @@ func setupAuthTLS(auth authConfig, saramaCfg *sarama.Config) error { return nil } +func qualifyPath(argFN string, target *string) { + if *target != "" && !filepath.IsAbs(*target) && filepath.Dir(*target) == "." { + *target = filepath.Join(filepath.Dir(argFN), *target) + } +} + func readAuthFile(argFN string, envFN string, target *authConfig) { if argFN == "" && envFN == "" { return @@ -294,4 +314,8 @@ func readAuthFile(argFN string, envFN string, target *authConfig) { if err := json.Unmarshal(byts, target); err != nil { failf("failed to unmarshal auth file err=%v", err) } + + qualifyPath(fn, &target.CACert) + qualifyPath(fn, &target.ClientCert) + qualifyPath(fn, &target.ClientCertKey) } diff --git a/common_test.go b/common_test.go new file mode 100644 index 0000000..c45c466 --- /dev/null +++ b/common_test.go @@ -0,0 +1,49 @@ +package main + +import ( + "fmt" + "testing" + + "github.com/IBM/sarama" + "github.com/stretchr/testify/require" +) + +func TestChooseKafkaVersion(t *testing.T) { + td := map[string]struct { + arg string + env string + err error + expected sarama.KafkaVersion + }{ + "default": { + expected: sarama.V3_0_0_0, + }, + "arg v1": { + arg: "v1.0.0", + expected: sarama.V1_0_0_0, + }, + "env v2": { + env: "v2.0.0", + expected: sarama.V2_0_0_0, + }, + "arg v1 wins over env v2": { + arg: "v1.0.0", + env: "v2.0.0", + expected: sarama.V1_0_0_0, + }, + "invalid": { + arg: "234", + err: fmt.Errorf("invalid version `234`"), + }, + } + + for tn, tc := range td { + actual, err := chooseKafkaVersion(tc.arg, tc.env) + if tc.err == nil { + require.Equal(t, tc.expected, actual, tn) + require.NoError(t, err) + } else { + require.Equal(t, tc.err, err) + } + } +} diff --git a/consume.go b/consume.go index a8b8c91..be34be4 100644 --- a/consume.go +++ b/consume.go @@ -13,18 +13,18 @@ import ( "sync" "time" - "github.com/Shopify/sarama" + "github.com/IBM/sarama" ) type consumeCmd struct { sync.Mutex + baseCmd topic string brokers []string auth authConfig offsets map[int32]interval timeout time.Duration - verbose bool version sarama.KafkaVersion encodeValue string encodeKey string @@ -97,7 +97,7 @@ type consumeArgs struct { } func (cmd *consumeCmd) failStartup(msg string) { - fmt.Fprintln(os.Stderr, msg) + warnf(msg) failf("use \"kt consume -help\" for more information") } @@ -119,9 +119,13 @@ func (cmd *consumeCmd) parseArgs(as []string) { cmd.timeout = args.timeout cmd.verbose = args.verbose cmd.pretty = args.pretty - cmd.version = kafkaVersion(args.version) cmd.group = args.group + cmd.version, err = chooseKafkaVersion(args.version, os.Getenv(ENV_KAFKA_VERSION)) + if err != nil { + failf("failed to read kafka version err=%v", err) + } + readAuthFile(args.auth, os.Getenv(ENV_AUTH), &cmd.auth) if args.encodeValue != "string" && args.encodeValue != "hex" && args.encodeValue != "base64" { @@ -393,12 +397,10 @@ func (cmd *consumeCmd) setupClient() { ) cfg.Version = cmd.version if usr, err = user.Current(); err != nil { - fmt.Fprintf(os.Stderr, "Failed to read current user err=%v", err) + cmd.infof("Failed to read current user err=%v", err) } cfg.ClientID = "kt-consume-" + sanitizeUsername(usr.Username) - if cmd.verbose { - fmt.Fprintf(os.Stderr, "sarama client configuration %#v\n", cfg) - } + cmd.infof("sarama client configuration %#v\n", cfg) if err = setupAuth(cmd.auth, cfg); err != nil { failf("failed to setup auth err=%v", err) @@ -476,17 +478,17 @@ func (cmd *consumeCmd) consumePartition(out chan printContext, partition int32) } if start, err = cmd.resolveOffset(offsets.start, partition); err != nil { - fmt.Fprintf(os.Stderr, "Failed to read start offset for partition %v err=%v\n", partition, err) + warnf("Failed to read start offset for partition %v err=%v\n", partition, err) return } if end, err = cmd.resolveOffset(offsets.end, partition); err != nil { - fmt.Fprintf(os.Stderr, "Failed to read end offset for partition %v err=%v\n", partition, err) + warnf("Failed to read end offset for partition %v err=%v\n", partition, err) return } if pcon, err = cmd.consumer.ConsumePartition(cmd.topic, partition, start); err != nil { - fmt.Fprintf(os.Stderr, "Failed to consume partition %v err=%v\n", partition, err) + warnf("Failed to consume partition %v err=%v\n", partition, err) return } @@ -538,7 +540,7 @@ func (cmd *consumeCmd) closePOMs() { cmd.Lock() for p, pom := range cmd.poms { if err := pom.Close(); err != nil { - fmt.Fprintf(os.Stderr, "failed to close partition offset manager for partition %v err=%v", p, err) + warnf("failed to close partition offset manager for partition %v err=%v", p, err) } } cmd.Unlock() @@ -588,14 +590,14 @@ func (cmd *consumeCmd) partitionLoop(out chan printContext, pc sarama.PartitionC select { case <-timeout: - fmt.Fprintf(os.Stderr, "consuming from partition %v timed out after %s\n", p, cmd.timeout) + warnf("consuming from partition %v timed out after %s\n", p, cmd.timeout) return case err := <-pc.Errors(): - fmt.Fprintf(os.Stderr, "partition %v consumer encountered err %s", p, err) + warnf("partition %v consumer encountered err %s", p, err) return case msg, ok := <-pc.Messages(): if !ok { - fmt.Fprintf(os.Stderr, "unexpected closed messages chan") + warnf("unexpected closed messages chan") return } diff --git a/consume_test.go b/consume_test.go index 308819f..e45746f 100644 --- a/consume_test.go +++ b/consume_test.go @@ -7,7 +7,7 @@ import ( "testing" "time" - "github.com/Shopify/sarama" + "github.com/IBM/sarama" ) func TestParseOffsets(t *testing.T) { @@ -21,7 +21,7 @@ func TestParseOffsets(t *testing.T) { testName: "empty", input: "", expected: map[int32]interval{ - -1: interval{ + -1: { start: offset{relative: true, start: sarama.OffsetOldest}, end: offset{start: 1<<63 - 1}, }, @@ -31,7 +31,7 @@ func TestParseOffsets(t *testing.T) { testName: "single-comma", input: ",", expected: map[int32]interval{ - -1: interval{ + -1: { start: offset{relative: true, start: sarama.OffsetOldest}, end: offset{start: 1<<63 - 1}, }, @@ -41,7 +41,7 @@ func TestParseOffsets(t *testing.T) { testName: "all", input: "all", expected: map[int32]interval{ - -1: interval{ + -1: { start: offset{relative: true, start: sarama.OffsetOldest}, end: offset{relative: false, start: 1<<63 - 1}, }, @@ -51,7 +51,7 @@ func TestParseOffsets(t *testing.T) { testName: "oldest", input: "oldest", expected: map[int32]interval{ - -1: interval{ + -1: { start: offset{relative: true, start: sarama.OffsetOldest}, end: offset{relative: false, start: 1<<63 - 1}, }, @@ -61,7 +61,7 @@ func TestParseOffsets(t *testing.T) { testName: "resume", input: "resume", expected: map[int32]interval{ - -1: interval{ + -1: { start: offset{relative: true, start: offsetResume}, end: offset{relative: false, start: 1<<63 - 1}, }, @@ -71,7 +71,7 @@ func TestParseOffsets(t *testing.T) { testName: "all-with-space", input: " all ", expected: map[int32]interval{ - -1: interval{ + -1: { start: offset{relative: true, start: sarama.OffsetOldest}, end: offset{relative: false, start: 1<<63 - 1}, }, @@ -81,7 +81,7 @@ func TestParseOffsets(t *testing.T) { testName: "all-with-zero-initial-offset", input: "all=+0:", expected: map[int32]interval{ - -1: interval{ + -1: { start: offset{relative: true, start: sarama.OffsetOldest, diff: 0}, end: offset{relative: false, start: 1<<63 - 1}, }, @@ -91,15 +91,15 @@ func TestParseOffsets(t *testing.T) { testName: "several-partitions", input: "1,2,4", expected: map[int32]interval{ - 1: interval{ + 1: { start: offset{relative: true, start: sarama.OffsetOldest}, end: offset{relative: false, start: 1<<63 - 1}, }, - 2: interval{ + 2: { start: offset{relative: true, start: sarama.OffsetOldest}, end: offset{relative: false, start: 1<<63 - 1}, }, - 4: interval{ + 4: { start: offset{relative: true, start: sarama.OffsetOldest}, end: offset{relative: false, start: 1<<63 - 1}, }, @@ -109,7 +109,7 @@ func TestParseOffsets(t *testing.T) { testName: "one-partition,empty-offsets", input: "0=", expected: map[int32]interval{ - 0: interval{ + 0: { start: offset{relative: true, start: sarama.OffsetOldest}, end: offset{relative: false, start: 1<<63 - 1}, }, @@ -119,7 +119,7 @@ func TestParseOffsets(t *testing.T) { testName: "one-partition,one-offset", input: "0=1", expected: map[int32]interval{ - 0: interval{ + 0: { start: offset{relative: false, start: 1}, end: offset{relative: false, start: 1<<63 - 1}, }, @@ -129,7 +129,7 @@ func TestParseOffsets(t *testing.T) { testName: "one-partition,empty-after-colon", input: "0=1:", expected: map[int32]interval{ - 0: interval{ + 0: { start: offset{relative: false, start: 1}, end: offset{relative: false, start: 1<<63 - 1}, }, @@ -139,15 +139,15 @@ func TestParseOffsets(t *testing.T) { testName: "multiple-partitions", input: "0=4:,2=1:10,6", expected: map[int32]interval{ - 0: interval{ + 0: { start: offset{relative: false, start: 4}, end: offset{relative: false, start: 1<<63 - 1}, }, - 2: interval{ + 2: { start: offset{relative: false, start: 1}, end: offset{relative: false, start: 10}, }, - 6: interval{ + 6: { start: offset{relative: true, start: sarama.OffsetOldest}, end: offset{relative: false, start: 1<<63 - 1}, }, @@ -157,7 +157,7 @@ func TestParseOffsets(t *testing.T) { testName: "newest-relative", input: "0=-1", expected: map[int32]interval{ - 0: interval{ + 0: { start: offset{relative: true, start: sarama.OffsetNewest, diff: -1}, end: offset{relative: false, start: 1<<63 - 1}, }, @@ -167,7 +167,7 @@ func TestParseOffsets(t *testing.T) { testName: "newest-relative,empty-after-colon", input: "0=-1:", expected: map[int32]interval{ - 0: interval{ + 0: { start: offset{relative: true, start: sarama.OffsetNewest, diff: -1}, end: offset{relative: false, start: 1<<63 - 1}, }, @@ -177,7 +177,7 @@ func TestParseOffsets(t *testing.T) { testName: "resume-relative", input: "0=resume-10", expected: map[int32]interval{ - 0: interval{ + 0: { start: offset{relative: true, start: offsetResume, diff: -10}, end: offset{relative: false, start: 1<<63 - 1}, }, @@ -187,7 +187,7 @@ func TestParseOffsets(t *testing.T) { testName: "oldest-relative", input: "0=+1", expected: map[int32]interval{ - 0: interval{ + 0: { start: offset{relative: true, start: sarama.OffsetOldest, diff: 1}, end: offset{relative: false, start: 1<<63 - 1}, }, @@ -197,7 +197,7 @@ func TestParseOffsets(t *testing.T) { testName: "oldest-relative,empty-after-colon", input: "0=+1:", expected: map[int32]interval{ - 0: interval{ + 0: { start: offset{relative: true, start: sarama.OffsetOldest, diff: 1}, end: offset{relative: false, start: 1<<63 - 1}, }, @@ -207,7 +207,7 @@ func TestParseOffsets(t *testing.T) { testName: "oldest-relative-to-newest-relative", input: "0=+1:-1", expected: map[int32]interval{ - 0: interval{ + 0: { start: offset{relative: true, start: sarama.OffsetOldest, diff: 1}, end: offset{relative: true, start: sarama.OffsetNewest, diff: -1}, }, @@ -217,11 +217,11 @@ func TestParseOffsets(t *testing.T) { testName: "specific-partition-with-all-partitions", input: "0=+1:-1,all=1:10", expected: map[int32]interval{ - 0: interval{ + 0: { start: offset{relative: true, start: sarama.OffsetOldest, diff: 1}, end: offset{relative: true, start: sarama.OffsetNewest, diff: -1}, }, - -1: interval{ + -1: { start: offset{relative: false, start: 1, diff: 0}, end: offset{relative: false, start: 10, diff: 0}, }, @@ -231,7 +231,7 @@ func TestParseOffsets(t *testing.T) { testName: "oldest-to-newest", input: "0=oldest:newest", expected: map[int32]interval{ - 0: interval{ + 0: { start: offset{relative: true, start: sarama.OffsetOldest, diff: 0}, end: offset{relative: true, start: sarama.OffsetNewest, diff: 0}, }, @@ -241,7 +241,7 @@ func TestParseOffsets(t *testing.T) { testName: "oldest-to-newest-with-offsets", input: "0=oldest+10:newest-10", expected: map[int32]interval{ - 0: interval{ + 0: { start: offset{relative: true, start: sarama.OffsetOldest, diff: 10}, end: offset{relative: true, start: sarama.OffsetNewest, diff: -10}, }, @@ -251,7 +251,7 @@ func TestParseOffsets(t *testing.T) { testName: "newest", input: "newest", expected: map[int32]interval{ - -1: interval{ + -1: { start: offset{relative: true, start: sarama.OffsetNewest, diff: 0}, end: offset{relative: false, start: 1<<63 - 1, diff: 0}, }, @@ -261,7 +261,7 @@ func TestParseOffsets(t *testing.T) { testName: "single-partition", input: "10", expected: map[int32]interval{ - 10: interval{ + 10: { start: offset{relative: true, start: sarama.OffsetOldest, diff: 0}, end: offset{relative: false, start: 1<<63 - 1, diff: 0}, }, @@ -271,7 +271,7 @@ func TestParseOffsets(t *testing.T) { testName: "single-range,all-partitions", input: "10:20", expected: map[int32]interval{ - -1: interval{ + -1: { start: offset{start: 10}, end: offset{start: 20}, }, @@ -281,7 +281,7 @@ func TestParseOffsets(t *testing.T) { testName: "single-range,all-partitions,open-end", input: "10:", expected: map[int32]interval{ - -1: interval{ + -1: { start: offset{start: 10}, end: offset{start: 1<<63 - 1}, }, @@ -291,7 +291,7 @@ func TestParseOffsets(t *testing.T) { testName: "all-newest", input: "all=newest:", expected: map[int32]interval{ - -1: interval{ + -1: { start: offset{relative: true, start: sarama.OffsetNewest, diff: 0}, end: offset{relative: false, start: 1<<63 - 1, diff: 0}, }, @@ -301,7 +301,7 @@ func TestParseOffsets(t *testing.T) { testName: "implicit-all-newest-with-offset", input: "newest-10:", expected: map[int32]interval{ - -1: interval{ + -1: { start: offset{relative: true, start: sarama.OffsetNewest, diff: -10}, end: offset{relative: false, start: 1<<63 - 1, diff: 0}, }, @@ -311,7 +311,7 @@ func TestParseOffsets(t *testing.T) { testName: "implicit-all-oldest-with-offset", input: "oldest+10:", expected: map[int32]interval{ - -1: interval{ + -1: { start: offset{relative: true, start: sarama.OffsetOldest, diff: 10}, end: offset{relative: false, start: 1<<63 - 1, diff: 0}, }, @@ -321,7 +321,7 @@ func TestParseOffsets(t *testing.T) { testName: "implicit-all-neg-offset-empty-colon", input: "-10:", expected: map[int32]interval{ - -1: interval{ + -1: { start: offset{relative: true, start: sarama.OffsetNewest, diff: -10}, end: offset{relative: false, start: 1<<63 - 1, diff: 0}, }, @@ -331,7 +331,7 @@ func TestParseOffsets(t *testing.T) { testName: "implicit-all-pos-offset-empty-colon", input: "+10:", expected: map[int32]interval{ - -1: interval{ + -1: { start: offset{relative: true, start: sarama.OffsetOldest, diff: 10}, end: offset{relative: false, start: 1<<63 - 1, diff: 0}, }, @@ -429,7 +429,7 @@ func TestFindPartitionsToConsume(t *testing.T) { consumer: tConsumer{ topics: []string{"a"}, topicsErr: nil, - partitions: map[string][]int32{"a": []int32{0, 10}}, + partitions: map[string][]int32{"a": {0, 10}}, partitionsErr: map[string]error{"a": nil}, consumePartition: map[tConsumePartition]tPartitionConsumer{}, consumePartitionErr: map[tConsumePartition]error{}, @@ -445,7 +445,7 @@ func TestFindPartitionsToConsume(t *testing.T) { consumer: tConsumer{ topics: []string{"a"}, topicsErr: nil, - partitions: map[string][]int32{"a": []int32{0, 10}}, + partitions: map[string][]int32{"a": {0, 10}}, partitionsErr: map[string]error{"a": nil}, consumePartition: map[tConsumePartition]tPartitionConsumer{}, consumePartitionErr: map[tConsumePartition]error{}, @@ -486,8 +486,8 @@ func TestConsume(t *testing.T) { calls := make(chan tConsumePartition) consumer := tConsumer{ consumePartition: map[tConsumePartition]tPartitionConsumer{ - tConsumePartition{"hans", 1, 1}: tPartitionConsumer{messages: messageChan}, - tConsumePartition{"hans", 2, 1}: tPartitionConsumer{messages: messageChan}, + {"hans", 1, 1}: {messages: messageChan}, + {"hans", 2, 1}: {messages: messageChan}, }, calls: calls, } @@ -496,7 +496,7 @@ func TestConsume(t *testing.T) { target.topic = "hans" target.brokers = []string{"localhost:9092"} target.offsets = map[int32]interval{ - -1: interval{start: offset{false, 1, 0}, end: offset{false, 5, 0}}, + -1: {start: offset{false, 1, 0}, end: offset{false, 5, 0}}, } go target.consume(partitions) @@ -506,8 +506,8 @@ func TestConsume(t *testing.T) { go func(c chan tConsumePartition, e chan struct{}) { actual := []tConsumePartition{} expected := []tConsumePartition{ - tConsumePartition{"hans", 1, 1}, - tConsumePartition{"hans", 2, 1}, + {"hans", 1, 1}, + {"hans", 2, 1}, } for { select { @@ -583,6 +583,10 @@ func (pc tPartitionConsumer) Errors() <-chan *sarama.ConsumerError { return pc.errors } +func (pc tPartitionConsumer) IsPaused() bool { return false } +func (pc tPartitionConsumer) Pause() {} +func (pc tPartitionConsumer) Resume() {} + type tConsumer struct { topics []string topicsErr error @@ -616,6 +620,12 @@ func (c tConsumer) HighWaterMarks() map[string]map[int32]int64 { return nil } +func (c tConsumer) Pause(topicPartitions map[string][]int32) {} +func (c tConsumer) PauseAll() {} + +func (c tConsumer) Resume(topicPartitions map[string][]int32) {} +func (c tConsumer) ResumeAll() {} + func TestConsumeParseArgs(t *testing.T) { topic := "test-topic" givenBroker := "hans:9092" diff --git a/go.mod b/go.mod index da1d155..1bc8e61 100644 --- a/go.mod +++ b/go.mod @@ -1,10 +1,47 @@ -module github.com/fgeller/kt +module github.com/fgeller/kt/v14 + +go 1.21 require ( - github.com/Shopify/sarama v1.26.1 + github.com/IBM/sarama v1.41.1 github.com/davecgh/go-spew v1.1.1 github.com/stretchr/testify v1.5.1 golang.org/x/crypto v0.0.0-20200204104054-c9f3fb736b72 + github.com/markusmobius/go-dateparser v1.2.1 + github.com/stretchr/testify v1.8.4 + golang.org/x/crypto v0.13.0 ) -go 1.14 +require ( + github.com/eapache/go-resiliency v1.4.0 // indirect + github.com/eapache/go-xerial-snappy v0.0.0-20230731223053-c322873962e3 // indirect + github.com/eapache/queue v1.1.0 // indirect + github.com/elliotchance/pie/v2 v2.8.0 // indirect + github.com/golang/snappy v0.0.4 // indirect + github.com/hablullah/go-hijri v1.0.2 // indirect + github.com/hablullah/go-juliandays v1.0.0 // indirect + github.com/hashicorp/errwrap v1.1.0 // indirect + github.com/hashicorp/go-multierror v1.1.1 // indirect + github.com/hashicorp/go-uuid v1.0.3 // indirect + github.com/jalaali/go-jalaali v0.0.0-20210801064154-80525e88d958 // indirect + github.com/jcmturner/aescts/v2 v2.0.0 // indirect + github.com/jcmturner/dnsutils/v2 v2.0.0 // indirect + github.com/jcmturner/gofork v1.7.6 // indirect + github.com/jcmturner/gokrb5/v8 v8.4.4 // indirect + github.com/jcmturner/rpc/v2 v2.0.3 // indirect + github.com/klauspost/compress v1.16.7 // indirect + github.com/kr/text v0.2.0 // indirect + github.com/magefile/mage v1.15.0 // indirect + github.com/pierrec/lz4/v4 v4.1.18 // indirect + github.com/pmezard/go-difflib v1.0.0 // indirect + github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475 // indirect + github.com/rogpeppe/go-internal v1.11.0 // indirect + github.com/tetratelabs/wazero v1.5.0 // indirect + github.com/wasilibs/go-re2 v1.4.0 // indirect + golang.org/x/exp v0.0.0-20230905200255-921286631fa9 // indirect + golang.org/x/net v0.15.0 // indirect + golang.org/x/sys v0.12.0 // indirect + golang.org/x/term v0.12.0 // indirect + golang.org/x/text v0.13.0 // indirect + gopkg.in/yaml.v3 v3.0.1 // indirect +) diff --git a/go.sum b/go.sum index 90fd235..de521cc 100644 --- a/go.sum +++ b/go.sum @@ -1,87 +1,130 @@ -github.com/Shopify/sarama v1.19.0 h1:9oksLxC6uxVPHPVYUmq6xhr1BOF/hHobWH2UzO67z1s= -github.com/Shopify/sarama v1.19.0/go.mod h1:FVkBWblsNy7DGZRfXLU0O9RCGt5g3g3yEuWXgklEdEo= -github.com/Shopify/sarama v1.26.1 h1:3jnfWKD7gVwbB1KSy/lE0szA9duPuSFLViK0o/d3DgA= -github.com/Shopify/sarama v1.26.1/go.mod h1:NbSGBSSndYaIhRcBtY9V0U7AyH+x71bG668AuWys/yU= -github.com/Shopify/toxiproxy v2.1.3+incompatible h1:awiJqUYH4q4OmoBiRccJykjd7B+w0loJi2keSna4X/M= -github.com/Shopify/toxiproxy v2.1.3+incompatible/go.mod h1:OXgGpZ6Cli1/URJOF1DMxUHB2q5Ap20/P/eIdh4G0pI= -github.com/Shopify/toxiproxy v2.1.4+incompatible h1:TKdv8HiTLgE5wdJuEML90aBgNWsokNbMijUGhmcoBJc= -github.com/Shopify/toxiproxy v2.1.4+incompatible/go.mod h1:OXgGpZ6Cli1/URJOF1DMxUHB2q5Ap20/P/eIdh4G0pI= +github.com/IBM/sarama v1.41.1 h1:B4/TdHce/8Ipza+qrLIeNJ9D1AOxZVp/3uDv6H/dp2M= +github.com/IBM/sarama v1.41.1/go.mod h1:JFCPURVskaipJdKRFkiE/OZqQHw7jqliaJmRwXCmSSw= +github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= -github.com/eapache/go-resiliency v1.1.0 h1:1NtRmCAqadE2FN4ZcN6g90TP3uk8cg9rn9eNK2197aU= -github.com/eapache/go-resiliency v1.1.0/go.mod h1:kFI+JgMyC7bLPUVY133qvEBtVayf5mFgVsvEsIPBvNs= -github.com/eapache/go-resiliency v1.2.0 h1:v7g92e/KSN71Rq7vSThKaWIq68fL4YHvWyiUKorFR1Q= -github.com/eapache/go-resiliency v1.2.0/go.mod h1:kFI+JgMyC7bLPUVY133qvEBtVayf5mFgVsvEsIPBvNs= -github.com/eapache/go-xerial-snappy v0.0.0-20180814174437-776d5712da21 h1:YEetp8/yCZMuEPMUDHG0CW/brkkEp8mzqk2+ODEitlw= -github.com/eapache/go-xerial-snappy v0.0.0-20180814174437-776d5712da21/go.mod h1:+020luEh2TKB4/GOp8oxxtq0Daoen/Cii55CzbTV6DU= +github.com/eapache/go-resiliency v1.4.0 h1:3OK9bWpPk5q6pbFAaYSEwD9CLUSHG8bnZuqX2yMt3B0= +github.com/eapache/go-resiliency v1.4.0/go.mod h1:5yPzW0MIvSe0JDsv0v+DvcjEv2FyD6iZYSs1ZI+iQho= +github.com/eapache/go-xerial-snappy v0.0.0-20230731223053-c322873962e3 h1:Oy0F4ALJ04o5Qqpdz8XLIpNA3WM/iSIXqxtqo7UGVws= +github.com/eapache/go-xerial-snappy v0.0.0-20230731223053-c322873962e3/go.mod h1:YvSRo5mw33fLEx1+DlK6L2VV43tJt5Eyel9n9XBcR+0= github.com/eapache/queue v1.1.0 h1:YOEu7KNc61ntiQlcEeUIoDTJ2o8mQznoNvUhiigpIqc= github.com/eapache/queue v1.1.0/go.mod h1:6eCeP0CKFpHLu8blIFXhExK/dRa7WDZfr6jVFPTqq+I= +github.com/elliotchance/pie/v2 v2.8.0 h1://QS43W8sEha8XV/fjngO5iMudN3XARJV5cpBayAcVY= +github.com/elliotchance/pie/v2 v2.8.0/go.mod h1:18t0dgGFH006g4eVdDtWfgFZPQEgl10IoEO8YWEq3Og= github.com/fortytw2/leaktest v1.3.0 h1:u8491cBMTQ8ft8aeV+adlcytMZylmA5nnwwkRZjI8vw= github.com/fortytw2/leaktest v1.3.0/go.mod h1:jDsjWgpAGjm2CA7WthBh/CdZYEPF31XHquHwclZch5g= -github.com/frankban/quicktest v1.7.2/go.mod h1:jaStnuzAqU1AJdCO0l53JDCJrVDKcS03DbaAcR7Ks/o= -github.com/golang/snappy v0.0.0-20180518054509-2e65f85255db h1:woRePGFeVFfLKN/pOkfl+p/TAqKOfFu+7KPlMVpok/w= -github.com/golang/snappy v0.0.0-20180518054509-2e65f85255db/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= -github.com/golang/snappy v0.0.1 h1:Qgr9rKW7uDUkrbSmQeiDsGa8SjGyCOGtuasMWwvp2P4= -github.com/golang/snappy v0.0.1/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= -github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= -github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= -github.com/hashicorp/go-uuid v1.0.2 h1:cfejS+Tpcp13yd5nYHWDI6qVCny6wyX2Mt5SGur2IGE= +github.com/golang/snappy v0.0.4 h1:yAGX7huGHXlcLOEtBnF4w7FQwA26wojNCwOYAEhLjQM= +github.com/golang/snappy v0.0.4/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= +github.com/gorilla/securecookie v1.1.1/go.mod h1:ra0sb63/xPlUeL+yeDciTfxMRAA+MP+HVt/4epWDjd4= +github.com/gorilla/sessions v1.2.1/go.mod h1:dk2InVEVJ0sfLlnXv9EAgkf6ecYs/i80K/zI+bUmuGM= +github.com/hablullah/go-hijri v1.0.2 h1:drT/MZpSZJQXo7jftf5fthArShcaMtsal0Zf/dnmp6k= +github.com/hablullah/go-hijri v1.0.2/go.mod h1:OS5qyYLDjORXzK4O1adFw9Q5WfhOcMdAKglDkcTxgWQ= +github.com/hablullah/go-juliandays v1.0.0 h1:A8YM7wIj16SzlKT0SRJc9CD29iiaUzpBLzh5hr0/5p0= +github.com/hablullah/go-juliandays v1.0.0/go.mod h1:0JOYq4oFOuDja+oospuc61YoX+uNEn7Z6uHYTbBzdGc= +github.com/hashicorp/errwrap v1.0.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4= +github.com/hashicorp/errwrap v1.1.0 h1:OxrOeh75EUXMY8TBjag2fzXGZ40LB6IKw45YeGUDY2I= +github.com/hashicorp/errwrap v1.1.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4= +github.com/hashicorp/go-multierror v1.1.1 h1:H5DkEtf6CXdFp0N0Em5UCwQpXMWke8IA0+lD48awMYo= +github.com/hashicorp/go-multierror v1.1.1/go.mod h1:iw975J/qwKPdAO1clOe2L8331t/9/fmwbPZ6JB6eMoM= github.com/hashicorp/go-uuid v1.0.2/go.mod h1:6SBZvOh/SIDV7/2o3Jml5SYk/TvGqwFJ/bN7x4byOro= -github.com/jcmturner/gofork v1.0.0 h1:J7uCkflzTEhUZ64xqKnkDxq3kzc96ajM1Gli5ktUem8= -github.com/jcmturner/gofork v1.0.0/go.mod h1:MK8+TM0La+2rjBD4jE12Kj1pCCxK7d2LK/UM3ncEo0o= -github.com/klauspost/compress v1.9.8 h1:VMAMUUOh+gaxKTMk+zqbjsSjsIcUcL/LF4o63i82QyA= -github.com/klauspost/compress v1.9.8/go.mod h1:RyIbtBH6LamlWaDj8nUwkbUhJ87Yi3uG0guNDohfE1A= -github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= -github.com/kr/pretty v0.2.0/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI= -github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= -github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= -github.com/pierrec/lz4 v2.0.5+incompatible h1:2xWsjqPFWcplujydGg4WmhC/6fZqK42wMM8aXeqhl0I= -github.com/pierrec/lz4 v2.0.5+incompatible/go.mod h1:pdkljMzZIN41W+lC3N2tnIh5sFi+IEE17M5jbnwPHcY= -github.com/pierrec/lz4 v2.4.1+incompatible h1:mFe7ttWaflA46Mhqh+jUfjp2qTbPYxLB2/OyBppH9dg= -github.com/pierrec/lz4 v2.4.1+incompatible/go.mod h1:pdkljMzZIN41W+lC3N2tnIh5sFi+IEE17M5jbnwPHcY= +github.com/hashicorp/go-uuid v1.0.3 h1:2gKiV6YVmrJ1i2CKKa9obLvRieoRGviZFL26PcT/Co8= +github.com/hashicorp/go-uuid v1.0.3/go.mod h1:6SBZvOh/SIDV7/2o3Jml5SYk/TvGqwFJ/bN7x4byOro= +github.com/jalaali/go-jalaali v0.0.0-20210801064154-80525e88d958 h1:qxLoi6CAcXVzjfvu+KXIXJOAsQB62LXjsfbOaErsVzE= +github.com/jalaali/go-jalaali v0.0.0-20210801064154-80525e88d958/go.mod h1:Wqfu7mjUHj9WDzSSPI5KfBclTTEnLveRUFr/ujWnTgE= +github.com/jcmturner/aescts/v2 v2.0.0 h1:9YKLH6ey7H4eDBXW8khjYslgyqG2xZikXP0EQFKrle8= +github.com/jcmturner/aescts/v2 v2.0.0/go.mod h1:AiaICIRyfYg35RUkr8yESTqvSy7csK90qZ5xfvvsoNs= +github.com/jcmturner/dnsutils/v2 v2.0.0 h1:lltnkeZGL0wILNvrNiVCR6Ro5PGU/SeBvVO/8c/iPbo= +github.com/jcmturner/dnsutils/v2 v2.0.0/go.mod h1:b0TnjGOvI/n42bZa+hmXL+kFJZsFT7G4t3HTlQ184QM= +github.com/jcmturner/gofork v1.7.6 h1:QH0l3hzAU1tfT3rZCnW5zXl+orbkNMMRGJfdJjHVETg= +github.com/jcmturner/gofork v1.7.6/go.mod h1:1622LH6i/EZqLloHfE7IeZ0uEJwMSUyQ/nDd82IeqRo= +github.com/jcmturner/goidentity/v6 v6.0.1 h1:VKnZd2oEIMorCTsFBnJWbExfNN7yZr3EhJAxwOkZg6o= +github.com/jcmturner/goidentity/v6 v6.0.1/go.mod h1:X1YW3bgtvwAXju7V3LCIMpY0Gbxyjn/mY9zx4tFonSg= +github.com/jcmturner/gokrb5/v8 v8.4.4 h1:x1Sv4HaTpepFkXbt2IkL29DXRf8sOfZXo8eRKh687T8= +github.com/jcmturner/gokrb5/v8 v8.4.4/go.mod h1:1btQEpgT6k+unzCwX1KdWMEwPPkkgBtP+F6aCACiMrs= +github.com/jcmturner/rpc/v2 v2.0.3 h1:7FXXj8Ti1IaVFpSAziCZWNzbNuZmnvw/i6CqLNdWfZY= +github.com/jcmturner/rpc/v2 v2.0.3/go.mod h1:VUJYCIDm3PVOEHw8sgt091/20OJjskO/YJki3ELg/Hc= +github.com/klauspost/compress v1.16.7 h1:2mk3MPGNzKyxErAw8YaohYh69+pa4sIQSC0fPGCFR9I= +github.com/klauspost/compress v1.16.7/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE= +github.com/kr/pretty v0.3.0 h1:WgNl7dwNpEZ6jJ9k1snq4pZsg7DOEN8hP9Xw0Tsjwk0= +github.com/kr/pretty v0.3.0/go.mod h1:640gp4NfQd8pI5XOwp5fnNeVWj67G7CFk/SaSQn7NBk= +github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= +github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= +github.com/magefile/mage v1.15.0 h1:BvGheCMAsG3bWUDbZ8AyXXpCNwU9u5CB6sM+HNb9HYg= +github.com/magefile/mage v1.15.0/go.mod h1:z5UZb/iS3GoOSn0JgWuiw7dxlurVYTu+/jHXqQg881A= +github.com/markusmobius/go-dateparser v1.2.1 h1:mYRRdu3TzpAeE6fSl2Gn3arfxEtoTRvFOKlumlVsUtg= +github.com/markusmobius/go-dateparser v1.2.1/go.mod h1:5xYsZ1h7iB3sE1BSu8bkjYpbFST7EU1/AFxcyO3mgYg= +github.com/pierrec/lz4/v4 v4.1.18 h1:xaKrnTkyoqfh1YItXl56+6KJNVYWlEEPuAQW9xsplYQ= +github.com/pierrec/lz4/v4 v4.1.18/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= -github.com/rcrowley/go-metrics v0.0.0-20180503174638-e2704e165165 h1:nkcn14uNmFEuGCb2mBZbBb24RdNRL08b/wb+xBOYpuk= -github.com/rcrowley/go-metrics v0.0.0-20180503174638-e2704e165165/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4= -github.com/rcrowley/go-metrics v0.0.0-20190826022208-cac0b30c2563 h1:dY6ETXrvDG7Sa4vE8ZQG4yqWg6UnOcbqTAahkV813vQ= -github.com/rcrowley/go-metrics v0.0.0-20190826022208-cac0b30c2563/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4= +github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475 h1:N/ElC8H3+5XpJzTSTfLsJV/mx9Q9g7kxmchpfZyxgzM= +github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4= +github.com/rogpeppe/go-internal v1.11.0 h1:cWPaGQEPrBb5/AsnsZesgZZ9yb1OQ+GOISoDNXVBh4M= +github.com/rogpeppe/go-internal v1.11.0/go.mod h1:ddIwULY96R17DhadqLgMfk9H9tvdUzkipdSkR5nkCZA= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= -github.com/stretchr/testify v1.2.2 h1:bSDNvY7ZPG5RlJ8otE/7V6gMiyenm9RtJ7IUVIAoJ1w= -github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= +github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= +github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo= github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4= -github.com/stretchr/testify v1.5.1 h1:nOGnQDM7FYENwehXlg/kFVnos3rEvtKTjRvOWSzb6H4= -github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA= -github.com/xdg/scram v0.0.0-20180814205039-7eeb5667e42c/go.mod h1:lB8K/P019DLNhemzwFU4jHLhdvlE6uDZjXFejJXr49I= -github.com/xdg/stringprep v1.0.0/go.mod h1:Jhud4/sHMO4oL310DaZAKk9ZaJ08SJfe+sJh0HrGL1Y= -golang.org/x/crypto v0.0.0-20181001203147-e3636079e1a4 h1:Vk3wNqEZwyGyei9yq5ekj7frek2u7HUfffJ1/opblzc= -golang.org/x/crypto v0.0.0-20181001203147-e3636079e1a4/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4= +github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= +github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= +github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk= +github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo= +github.com/tetratelabs/wazero v1.5.0 h1:Yz3fZHivfDiZFUXnWMPUoiW7s8tC1sjdBtlJn08qYa0= +github.com/tetratelabs/wazero v1.5.0/go.mod h1:0U0G41+ochRKoPKCJlh0jMg1CHkyfK8kDqiirMmKY8A= +github.com/wasilibs/go-re2 v1.4.0 h1:Jp6BM8G/zajgY1BCQUm3i7oGMdR1gA5EBv87wGd2ysc= +github.com/wasilibs/go-re2 v1.4.0/go.mod h1:hLzlKjEgON+17hWjikLx8hJBkikyjQH/lsqCy9t6tIY= +github.com/wasilibs/nottinygc v0.4.0 h1:h1TJMihMC4neN6Zq+WKpLxgd9xCFMw7O9ETLwY2exJQ= +github.com/wasilibs/nottinygc v0.4.0/go.mod h1:oDcIotskuYNMpqMF23l7Z8uzD4TC0WXHK8jetlB3HIo= +github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= -golang.org/x/crypto v0.0.0-20200204104054-c9f3fb736b72 h1:+ELyKg6m8UBf0nPFSqD0mi7zUfwPyXo23HNjMnXPz7w= -golang.org/x/crypto v0.0.0-20200204104054-c9f3fb736b72/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= -golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= -golang.org/x/net v0.0.0-20200202094626-16171245cfb2 h1:CCH4IOTTfewWjGOlSp+zGcjutRKlBEZQ6wTn8ozI/nI= -golang.org/x/net v0.0.0-20200202094626-16171245cfb2/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= -golang.org/x/sys v0.0.0-20181005133103-4497e2df6f9e h1:EfdBzeKbFSvOjoIqSZcfS8wp0FBLokGBEs9lz1OtSg0= -golang.org/x/sys v0.0.0-20181005133103-4497e2df6f9e/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= +golang.org/x/crypto v0.6.0/go.mod h1:OFC/31mSvZgRz0V1QTNCzfAI1aIRzbiufJtkMIlEp58= +golang.org/x/crypto v0.13.0 h1:mvySKfSWJ+UKUii46M40LOvyWfN0s2U+46/jDd0e6Ck= +golang.org/x/crypto v0.13.0/go.mod h1:y6Z2r+Rw4iayiXXAIxJIDAJ1zMW4yaTpebo8fPOliYc= +golang.org/x/exp v0.0.0-20230905200255-921286631fa9 h1:GoHiUyI/Tp2nVkLI2mCxVkOjsbSXD66ic0XW0js0R9g= +golang.org/x/exp v0.0.0-20230905200255-921286631fa9/go.mod h1:S2oDrQGGwySpoQPVqRShND87VCbxmc6bL1Yd2oYrm6k= +golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4/go.mod h1:jJ57K6gSWd91VN4djpZkiMVwK6gcyfeH4XE8wZrZaV4= +golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/net v0.0.0-20200114155413-6afb5195e5aa/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg= +golang.org/x/net v0.0.0-20220722155237-a158d28d115b/go.mod h1:XRhObCWvk6IyKnWLug+ECip1KBveYUHfp+8e9klMJ9c= +golang.org/x/net v0.6.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs= +golang.org/x/net v0.7.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs= +golang.org/x/net v0.15.0 h1:ugBLEUaxABaB5AJqW9enI0ACdci2RUd4eP51NTBvuJ8= +golang.org/x/net v0.15.0/go.mod h1:idbUs1IY1+zTqbi8yxTbhexhEEk5ur9LInksu6HrEpk= +golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.3.0 h1:ftCYgMx6zT/asHUrPw8BLLscYtGznsLAnjq5RH9P66E= +golang.org/x/sync v0.3.0/go.mod h1:FU7BRWz2tNW+3quACPkgCx/L+uEAv1htQ0V83Z9Rj+Y= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= -golang.org/x/sys v0.0.0-20190412213103-97732733099d h1:+R4KGOnez64A81RvjARKc4UT5/tI9ujCIVX+P5KiHuI= -golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.12.0 h1:CM0HF96J0hcLAwsHPJZjfdNzs0gftsLfgKt57wWHJ0o= +golang.org/x/sys v0.12.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= +golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= +golang.org/x/term v0.5.0/go.mod h1:jMB1sMXY+tzblOD4FWmEbocvup2/aLOaQEp7JmGp78k= +golang.org/x/term v0.12.0 h1:/ZfYdc3zq+q02Rv9vGqTeSItdzZTSNDmfTi0mBAuidU= +golang.org/x/term v0.12.0/go.mod h1:owVbMEjm3cBLCHdkQu9b1opXd4ETQWc3BhuQGKgXgvU= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= -golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk= +golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= +golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ= +golang.org/x/text v0.7.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8= +golang.org/x/text v0.13.0 h1:ablQoSUd0tRdKxZewP80B+BaqeKJuVhuRxj/dkrun3k= +golang.org/x/text v0.13.0/go.mod h1:TvPlkZtksWOMsz7fbANvkp4WM8x/WCo/om8BMLbz+aE= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= -golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= +golang.org/x/tools v0.1.12/go.mod h1:hNGJHUnrk76NpqgfD5Aqm5Crs+Hm0VOH/i9J2+nxYbc= +golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= -gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= -gopkg.in/jcmturner/aescts.v1 v1.0.1 h1:cVVZBK2b1zY26haWB4vbBiZrfFQnfbTVrE3xZq6hrEw= -gopkg.in/jcmturner/aescts.v1 v1.0.1/go.mod h1:nsR8qBOg+OucoIW+WMhB3GspUQXq9XorLnQb9XtvcOo= -gopkg.in/jcmturner/dnsutils.v1 v1.0.1 h1:cIuC1OLRGZrld+16ZJvvZxVJeKPsvd5eUIvxfoN5hSM= -gopkg.in/jcmturner/dnsutils.v1 v1.0.1/go.mod h1:m3v+5svpVOhtFAP/wSz+yzh4Mc0Fg7eRhxkJMWSIz9Q= -gopkg.in/jcmturner/goidentity.v3 v3.0.0/go.mod h1:oG2kH0IvSYNIu80dVAyu/yoefjq1mNfM5bm88whjWx4= -gopkg.in/jcmturner/gokrb5.v7 v7.5.0 h1:a9tsXlIDD9SKxotJMK3niV7rPZAJeX2aD/0yg3qlIrg= -gopkg.in/jcmturner/gokrb5.v7 v7.5.0/go.mod h1:l8VISx+WGYp+Fp7KRbsiUuXTTOnxIc3Tuvyavf11/WM= -gopkg.in/jcmturner/rpc.v1 v1.1.0 h1:QHIUxTX1ISuAv9dD2wJ9HWQVuWDX/Zc0PfeC2tjc4rU= -gopkg.in/jcmturner/rpc.v1 v1.1.0/go.mod h1:YIdkC4XfD6GXbzje11McwsDuOlZQSb9W4vfLvuNnlv8= +gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk= +gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q= gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= -gopkg.in/yaml.v2 v2.2.8 h1:obN1ZagJSUGI0Ek/LBmuj4SNLPfIny3KsKFopxRdj10= -gopkg.in/yaml.v2 v2.2.8/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= +gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/group.go b/group.go index 6b4fbda..321bf40 100644 --- a/group.go +++ b/group.go @@ -12,10 +12,13 @@ import ( "strings" "sync" - "github.com/Shopify/sarama" + "github.com/IBM/sarama" + dps "github.com/markusmobius/go-dateparser" ) type groupCmd struct { + baseCmd + brokers []string auth authConfig group string @@ -24,7 +27,7 @@ type groupCmd struct { topic string partitions []int32 reset int64 - verbose bool + resetTime bool pretty bool version sarama.KafkaVersion offsets bool @@ -63,7 +66,7 @@ func (cmd *groupCmd) run(args []string) { } brokers := cmd.client.Brokers() - fmt.Fprintf(os.Stderr, "found %v brokers\n", len(brokers)) + cmd.infof("found %v brokers\n", len(brokers)) groups := []string{cmd.group} if cmd.group == "" { @@ -74,7 +77,7 @@ func (cmd *groupCmd) run(args []string) { } } } - fmt.Fprintf(os.Stderr, "found %v groups\n", len(groups)) + cmd.infof("found %v groups\n", len(groups)) topics := []string{cmd.topic} if cmd.topic == "" { @@ -85,7 +88,7 @@ func (cmd *groupCmd) run(args []string) { } } } - fmt.Fprintf(os.Stderr, "found %v topics\n", len(topics)) + cmd.infof("found %v topics\n", len(topics)) out := make(chan printContext) go print(out, cmd.pretty) @@ -96,9 +99,7 @@ func (cmd *groupCmd) run(args []string) { out <- ctx <-ctx.done - if cmd.verbose { - fmt.Fprintf(os.Stderr, "%v/%v\n", i+1, len(groups)) - } + cmd.infof("%v/%v\n", i+1, len(groups)) } return } @@ -108,7 +109,7 @@ func (cmd *groupCmd) run(args []string) { parts := cmd.partitions if len(parts) == 0 { parts = cmd.fetchPartitions(topic) - fmt.Fprintf(os.Stderr, "found partitions=%v for topic=%v\n", parts, topic) + cmd.infof("found partitions=%v for topic=%v\n", parts, topic) } topicPartitions[topic] = parts } @@ -158,15 +159,13 @@ awaitGroupOffsets: } } -func (cmd *groupCmd) resolveOffset(top string, part int32, off int64) int64 { - resolvedOff, err := cmd.client.GetOffset(top, part, off) +func (cmd *groupCmd) resolveOffset(top string, part int32, time int64) int64 { + resolvedOff, err := cmd.client.GetOffset(top, part, time) if err != nil { failf("failed to get offset to reset to for partition=%d err=%v", part, err) } - if cmd.verbose { - fmt.Fprintf(os.Stderr, "resolved offset %v for topic=%s partition=%d to %v\n", off, top, part, resolvedOff) - } + cmd.infof("resolved offset %v for topic=%s partition=%d to %v\n", time, top, part, resolvedOff) return resolvedOff } @@ -174,9 +173,7 @@ func (cmd *groupCmd) resolveOffset(top string, part int32, off int64) int64 { func (cmd *groupCmd) fetchGroupOffset(wg *sync.WaitGroup, grp, top string, part int32, results chan<- groupOffset) { defer wg.Done() - if cmd.verbose { - fmt.Fprintf(os.Stderr, "fetching offset information for group=%v topic=%v partition=%v\n", grp, top, part) - } + cmd.infof("fetching offset information for group=%v topic=%v partition=%v\n", grp, top, part) offsetManager, err := sarama.NewOffsetManagerFromClient(grp, cmd.client) if err != nil { @@ -190,7 +187,7 @@ func (cmd *groupCmd) fetchGroupOffset(wg *sync.WaitGroup, grp, top string, part } defer logClose("partition offset manager", pom) - specialOffset := cmd.reset == sarama.OffsetNewest || cmd.reset == sarama.OffsetOldest + specialOffset := cmd.resetTime || cmd.reset == sarama.OffsetNewest || cmd.reset == sarama.OffsetOldest groupOff, _ := pom.NextOffset() if cmd.reset >= 0 || specialOffset { @@ -207,12 +204,6 @@ func (cmd *groupCmd) fetchGroupOffset(wg *sync.WaitGroup, grp, top string, part groupOff = resolvedOff } - // we haven't reset it, and it wasn't set before - lag depends on client's config - if specialOffset { - results <- groupOffset{Partition: part} - return - } - partOff := cmd.resolveOffset(top, part, sarama.OffsetNewest) lag := partOff - groupOff results <- groupOffset{Partition: part, Offset: &groupOff, Lag: &lag} @@ -323,17 +314,20 @@ func (cmd *groupCmd) saramaConfig() *sarama.Config { cfg.Version = cmd.version if usr, err = user.Current(); err != nil { - fmt.Fprintf(os.Stderr, "Failed to read current user err=%v", err) + cmd.infof("Failed to read current user err=%v", err) } cfg.ClientID = "kt-group-" + sanitizeUsername(usr.Username) + cmd.infof("sarama client configuration %#v\n", cfg) - setupAuth(cmd.auth, cfg) + if err = setupAuth(cmd.auth, cfg); err != nil { + failf("failed to setup auth err=%v", err) + } return cfg } func (cmd *groupCmd) failStartup(msg string) { - fmt.Fprintln(os.Stderr, msg) + warnf(msg) failf("use \"kt group -help\" for more information") } @@ -353,7 +347,10 @@ func (cmd *groupCmd) parseArgs(as []string) { cmd.verbose = args.verbose cmd.pretty = args.pretty cmd.offsets = args.offsets - cmd.version = kafkaVersion(args.version) + cmd.version, err = chooseKafkaVersion(args.version, os.Getenv(ENV_KAFKA_VERSION)) + if err != nil { + failf("failed to read kafka version err=%v", err) + } readAuthFile(args.auth, os.Getenv(ENV_AUTH), &cmd.auth) @@ -387,6 +384,8 @@ func (cmd *groupCmd) parseArgs(as []string) { failf("group and topic are required to reset offsets.") } + cmd.resetTime = false + switch args.reset { case "newest": cmd.reset = sarama.OffsetNewest @@ -398,10 +397,18 @@ func (cmd *groupCmd) parseArgs(as []string) { default: cmd.reset, err = strconv.ParseInt(args.reset, 10, 64) if err != nil { - if cmd.verbose { - fmt.Fprintf(os.Stderr, "failed to parse set %#v err=%v", args.reset, err) + var dt, derr = dps.Parse(nil, args.reset) + if derr == nil { + err = nil + cmd.reset = dt.Time.UnixMilli() + cmd.resetTime = true + } else { + err = derr } - cmd.failStartup(fmt.Sprintf(`set value %#v not valid. either newest, oldest or specific offset expected.`, args.reset)) + } + if err != nil { + warnf("failed to parse set %#v err=%v", args.reset, err) + cmd.failStartup(fmt.Sprintf(`set value %#v not valid. either "newest", "oldest", a time, or a specific offset expected. See https://github.com/markusmobius/go-dateparser for supported time formats. `, args.reset)) } } @@ -430,6 +437,7 @@ type groupArgs struct { filterGroups string filterTopics string reset string + resetTime bool verbose bool pretty bool version string @@ -450,7 +458,7 @@ func (cmd *groupCmd) parseFlags(as []string) groupArgs { flags.BoolVar(&args.pretty, "pretty", true, "Control output pretty printing.") flags.StringVar(&args.version, "version", "", "Kafka protocol version") flags.StringVar(&args.partitions, "partitions", allPartitionsHuman, "comma separated list of partitions to limit offsets to, or all") - flags.BoolVar(&args.offsets, "offsets", true, "Controls if offsets should be fetched (defauls to true)") + flags.BoolVar(&args.offsets, "offsets", true, "Controls if offsets should be fetched (defaults to true)") flags.Usage = func() { fmt.Fprintln(os.Stderr, "Usage of group:") @@ -474,7 +482,7 @@ The values supplied on the command line win over environment variable values. The group command can be used to list groups, their offsets and lag and to reset a group's offset. -When an explicit offset hasn't been set yet, kt prints out the respective sarama constants, cf. https://godoc.org/github.com/Shopify/sarama#pkg-constants +When an explicit offset hasn't been set yet, kt prints out the respective sarama constants, cf. https://godoc.org/github.com/IBM/sarama#pkg-constants To simply list all groups: diff --git a/main.go b/main.go index 2a0aebc..f7622d6 100644 --- a/main.go +++ b/main.go @@ -5,18 +5,29 @@ import ( "os" ) -// TODO have these all the time +const AppVersion = "v14.0.0-pre" + var buildVersion, buildTime string -type command interface { - run(args []string) -} +var versionMessage = fmt.Sprintf(`kt version %s`, AppVersion) func init() { - if len(buildTime) > 0 && len(buildVersion) > 0 { - usageMessage = fmt.Sprintf(`%v -Build %v from %v.`, usageMessage, buildVersion, buildTime) + if buildVersion == "" && buildTime == "" { + return + } + + versionMessage += " (" + if buildVersion != "" { + versionMessage += buildVersion + } + + if buildTime != "" { + if buildVersion != "" { + versionMessage += " @ " + } + versionMessage += buildTime } + versionMessage += ")" } var usageMessage = fmt.Sprintf(`kt is a tool for Kafka. @@ -33,7 +44,9 @@ The commands are: group consumer group information and modification. admin basic cluster administration. -Use "kt [command] -help" for for information about the command. +Use "kt [command] -help" for more information about the command. + +Use "kt -version" for details on what version you are running. Authentication: @@ -41,33 +54,34 @@ Authentication with Kafka can be configured via a JSON file. You can set the file name via an "-auth" flag to each command or set it via the environment variable %s. -More at https://github.com/fgeller/kt`, ENV_AUTH) +You can find more details at https://github.com/fgeller/kt + +%s`, ENV_AUTH, versionMessage) -func parseArgs() command { +func main() { if len(os.Args) < 2 { failf(usageMessage) } + var cmd command switch os.Args[1] { case "consume": - return &consumeCmd{} + cmd = &consumeCmd{} case "produce": - return &produceCmd{} + cmd = &produceCmd{} case "topic": - return &topicCmd{} + cmd = &topicCmd{} case "group": - return &groupCmd{} + cmd = &groupCmd{} case "admin": - return &adminCmd{} + cmd = &adminCmd{} case "-h", "-help", "--help": quitf(usageMessage) + case "-version", "--version": + quitf(versionMessage) default: failf(usageMessage) } - return nil -} -func main() { - cmd := parseArgs() cmd.run(os.Args[2:]) } diff --git a/produce.go b/produce.go index b0ae1cb..2e05ebb 100644 --- a/produce.go +++ b/produce.go @@ -1,6 +1,7 @@ package main import ( + "bufio" "encoding/base64" "encoding/hex" "encoding/json" @@ -8,11 +9,12 @@ import ( "fmt" "log" "os" + "os/signal" "os/user" "strings" "time" - "github.com/Shopify/sarama" + "github.com/IBM/sarama" ) type produceArgs struct { @@ -59,9 +61,9 @@ func (cmd *produceCmd) read(as []string) produceArgs { flags.IntVar(&args.bufferSize, "buffersize", 16777216, "Buffer size for scanning stdin, defaults to 16777216=16*1024*1024.") flags.Usage = func() { - fmt.Fprintln(os.Stderr, "Usage of produce:") + warnf("Usage of produce:") flags.PrintDefaults() - fmt.Fprintln(os.Stderr, produceDocString) + warnf(produceDocString + "\n") } err := flags.Parse(as) @@ -75,7 +77,7 @@ func (cmd *produceCmd) read(as []string) produceArgs { } func (cmd *produceCmd) failStartup(msg string) { - fmt.Fprintln(os.Stderr, msg) + warnf(msg) failf("use \"kt produce -help\" for more information") } @@ -120,7 +122,6 @@ func (cmd *produceCmd) parseArgs(as []string) { return } cmd.decodeKey = args.decodeKey - cmd.batch = args.batch cmd.timeout = args.timeout cmd.verbose = args.verbose @@ -128,7 +129,13 @@ func (cmd *produceCmd) parseArgs(as []string) { cmd.literal = args.literal cmd.partition = int32(args.partition) cmd.partitioner = args.partitioner - cmd.version = kafkaVersion(args.version) + + var err error + cmd.version, err = chooseKafkaVersion(args.version, os.Getenv(ENV_KAFKA_VERSION)) + if err != nil { + failf("failed to read kafka version err=%v", err) + } + cmd.compression = kafkaCompression(args.compression) cmd.bufferSize = args.bufferSize } @@ -161,12 +168,10 @@ func (cmd *produceCmd) findLeaders() { cfg.Producer.RequiredAcks = sarama.WaitForAll cfg.Version = cmd.version if usr, err = user.Current(); err != nil { - fmt.Fprintf(os.Stderr, "Failed to read current user err=%v", err) + cmd.infof("Failed to read current user err=%v", err) } cfg.ClientID = "kt-produce-" + sanitizeUsername(usr.Username) - if cmd.verbose { - fmt.Fprintf(os.Stderr, "sarama client configuration %#v\n", cfg) - } + cmd.infof("sarama client configuration %#v\n", cfg) if err = setupAuth(cmd.auth, cfg); err != nil { failf("failed to setup auth err=%v", err) @@ -176,16 +181,16 @@ loop: for _, addr := range cmd.brokers { broker := sarama.NewBroker(addr) if err = broker.Open(cfg); err != nil { - fmt.Fprintf(os.Stderr, "Failed to open broker connection to %v. err=%s\n", addr, err) + cmd.infof("Failed to open broker connection to %v. err=%s\n", addr, err) continue loop } if connected, err := broker.Connected(); !connected || err != nil { - fmt.Fprintf(os.Stderr, "Failed to open broker connection to %v. err=%s\n", addr, err) + cmd.infof("Failed to open broker connection to %v. err=%s\n", addr, err) continue loop } if res, err = broker.GetMetadata(&req); err != nil { - fmt.Fprintf(os.Stderr, "Failed to get metadata from %#v. err=%v\n", addr, err) + cmd.infof("Failed to get metadata from %#v. err=%v\n", addr, err) continue loop } @@ -197,7 +202,7 @@ loop: for _, tm := range res.Topics { if tm.Name == cmd.topic { if tm.Err != sarama.ErrNoError { - fmt.Fprintf(os.Stderr, "Failed to get metadata from %#v. err=%v\n", addr, tm.Err) + cmd.infof("Failed to get metadata from %#v. err=%v\n", addr, tm.Err) continue loop } @@ -226,12 +231,13 @@ loop: } type produceCmd struct { + baseCmd + topic string brokers []string auth authConfig batch int timeout time.Duration - verbose bool pretty bool literal bool partition int32 @@ -260,16 +266,38 @@ func (cmd *produceCmd) run(as []string) { out := make(chan printContext) q := make(chan struct{}) - go readStdinLines(cmd.bufferSize, stdin) + go cmd.readStdinLines(cmd.bufferSize, stdin) go print(out, cmd.pretty) - go listenForInterrupt(q) + go cmd.listenForInterrupt(q) go cmd.readInput(q, stdin, lines) go cmd.deserializeLines(lines, messages, int32(len(cmd.leaders))) go cmd.batchRecords(messages, batchedMessages) cmd.produce(batchedMessages, out) } +func (cmd *produceCmd) readStdinLines(max int, out chan string) { + scanner := bufio.NewScanner(os.Stdin) + scanner.Buffer(make([]byte, max), max) + + for scanner.Scan() { + out <- scanner.Text() + } + + if err := scanner.Err(); err != nil { + warnf("scanning input failed err=%v\n", err) + } + close(out) +} + +func (cmd *produceCmd) listenForInterrupt(q chan struct{}) { + signals := make(chan os.Signal, 1) + signal.Notify(signals, os.Kill, os.Interrupt) + sig := <-signals + warnf("received signal %s - triggering shutdown\n", sig) + close(q) +} + func (cmd *produceCmd) close() { for _, b := range cmd.leaders { var ( @@ -278,7 +306,7 @@ func (cmd *produceCmd) close() { ) if connected, err = b.Connected(); err != nil { - fmt.Fprintf(os.Stderr, "Failed to check if broker is connected. err=%s\n", err) + cmd.infof("Failed to check if broker is connected. err=%s\n", err) continue } @@ -287,7 +315,7 @@ func (cmd *produceCmd) close() { } if err = b.Close(); err != nil { - fmt.Fprintf(os.Stderr, "Failed to close broker %v connection. err=%s\n", b, err) + cmd.infof("Failed to close broker %v connection. err=%s\n", b, err) } } } @@ -308,9 +336,7 @@ func (cmd *produceCmd) deserializeLines(in chan string, out chan message, partit msg.Partition = &cmd.partition default: if err := json.Unmarshal([]byte(l), &msg); err != nil { - if cmd.verbose { - fmt.Fprintf(os.Stderr, "Failed to unmarshal input [%v], falling back to defaults. err=%v\n", l, err) - } + cmd.infof("Failed to unmarshal input [%v], falling back to defaults. err=%v\n", l, err) var v *string = &l if len(l) == 0 { v = nil @@ -441,7 +467,7 @@ func (cmd *produceCmd) produceBatch(leaders map[int32]*sarama.Broker, batch []me return fmt.Errorf("failed to send request to broker %#v. err=%s", broker, err) } - offsets, err := readPartitionOffsetResults(resp) + offsets, err := cmd.readPartitionOffsetResults(resp) if err != nil { return fmt.Errorf("failed to read producer response err=%s", err) @@ -458,12 +484,12 @@ func (cmd *produceCmd) produceBatch(leaders map[int32]*sarama.Broker, batch []me return nil } -func readPartitionOffsetResults(resp *sarama.ProduceResponse) (map[int32]partitionProduceResult, error) { +func (cmd *produceCmd) readPartitionOffsetResults(resp *sarama.ProduceResponse) (map[int32]partitionProduceResult, error) { offsets := map[int32]partitionProduceResult{} for _, blocks := range resp.Blocks { for partition, block := range blocks { if block.Err != sarama.ErrNoError { - fmt.Fprintf(os.Stderr, "Failed to send message. err=%s\n", block.Err.Error()) + warnf("Failed to send message. err=%s\n", block.Err.Error()) return offsets, block.Err } @@ -485,7 +511,7 @@ func (cmd *produceCmd) produce(in chan []message, out chan printContext) { return } if err := cmd.produceBatch(cmd.leaders, b, out); err != nil { - fmt.Fprintln(os.Stderr, err.Error()) // TODO: failf + warnf(err.Error()) // TODO: failf return } } diff --git a/system_test.go b/system_test.go index 25f9bab..9672c6a 100644 --- a/system_test.go +++ b/system_test.go @@ -13,7 +13,7 @@ import ( "testing" "time" - "github.com/Shopify/sarama" + "github.com/IBM/sarama" "github.com/stretchr/testify/require" ) @@ -32,6 +32,7 @@ func (c *cmd) run(name string, args ...string) (int, string, string) { cmd.Env = os.Environ() cmd.Env = append(cmd.Env, fmt.Sprintf("%s=localhost:9092", ENV_BROKERS)) cmd.Env = append(cmd.Env, fmt.Sprintf("%s=test-secrets/auth.json", ENV_AUTH)) + cmd.Env = append(cmd.Env, fmt.Sprintf("%s=v3.0.0", ENV_KAFKA_VERSION)) if len(c.in) > 0 { cmd.Stdin = strings.NewReader(c.in) @@ -151,9 +152,10 @@ func TestSystem(t *testing.T) { status, stdOut, stdErr = newCmd(). run("./kt", "group", + "-verbose", "-topic", topicName) - fmt.Printf(">> system test kt group -topic %v stdout:\n%s\n", topicName, stdOut) - fmt.Printf(">> system test kt group -topic %v stderr:\n%s\n", topicName, stdErr) + fmt.Printf(">> system test kt group -verbose -topic %v stdout:\n%s\n", topicName, stdOut) + fmt.Printf(">> system test kt group -verbose -topic %v stderr:\n%s\n", topicName, stdErr) require.Zero(t, status) require.Contains(t, stdErr, fmt.Sprintf("found partitions=[0] for topic=%v", topicName)) require.Contains(t, stdOut, fmt.Sprintf(`{"name":"hans","topic":"%v","offsets":[{"partition":0,"offset":1,"lag":0}]}`, topicName)) @@ -219,12 +221,13 @@ func TestSystem(t *testing.T) { status, stdOut, stdErr = newCmd(). run("./kt", "group", + "-verbose", "-topic", topicName, "-partitions", "0", "-group", "hans", "-reset", "0") - fmt.Printf(">> system test kt group -topic %v -partitions 0 -group hans -reset 0 stdout:\n%s\n", topicName, stdOut) - fmt.Printf(">> system test kt group -topic %v -partitions 0 -group hans -reset 0 stderr:\n%s\n", topicName, stdErr) + fmt.Printf(">> system test kt group -verbose -topic %v -partitions 0 -group hans -reset 0 stdout:\n%s\n", topicName, stdOut) + fmt.Printf(">> system test kt group -verbose -topic %v -partitions 0 -group hans -reset 0 stderr:\n%s\n", topicName, stdErr) require.Zero(t, status) lines = strings.Split(stdOut, "\n") @@ -248,9 +251,10 @@ func TestSystem(t *testing.T) { status, stdOut, stdErr = newCmd(). run("./kt", "group", + "-verbose", "-topic", topicName) - fmt.Printf(">> system test kt group -topic %v stdout:\n%s\n", topicName, stdOut) - fmt.Printf(">> system test kt group -topic %v stderr:\n%s\n", topicName, stdErr) + fmt.Printf(">> system test kt group -verbose -topic %v stdout:\n%s\n", topicName, stdOut) + fmt.Printf(">> system test kt group -verbose -topic %v stderr:\n%s\n", topicName, stdErr) require.Zero(t, status) require.Contains(t, stdErr, fmt.Sprintf("found partitions=[0] for topic=%v", topicName)) require.Contains(t, stdOut, fmt.Sprintf(`{"name":"hans","topic":"%v","offsets":[{"partition":0,"offset":0,"lag":2}]}`, topicName)) diff --git a/test-dependencies.yml b/test-dependencies.yml index e27c3c2..76a7cb6 100644 --- a/test-dependencies.yml +++ b/test-dependencies.yml @@ -2,7 +2,7 @@ version: '2' services: zookeeper-1: - image: confluentinc/cp-zookeeper:5.4.1 + image: confluentinc/cp-zookeeper:latest ports: - "2181:2181" environment: @@ -10,7 +10,7 @@ services: ZOOKEEPER_CLIENT_PORT: 2181 ZOOKEEPER_TICK_TIME: 2000 kafka-1: - image: confluentinc/cp-kafka:5.4.1 + image: confluentinc/cp-kafka:latest depends_on: - zookeeper-1 ports: @@ -34,3 +34,4 @@ services: volumes: - /var/run/docker.sock:/var/run/docker.sock - ./test-secrets/:/etc/kafka/secrets + diff --git a/test-secrets/broker1-ca1-signed.crt b/test-secrets/broker1-ca1-signed.crt index e45656b..fd1fe92 100644 --- a/test-secrets/broker1-ca1-signed.crt +++ b/test-secrets/broker1-ca1-signed.crt @@ -1,18 +1,23 @@ -----BEGIN CERTIFICATE----- -MIIC3DCCAcQCCQCNa2hkDfzbrDANBgkqhkiG9w0BAQUFADAwMRIwEAYDVQQDDAls -b2NhbGhvc3QxDTALBgNVBAsMBFRFU1QxCzAJBgNVBAoMAktUMB4XDTIwMDMxNTE2 -NDUzOVoXDTQ3MDczMTE2NDUzOVowMDELMAkGA1UEChMCS1QxDTALBgNVBAsTBFRF -U1QxEjAQBgNVBAMTCWxvY2FsaG9zdDCCASIwDQYJKoZIhvcNAQEBBQADggEPADCC -AQoCggEBAIdPkDan7cbUIqhNrATm1ecAn77fzRV+bM4RurA5KVb4IJQEuV6A//5f -/dqXpXyhi8EXW+39vq7kMExfr18adWKKIlotZLAokuI2fGWAx18oNXJGIR5R9YDw -vIAcFVrhijWuJaMMsN0wJ9wWOYuCgNPCGOVCmEqiakL8YfKxdedNyAAjfozuJ2VG -9oiZsLI14IulZUf29e+Qsca0Y/oJfaQEoRxd3k+0wzdcm2BuH2uDbv5d1O7O4OBd -qcdOHeLFV71OgBYbPlAMUkepghbgvdp3sx99RmUa9pw7xdbKI8EK81fc6JC9Mjs0 -vnszBOkhB5GNdyYAZ6QuQZSAbRXpI00CAwEAATANBgkqhkiG9w0BAQUFAAOCAQEA -Xz/bmZIrPCwRCZG66TJnluI2ddJq2J0ywscBwlFlJqYFKRKJOM/5tvWoXYWxHsJt -A6IHGhQEKax1I35RWzvbnYblrmwE5l/kKIEbU7y6GFDRiiUr4DQ/QxYJrwTUYORi -vScHdhdMf9reqBAbIDhPOsIFXF94XgJb+kBDdcy3KETDyiOUd7zeZp9YBZWbKbZb -YZaPSava8NrP8fVKV19N55qP4XSMQK/oqWKj6wHQrslNbsF3vFx7cSrMrE+jANMX -OGvxSsRRUDmy8EG20gKaOtgg93rtKApWii0Pb0+MEQ1tuzOu6+qGNcay0kllXE/B -ZMnt0CsKhZG5MVFJz+IdfQ== +MIIDyDCCArCgAwIBAgIULxF6KJKCcA9Uqd8v73k/6ZoJgcgwDQYJKoZIhvcNAQEL +BQAwMDESMBAGA1UEAwwJbG9jYWxob3N0MQ0wCwYDVQQLDARURVNUMQswCQYDVQQK +DAJLVDAgFw0yMzA5MDkwOTI0MzNaGA8yMDUxMDEyNDA5MjQzM1owMDELMAkGA1UE +ChMCS1QxDTALBgNVBAsTBFRFU1QxEjAQBgNVBAMTCWxvY2FsaG9zdDCCAaIwDQYJ +KoZIhvcNAQEBBQADggGPADCCAYoCggGBAJCo4CkX3xYZ0eHO1lhaIU5OFmnmw/7L +gn4w+Zz5HFarzrdmwA/jEJJG8QZc4NrKD9l8KoWqI+ilmGyQYBwwTxe3+Z/YDmpa +1v9eClCy7ITEO67rCpifFlQ1X6V3i6xY5/m0zJCO6xdRGwJCXKcq0QW5l3ulZkuq +jqneqV0DmQFDNwl8+Gq6oqI86XU9dZQaOCeVXgq2HRlpErefqAzIfcowdNUTRyt0 +6irgPwkGU5YnvIfGSjZZIgekybQHuVs0kKzkQQ7uo8pQQoYLCOG4MveAkl6f9BhJ +pVnxoa42KExIMbxL7Lx4rLTTy7BrWBh6WeTZTmncOVc3/ZfBUqCAdMvwmN7wNiJu +tOEsfCUTX8vbMKUHIAuLyjM4TYi4K5/hYdw5Mu6W8GDnz2OuuRcqtmTn2d+m7ha/ +Xys+Ulv10nuqxljjlncAZ4uZhAZ5nGcU8Ly2e/2BHND7Sj5BQsCdLDqBELcil0ef +uLiT1PsZRvdb+HDYXuRHCmR50F9GkHI49wIDAQABo1gwVjAUBgNVHREEDTALggls +b2NhbGhvc3QwHQYDVR0OBBYEFBzpPQpZk8jnAi78U33bOdtLDOARMB8GA1UdIwQY +MBaAFP74c8JMNP2DEvyjWf4rkS1ez16nMA0GCSqGSIb3DQEBCwUAA4IBAQBF+N2d +ng85nfowXjf5QpCFfAVSFn8wyNrH6OLth7Qyjim0e+X1G7sbvy4nb3glv/lrqHqw +p30SqXZnLUq6PANeOOELwfeqwEtEC1eVezzx525wbAZycKoo216w+ff5rAFn2kaM +MuOCVgQWbOhBwEELMIZdBjl+NhZMcGH5sLwwTms7vatoJTZMhSYq6VS9NTb9PiGF +WC9H8KmC3cxXVy/c3n8H4//fbJGFIznmjggJZcnbjHIUfnIRLN1HpcEKwLKlfldq +e3oT9llbazrobGGn9xb4tQhIi03Hb//jpLMdaZgyYQuSkzEd6KNi7T5Ua7Mvna5y +LRWgZhcu5S/l7VLJ -----END CERTIFICATE----- diff --git a/test-secrets/broker1.csr b/test-secrets/broker1.csr index fa6d1d7..87cd90c 100644 --- a/test-secrets/broker1.csr +++ b/test-secrets/broker1.csr @@ -1,17 +1,22 @@ -----BEGIN NEW CERTIFICATE REQUEST----- -MIICpTCCAY0CAQAwMDELMAkGA1UEChMCS1QxDTALBgNVBAsTBFRFU1QxEjAQBgNV -BAMTCWxvY2FsaG9zdDCCASIwDQYJKoZIhvcNAQEBBQADggEPADCCAQoCggEBAIdP -kDan7cbUIqhNrATm1ecAn77fzRV+bM4RurA5KVb4IJQEuV6A//5f/dqXpXyhi8EX -W+39vq7kMExfr18adWKKIlotZLAokuI2fGWAx18oNXJGIR5R9YDwvIAcFVrhijWu -JaMMsN0wJ9wWOYuCgNPCGOVCmEqiakL8YfKxdedNyAAjfozuJ2VG9oiZsLI14Iul -ZUf29e+Qsca0Y/oJfaQEoRxd3k+0wzdcm2BuH2uDbv5d1O7O4OBdqcdOHeLFV71O -gBYbPlAMUkepghbgvdp3sx99RmUa9pw7xdbKI8EK81fc6JC9Mjs0vnszBOkhB5GN -dyYAZ6QuQZSAbRXpI00CAwEAAaAwMC4GCSqGSIb3DQEJDjEhMB8wHQYDVR0OBBYE -FLBJ0Dx9eIFyda6ZQNaZm8UF/wfCMA0GCSqGSIb3DQEBCwUAA4IBAQBwIw/PRn2q -mkqEmau91wlPb/X37O+eQopW6QZ1p4GwMXYtmHaEv1fS5OjhRfP/9le7D9NviY7S -o6irFZAOGIZ7SwbRMHFR+9oKGKEn5yVRImggUtq5GjOscQ/WDdw3M4tPe52EdLrw -xsm5u406fd+yCoBcuLBJcJJDBAwuNQzNna2Kz8P9y8kcpCCVsmRrFpwnhkN/STN7 -QqM1LBOUtlfK+mqnWdqcxGxrnoaFyTDtHsnfMRQqw7MpsB5jPJUB+6cqrhOps6Wn -M38C9vCz9n7qpoa3Aj9m7E6LMFgFXsXRNKXg0ykzD85evCcDg/hDnpwOJwatfsv2 -qTltTGrKOMUJ +MIIDpTCCAg0CAQAwMDELMAkGA1UEChMCS1QxDTALBgNVBAsTBFRFU1QxEjAQBgNV +BAMTCWxvY2FsaG9zdDCCAaIwDQYJKoZIhvcNAQEBBQADggGPADCCAYoCggGBAJCo +4CkX3xYZ0eHO1lhaIU5OFmnmw/7Lgn4w+Zz5HFarzrdmwA/jEJJG8QZc4NrKD9l8 +KoWqI+ilmGyQYBwwTxe3+Z/YDmpa1v9eClCy7ITEO67rCpifFlQ1X6V3i6xY5/m0 +zJCO6xdRGwJCXKcq0QW5l3ulZkuqjqneqV0DmQFDNwl8+Gq6oqI86XU9dZQaOCeV +Xgq2HRlpErefqAzIfcowdNUTRyt06irgPwkGU5YnvIfGSjZZIgekybQHuVs0kKzk +QQ7uo8pQQoYLCOG4MveAkl6f9BhJpVnxoa42KExIMbxL7Lx4rLTTy7BrWBh6WeTZ +TmncOVc3/ZfBUqCAdMvwmN7wNiJutOEsfCUTX8vbMKUHIAuLyjM4TYi4K5/hYdw5 +Mu6W8GDnz2OuuRcqtmTn2d+m7ha/Xys+Ulv10nuqxljjlncAZ4uZhAZ5nGcU8Ly2 +e/2BHND7Sj5BQsCdLDqBELcil0efuLiT1PsZRvdb+HDYXuRHCmR50F9GkHI49wID +AQABoDAwLgYJKoZIhvcNAQkOMSEwHzAdBgNVHQ4EFgQUHOk9ClmTyOcCLvxTfds5 +20sM4BEwDQYJKoZIhvcNAQEMBQADggGBACarwzOjCatZECC11vzm/2oVP3x9S5xQ +7i9u6id0ps/hdgj3mne1RjTjFbae35TqIp/ZvcfTq0Bba/8S3Z4RDpdqBymPhCWV +87R2hnLQAkayeIWJonbRczS/PpbpCwOJ5I+JPO1FC8Z92hZr6WY5yi6cD5Qhwhxd +GtprEGOAVPLm+1GdeeyAwcGRisVDSSxHLvTdluq1bt3h7zhYfrCcXgW+toqiKNGB +Yvuj+g8pD/ApNXvqO1DHwbnzc75y5fmZihtgNLc8Ivr2QUzSLP1eVIUJ5HYT3cV4 +j5kUBVaaVkMDipY/JUYCz1Xoy/ZI74cMUWvdNFgFNddNsFYYQGdiQUEoy3Ob6Ol6 +0aL9mIk2r7jEedkKdjLOh0hIf/+3tw+a7/1FiDZ9/QShvUF57FIh3ioaIxOfEsIA +cHdcH3xGbqDv3NDmYJ+hGuIyrbOlQldljfWgwGWmqItu3E4f61CWAW30Nc+YGv45 +Ku/uMtSx8aOmYsKeAyv9YxJu8z02br3nUw== -----END NEW CERTIFICATE REQUEST----- diff --git a/test-secrets/create-certs.sh b/test-secrets/create-certs.sh index 0119946..5bdb917 100755 --- a/test-secrets/create-certs.sh +++ b/test-secrets/create-certs.sh @@ -5,28 +5,30 @@ set -o nounset \ -o verbose \ -o xtrace +rm -f kt-test.* +rm -f kafka.*.jks +rm -f snakeoil-ca-* + # Generate CA key -openssl req -new -x509 -keyout snakeoil-ca-1.key -out snakeoil-ca-1.crt -days 365 -subj '/CN=localhost/OU=TEST/O=KT' -passin pass:ktktkt -passout pass:ktktkt +openssl req -new -x509 -keyout snakeoil-ca-1.key -out snakeoil-ca-1.crt -days 365 -subj '/CN=localhost/OU=TEST/O=KT' -addext 'subjectAltName = DNS:localhost' -passin pass:ktktkt -passout pass:ktktkt for i in broker1 do echo $i + echo ">> 0 <<" keytool -genkey -noprompt \ -alias $i \ -dname "CN=localhost, OU=TEST, O=KT" \ + -ext "SAN=DNS:localhost" \ -keystore kafka.$i.keystore.jks \ -keyalg RSA \ -storepass ktktkt \ -keypass ktktkt - # Create CSR, sign the key and import back into keystore keytool -keystore kafka.$i.keystore.jks -alias $i -certreq -file $i.csr -storepass ktktkt -keypass ktktkt - - openssl x509 -req -CA snakeoil-ca-1.crt -CAkey snakeoil-ca-1.key -in $i.csr -out $i-ca1-signed.crt -days 9999 -CAcreateserial -passin pass:ktktkt - + openssl x509 -req -extfile <(printf "subjectAltName=DNS:localhost") -CA snakeoil-ca-1.crt -CAkey snakeoil-ca-1.key -in $i.csr -out $i-ca1-signed.crt -days 9999 -CAcreateserial -passin pass:ktktkt keytool -keystore kafka.$i.keystore.jks -alias CARoot -import -file snakeoil-ca-1.crt -storepass ktktkt -keypass ktktkt keytool -keystore kafka.$i.keystore.jks -alias $i -import -file $i-ca1-signed.crt -storepass ktktkt -keypass ktktkt - # Create truststore and import the CA cert. keytool -keystore kafka.$i.truststore.jks -alias CARoot -import -file snakeoil-ca-1.crt -storepass ktktkt -keypass ktktkt @@ -37,5 +39,5 @@ done # generate public/private key pair for kt openssl genrsa -out kt-test.key 2048 -openssl req -new -key kt-test.key -out kt-test.csr -subj '/CN=localhost/OU=TEST/O=KT' -openssl x509 -req -days 9999 -in kt-test.csr -CA snakeoil-ca-1.crt -CAkey snakeoil-ca-1.key -CAcreateserial -out kt-test.crt +openssl req -new -key kt-test.key -out kt-test.csr -subj '/CN=localhost/OU=TEST/O=KT' -addext 'subjectAltName = DNS:localhost' +openssl x509 -req -extfile <(printf "subjectAltName=DNS:localhost") -days 9999 -in kt-test.csr -CA snakeoil-ca-1.crt -CAkey snakeoil-ca-1.key -CAcreateserial -out kt-test.crt diff --git a/test-secrets/kafka.broker1.keystore.jks b/test-secrets/kafka.broker1.keystore.jks index be44180..48b69d1 100644 Binary files a/test-secrets/kafka.broker1.keystore.jks and b/test-secrets/kafka.broker1.keystore.jks differ diff --git a/test-secrets/kafka.broker1.truststore.jks b/test-secrets/kafka.broker1.truststore.jks index d66789d..410f78b 100644 Binary files a/test-secrets/kafka.broker1.truststore.jks and b/test-secrets/kafka.broker1.truststore.jks differ diff --git a/test-secrets/kt-test.crt b/test-secrets/kt-test.crt index e4a8e02..21c75ba 100644 --- a/test-secrets/kt-test.crt +++ b/test-secrets/kt-test.crt @@ -1,18 +1,20 @@ -----BEGIN CERTIFICATE----- -MIIC3DCCAcQCCQCNa2hkDfzbrTANBgkqhkiG9w0BAQUFADAwMRIwEAYDVQQDDAls -b2NhbGhvc3QxDTALBgNVBAsMBFRFU1QxCzAJBgNVBAoMAktUMB4XDTIwMDMxNTE2 -NDU0NVoXDTQ3MDczMTE2NDU0NVowMDESMBAGA1UEAwwJbG9jYWxob3N0MQ0wCwYD -VQQLDARURVNUMQswCQYDVQQKDAJLVDCCASIwDQYJKoZIhvcNAQEBBQADggEPADCC -AQoCggEBAKyCy6Fb+44nS3hywRL4lIUe8vGklgiDevtKFlSwn/0y2Bf4P6ubFRmi -f9HjUfuN+iVuSKBqibEbCNI8MkhSUnSS80t5AgXfINxJUW+N+wxMFON++CE/y4TB -D4sc1JmHhnHp99f3Mc7v3rbjnG9pHOIfF9MRxuxo7oYg46pV4M59wE+eN3XFvt+1 -IXyaxLVNCFueyKA7IE5ECWECJ76JpbGjnvNNJTpcuIkoKk58cGbwvnKDtaVnZVMf -ImDyVpth1Pao1iMeNnOKGpSzvEELD3dCOGzdi9vEo12Bj+7C6y4gV7YdCGod18LN -QYUjAkvz5gTmwb9dLPlmbsAj9wrKQNUCAwEAATANBgkqhkiG9w0BAQUFAAOCAQEA -My95xuxadAIm+oY8OwDpO39g7TyvyHhFC9afY5/oHhZGKXkexJQMOlH+Y6P2+12D -QcP2i6XTFVCXCQqWtULPz4jv2NvFda1Jv/DnVZs62hqeI3TZeY0kdDCKl1nud/sK -9hsaenQhSrD3n9XwhkBWJaGE0bvFniyeuRfVD+b3PoTFbs/Bf8p2kKIsBaBY6Nli -bNE0fM2H0awGNeh/gqruVoMK/hrfOdxpgpUNKfW3OJefBSvB9VmqjhM7IkWmZxsX -LmZ5uthOFSzQBJMdjovAjn/QXB0x45IpmJMzqAGyapz6+JYgNrIPk2fQYcDGXJ3a -woB5/2gqRuNRqM+CI/TMuQ== +MIIDSDCCAjCgAwIBAgIULxF6KJKCcA9Uqd8v73k/6ZoJgckwDQYJKoZIhvcNAQEL +BQAwMDESMBAGA1UEAwwJbG9jYWxob3N0MQ0wCwYDVQQLDARURVNUMQswCQYDVQQK +DAJLVDAgFw0yMzA5MDkwOTI0MzhaGA8yMDUxMDEyNDA5MjQzOFowMDESMBAGA1UE +AwwJbG9jYWxob3N0MQ0wCwYDVQQLDARURVNUMQswCQYDVQQKDAJLVDCCASIwDQYJ +KoZIhvcNAQEBBQADggEPADCCAQoCggEBAMWUZ/gXqWddzVsm7t5k5kscMpELT2/s +CS8j7pYvujpyGh0Ehv/kbq9ICo87bj2LbpZ7bfOR+Mzq+8kfN97pYvT2m3A85nWx +ANLExvLuVr5psFUG61I12TvfOI+YxtX2zmy3KgJyiP/ijQC9580V3d8iZfCQtw06 +Q31SQcRycnQYjpRisMKZAtbO3j4IfENVpuEUo7Fv+vv8mQJ2wEVtgCoG95P5UvFb +UsVil7dr0d8qsdiF5oIacR0FlTErVE/PaIwm1X/jg39MQ6TKfi9pqQ+nMaLBGeHB +QqOVIOMvN9TjgbJXiIaUY+uRCJyGYy5Dc1ZTiVS1vr2DLDO9LPajelUCAwEAAaNY +MFYwFAYDVR0RBA0wC4IJbG9jYWxob3N0MB0GA1UdDgQWBBROxHij48253g4hpWAe +1kW5HqhUczAfBgNVHSMEGDAWgBT++HPCTDT9gxL8o1n+K5EtXs9epzANBgkqhkiG +9w0BAQsFAAOCAQEAK4zIgCsN9T7pcEhZr+sEuXgjZjfs6xLmxriE3jgWEGuR1wd4 +L5NV1usCbDI4wl/9RhMZ0+CnZvnqKgP+CiI6T93aaUMyVEQa2BCsz3gr81KGH/Ha +x5oqMTLUyQgphotOK9Sr3Z5WCN3t8kqx1uJsgetQfmsX3FZ8ADgdABelPZBDgSEC ++UX8pGKmHWF4eUmkHv58TJZX2vcD8eccsffXbdUZzxxCsnmqveIzNYHpwr1n/s8V +asPGatsOYGD9emkhRstod48WPXWI5ld9fB1BpL1WIerXBHchk5GQZuZEVWZzXuC4 +GyXiFNgL2YNURgKKaNb6hJEEBKpWfX8doYzylA== -----END CERTIFICATE----- diff --git a/test-secrets/kt-test.csr b/test-secrets/kt-test.csr index 8431fcd..3270100 100644 --- a/test-secrets/kt-test.csr +++ b/test-secrets/kt-test.csr @@ -1,16 +1,16 @@ -----BEGIN CERTIFICATE REQUEST----- -MIICdTCCAV0CAQAwMDESMBAGA1UEAwwJbG9jYWxob3N0MQ0wCwYDVQQLDARURVNU -MQswCQYDVQQKDAJLVDCCASIwDQYJKoZIhvcNAQEBBQADggEPADCCAQoCggEBAKyC -y6Fb+44nS3hywRL4lIUe8vGklgiDevtKFlSwn/0y2Bf4P6ubFRmif9HjUfuN+iVu -SKBqibEbCNI8MkhSUnSS80t5AgXfINxJUW+N+wxMFON++CE/y4TBD4sc1JmHhnHp -99f3Mc7v3rbjnG9pHOIfF9MRxuxo7oYg46pV4M59wE+eN3XFvt+1IXyaxLVNCFue -yKA7IE5ECWECJ76JpbGjnvNNJTpcuIkoKk58cGbwvnKDtaVnZVMfImDyVpth1Pao -1iMeNnOKGpSzvEELD3dCOGzdi9vEo12Bj+7C6y4gV7YdCGod18LNQYUjAkvz5gTm -wb9dLPlmbsAj9wrKQNUCAwEAAaAAMA0GCSqGSIb3DQEBCwUAA4IBAQBcSHtoHbtp -bnti9lgsLwlJFnyAw6L873pS26tyzXd6yAXjx55CeqgtPOw5LzGTSnkGM6ktJnGr -l6VbeoXXDFyUlJIydrMo1x5UM/egEdy0ci59hi5yVIpuGewKnkVfroPfT/8dapYI -jegkQLMRCTAL/lvQ/WNd1To/yaUNdunviDZ5A1DOTGumFwKGqdxacJsuNL8mhFPJ -JNEYhYDn9HvPJLc1RWMUozJstF7poXhzRCRHTtRkxjR8AEXDsV/2ObZKdJocY9Io -WoXzLn+B9kvwlFDMVIQuOp+d609jDKEz0sDE9OpD/2NPWBG6OqD4urqDe/bMWAlv -LDCnw6/IxX1i +MIICnDCCAYQCAQAwMDESMBAGA1UEAwwJbG9jYWxob3N0MQ0wCwYDVQQLDARURVNU +MQswCQYDVQQKDAJLVDCCASIwDQYJKoZIhvcNAQEBBQADggEPADCCAQoCggEBAMWU +Z/gXqWddzVsm7t5k5kscMpELT2/sCS8j7pYvujpyGh0Ehv/kbq9ICo87bj2LbpZ7 +bfOR+Mzq+8kfN97pYvT2m3A85nWxANLExvLuVr5psFUG61I12TvfOI+YxtX2zmy3 +KgJyiP/ijQC9580V3d8iZfCQtw06Q31SQcRycnQYjpRisMKZAtbO3j4IfENVpuEU +o7Fv+vv8mQJ2wEVtgCoG95P5UvFbUsVil7dr0d8qsdiF5oIacR0FlTErVE/PaIwm +1X/jg39MQ6TKfi9pqQ+nMaLBGeHBQqOVIOMvN9TjgbJXiIaUY+uRCJyGYy5Dc1ZT +iVS1vr2DLDO9LPajelUCAwEAAaAnMCUGCSqGSIb3DQEJDjEYMBYwFAYDVR0RBA0w +C4IJbG9jYWxob3N0MA0GCSqGSIb3DQEBCwUAA4IBAQBuLVaJnFEfv4JVqeoNTOe1 +YNe/hHyVtCkv/f2xt9sPvGnUmV1j15j1G1O3MUS+vf01wfRaHabFqvmd1KFMQHrN +fMaWGTYcdX0Sus8OJpNRVpqu7marAkLVHs2w2GGmicTd4sZa0Q5Ik7r8J3o6+drS +4TYK+33TtK/ujp67ryYEDypl/eh7sIUGLNlRjCHvX14k4nGyL+GYiOZ2rnoO0K75 +oWYutCVbtCrtC08bLFhSPmCkWCH3lPIP+sZIYncQyQ2eyyBvEqdI7ypySRDujkw2 +cWOHMc30WJ0NJry46o7Nd9TofK3LAdNGxwDNk3BQYW9Z9kvXOI6Ea5frYVCGIEv9 -----END CERTIFICATE REQUEST----- diff --git a/test-secrets/kt-test.key b/test-secrets/kt-test.key index 76ebdda..831ecea 100644 --- a/test-secrets/kt-test.key +++ b/test-secrets/kt-test.key @@ -1,27 +1,28 @@ ------BEGIN RSA PRIVATE KEY----- -MIIEowIBAAKCAQEArILLoVv7jidLeHLBEviUhR7y8aSWCIN6+0oWVLCf/TLYF/g/ -q5sVGaJ/0eNR+436JW5IoGqJsRsI0jwySFJSdJLzS3kCBd8g3ElRb437DEwU4374 -IT/LhMEPixzUmYeGcen31/cxzu/etuOcb2kc4h8X0xHG7GjuhiDjqlXgzn3AT543 -dcW+37UhfJrEtU0IW57IoDsgTkQJYQInvomlsaOe800lOly4iSgqTnxwZvC+coO1 -pWdlUx8iYPJWm2HU9qjWIx42c4oalLO8QQsPd0I4bN2L28SjXYGP7sLrLiBXth0I -ah3Xws1BhSMCS/PmBObBv10s+WZuwCP3CspA1QIDAQABAoIBAC/5G4me28C3Wrx/ -JQQSjrLKXYMnYvWxIpUZEEay63rtFbzss0AUm3SKhQMoNRdAb0zTc2zw06Olwfk8 -9jq4ceerKe9WEmLxnuDDntrn3+WWjsSI/ZW5eO5NBfaIqPKdBp1NFDhBrN9AQ24W -s90jnP2g6e4vZmM99v7jzIwE38RsLReMp/6XH8c4Put/tNt39M8LgSWJlD3JmqSJ -tGtfCli+0pGvggILoeqGqpxELG39vcRX4JVJgNqwE4NUpVZMAxibcEjXPdw1Hwuo -QxcPjMBb5M/+fpNide+MePcHVZPR0yEZU85CIURscq2mbUz2eWOXen3WcYK8DOxp -TJfpPKECgYEA1OFr3j3W/RQ+JDwgs7JW8cqJ8y8NTL7KSuOMxh5eSG7ccHfn/4UU -US6YaNKmuhmW4oI0nqWbycO0o3f6buuAPO0sh1TjPZTNDwaclsjpNflC9U+Q1KbH -kT91eKlVJtsKXyO2Kb0hVnlrD0pb8Z2ak7EHFy/MBmgGNI6EBLDSpx0CgYEAz3QS -W07XTtinfYeEE6LFw4l6TdI7KOcWLOw5tjo9kpztUFPIUiTYpjI5t9zbpOHLInc3 -xgEEJLV7vgB3Vz8KJ7KHhME61fxnvIDwPZ/800VS6+ocAgj+dPakAxt0Be8qmzDe -nmd01c/B+pNP5Of/3h5tl6KzDencoRCgSKMRexkCgYBBLRcafvJwn5769xHIEqT9 -+FXOB6zZv1wuG4cBLVEOeiqxoHR+Td9Pj6WICl6Tgbwpc1Xq8KdG2c1XcydQiHfF -r5O/h2TfeMxjelG89Jw6Myul8YtVIA7f0H8Fx0pT0CJXyJZAJzCqyhfWIZ0O14L5 -Du0t0ytzFqw3sR6IVR7ZnQKBgQCB2xcufpB5s1ZMVkqPfAEs395l3iXX+abA4fFS -Juyp3H8UHA21lYjjp2bHt4DpmcjsvxFzaOluDQfzLBmHzFcP+EmqCMZKch+BOHip -Kn84xBkewUFc6MJW9pg9ta6PlGDoGdQP9ZSFzfGPZyzIjYx1Pji9hUNtfyjyzRR3 -w1NK6QKBgCNvGX1o6s+7BFfCAzDNFrVjp7bTRYYb0P3XboFtaTcleD/RYvMgU+TR -DZ5FJrbR3MeuHqrL9me2ciQi9wF0w1fsney6k7pa4SUWX6SahegLufE4JduDwCsv -ZFsal3Jg9Cw+0WRekr7tupew77oUIZFfcL/Qp6qDm24BioZpfjwG ------END RSA PRIVATE KEY----- +-----BEGIN PRIVATE KEY----- +MIIEvQIBADANBgkqhkiG9w0BAQEFAASCBKcwggSjAgEAAoIBAQDFlGf4F6lnXc1b +Ju7eZOZLHDKRC09v7AkvI+6WL7o6chodBIb/5G6vSAqPO249i26We23zkfjM6vvJ +Hzfe6WL09ptwPOZ1sQDSxMby7la+abBVButSNdk73ziPmMbV9s5styoCcoj/4o0A +vefNFd3fImXwkLcNOkN9UkHEcnJ0GI6UYrDCmQLWzt4+CHxDVabhFKOxb/r7/JkC +dsBFbYAqBveT+VLxW1LFYpe3a9HfKrHYheaCGnEdBZUxK1RPz2iMJtV/44N/TEOk +yn4vaakPpzGiwRnhwUKjlSDjLzfU44GyV4iGlGPrkQichmMuQ3NWU4lUtb69gywz +vSz2o3pVAgMBAAECggEAA/YEVpSiMO5lg72bqGLlSnoHKEVqTdx4kP90OGX3RitL +0nJh3z/vtctFMOyGrGV32FkxLj9N1iKGMop8ptcV6DRdyI1kAQ6+AnSZ2MrGJkB9 +VpPurvdFHykhKfZCC8bMNK9czppdY5ZTfmTzzq+NSjyI40IsyqEDQcAtDYbOLhnJ +c5I6QYrRovviqR+MNiFdkhIdfUatcDVHYWiyPBlSr/Q5NPFw2z/JQGiVHlAnZCFs +coJPKDNgDA5Xgo/1oMI7j63PpUU0cxdl0MpNtsf4PzmFmjRhLlSSOvjZNSqkIetJ +VAqWknsuM4m79/ZR9D6heOzlHdZPuPPTZXqe6mbyAQKBgQDv2me0QbZ7fytmrjoQ +aa4rGbgS4/GMEiPK17IVo0NlIdMLAuaEVZnBBiPuFNDLY25bh5T826Z7VT0YWbO6 +hLZjzltve8Bj1TRHmOuYPJs2DYMuWl/qs6bFNU4ZYnYvrEJwRlxR4s39ZPLaUKz/ +M0XBul1YSUwQYAwBN4GXC0ekVQKBgQDS4Xd6RFldqCCKYP9VKiuEZZgXDmYpB40O +35HoiAs+GVbtVHE7iIqeCHldjVYD/22HU9oGHKnwKjk0Kq13LTPbDmYbis9eG9vv +pdtNuD2/0iEUniRqLsu7xHszutZ+IaFqOqnsqoYSQYDqMocnrIcIqs0b3/XGNvle +1KHpMJJ+AQKBgQC/ned9nYXwsY710Dj5BcSsEDb7eHlvospPcfSGztC3ycb1pBKP +JGfaSlKIEL8hzcTLgMypGb2uCFHv0zH/z6h6odBwhxgTabADhwqoq7p/+1mSvYal +VXLlWAmVxBCRU3gchEma7awNRQmzGgKrOsX+2Clcr+oBpwnQKwzvo4ZPpQKBgEri +6FPR+85Hiy643VN4AzYkAcuGQH8ngE8c06q4zQ1DYFwtVXyns1oFdlvjef03csTQ +4Sa0fSCdTe0Zjro4oiaTSW1UcOlhiUvkISnLWJpK25Z/xjyy24SVdI7dNZhbBwA7 +TuaNc8j92/LO+LH9EYLhbK8ObC/0i7/CULEHn64BAoGALoZhyoww72GZhDnK7iOy +4BA6hffMrg0TSTr9w5WKLiZ5cCreTt4FwycyyCJ77k3J9W8yVo6t1GQ/5185befI +1H6NuJFVzQwweEPxNw66rNBbg+0CO01lZx2BGw5FB6dbaDrNrWP2lgSeXiihtJDk +y/zSwljPZCDAsp8Tscsw9OE= +-----END PRIVATE KEY----- diff --git a/test-secrets/snakeoil-ca-1.crt b/test-secrets/snakeoil-ca-1.crt index bc1f46c..6398d65 100644 --- a/test-secrets/snakeoil-ca-1.crt +++ b/test-secrets/snakeoil-ca-1.crt @@ -1,18 +1,20 @@ -----BEGIN CERTIFICATE----- -MIIC3DCCAcQCCQDIiaispcygITANBgkqhkiG9w0BAQsFADAwMRIwEAYDVQQDDAls -b2NhbGhvc3QxDTALBgNVBAsMBFRFU1QxCzAJBgNVBAoMAktUMB4XDTIwMDMxNTE2 -NDUzN1oXDTIxMDMxNTE2NDUzN1owMDESMBAGA1UEAwwJbG9jYWxob3N0MQ0wCwYD -VQQLDARURVNUMQswCQYDVQQKDAJLVDCCASIwDQYJKoZIhvcNAQEBBQADggEPADCC -AQoCggEBAN7iSLcIEBAsPVXzh3wo7ptstwmLVbPeyqRaK4G1wtLKoX6blQz0iE8M -+YCY5T7OGQ4DlB8I982al0LF4tTjcN5b7YWbYbuqYnjkqRpQFN9eUAuzVqGQNqBJ -V3jLwINIOr5acLqvJz23ge/pjsnNU15queniYpn+k5csemSyVAREU4xlsX837QnC -zp0idheOynSOZ4dFEmKnKJxL9Fv4A8apO1kd6QWov3mOoP1MElHQik5c2cGeBhO7 -RFRBo2HjiRXyqGOT29RiQ5vIPOVs43ifuNZULWFGW1MRo8nypzjXc4ZoSGEE7xKz -OhoQOcoVdniG1p3wgpWCjjEbIiH1hhUCAwEAATANBgkqhkiG9w0BAQsFAAOCAQEA -bDQxsFYBy7CtVvFPCuGHXHKHQwiJkGrn6Fxiwn4jFrrPln5CS0LrpBscUEgvYWD6 -7R5CGxISIVKoG0Yq32ZKMCZmfpCD0mkFG4zDPKtk6Zf+MSKiV35CMUeBzlq3jfVU -oCuSwsV131b07Ep3zxzwIpXUDEdwdSBwZJ3TDah6NIyO0xH5tMBMQ2rU0IjW8GVV -eiJBLG8YmLCPzooOnylfI9ZRePd1rAkZSJJPbizaztTGc0C7t569QzeM2xZrCXO/ -GuQ1CRvUnEfQ8MWN/OTtPOjzNk2kTAJATS/x5hDYpCcKsG1IsYAg7E/bTDVEMhZB -3ynZQmsTLJaEx+M/K5LfqA== +MIIDVzCCAj+gAwIBAgIUKlT9/FXESwWKyqBtIbSNdaqz5uEwDQYJKoZIhvcNAQEL +BQAwMDESMBAGA1UEAwwJbG9jYWxob3N0MQ0wCwYDVQQLDARURVNUMQswCQYDVQQK +DAJLVDAeFw0yMzA5MDkwOTI0MzFaFw0yNDA5MDgwOTI0MzFaMDAxEjAQBgNVBAMM +CWxvY2FsaG9zdDENMAsGA1UECwwEVEVTVDELMAkGA1UECgwCS1QwggEiMA0GCSqG +SIb3DQEBAQUAA4IBDwAwggEKAoIBAQCe/TN9DjOojmoMCxeeHjrLjmZLKw35AVPY +AUT9nk2fUKLFgIY6QgXQBwcMh8hxAGiINkKyHHW+dlNeTY072b/e6bATqSJlJmgW +WvrYHvV/2pOe4yuIbQmKM6/Cc701wtF+WqYAM5puOq6jpBrnbI/PiewVGAL/DH7q +/aISej4rrUJ69NKx/d66Gp6ONcUqNlXKDOXDvV2hAXcJMKGVOmDj71rVNFv6QIPp +w9gGUaOzS4MCafGmrPNxQ+36iHaH6OluMxwOLX/OHQrD0KbgrW1aaBCbUrz7E0iI +ogcUsOKqT079Ta3c01lcHa55/MU/5dsTzAYn95yxwOlTxH9//wCJAgMBAAGjaTBn +MB0GA1UdDgQWBBT++HPCTDT9gxL8o1n+K5EtXs9epzAfBgNVHSMEGDAWgBT++HPC +TDT9gxL8o1n+K5EtXs9epzAPBgNVHRMBAf8EBTADAQH/MBQGA1UdEQQNMAuCCWxv +Y2FsaG9zdDANBgkqhkiG9w0BAQsFAAOCAQEAGJrLlUNdnCZcXkEmasJXkAwdmTKA +9RJDMH2ElLlWP+IFzCDtoqyI9WjiyZsuo3mDUshu+YZhZ2362oW3gfdCvocTqKuh +LsUXnyvKy30VZPxhsA18tzbcVElbUToPY0uNtnLGQ0yfEeYch53yvbmTq3zJTwP0 +SLYWKbwM0Hpt7MpvwwDVZfzIZ2NNS1pHWOPKtXaB+6c0kK3VLFYrCihw5ID1zzhJ +CxI5DXbIRhdsliXp4ZTHpG+I/cX+3pA2AukQByKtqUgigR2bJIH4edhpQGvxe0B4 +89hLL8iNroa+9lT97O9ECDBKoMLj9YcN8YCsvjjc7wt2C9GoxAzutFsSTA== -----END CERTIFICATE----- diff --git a/test-secrets/snakeoil-ca-1.key b/test-secrets/snakeoil-ca-1.key index a6340c6..5573774 100644 --- a/test-secrets/snakeoil-ca-1.key +++ b/test-secrets/snakeoil-ca-1.key @@ -1,30 +1,30 @@ -----BEGIN ENCRYPTED PRIVATE KEY----- -MIIFHzBJBgkqhkiG9w0BBQ0wPDAbBgkqhkiG9w0BBQwwDgQIEb15MMBrel0CAggA -MB0GCWCGSAFlAwQBKgQQatNgtaHFXd4Kvf6cwWWDzwSCBNDHDk0GMZ2GH1S5pifX -sA+yR4aCkNAIxpwVCo1jeSd4OAwD+j1+Gi2kcbE6W4TOv/b+pDnUErW64t/9lTl1 -mTLmjLezO+uJfhDFuq+5arVQaLKgE/gFbaWYlRLPXfYjtk8fjkPl7sBa4UPKRi4j -6/ZaW6AVXK971m3BL56BwzCBmLlhpa0wVJc+tBbezrSiVvpYSLTDR1Vsxl6p2kKy -q9pz43AfhWeNSErPYaqPvSBHoBIQHZs9boDE9asZkQE2WjgCFaDtPi+DSqQPt5Bi -xxRO3IO+HUiZX+tF6yz0F/IjBdlFwRKzyjk6iB+QpmiXnFECasAZo34pZyC661+N -AP4Lux9MbwdqnoWFuzcJTf4oS1Ns8iJu0joZg1iC04ncRPdcOKfXZpOEA1nVlUIM -FpEaxMt/aQINR1Wz6CZHbwmRjKaiD0d/SIGtfs+68jeSnwlNjEf2Pon+nQe0ZmDA -GxRJTgdTn4+FIudUeLYqnU9J+Dslr5+ogn+C6nApMduY/s6FuumZAEO3bSK3GbGb -YxQ1NZlEqjw/ZJkMdFNIE4IVRF/N6xcPTnE9S7CafqBEbO+SDJGZa7OESb26/3F5 -vlQ8zkurDblOequT3rZbx+jKhk7EXmAtmdJBV6Bom6GGj1FwPWyiw5xSRA1prQKM -XqMenH0cq+CLQgNBzOLPbjenPyBX0GxjmENFayf7ns6gV+unIVX1wyUQh2OEF3Ub -PTaC5XWTB83NePNy9T33Z46f4asZW7jfvZX4KOqUmM56YgsYVRZwIAGEGMvxmfoN -H2QHjBrj1jGQrZFIxT9UH3i9bjdJYPociaIgJD7m1ucbTsSqhPb+7W2oWIqG4eTE -/oQCzfS4tp51FFWdsGdz7M6zAGOlJL7TIPIPzMj/p8EZvWkjMFWvvu98zBcRXWwf -WpzJnUZXiSVA2f5kqYb8H0WS101sdjX0i2y1jindK9Fsr9QGihOGY4eXjg86KE9y -r109aP0hHufmZw/arZELmu4skniz7G8v6uh9FbrZbM2ykMIGJsdIqYAUeH9eAT5d -oIzY/UjL8hrUsSuTZumxqFjPQw3AzDF8ZfPhssZXEY3zm3Q+00umxIQydYjbCYtb -7NnJyK9lCkjOiRpVoupvyQQYaWF5KPplKfC8Rb/69tln9LGnZOhJKpUbdqb0V/Of -aPeknXj3M4zHxIc2YewpMlZbVbzuOCE5+wAqFk5XOr6QLyne7QS1xsLmW8q+d3dr -riQNh/95uNs2Sd8OfcXXDK72wFYFxNIxOi+MAUf5nMfrd2roPUDQdpC8Id1OXV2A -8gckHG+BmTgUcOXcUvFMfGMNc4bHsEqiBcl4rJYojlAdBhsb7gtJCZ23mBbRtng/ -9DvRJ4+pd03eUjOjtPccpFA9/uFGJXLCmDwmnSwPDVSbTqtyEnFhVCi8gcp7zCII -B4BjooJlJ6zoYbBiK8UbXM9Wgtrdvr2Ki+UkXooI19DN8xX2vOXwjTNW6w8+4ZC8 -31CsDqEe0X02a91b2isSn3iG2GSp7gLeXu6P4gXaXSa3FGkGpd9/yp/EVhiSK6qh -NC0w7rRyGcg9QtEK67qw6jDkSRBFoxX7JBFcDHBAHZv9QZ6PA/S4dhcoEnTP4tPZ -e7Thu+1dI70w03hqHdAy/Ptngw== +MIIFHDBOBgkqhkiG9w0BBQ0wQTApBgkqhkiG9w0BBQwwHAQISTm/5y6Y+fcCAggA +MAwGCCqGSIb3DQIJBQAwFAYIKoZIhvcNAwcECHHoLuMkCMfJBIIEyM5sggx1BvrV +RppMyoQpWQ9ZTRHnmZTESwyIYm0Pn7ycpxSv/x9eKYwclRP0nKW/8xsBwImYoO1q +V3UBTCBYU3OfBn6GbFzO5ajXWwJa48NiAkjZ3UxycsCjJAWFVOHxAwWiUEbmO/52 +RoBlcAW2TuNRsMwBoZDMkky3YQr5VlSJaFKBXAWItr1XAPWECNVzXwIJVus3qFic +nYYhCYpunDIfWSlqjYjU4vQ946qvOY1yRU/dqTYYA+g3jHqDiW4sQhZocELqiLkR +aLsr+BUWaazB56PHvCNhmy21xUP2Ts2MiIbctXtpOtMFeYTNXqdisZhO0iJaRr0u +g0e1ilnzWypMigmmZwvZnG0lKN2qNS5Vl4lNUmJ3TFXVYmgweiryherqdzA0J9jS +hDBzwfGCWbAprt/HHliknssUnELTM6Ij0i0lyf6q/z1Ol/y0pwSmHUjx2CHHF8z1 +LSNdf8HTY8ugyGoMCFxDIenFDNzfT9ZRrE3MW6H4zcctxHnpy0r0O3FZsIpJEhqC +bpkAyLYn0KQyP97HpcJ4kterI3iOVisQE26nI7dB4SWZs69Pu6TCLqQQfSQqF/hL +e1ZH2VsJ7v/lQmBYMY4PlcygkrjL3DQMSAUzyJI50do9Wd61J6OtQ+ib8E8yXWBF +ORGH1KXoQT2U4vQ8M8Y6Pco0b2pUV4JHQz0exygUi9QrLGpw4nh6jnJcDsWhy9Dn +C8fn3PWtkzsrndq+yO3VNKOxvJKj4ESb6C7UtxK5vArqinLhM1CREO12kyS0593q +I5VE9fWavI7NIBnzRhBVz5Wc66KllKTssDQgDH9M/HINL4ggc4vzHGwc+tKMLwL0 +RQqf5Q4YuCgS817vK5fhMvfoz+GiokG7UNwEUuIxG8ujB2ZGIdNtEA8Z6s8Rix5o +EZvCR55waWJ4z9k8qEL5CL7OoOMiJ6hxOb1C3wNXHPH6goegd9Ll7dS83oA/zWVP +DyxQjkuwQJsMibzb3dFEcZbwZC9P2dDYjVcn+jwpVSkObOmtkAU59PFtlHYwz36v +vHS1UJ/5Z2dNtFSNR2AhZ8meFyDOQynmUKxDXtsTX3onYX6nmKvnzfyN/B4sClWe +QXxKbGN3QwEaahSXiGEDeJr9ToBl8xv1MAKXygM0F6UT/BVm/7Nc88E9GFHpzzPC +spHzQS0VohvCTGx6gjv0boIwX74CRd0HW9h5WIOf4VJJeXjDMJ1ODY8a5hpxocgm +m2lNnO2Wn+enh6DSGnSEcSGiH1qPcZSeokuKdMDkFaPr6NsCx6WMOt207LyWyC2S +VfazWiGgq9qEaSs2S40PECCykKJtRMm+RsxPZOcpjIwZjsSmT1UszxkqNkpXeaWR +aXFePr2o/7BjrFb5f+5yMzD+1GWPcDjflMh/v9uCq8zyU0UvdM3UF3my5a/h9ItJ +JwnqsgroOHRlecT0a6KuDBTfoZCm8bLy8elvatpcwEkYccpmflbCy3W1TkEBeh7D +wavUT6FJcfOejYV4iPj22R7nFBKVHbSbhOjua0iEVs21ZjBlrm6dmBvXwjz6ufNZ +6eH9nXyO0dGS9i65T3mx1AlorfR9v73m/elkvMEs5rIdOPvLIOhvzMyN15PnAgEj +NqEt1wSboT/b2LB+dyELLA== -----END ENCRYPTED PRIVATE KEY----- diff --git a/test-secrets/snakeoil-ca-1.srl b/test-secrets/snakeoil-ca-1.srl index 1265185..533afef 100644 --- a/test-secrets/snakeoil-ca-1.srl +++ b/test-secrets/snakeoil-ca-1.srl @@ -1 +1 @@ -8D6B68640DFCDBAD +2F117A289282700F54A9DF2FEF793FE99A0981C9 diff --git a/topic.go b/topic.go index 0e61422..bae6544 100644 --- a/topic.go +++ b/topic.go @@ -10,7 +10,7 @@ import ( "strings" "sync" - "github.com/Shopify/sarama" + "github.com/IBM/sarama" ) type topicArgs struct { @@ -27,6 +27,8 @@ type topicArgs struct { } type topicCmd struct { + baseCmd + brokers []string auth authConfig filter *regexp.Regexp @@ -34,7 +36,6 @@ type topicCmd struct { leaders bool replicas bool config bool - verbose bool pretty bool version sarama.KafkaVersion @@ -124,7 +125,11 @@ func (cmd *topicCmd) parseArgs(as []string) { cmd.config = args.config cmd.pretty = args.pretty cmd.verbose = args.verbose - cmd.version = kafkaVersion(args.version) + + cmd.version, err = chooseKafkaVersion(args.version, os.Getenv(ENV_KAFKA_VERSION)) + if err != nil { + failf("failed to read kafka version err=%v", err) + } } func (cmd *topicCmd) connect() { @@ -137,14 +142,14 @@ func (cmd *topicCmd) connect() { cfg.Version = cmd.version if usr, err = user.Current(); err != nil { - fmt.Fprintf(os.Stderr, "Failed to read current user err=%v", err) + cmd.infof("Failed to read current user err=%v", err) } cfg.ClientID = "kt-topic-" + sanitizeUsername(usr.Username) - if cmd.verbose { - fmt.Fprintf(os.Stderr, "sarama client configuration %#v\n", cfg) - } + cmd.infof("sarama client configuration %#v\n", cfg) - setupAuth(cmd.auth, cfg) + if err = setupAuth(cmd.auth, cfg); err != nil { + failf("failed to setup auth err=%v", err) + } if cmd.client, err = sarama.NewClient(cmd.brokers, cfg); err != nil { failf("failed to create client err=%v", err) @@ -201,7 +206,7 @@ func (cmd *topicCmd) print(name string, out chan printContext) { ) if top, err = cmd.readTopic(name); err != nil { - fmt.Fprintf(os.Stderr, "failed to read info for topic %s. err=%v\n", name, err) + warnf("failed to read info for topic %s. err=%v\n", name, err) return }