Skip to content

Commit

Permalink
Merge pull request #13 from EladLeev/fix-slice-bound
Browse files Browse the repository at this point in the history
Fix slice bound
  • Loading branch information
EladLeev committed Apr 6, 2023
2 parents f432b14 + 0df01ef commit e6a8d1a
Show file tree
Hide file tree
Showing 2 changed files with 14 additions and 1 deletion.
8 changes: 8 additions & 0 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,8 @@ func (consumer *Consumer) Cleanup(sarama.ConsumerGroupSession) error {
for k, v := range consumer.stats.StatMap {
if k == "TOTAL" {
continue
} else if k == "ERROR" {
defer log.Printf("Unable to decode schema in %v messages. They might be empty, or do not contains any schema.", v)
} else {
utils.CalcPercentile(k, v, consumedMessages)
}
Expand All @@ -108,6 +110,12 @@ func (consumer *Consumer) ConsumeClaim(session sarama.ConsumerGroupSession, clai
for {
select {
case message := <-claim.Messages():
if len(message.Value) < 5 {
log.Printf("error encoding message offset: %v\n", message.Offset)
// append error value, using 4294967295 as a dummy value (end of uint32)
utils.CalcStat(consumer.stats, 4294967295, &consumer.consumerLock)
break
}
schemaId := binary.BigEndian.Uint32(message.Value[1:5])
utils.CalcStat(consumer.stats, schemaId, &consumer.consumerLock)
if consumer.config.store { // lock map, and build result for analysis
Expand Down
7 changes: 6 additions & 1 deletion utils/stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ type ResultStats struct {

func CalcPercentile(k string, v, consumedMessages int) {
idPerc := math.Round((float64(v) / float64(consumedMessages) * 100))
c := color.New(color.FgGreen, color.BgWhite)
c := color.New(color.FgGreen)
c.Printf("Schema ID %v => %v%%\n", k, idPerc)
}

Expand All @@ -33,6 +33,11 @@ func AppendResult(stat ResultStats, offset int64, schemaId uint32, lock *sync.RW
func CalcStat(stat ResultStats, schemaId uint32, lock *sync.RWMutex) {
lock.Lock()
defer lock.Unlock()
// 4294967295 represents error
if schemaId == 4294967295 {
stat.StatMap["ERROR"] += 1
return
}
stat.StatMap[fmt.Sprint(schemaId)] += 1
stat.StatMap["TOTAL"] += 1
}
Expand Down

0 comments on commit e6a8d1a

Please sign in to comment.