diff --git a/main.go b/main.go index 687377f..7ea146b 100644 --- a/main.go +++ b/main.go @@ -94,6 +94,8 @@ func (consumer *Consumer) Cleanup(sarama.ConsumerGroupSession) error { for k, v := range consumer.stats.StatMap { if k == "TOTAL" { continue + } else if k == "ERROR" { + defer log.Printf("Unable to decode schema in %v messages. They might be empty, or do not contains any schema.", v) } else { utils.CalcPercentile(k, v, consumedMessages) } @@ -108,6 +110,12 @@ func (consumer *Consumer) ConsumeClaim(session sarama.ConsumerGroupSession, clai for { select { case message := <-claim.Messages(): + if len(message.Value) < 5 { + log.Printf("error encoding message offset: %v\n", message.Offset) + // append error value, using 4294967295 as a dummy value (end of uint32) + utils.CalcStat(consumer.stats, 4294967295, &consumer.consumerLock) + break + } schemaId := binary.BigEndian.Uint32(message.Value[1:5]) utils.CalcStat(consumer.stats, schemaId, &consumer.consumerLock) if consumer.config.store { // lock map, and build result for analysis diff --git a/utils/stats.go b/utils/stats.go index 94d87df..71a8fc1 100644 --- a/utils/stats.go +++ b/utils/stats.go @@ -18,7 +18,7 @@ type ResultStats struct { func CalcPercentile(k string, v, consumedMessages int) { idPerc := math.Round((float64(v) / float64(consumedMessages) * 100)) - c := color.New(color.FgGreen, color.BgWhite) + c := color.New(color.FgGreen) c.Printf("Schema ID %v => %v%%\n", k, idPerc) } @@ -33,6 +33,11 @@ func AppendResult(stat ResultStats, offset int64, schemaId uint32, lock *sync.RW func CalcStat(stat ResultStats, schemaId uint32, lock *sync.RWMutex) { lock.Lock() defer lock.Unlock() + // 4294967295 represents error + if schemaId == 4294967295 { + stat.StatMap["ERROR"] += 1 + return + } stat.StatMap[fmt.Sprint(schemaId)] += 1 stat.StatMap["TOTAL"] += 1 }