Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: generate pie chart from results #14

Merged
merged 2 commits into from
Apr 16, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -13,3 +13,6 @@

# Dependency directories (remove the comment below to include it)
# vendor/

# Don't push pie.html
pie.html
18 changes: 12 additions & 6 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,11 @@ Using this tool, you can consume from a topic, while calculating the percentage
Table of Contents
-----------------

- [schema-registry-statistics](#schema-registry-statistics)
- [Schema-registry-statistics](#schema-registry-statistics)
- [Table of Contents](#table-of-contents)
- [Flags](#flags)
- [Usage](#usage)
- [Generate Pie Chart](#generate-pie-chart)
- [How does it work?](#how-does-it-work)
- [Local testing](#local-testing)
- [License](#license)
Expand Down Expand Up @@ -45,28 +46,33 @@ For further offsets analysis, you can store the results into a JSON file:
| `--version` | The Kafka client version to be used. | | `string` | "2.1.1" |
| `--group` | The consumer group name. | | `string` | schema-stats |
| `--user` | The Kafka username for authentication. | | `string` | "" |
| `--password` | The Kafka password for authentication. | | `string` | "" |
| `--password` | The Kafka authentication password. | | `string` | "" |
| `--tls` | Use TLS communication. | | `bool` | `false` |
| `--cert` | When TLS communication is enabled, specify the path for the CA certificate. | when `tls` | `string` | "" |
| `--store` | Store results into a file. | | `bool` | `false` |
| `--chart` | Generate pie chart from results. | | `bool` | `false` |
| `--path` | If `store` flag is set, the path to store the file. | | `string` | "/tmp/results.json" |
| `--oldest` | Consume from oldest offset. | | `bool` | `true` |
| `--limit` | Limit consumer to X messages, if different than 0. | | `int` | 0 |
| `--verbose` | Raise consumer log level. | | `bool` | `false` |
| `--verbose` | Raise the consumer log level. | | `bool` | `false` |

## Usage
```bash
./schema-registry-statistics --bootstrap kafka1:9092 --group stat-consumer --topic payments-topic --store --path ~/results.json
```
Consume from `payments-topic` of `kafka1` and store the results. The consumer will run until `SIGINT` (`CMD + C`) will be used.

### Generate Pie Chart
By using the `--chart` flag, you can generate an HTML page with a pie chart visualization:
![Pie Chart Example](static/chart.png)

## How does it work?
According the Kafka [wire format](https://docs.confluent.io/platform/current/schema-registry/serdes-develop/index.html#wire-format), has only a couple of components:
According to the Kafka [wire format](https://docs.confluent.io/platform/current/schema-registry/serdes-develop/index.html#wire-format), has only a couple of components:
| Bytes | Area | Description |
| ----- | ---------- | ------------------------------------------------------------------ |
| 0 | Magic Byte | Confluent serialization format version number; currently always 0. |
| 1-4 | Schema ID | 4-byte schema ID as returned by Schema Registry. |
| 5.. | Data | Serialized data for the specified schema format (Avro, Protobuf). |
| 1-4 | Schema ID | 4-byte schema ID as returned by the Schema Registry. |
| 5.. | Data | Serialized data in the specified schema format (Avro, Protobuf). |

The tool leverage this format, and reads the binary format of the each message in order to extract the schema ID and store it.

Expand Down
17 changes: 3 additions & 14 deletions flag.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,18 +8,6 @@ import (
"github.com/Shopify/sarama"
)

type appConfig struct {
bootstrapServers, version, group, topic string
user, password string
tls bool
caCert string
path string
limit int
oldest bool
verbose bool
store bool
}

func parseFlags() appConfig {
cfg := appConfig{}

Expand All @@ -31,14 +19,15 @@ func parseFlags() appConfig {
flag.StringVar(&cfg.user, "user", "", "The Kafka username")
flag.StringVar(&cfg.password, "password", "", "The Kafka password")
flag.StringVar(&cfg.caCert, "cert", "", "The path for the CA certificate")
flag.BoolVar(&cfg.oldest, "oldest", true, "Consume from oldest offset")
flag.BoolVar(&cfg.oldest, "oldest", true, "Consume from the oldest offset available")
flag.BoolVar(&cfg.verbose, "verbose", false, "Switch to verbose logging")
flag.BoolVar(&cfg.tls, "tls", false, "Enable TLS connection")
flag.IntVar(&cfg.limit, "limit", 0, "Limit consumer to N messages")

// Tool configuration
flag.StringVar(&cfg.path, "path", "/tmp/results.json", "Default file to store the results")
flag.BoolVar(&cfg.store, "store", false, "Store results to file for analysis")
flag.BoolVar(&cfg.store, "store", false, "Store results on file for analysis")
flag.BoolVar(&cfg.chart, "chart", false, "Generate a pie chart from the results")

flag.Parse()

Expand Down
7 changes: 7 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,20 @@ go 1.19
require (
github.com/Shopify/sarama v1.37.2
github.com/fatih/color v1.13.0
gotest.tools v2.2.0+incompatible
)

require (
github.com/google/go-cmp v0.5.9 // indirect
github.com/pkg/errors v0.9.1 // indirect
)

require (
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/eapache/go-resiliency v1.3.0 // indirect
github.com/eapache/go-xerial-snappy v0.0.0-20180814174437-776d5712da21 // indirect
github.com/eapache/queue v1.1.0 // indirect
github.com/go-echarts/go-echarts/v2 v2.2.6
github.com/golang/snappy v0.0.4 // indirect
github.com/hashicorp/errwrap v1.0.0 // indirect
github.com/hashicorp/go-multierror v1.1.1 // indirect
Expand Down
14 changes: 14 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
github.com/Shopify/sarama v1.37.2 h1:LoBbU0yJPte0cE5TZCGdlzZRmMgMtZU/XgnUKZg9Cv4=
github.com/Shopify/sarama v1.37.2/go.mod h1:Nxye/E+YPru//Bpaorfhc3JsSGYwCaDDj+R4bK52U5o=
github.com/Shopify/toxiproxy/v2 v2.5.0 h1:i4LPT+qrSlKNtQf5QliVjdP08GyAH8+BUIc9gT0eahc=
github.com/cinar/indicator v1.2.24/go.mod h1:5eX8f1PG9g3RKSoHsoQxKd8bIN97Cf/gbgxXjihROpI=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
Expand All @@ -13,8 +14,12 @@ github.com/eapache/queue v1.1.0/go.mod h1:6eCeP0CKFpHLu8blIFXhExK/dRa7WDZfr6jVFP
github.com/fatih/color v1.13.0 h1:8LOYc1KYPPmyKMuN8QV2DNRWNbLo6LZ0iLs8+mlH53w=
github.com/fatih/color v1.13.0/go.mod h1:kLAiJbzzSOZDVNGyDpeOxJ47H46qBXwg5ILebYFFOfk=
github.com/fortytw2/leaktest v1.3.0 h1:u8491cBMTQ8ft8aeV+adlcytMZylmA5nnwwkRZjI8vw=
github.com/go-echarts/go-echarts/v2 v2.2.6 h1:Gg4SXDxFwi/KzRvBuH6ed89b6bqP4F7ysANDdWiziBY=
github.com/go-echarts/go-echarts/v2 v2.2.6/go.mod h1:IN5P8jIRZKENmAJf2lHXBzv8U9YwdVnY9urdzGkEDA0=
github.com/golang/snappy v0.0.4 h1:yAGX7huGHXlcLOEtBnF4w7FQwA26wojNCwOYAEhLjQM=
github.com/golang/snappy v0.0.4/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
github.com/google/go-cmp v0.5.9 h1:O2Tfq5qg4qc4AmwVlvv0oLiVAGB7enBSJ2x2DqQFi38=
github.com/google/go-cmp v0.5.9/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
github.com/gorilla/securecookie v1.1.1/go.mod h1:ra0sb63/xPlUeL+yeDciTfxMRAA+MP+HVt/4epWDjd4=
github.com/gorilla/sessions v1.2.1/go.mod h1:dk2InVEVJ0sfLlnXv9EAgkf6ecYs/i80K/zI+bUmuGM=
github.com/hashicorp/errwrap v1.0.0 h1:hLrqtEDnRye3+sgx6z4qVLNuviH3MR5aQ0ykNJa/UYA=
Expand All @@ -38,20 +43,26 @@ github.com/jcmturner/rpc/v2 v2.0.3 h1:7FXXj8Ti1IaVFpSAziCZWNzbNuZmnvw/i6CqLNdWfZ
github.com/jcmturner/rpc/v2 v2.0.3/go.mod h1:VUJYCIDm3PVOEHw8sgt091/20OJjskO/YJki3ELg/Hc=
github.com/klauspost/compress v1.15.11 h1:Lcadnb3RKGin4FYM/orgq0qde+nc15E5Cbqg4B9Sx9c=
github.com/klauspost/compress v1.15.11/go.mod h1:QPwzmACJjUTFsnSHH934V6woptycfrDDJnH7hvFVbGM=
github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo=
github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
github.com/mattn/go-colorable v0.1.9 h1:sqDoxXbdeALODt0DAeJCVp38ps9ZogZEAXjus69YV3U=
github.com/mattn/go-colorable v0.1.9/go.mod h1:u6P/XSegPjTcexA+o6vUJrdnUu04hMope9wVRipJSqc=
github.com/mattn/go-isatty v0.0.12/go.mod h1:cbi8OIDigv2wuxKPP5vlRcQ1OAZbq2CE4Kysco4FUpU=
github.com/mattn/go-isatty v0.0.14 h1:yVuAays6BHfxijgZPzw+3Zlu5yQgKGP2/hcQbHb7S9Y=
github.com/mattn/go-isatty v0.0.14/go.mod h1:7GGIvUiUoEMVVmxf/4nioHXj79iQHKdU27kJ6hsGG94=
github.com/pierrec/lz4/v4 v4.1.17 h1:kV4Ip+/hUBC+8T6+2EgburRtkE9ef4nbY3f4dFhGjMc=
github.com/pierrec/lz4/v4 v4.1.17/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4=
github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475 h1:N/ElC8H3+5XpJzTSTfLsJV/mx9Q9g7kxmchpfZyxgzM=
github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw=
github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4=
github.com/stretchr/testify v1.6.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.8.0 h1:pSgiaMZlXftHpm5L7V1+rVB+AZJydKsMxsQBIJw4PKk=
github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU=
Expand Down Expand Up @@ -82,7 +93,10 @@ golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ=
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gotest.tools v2.2.0+incompatible h1:VsBPFP1AI068pPrMxtb/S8Zkgf9xEmTLJjfM+P5UIEo=
gotest.tools v2.2.0+incompatible/go.mod h1:DsYFclhRJ6vuDpmuTbkuFWG+y2sxOXAzmJt81HFBacw=
29 changes: 14 additions & 15 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,9 @@ import (

"github.com/Shopify/sarama"
"github.com/eladleev/schema-registry-statistics/utils"
"github.com/fatih/color"
)

type Consumer struct {
ready chan bool
stats utils.ResultStats
config appConfig
consumerLock sync.RWMutex
}

func main() {
cfg := parseFlags()
log.SetPrefix("[sr-stats] ")
Expand Down Expand Up @@ -91,18 +85,23 @@ func (consumer *Consumer) Setup(sarama.ConsumerGroupSession) error {
func (consumer *Consumer) Cleanup(sarama.ConsumerGroupSession) error {
consumedMessages := consumer.stats.StatMap["TOTAL"]
log.Printf("Total messages consumed: %v\n", consumedMessages)
for k, v := range consumer.stats.StatMap {
if k == "TOTAL" {
continue
} else if k == "ERROR" {
defer log.Printf("Unable to decode schema in %v messages. They might be empty, or do not contains any schema.", v)
} else {
utils.CalcPercentile(k, v, consumedMessages)
}
utils.BuildPercentileMap(consumer.stats.StatMap)

// Print results
for k, v := range utils.PercentileMap {
c := color.New(color.FgGreen)
c.Printf("Schema ID %v => %v%%\n", k, v)
}

// Dump stats to file
if consumer.config.store {
utils.DumpStats(consumer.stats, consumer.config.path)
}

// Build Charts
if consumer.config.chart {
utils.GenChart()
}
return nil
}

Expand Down
Binary file added static/chart.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
27 changes: 27 additions & 0 deletions types.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package main

import (
"sync"

"github.com/eladleev/schema-registry-statistics/utils"
)

type appConfig struct {
bootstrapServers, version, group, topic string
user, password string
tls bool
caCert string
path string
limit int
oldest bool
verbose bool
store bool
chart bool
}

type Consumer struct {
ready chan bool
stats utils.ResultStats
config appConfig
consumerLock sync.RWMutex
}
97 changes: 97 additions & 0 deletions types_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
package main

import (
"sync"
"testing"
"time"

"github.com/eladleev/schema-registry-statistics/utils"
"gotest.tools/assert"
)

func TestConsumerInitialization(t *testing.T) {
config := appConfig{
bootstrapServers: "localhost:9092",
version: "2.6.0",
group: "my-group",
topic: "my-topic",
user: "my-user",
password: "my-password",
tls: true,
caCert: "ca.pem",
path: "/tmp",
limit: 1000,
oldest: true,
verbose: true,
store: true,
chart: true,
}

consumer := Consumer{
ready: make(chan bool),
stats: utils.ResultStats{},
config: config,
consumerLock: sync.RWMutex{},
}

assert.Equal(t, config.bootstrapServers, consumer.config.bootstrapServers)
assert.Equal(t, config.version, consumer.config.version)
assert.Equal(t, config.group, consumer.config.group)
assert.Equal(t, config.topic, consumer.config.topic)
assert.Equal(t, config.user, consumer.config.user)
assert.Equal(t, config.password, consumer.config.password)
assert.Equal(t, config.tls, consumer.config.tls)
assert.Equal(t, config.caCert, consumer.config.caCert)
assert.Equal(t, config.path, consumer.config.path)
assert.Equal(t, config.limit, consumer.config.limit)
assert.Equal(t, config.oldest, consumer.config.oldest)
assert.Equal(t, config.verbose, consumer.config.verbose)
assert.Equal(t, config.store, consumer.config.store)
assert.Equal(t, config.chart, consumer.config.chart)
}

func TestConsumerLocking(t *testing.T) {
config := appConfig{
bootstrapServers: "localhost:9092",
version: "2.6.0",
group: "my-group",
topic: "my-topic",
user: "my-user",
password: "my-password",
tls: true,
caCert: "ca.pem",
path: "/tmp",
limit: 1000,
oldest: true,
verbose: true,
store: true,
chart: true,
}

consumer := Consumer{
ready: make(chan bool),
stats: utils.ResultStats{},
config: config,
consumerLock: sync.RWMutex{},
}

var wg sync.WaitGroup
wg.Add(2)

go func() {
consumer.consumerLock.Lock()
defer consumer.consumerLock.Unlock()

time.Sleep(time.Millisecond * 100)
wg.Done()
}()

go func() {
consumer.consumerLock.Lock()
defer consumer.consumerLock.Unlock()

wg.Done()
}()

wg.Wait()
}
Loading