From d77c5f175ac7a3c195aac90fe706ff0d4e8f1322 Mon Sep 17 00:00:00 2001 From: Elad Leev Date: Sun, 19 Feb 2023 17:58:37 +0000 Subject: [PATCH 1/2] Add CA cert support --- README.md | 29 +++++++++++++++-------------- config.go | 21 +++++++++++++++++++++ config_test.go | 44 ++++++++++++++++++++++++++++++++++++++++++++ flag.go | 10 ++++++++-- 4 files changed, 88 insertions(+), 16 deletions(-) create mode 100644 config_test.go diff --git a/README.md b/README.md index 7bfd220..c45156e 100644 --- a/README.md +++ b/README.md @@ -36,20 +36,21 @@ For further offsets analysis, you can store the results into a JSON file: ``` ## Flags -| Name | Description | Require | Type | default | -| ------------- | --------------------------------------------------- | ------- | -------- | ------------------- | -| `--bootstrap` | The Kafka bootstrap servers. | `V` | `string` | "localhost:9092" | -| `--topic` | The topic name to consume | `V` | `string` | "" | -| `--version` | The Kafka client version to be used. | | `string` | "2.1.1" | -| `--group` | The consumer group name. | | `string` | schema-stats | -| `--user` | The Kafka username for authentication | | `string` | "" | -| `--password` | The Kafka password for authentication | | `string` | "" | -| `--tls` | Use TLS communication. | | `bool` | `false` | -| `--store` | Store results into a file. | | `bool` | `false` | -| `--path` | If `store` flag is set, the path to store the file. | | `string` | "/tmp/results.json" | -| `--oldest` | Consume from oldest offset. | | `bool` | `true` | -| `--limit` | Limit consumer to X messages, if different than 0. | | `int` | 0 | -| `--verbose` | Raise consumer log level. | | `bool` | `false` | +| Name | Description | Require | Type | default | +| ------------- | --------------------------------------------------------------------------- | ---------- | -------- | ------------------- | +| `--bootstrap` | The Kafka bootstrap servers. | `V` | `string` | "localhost:9092" | +| `--topic` | The topic name to consume. | `V` | `string` | "" | +| `--version` | The Kafka client version to be used. | | `string` | "2.1.1" | +| `--group` | The consumer group name. | | `string` | schema-stats | +| `--user` | The Kafka username for authentication. | | `string` | "" | +| `--password` | The Kafka password for authentication. | | `string` | "" | +| `--tls` | Use TLS communication. | | `bool` | `false` | +| `--cert` | When TLS communication is enabled, specify the path for the CA certificate. | when `tls` | `string` | "" | +| `--store` | Store results into a file. | | `bool` | `false` | +| `--path` | If `store` flag is set, the path to store the file. | | `string` | "/tmp/results.json" | +| `--oldest` | Consume from oldest offset. | | `bool` | `true` | +| `--limit` | Limit consumer to X messages, if different than 0. | | `int` | 0 | +| `--verbose` | Raise consumer log level. | | `bool` | `false` | ## Usage ```bash diff --git a/config.go b/config.go index d63f22d..21dceec 100644 --- a/config.go +++ b/config.go @@ -1,15 +1,36 @@ package main import ( + "crypto/tls" + "crypto/x509" + "os" + "path/filepath" + "github.com/Shopify/sarama" ) +func loadKey(caFile string) *tls.Config { + caCert, err := os.ReadFile(filepath.Clean(caFile)) + if err != nil { + panic(err) + } + caCertPool := x509.NewCertPool() + caCertPool.AppendCertsFromPEM(caCert) + tlsConfig := &tls.Config{ + MinVersion: tls.VersionTLS12, + RootCAs: caCertPool, + } + return tlsConfig +} + func setConfig(kafkaVersion sarama.KafkaVersion, cfg appConfig) *sarama.Config { config := sarama.NewConfig() config.Version = kafkaVersion if cfg.tls { config.Net.TLS.Enable = true + tlsCfg := loadKey(cfg.caCert) + config.Net.TLS.Config = tlsCfg } if cfg.user != "" && cfg.password != "" { diff --git a/config_test.go b/config_test.go new file mode 100644 index 0000000..90d2a94 --- /dev/null +++ b/config_test.go @@ -0,0 +1,44 @@ +package main + +import ( + "crypto/x509" + "os" + "testing" +) + +func TestLoadKey(t *testing.T) { + caCertFile, err := os.CreateTemp("", "cacert*.pem") + if err != nil { + t.Fatalf("failed to create temporary file: %s", err) + } + defer os.Remove(caCertFile.Name()) + + caCertData := []byte(` +-----BEGIN CERTIFICATE----- +MIIBwjCCAWugAwIBAgIUTvn+Zc80K0mGpMAoGCCqGSM49BAMCMFMxKTAnBgNV +BAMMIDFwcGxlIENlcnRpZmljYXRlIEF1dGhvcml0eTAeFw0yMjAyMTcxMDU3 +MTlaFw0yMjAzMTcxMDU3MTlaMFMxKTAnBgNVBAMMIDFwcGxlIENlcnRpZmlj +YXRlIEF1dGhvcml0eTBZMBMGByqGSM49AgEGCCqGSM49AwEHA0IABLh6FJzh +Kj/ISL9Rbhg0N14O/r5WWqNw4euBJNzPbNybc+n4ebNkMttcV6U9az6POoyG +Ucky6hGz2jBBRGaUuV6jUDBOMB0GA1UdDgQWBBQ2Wpqw4q3iG4nJZc+uM7N/ +Y4qr4DAOBgNVHQ8BAf8EBAMCAQYwHQYDVR0lBBYwFAYIKwYBBQUHAwEGCCsG +AQUFBwMCMCAGA1UdEQQZMBeCFGFwaWtleS5jb22CCnN1Ym1pdC5jb20wCgYI +KoZIzj0EAwIDSAAwRQIgZH1OqzW8NfBvZHXrNmlT0TtIJ0QQs+z7E2N1blSC +X/0CIQDKuZwuAQzS1aA90xSgbbVi/TuV7Yj4l4uV7lRGkW8HvA== +-----END CERTIFICATE----- +`) + if _, err := caCertFile.Write(caCertData); err != nil { + t.Fatalf("failed to write to temporary file: %s", err) + } + + tlsConfig := loadKey(caCertFile.Name()) + + if tlsConfig.RootCAs == nil { + t.Error("RootCAs is nil") + } + + if !tlsConfig.RootCAs.Equal(x509.NewCertPool()) { + t.Error("something is wrong with the RootCAs") + } + +} diff --git a/flag.go b/flag.go index e351c9c..54ce5ec 100644 --- a/flag.go +++ b/flag.go @@ -12,6 +12,7 @@ type appConfig struct { bootstrapServers, version, group, topic string user, password string tls bool + caCert string path string limit int oldest bool @@ -29,14 +30,15 @@ func parseFlags() appConfig { flag.StringVar(&cfg.topic, "topic", "", "The Kafka topic to consume from") flag.StringVar(&cfg.user, "user", "", "The Kafka username") flag.StringVar(&cfg.password, "password", "", "The Kafka password") + flag.StringVar(&cfg.caCert, "cert", "", "The path for the CA certificate") flag.BoolVar(&cfg.oldest, "oldest", true, "Consume from oldest offset") flag.BoolVar(&cfg.verbose, "verbose", false, "Switch to verbose logging") - flag.IntVar(&cfg.limit, "limit", 0, "Limit consumer to N messages") flag.BoolVar(&cfg.tls, "tls", false, "Enable TLS connection") + flag.IntVar(&cfg.limit, "limit", 0, "Limit consumer to N messages") // Tool configuration - flag.BoolVar(&cfg.store, "store", false, "Store results to file for analysis") flag.StringVar(&cfg.path, "path", "/tmp/results.json", "Default file to store the results") + flag.BoolVar(&cfg.store, "store", false, "Store results to file for analysis") flag.Parse() @@ -48,5 +50,9 @@ func parseFlags() appConfig { sarama.Logger = log.New(os.Stdout, "[sr-stats DEBUG] ", log.LstdFlags) } + if cfg.tls && cfg.caCert == "" { + log.Fatal("TLS communication was set, but no CA Certificate specified. Please set the --cert flag and try again") + } + return cfg } From a68d5800785d981fd5b09f5220d2cb5f1bcba88a Mon Sep 17 00:00:00 2001 From: Elad Leev Date: Sun, 19 Feb 2023 18:04:25 +0000 Subject: [PATCH 2/2] bump go version --- .github/workflows/build.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml index bcc2b38..b4dd972 100644 --- a/.github/workflows/build.yml +++ b/.github/workflows/build.yml @@ -24,7 +24,7 @@ jobs: needs: gosec strategy: matrix: - go-version: [1.18.x, 1.19.x] + go-version: [1.19.x, 1.20.x] platform: [ubuntu-latest] runs-on: ${{ matrix.platform }} steps: