Skip to content

Commit

Permalink
Merge pull request #4 from EladLeev/ca-certs-support
Browse files Browse the repository at this point in the history
Add CA cert support, bump go version
  • Loading branch information
EladLeev committed Feb 19, 2023
2 parents a89dd7b + a68d580 commit 7ae108b
Show file tree
Hide file tree
Showing 5 changed files with 89 additions and 17 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ jobs:
needs: gosec
strategy:
matrix:
go-version: [1.18.x, 1.19.x]
go-version: [1.19.x, 1.20.x]
platform: [ubuntu-latest]
runs-on: ${{ matrix.platform }}
steps:
Expand Down
29 changes: 15 additions & 14 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -36,20 +36,21 @@ For further offsets analysis, you can store the results into a JSON file:
```

## Flags
| Name | Description | Require | Type | default |
| ------------- | --------------------------------------------------- | ------- | -------- | ------------------- |
| `--bootstrap` | The Kafka bootstrap servers. | `V` | `string` | "localhost:9092" |
| `--topic` | The topic name to consume | `V` | `string` | "" |
| `--version` | The Kafka client version to be used. | | `string` | "2.1.1" |
| `--group` | The consumer group name. | | `string` | schema-stats |
| `--user` | The Kafka username for authentication | | `string` | "" |
| `--password` | The Kafka password for authentication | | `string` | "" |
| `--tls` | Use TLS communication. | | `bool` | `false` |
| `--store` | Store results into a file. | | `bool` | `false` |
| `--path` | If `store` flag is set, the path to store the file. | | `string` | "/tmp/results.json" |
| `--oldest` | Consume from oldest offset. | | `bool` | `true` |
| `--limit` | Limit consumer to X messages, if different than 0. | | `int` | 0 |
| `--verbose` | Raise consumer log level. | | `bool` | `false` |
| Name | Description | Require | Type | default |
| ------------- | --------------------------------------------------------------------------- | ---------- | -------- | ------------------- |
| `--bootstrap` | The Kafka bootstrap servers. | `V` | `string` | "localhost:9092" |
| `--topic` | The topic name to consume. | `V` | `string` | "" |
| `--version` | The Kafka client version to be used. | | `string` | "2.1.1" |
| `--group` | The consumer group name. | | `string` | schema-stats |
| `--user` | The Kafka username for authentication. | | `string` | "" |
| `--password` | The Kafka password for authentication. | | `string` | "" |
| `--tls` | Use TLS communication. | | `bool` | `false` |
| `--cert` | When TLS communication is enabled, specify the path for the CA certificate. | when `tls` | `string` | "" |
| `--store` | Store results into a file. | | `bool` | `false` |
| `--path` | If `store` flag is set, the path to store the file. | | `string` | "/tmp/results.json" |
| `--oldest` | Consume from oldest offset. | | `bool` | `true` |
| `--limit` | Limit consumer to X messages, if different than 0. | | `int` | 0 |
| `--verbose` | Raise consumer log level. | | `bool` | `false` |

## Usage
```bash
Expand Down
21 changes: 21 additions & 0 deletions config.go
Original file line number Diff line number Diff line change
@@ -1,15 +1,36 @@
package main

import (
"crypto/tls"
"crypto/x509"
"os"
"path/filepath"

"github.com/Shopify/sarama"
)

func loadKey(caFile string) *tls.Config {
caCert, err := os.ReadFile(filepath.Clean(caFile))
if err != nil {
panic(err)
}
caCertPool := x509.NewCertPool()
caCertPool.AppendCertsFromPEM(caCert)
tlsConfig := &tls.Config{
MinVersion: tls.VersionTLS12,
RootCAs: caCertPool,
}
return tlsConfig
}

