diff --git a/.gitignore b/.gitignore index 66fd13c..a6a61ef 100644 --- a/.gitignore +++ b/.gitignore @@ -13,3 +13,6 @@ # Dependency directories (remove the comment below to include it) # vendor/ + +# Don't push pie.html +pie.html diff --git a/README.md b/README.md index 9be462b..67985ed 100644 --- a/README.md +++ b/README.md @@ -7,10 +7,11 @@ Using this tool, you can consume from a topic, while calculating the percentage Table of Contents ----------------- -- [schema-registry-statistics](#schema-registry-statistics) +- [Schema-registry-statistics](#schema-registry-statistics) - [Table of Contents](#table-of-contents) - [Flags](#flags) - [Usage](#usage) + - [Generate Pie Chart](#generate-pie-chart) - [How does it work?](#how-does-it-work) - [Local testing](#local-testing) - [License](#license) @@ -45,14 +46,15 @@ For further offsets analysis, you can store the results into a JSON file: | `--version` | The Kafka client version to be used. | | `string` | "2.1.1" | | `--group` | The consumer group name. | | `string` | schema-stats | | `--user` | The Kafka username for authentication. | | `string` | "" | -| `--password` | The Kafka password for authentication. | | `string` | "" | +| `--password` | The Kafka authentication password. | | `string` | "" | | `--tls` | Use TLS communication. | | `bool` | `false` | | `--cert` | When TLS communication is enabled, specify the path for the CA certificate. | when `tls` | `string` | "" | | `--store` | Store results into a file. | | `bool` | `false` | +| `--chart` | Generate pie chart from results. | | `bool` | `false` | | `--path` | If `store` flag is set, the path to store the file. | | `string` | "/tmp/results.json" | | `--oldest` | Consume from oldest offset. | | `bool` | `true` | | `--limit` | Limit consumer to X messages, if different than 0. | | `int` | 0 | -| `--verbose` | Raise consumer log level. | | `bool` | `false` | +| `--verbose` | Raise the consumer log level. | | `bool` | `false` | ## Usage ```bash @@ -60,13 +62,17 @@ For further offsets analysis, you can store the results into a JSON file: ``` Consume from `payments-topic` of `kafka1` and store the results. The consumer will run until `SIGINT` (`CMD + C`) will be used. +### Generate Pie Chart +By using the `--chart` flag, you can generate an HTML page with a pie chart visualization: +![Pie Chart Example](static/chart.png) + ## How does it work? -According the Kafka [wire format](https://docs.confluent.io/platform/current/schema-registry/serdes-develop/index.html#wire-format), has only a couple of components: +According to the Kafka [wire format](https://docs.confluent.io/platform/current/schema-registry/serdes-develop/index.html#wire-format), has only a couple of components: | Bytes | Area | Description | | ----- | ---------- | ------------------------------------------------------------------ | | 0 | Magic Byte | Confluent serialization format version number; currently always 0. | -| 1-4 | Schema ID | 4-byte schema ID as returned by Schema Registry. | -| 5.. | Data | Serialized data for the specified schema format (Avro, Protobuf). | +| 1-4 | Schema ID | 4-byte schema ID as returned by the Schema Registry. | +| 5.. | Data | Serialized data in the specified schema format (Avro, Protobuf). | The tool leverage this format, and reads the binary format of the each message in order to extract the schema ID and store it. diff --git a/flag.go b/flag.go index 54ce5ec..b3b2928 100644 --- a/flag.go +++ b/flag.go @@ -8,18 +8,6 @@ import ( "github.com/Shopify/sarama" ) -type appConfig struct { - bootstrapServers, version, group, topic string - user, password string - tls bool - caCert string - path string - limit int - oldest bool - verbose bool - store bool -} - func parseFlags() appConfig { cfg := appConfig{} @@ -31,14 +19,15 @@ func parseFlags() appConfig { flag.StringVar(&cfg.user, "user", "", "The Kafka username") flag.StringVar(&cfg.password, "password", "", "The Kafka password") flag.StringVar(&cfg.caCert, "cert", "", "The path for the CA certificate") - flag.BoolVar(&cfg.oldest, "oldest", true, "Consume from oldest offset") + flag.BoolVar(&cfg.oldest, "oldest", true, "Consume from the oldest offset available") flag.BoolVar(&cfg.verbose, "verbose", false, "Switch to verbose logging") flag.BoolVar(&cfg.tls, "tls", false, "Enable TLS connection") flag.IntVar(&cfg.limit, "limit", 0, "Limit consumer to N messages") // Tool configuration flag.StringVar(&cfg.path, "path", "/tmp/results.json", "Default file to store the results") - flag.BoolVar(&cfg.store, "store", false, "Store results to file for analysis") + flag.BoolVar(&cfg.store, "store", false, "Store results on file for analysis") + flag.BoolVar(&cfg.chart, "chart", false, "Generate a pie chart from the results") flag.Parse() diff --git a/go.mod b/go.mod index 2d6c024..7130095 100644 --- a/go.mod +++ b/go.mod @@ -5,6 +5,12 @@ go 1.19 require ( github.com/Shopify/sarama v1.37.2 github.com/fatih/color v1.13.0 + gotest.tools v2.2.0+incompatible +) + +require ( + github.com/google/go-cmp v0.5.9 // indirect + github.com/pkg/errors v0.9.1 // indirect ) require ( @@ -12,6 +18,7 @@ require ( github.com/eapache/go-resiliency v1.3.0 // indirect github.com/eapache/go-xerial-snappy v0.0.0-20180814174437-776d5712da21 // indirect github.com/eapache/queue v1.1.0 // indirect + github.com/go-echarts/go-echarts/v2 v2.2.6 github.com/golang/snappy v0.0.4 // indirect github.com/hashicorp/errwrap v1.0.0 // indirect github.com/hashicorp/go-multierror v1.1.1 // indirect diff --git a/go.sum b/go.sum index 4fea154..c6d29f3 100644 --- a/go.sum +++ b/go.sum @@ -1,6 +1,7 @@ github.com/Shopify/sarama v1.37.2 h1:LoBbU0yJPte0cE5TZCGdlzZRmMgMtZU/XgnUKZg9Cv4= github.com/Shopify/sarama v1.37.2/go.mod h1:Nxye/E+YPru//Bpaorfhc3JsSGYwCaDDj+R4bK52U5o= github.com/Shopify/toxiproxy/v2 v2.5.0 h1:i4LPT+qrSlKNtQf5QliVjdP08GyAH8+BUIc9gT0eahc= +github.com/cinar/indicator v1.2.24/go.mod h1:5eX8f1PG9g3RKSoHsoQxKd8bIN97Cf/gbgxXjihROpI= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= @@ -13,8 +14,12 @@ github.com/eapache/queue v1.1.0/go.mod h1:6eCeP0CKFpHLu8blIFXhExK/dRa7WDZfr6jVFP github.com/fatih/color v1.13.0 h1:8LOYc1KYPPmyKMuN8QV2DNRWNbLo6LZ0iLs8+mlH53w= github.com/fatih/color v1.13.0/go.mod h1:kLAiJbzzSOZDVNGyDpeOxJ47H46qBXwg5ILebYFFOfk= github.com/fortytw2/leaktest v1.3.0 h1:u8491cBMTQ8ft8aeV+adlcytMZylmA5nnwwkRZjI8vw= +github.com/go-echarts/go-echarts/v2 v2.2.6 h1:Gg4SXDxFwi/KzRvBuH6ed89b6bqP4F7ysANDdWiziBY= +github.com/go-echarts/go-echarts/v2 v2.2.6/go.mod h1:IN5P8jIRZKENmAJf2lHXBzv8U9YwdVnY9urdzGkEDA0= github.com/golang/snappy v0.0.4 h1:yAGX7huGHXlcLOEtBnF4w7FQwA26wojNCwOYAEhLjQM= github.com/golang/snappy v0.0.4/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= +github.com/google/go-cmp v0.5.9 h1:O2Tfq5qg4qc4AmwVlvv0oLiVAGB7enBSJ2x2DqQFi38= +github.com/google/go-cmp v0.5.9/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= github.com/gorilla/securecookie v1.1.1/go.mod h1:ra0sb63/xPlUeL+yeDciTfxMRAA+MP+HVt/4epWDjd4= github.com/gorilla/sessions v1.2.1/go.mod h1:dk2InVEVJ0sfLlnXv9EAgkf6ecYs/i80K/zI+bUmuGM= github.com/hashicorp/errwrap v1.0.0 h1:hLrqtEDnRye3+sgx6z4qVLNuviH3MR5aQ0ykNJa/UYA= @@ -38,6 +43,9 @@ github.com/jcmturner/rpc/v2 v2.0.3 h1:7FXXj8Ti1IaVFpSAziCZWNzbNuZmnvw/i6CqLNdWfZ github.com/jcmturner/rpc/v2 v2.0.3/go.mod h1:VUJYCIDm3PVOEHw8sgt091/20OJjskO/YJki3ELg/Hc= github.com/klauspost/compress v1.15.11 h1:Lcadnb3RKGin4FYM/orgq0qde+nc15E5Cbqg4B9Sx9c= github.com/klauspost/compress v1.15.11/go.mod h1:QPwzmACJjUTFsnSHH934V6woptycfrDDJnH7hvFVbGM= +github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= +github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= +github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= github.com/mattn/go-colorable v0.1.9 h1:sqDoxXbdeALODt0DAeJCVp38ps9ZogZEAXjus69YV3U= github.com/mattn/go-colorable v0.1.9/go.mod h1:u6P/XSegPjTcexA+o6vUJrdnUu04hMope9wVRipJSqc= github.com/mattn/go-isatty v0.0.12/go.mod h1:cbi8OIDigv2wuxKPP5vlRcQ1OAZbq2CE4Kysco4FUpU= @@ -45,6 +53,8 @@ github.com/mattn/go-isatty v0.0.14 h1:yVuAays6BHfxijgZPzw+3Zlu5yQgKGP2/hcQbHb7S9 github.com/mattn/go-isatty v0.0.14/go.mod h1:7GGIvUiUoEMVVmxf/4nioHXj79iQHKdU27kJ6hsGG94= github.com/pierrec/lz4/v4 v4.1.17 h1:kV4Ip+/hUBC+8T6+2EgburRtkE9ef4nbY3f4dFhGjMc= github.com/pierrec/lz4/v4 v4.1.17/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4= +github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= +github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475 h1:N/ElC8H3+5XpJzTSTfLsJV/mx9Q9g7kxmchpfZyxgzM= @@ -52,6 +62,7 @@ github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475/go.mod h1:bCqn github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4= +github.com/stretchr/testify v1.6.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.8.0 h1:pSgiaMZlXftHpm5L7V1+rVB+AZJydKsMxsQBIJw4PKk= github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= @@ -82,7 +93,10 @@ golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +gotest.tools v2.2.0+incompatible h1:VsBPFP1AI068pPrMxtb/S8Zkgf9xEmTLJjfM+P5UIEo= +gotest.tools v2.2.0+incompatible/go.mod h1:DsYFclhRJ6vuDpmuTbkuFWG+y2sxOXAzmJt81HFBacw= diff --git a/main.go b/main.go index 7ea146b..fb75c23 100644 --- a/main.go +++ b/main.go @@ -12,15 +12,9 @@ import ( "github.com/Shopify/sarama" "github.com/eladleev/schema-registry-statistics/utils" + "github.com/fatih/color" ) -type Consumer struct { - ready chan bool - stats utils.ResultStats - config appConfig - consumerLock sync.RWMutex -} - func main() { cfg := parseFlags() log.SetPrefix("[sr-stats] ") @@ -91,18 +85,23 @@ func (consumer *Consumer) Setup(sarama.ConsumerGroupSession) error { func (consumer *Consumer) Cleanup(sarama.ConsumerGroupSession) error { consumedMessages := consumer.stats.StatMap["TOTAL"] log.Printf("Total messages consumed: %v\n", consumedMessages) - for k, v := range consumer.stats.StatMap { - if k == "TOTAL" { - continue - } else if k == "ERROR" { - defer log.Printf("Unable to decode schema in %v messages. They might be empty, or do not contains any schema.", v) - } else { - utils.CalcPercentile(k, v, consumedMessages) - } + utils.BuildPercentileMap(consumer.stats.StatMap) + + // Print results + for k, v := range utils.PercentileMap { + c := color.New(color.FgGreen) + c.Printf("Schema ID %v => %v%%\n", k, v) } + + // Dump stats to file if consumer.config.store { utils.DumpStats(consumer.stats, consumer.config.path) } + + // Build Charts + if consumer.config.chart { + utils.GenChart() + } return nil } diff --git a/static/chart.png b/static/chart.png new file mode 100644 index 0000000..96b1434 Binary files /dev/null and b/static/chart.png differ diff --git a/types.go b/types.go new file mode 100644 index 0000000..a5048ea --- /dev/null +++ b/types.go @@ -0,0 +1,27 @@ +package main + +import ( + "sync" + + "github.com/eladleev/schema-registry-statistics/utils" +) + +type appConfig struct { + bootstrapServers, version, group, topic string + user, password string + tls bool + caCert string + path string + limit int + oldest bool + verbose bool + store bool + chart bool +} + +type Consumer struct { + ready chan bool + stats utils.ResultStats + config appConfig + consumerLock sync.RWMutex +} diff --git a/types_test.go b/types_test.go new file mode 100644 index 0000000..8e8a5ec --- /dev/null +++ b/types_test.go @@ -0,0 +1,97 @@ +package main + +import ( + "sync" + "testing" + "time" + + "github.com/eladleev/schema-registry-statistics/utils" + "gotest.tools/assert" +) + +func TestConsumerInitialization(t *testing.T) { + config := appConfig{ + bootstrapServers: "localhost:9092", + version: "2.6.0", + group: "my-group", + topic: "my-topic", + user: "my-user", + password: "my-password", + tls: true, + caCert: "ca.pem", + path: "/tmp", + limit: 1000, + oldest: true, + verbose: true, + store: true, + chart: true, + } + + consumer := Consumer{ + ready: make(chan bool), + stats: utils.ResultStats{}, + config: config, + consumerLock: sync.RWMutex{}, + } + + assert.Equal(t, config.bootstrapServers, consumer.config.bootstrapServers) + assert.Equal(t, config.version, consumer.config.version) + assert.Equal(t, config.group, consumer.config.group) + assert.Equal(t, config.topic, consumer.config.topic) + assert.Equal(t, config.user, consumer.config.user) + assert.Equal(t, config.password, consumer.config.password) + assert.Equal(t, config.tls, consumer.config.tls) + assert.Equal(t, config.caCert, consumer.config.caCert) + assert.Equal(t, config.path, consumer.config.path) + assert.Equal(t, config.limit, consumer.config.limit) + assert.Equal(t, config.oldest, consumer.config.oldest) + assert.Equal(t, config.verbose, consumer.config.verbose) + assert.Equal(t, config.store, consumer.config.store) + assert.Equal(t, config.chart, consumer.config.chart) +} + +func TestConsumerLocking(t *testing.T) { + config := appConfig{ + bootstrapServers: "localhost:9092", + version: "2.6.0", + group: "my-group", + topic: "my-topic", + user: "my-user", + password: "my-password", + tls: true, + caCert: "ca.pem", + path: "/tmp", + limit: 1000, + oldest: true, + verbose: true, + store: true, + chart: true, + } + + consumer := Consumer{ + ready: make(chan bool), + stats: utils.ResultStats{}, + config: config, + consumerLock: sync.RWMutex{}, + } + + var wg sync.WaitGroup + wg.Add(2) + + go func() { + consumer.consumerLock.Lock() + defer consumer.consumerLock.Unlock() + + time.Sleep(time.Millisecond * 100) + wg.Done() + }() + + go func() { + consumer.consumerLock.Lock() + defer consumer.consumerLock.Unlock() + + wg.Done() + }() + + wg.Wait() +} diff --git a/utils/chart.go b/utils/chart.go new file mode 100644 index 0000000..db011f4 --- /dev/null +++ b/utils/chart.go @@ -0,0 +1,53 @@ +package utils + +import ( + "fmt" + "io" + "os" + "time" + + "github.com/go-echarts/go-echarts/v2/charts" + "github.com/go-echarts/go-echarts/v2/components" + "github.com/go-echarts/go-echarts/v2/opts" +) + +func generatePieItems(stat map[string]float64) []opts.PieData { + items := make([]opts.PieData, 0) + for k, v := range PercentileMap { + items = append(items, opts.PieData{ + Name: fmt.Sprintf("Schema ID %s", k), + Value: v, + }) + } + return items +} + +func createPieChart() *charts.Pie { + pie := charts.NewPie() + pie.SetGlobalOptions( + charts.WithTitleOpts(opts.Title{ + Title: "Schemas Statistics", + Subtitle: fmt.Sprintf("Snapshot: %s", time.Now().Format(time.RFC822)), + }), + ) + pie.AddSeries("pie", generatePieItems(PercentileMap)). + SetSeriesOptions(charts.WithLabelOpts( + opts.Label{ + Show: true, + Formatter: "{b}: {c}%", + }), + ) + return pie +} + +func GenChart() { + page := components.NewPage() + page.AddCharts( + createPieChart(), + ) + f, err := os.Create("pie.html") + if err != nil { + panic(err) + } + page.Render(io.MultiWriter(f)) +} diff --git a/utils/stats.go b/utils/stats.go index 71a8fc1..a993a8d 100644 --- a/utils/stats.go +++ b/utils/stats.go @@ -7,19 +7,29 @@ import ( "math" "os" "sync" - - "github.com/fatih/color" ) +var PercentileMap = map[string]float64{} + type ResultStats struct { StatMap map[string]int ResultStore map[uint32][]int } -func CalcPercentile(k string, v, consumedMessages int) { - idPerc := math.Round((float64(v) / float64(consumedMessages) * 100)) - c := color.New(color.FgGreen) - c.Printf("Schema ID %v => %v%%\n", k, idPerc) +func calc(schema, total int) float64 { + return math.Round((float64(schema) / float64(total) * 100)) +} + +func BuildPercentileMap(s map[string]int) { + for k, v := range s { + if k == "TOTAL" { + continue + } else if k == "ERROR" { + defer log.Printf("Unable to decode schema in %v messages. They might be empty, or do not contains any schema.", v) + } else { + PercentileMap[k] = calc(v, s["TOTAL"]) + } + } } // AppendResult will map the results to a storeable map