Skip to content

Commit

Permalink
Merge pull request #14 from EladLeev/gen-chart
Browse files Browse the repository at this point in the history
feat: generate pie chart from results
  • Loading branch information
EladLeev committed Apr 16, 2023
2 parents e6a8d1a + e615eec commit b8b68c0
Show file tree
Hide file tree
Showing 11 changed files with 246 additions and 41 deletions.
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -13,3 +13,6 @@

# Dependency directories (remove the comment below to include it)
# vendor/

# Don't push pie.html
pie.html
18 changes: 12 additions & 6 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,11 @@ Using this tool, you can consume from a topic, while calculating the percentage
Table of Contents
-----------------

- [schema-registry-statistics](#schema-registry-statistics)
- [Schema-registry-statistics](#schema-registry-statistics)
- [Table of Contents](#table-of-contents)
- [Flags](#flags)
- [Usage](#usage)
- [Generate Pie Chart](#generate-pie-chart)
- [How does it work?](#how-does-it-work)
- [Local testing](#local-testing)
- [License](#license)
Expand Down Expand Up @@ -45,28 +46,33 @@ For further offsets analysis, you can store the results into a JSON file:
| `--version` | The Kafka client version to be used. | | `string` | "2.1.1" |
| `--group` | The consumer group name. | | `string` | schema-stats |
| `--user` | The Kafka username for authentication. | | `string` | "" |
| `--password` | The Kafka password for authentication. | | `string` | "" |
| `--password` | The Kafka authentication password. | | `string` | "" |
| `--tls` | Use TLS communication. | | `bool` | `false` |
| `--cert` | When TLS communication is enabled, specify the path for the CA certificate. | when `tls` | `string` | "" |
| `--store` | Store results into a file. | | `bool` | `false` |
| `--chart` | Generate pie chart from results. | | `bool` | `false` |
| `--path` | If `store` flag is set, the path to store the file. | | `string` | "/tmp/results.json" |
| `--oldest` | Consume from oldest offset. | | `bool` | `true` |
| `--limit` | Limit consumer to X messages, if different than 0. | | `int` | 0 |
| `--verbose` | Raise consumer log level. | | `bool` | `false` |
| `--verbose` | Raise the consumer log level. | | `bool` | `false` |

## Usage
```bash
./schema-registry-statistics --bootstrap kafka1:9092 --group stat-consumer --topic payments-topic --store --path ~/results.json
```
Consume from `payments-topic` of `kafka1` and store the results. The consumer will run until `SIGINT` (`CMD + C`) will be used.

### Generate Pie Chart
By using the `--chart` flag, you can generate an HTML page with a pie chart visualization:
![Pie Chart Example](static/chart.png)

## How does it work?
According the Kafka [wire format](https://docs.confluent.io/platform/current/schema-registry/serdes-develop/index.html#wire-format), has only a couple of components:
According to the Kafka [wire format](https://docs.confluent.io/platform/current/schema-registry/serdes-develop/index.html#wire-format), has only a couple of components:
| Bytes | Area | Description |
| ----- | ---------- | ------------------------------------------------------------------ |
| 0 | Magic Byte | Confluent serialization format version number; currently always 0. |
| 1-4 | Schema ID | 4-byte schema ID as returned by Schema Registry. |
| 5.. | Data | Serialized data for the specified schema format (Avro, Protobuf). |
| 1-4 | Schema ID | 4-byte schema ID as returned by the Schema Registry. |
| 5.. | Data | Serialized data in the specified schema format (Avro, Protobuf). |

The tool leverage this format, and reads the binary format of the each message in order to extract the schema ID and store it.

Expand Down
17 changes: 3 additions & 14 deletions flag.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,18 +8,6 @@ import (
"github.com/Shopify/sarama"
)

type appConfig struct {
bootstrapServers, version, group, topic string
user, password string
tls bool
caCert string
path string
limit int
oldest bool
verbose bool
store bool
}

func parseFlags() appConfig {
cfg := appConfig{}

Expand All @@ -31,14 +19,15 @@ func parseFlags() appConfig {
flag.StringVar(&cfg.user, "user", "", "The Kafka username")
flag.StringVar(&cfg.password, "password", "", "The Kafka password")
flag.StringVar(&cfg.caCert, "cert", "", "The path for the CA certificate")
flag.BoolVar(&cfg.oldest, "oldest", true, "Consume from oldest offset")
flag.BoolVar(&cfg.oldest, "oldest", true, "Consume from the oldest offset available")
flag.BoolVar(&cfg.verbose, "verbose", false, "Switch to verbose logging")
flag.BoolVar(&cfg.tls, "tls", false, "Enable TLS connection")
flag.IntVar(&cfg.limit, "limit", 0, "Limit consumer to N messages")

// Tool configuration
flag.StringVar(&cfg.path, "path", "/tmp/results.json", "Default file to store the results")
flag.BoolVar(&cfg.store, "store", false, "Store results to file for analysis")
flag.BoolVar(&cfg.store, "store", false, "Store results on file for analysis")
flag.BoolVar(&cfg.chart, "chart", false, "Generate a pie chart from the results")

flag.Parse()

Expand Down
7 changes: 7 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,20 @@ go 1.19
require (
github.com/Shopify/sarama v1.37.2
github.com/fatih/color v1.13.0
gotest.tools v2.2.0+incompatible
)

require (
github.com/google/go-cmp v0.5.9 // indirect
github.com/pkg/errors v0.9.1 // indirect
)

require (
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/eapache/go-resiliency v1.3.0 // indirect
github.com/eapache/go-xerial-snappy v0.0.0-20180814174437-776d5712da21 // indirect
github.com/eapache/queue v1.1.0 // indirect
github.com/go-echarts/go-echarts/v2 v2.2.6
github.com/golang/snappy v0.0.4 // indirect
github.com/hashicorp/errwrap v1.0.0 // indirect
github.com/hashicorp/go-multierror v1.1.1 // indirect
Expand Down
14 changes: 14 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
github.com/Shopify/sarama v1.37.2 h1:LoBbU0yJPte0cE5TZCGdlzZRmMgMtZU/XgnUKZg9Cv4=
github.com/Shopify/sarama v1.37.2/go.mod h1:Nxye/E+YPru//Bpaorfhc3JsSGYwCaDDj+R4bK52U5o=
github.com/Shopify/toxiproxy/v2 v2.5.0 h1:i4LPT+qrSlKNtQf5QliVjdP08GyAH8+BUIc9gT0eahc=
github.com/cinar/indicator v1.2.24/go.mod h1:5eX8f1PG9g3RKSoHsoQxKd8bIN97Cf/gbgxXjihROpI=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
Expand All @@ -13,8 +14,12 @@ github.com/eapache/queue v1.1.0/go.mod h1:6eCeP0CKFpHLu8blIFXhExK/dRa7WDZfr6jVFP
github.com/fatih/color v1.13.0 h1:8LOYc1KYPPmyKMuN8QV2DNRWNbLo6LZ0iLs8+mlH53w=
github.com/fatih/color v1.13.0/go.mod h1:kLAiJbzzSOZDVNGyDpeOxJ47H46qBXwg5ILebYFFOfk=
github.com/fortytw2/leaktest v1.3.0 h1:u8491cBMTQ8ft8aeV+adlcytMZylmA5nnwwkRZjI8vw=
github.com/go-echarts/go-echarts/v2 v2.2.6 h1:Gg4SXDxFwi/KzRvBuH6ed89b6bqP4F7ysANDdWiziBY=
github.com/go-echarts/go-echarts/v2 v2.2.6/go.mod h1:IN5P8jIRZKENmAJf2lHXBzv8U9YwdVnY9urdzGkEDA0=
github.com/golang/snappy v0.0.4 h1:yAGX7huGHXlcLOEtBnF4w7FQwA26wojNCwOYAEhLjQM=
github.com/golang/snappy v0.0.4/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
github.com/google/go-cmp v0.5.9 h1:O2Tfq5qg4qc4AmwVlvv0oLiVAGB7enBSJ2x2DqQFi38=
github.com/google/go-cmp v0.5.9/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
github.com/gorilla/securecookie v1.1.1/go.mod h1:ra0sb63/xPlUeL+yeDciTfxMRAA+MP+HVt/4epWDjd4=
github.com/gorilla/sessions v1.2.1/go.mod h1:dk2InVEVJ0sfLlnXv9EAgkf6ecYs/i80K/zI+bUmuGM=
github.com/hashicorp/errwrap v1.0.0 h1:hLrqtEDnRye3+sgx6z4qVLNuviH3MR5aQ0ykNJa/UYA=
Expand All @@ -38,20 +43,26 @@ github.com/jcmturner/rpc/v2 v2.0.3 h1:7FXXj8Ti1IaVFpSAziCZWNzbNuZmnvw/i6CqLNdWfZ
github.com/jcmturner/rpc/v2 v2.0.3/go.mod h1:VUJYCIDm3PVOEHw8sgt091/20OJjskO/YJki3ELg/Hc=
github.com/klauspost/compress v1.15.11 h1:Lcadnb3RKGin4FYM/orgq0qde+nc15E5Cbqg4B9Sx9c=
github.com/klauspost/compress v1.15.11/go.mod h1:QPwzmACJjUTFsnSHH934V6woptycfrDDJnH7hvFVbGM=
github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo=
github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
github.com/mattn/go-colorable v0.1.9 h1:sqDoxXbdeALODt0DAeJCVp38ps9ZogZEAXjus69YV3U=
github.com/mattn/go-colorable v0.1.9/go.mod h1:u6P/XSegPjTcexA+o6vUJrdnUu04hMope9wVRipJSqc=
github.com/mattn/go-isatty v0.0.12/go.mod h1:cbi8OIDigv2wuxKPP5vlRcQ1OAZbq2CE4Kysco4FUpU=
github.com/mattn/go-isatty v0.0.14 h1:yVuAays6BHfxijgZPzw+3Zlu5yQgKGP2/hcQbHb7S9Y=
github.com/mattn/go-isatty v0.0.14/go.mod h1:7GGIvUiUoEMVVmxf/4nioHXj79iQHKdU27kJ6hsGG94=
github.com/pierrec/lz4/v4 v4.1.17 h1:kV4Ip+/hUBC+8T6+2EgburRtkE9ef4nbY3f4dFhGjMc=
github.com/pierrec/lz4/v4 v4.1.17/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4=
github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475 h1:N/ElC8H3+5XpJzTSTfLsJV/mx9Q9g7kxmchpfZyxgzM=
github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw=
github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4=
github.com/stretchr/testify v1.6.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.8.0 h1:pSgiaMZlXftHpm5L7V1+rVB+AZJydKsMxsQBIJw4PKk=
github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU=
Expand Down Expand Up @@ -82,7 +93,10 @@ golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ=
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gotest.tools v2.2.0+incompatible h1:VsBPFP1AI068pPrMxtb/S8Zkgf9xEmTLJjfM+P5UIEo=
gotest.tools v2.2.0+incompatible/go.mod h1:DsYFclhRJ6vuDpmuTbkuFWG+y2sxOXAzmJt81HFBacw=
29 changes: 14 additions & 15 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,9 @@ import (

"github.com/Shopify/sarama"
"github.com/eladleev/schema-registry-statistics/utils"
"github.com/fatih/color"
)

type Consumer struct {
ready chan bool
stats utils.ResultStats
config appConfig
consumerLock sync.RWMutex
}

func main() {
cfg := parseFlags()
log.SetPrefix("[sr-stats] ")
Expand Down Expand Up @@ -91,18 +85,23 @@ func (consumer *Consumer) Setup(sarama.ConsumerGroupSession) error {
func (consumer *Consumer) Cleanup(sarama.ConsumerGroupSession) error {
consumedMessages := consumer.stats.StatMap["TOTAL"]
log.Printf("Total messages consumed: %v\n", consumedMessages)
for k, v := range consumer.stats.StatMap {
if k == "TOTAL" {
continue
} else if k == "ERROR" {
defer log.Printf("Unable to decode schema in %v messages. They might be empty, or do not contains any schema.", v)
} else {
utils.CalcPercentile(k, v, consumedMessages)
}
utils.BuildPercentileMap(consumer.stats.StatMap)

// Print results
for k, v := range utils.PercentileMap {
c := color.New(color.FgGreen)
c.Printf("Schema ID %v => %v%%\n", k, v)
}

// Dump stats to file
if consumer.config.store {
utils.DumpStats(consumer.stats, consumer.config.path)
}

// Build Charts
if consumer.config.chart {
utils.GenChart()
}
return nil
}

Expand Down
Binary file added static/chart.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
27 changes: 27 additions & 0 deletions types.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package main

import (
"sync"

"github.com/eladleev/schema-registry-statistics/utils"
)

type appConfig struct {
bootstrapServers, version, group, topic string
user, password string
tls bool
caCert string
path string
limit int
oldest bool
verbose bool
store bool
chart bool
}

type Consumer struct {
ready chan bool
stats utils.ResultStats
config appConfig
consumerLock sync.RWMutex
}
97 changes: 97 additions & 0 deletions types_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
package main

import (
"sync"
"testing"
"time"

"github.com/eladleev/schema-registry-statistics/utils"
"gotest.tools/assert"
)

func TestConsumerInitialization(t *testing.T) {
config := appConfig{
bootstrapServers: "localhost:9092",
version: "2.6.0",
group: "my-group",
topic: "my-topic",
user: "my-user",
password: "my-password",
tls: true,
caCert: "ca.pem",
path: "/tmp",
limit: 1000,
oldest: true,
verbose: true,
store: true,
chart: true,
}

consumer := Consumer{
ready: make(chan bool),
stats: utils.ResultStats{},
config: config,
consumerLock: sync.RWMutex{},
}

assert.Equal(t, config.bootstrapServers, consumer.config.bootstrapServers)
assert.Equal(t, config.version, consumer.config.version)
assert.Equal(t, config.group, consumer.config.group)
assert.Equal(t, config.topic, consumer.config.topic)
assert.Equal(t, config.user, consumer.config.user)
assert.Equal(t, config.password, consumer.config.password)
assert.Equal(t, config.tls, consumer.config.tls)
assert.Equal(t, config.caCert, consumer.config.caCert)
assert.Equal(t, config.path, consumer.config.path)
assert.Equal(t, config.limit, consumer.config.limit)
assert.Equal(t, config.oldest, consumer.config.oldest)
assert.Equal(t, config.verbose, consumer.config.verbose)
assert.Equal(t, config.store, consumer.config.store)
assert.Equal(t, config.chart, consumer.config.chart)
}

func TestConsumerLocking(t *testing.T) {
config := appConfig{
bootstrapServers: "localhost:9092",
version: "2.6.0",
group: "my-group",
topic: "my-topic",
user: "my-user",
password: "my-password",
tls: true,
caCert: "ca.pem",
path: "/tmp",
limit: 1000,
oldest: true,
verbose: true,
store: true,
chart: true,
}

consumer := Consumer{
ready: make(chan bool),
stats: utils.ResultStats{},
config: config,
consumerLock: sync.RWMutex{},
}

var wg sync.WaitGroup
wg.Add(2)

go func() {
consumer.consumerLock.Lock()
defer consumer.consumerLock.Unlock()

time.Sleep(time.Millisecond * 100)
wg.Done()
}()

go func() {
consumer.consumerLock.Lock()
defer consumer.consumerLock.Unlock()

wg.Done()
}()

wg.Wait()
}
Loading

0 comments on commit b8b68c0

Please sign in to comment.