func setConfig(kafkaVersion sarama.KafkaVersion, cfg appConfig) *sarama.Config {
config := sarama.NewConfig()
config.Version = kafkaVersion

if cfg.tls {
config.Net.TLS.Enable = true
tlsCfg := loadKey(cfg.caCert)
config.Net.TLS.Config = tlsCfg
}

if cfg.user != "" && cfg.password != "" {
Expand Down
44 changes: 44 additions & 0 deletions config_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
package main

import (
"crypto/x509"
"os"
"testing"
)

func TestLoadKey(t *testing.T) {
caCertFile, err := os.CreateTemp("", "cacert*.pem")
if err != nil {
t.Fatalf("failed to create temporary file: %s", err)
}
defer os.Remove(caCertFile.Name())

caCertData := []byte(`
-----BEGIN CERTIFICATE-----
MIIBwjCCAWugAwIBAgIUTvn+Zc80K0mGpMAoGCCqGSM49BAMCMFMxKTAnBgNV
BAMMIDFwcGxlIENlcnRpZmljYXRlIEF1dGhvcml0eTAeFw0yMjAyMTcxMDU3
MTlaFw0yMjAzMTcxMDU3MTlaMFMxKTAnBgNVBAMMIDFwcGxlIENlcnRpZmlj
YXRlIEF1dGhvcml0eTBZMBMGByqGSM49AgEGCCqGSM49AwEHA0IABLh6FJzh
Kj/ISL9Rbhg0N14O/r5WWqNw4euBJNzPbNybc+n4ebNkMttcV6U9az6POoyG
Ucky6hGz2jBBRGaUuV6jUDBOMB0GA1UdDgQWBBQ2Wpqw4q3iG4nJZc+uM7N/
Y4qr4DAOBgNVHQ8BAf8EBAMCAQYwHQYDVR0lBBYwFAYIKwYBBQUHAwEGCCsG
AQUFBwMCMCAGA1UdEQQZMBeCFGFwaWtleS5jb22CCnN1Ym1pdC5jb20wCgYI
KoZIzj0EAwIDSAAwRQIgZH1OqzW8NfBvZHXrNmlT0TtIJ0QQs+z7E2N1blSC
X/0CIQDKuZwuAQzS1aA90xSgbbVi/TuV7Yj4l4uV7lRGkW8HvA==
-----END CERTIFICATE-----
`)
if _, err := caCertFile.Write(caCertData); err != nil {
t.Fatalf("failed to write to temporary file: %s", err)
}

tlsConfig := loadKey(caCertFile.Name())

if tlsConfig.RootCAs == nil {
t.Error("RootCAs is nil")
}

if !tlsConfig.RootCAs.Equal(x509.NewCertPool()) {
t.Error("something is wrong with the RootCAs")
}

}
10 changes: 8 additions & 2 deletions flag.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ type appConfig struct {
bootstrapServers, version, group, topic string
user, password string
tls bool
caCert string
path string
limit int
oldest bool
Expand All @@ -29,14 +30,15 @@ func parseFlags() appConfig {
flag.StringVar(&cfg.topic, "topic", "", "The Kafka topic to consume from")
flag.StringVar(&cfg.user, "user", "", "The Kafka username")
flag.StringVar(&cfg.password, "password", "", "The Kafka password")
flag.StringVar(&cfg.caCert, "cert", "", "The path for the CA certificate")
flag.BoolVar(&cfg.oldest, "oldest", true, "Consume from oldest offset")
flag.BoolVar(&cfg.verbose, "verbose", false, "Switch to verbose logging")
flag.IntVar(&cfg.limit, "limit", 0, "Limit consumer to N messages")
flag.BoolVar(&cfg.tls, "tls", false, "Enable TLS connection")
flag.IntVar(&cfg.limit, "limit", 0, "Limit consumer to N messages")

// Tool configuration
flag.BoolVar(&cfg.store, "store", false, "Store results to file for analysis")
flag.StringVar(&cfg.path, "path", "/tmp/results.json", "Default file to store the results")
flag.BoolVar(&cfg.store, "store", false, "Store results to file for analysis")

flag.Parse()

Expand All @@ -48,5 +50,9 @@ func parseFlags() appConfig {
sarama.Logger = log.New(os.Stdout, "[sr-stats DEBUG] ", log.LstdFlags)
}

if cfg.tls && cfg.caCert == "" {
log.Fatal("TLS communication was set, but no CA Certificate specified. Please set the --cert flag and try again")
}

return cfg
}

0 comments on commit 7ae108b

Please sign in to comment.