From 947ba42e961bf3a79e2b1e6a3b959af8abcbc904 Mon Sep 17 00:00:00 2001 From: Elad Leev Date: Thu, 6 Apr 2023 18:01:51 +0100 Subject: [PATCH 1/3] handle empty message --- main.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/main.go b/main.go index 687377f..d51cd91 100644 --- a/main.go +++ b/main.go @@ -108,6 +108,10 @@ func (consumer *Consumer) ConsumeClaim(session sarama.ConsumerGroupSession, clai for { select { case message := <-claim.Messages(): + if len(message.Value) < 5 { + log.Printf("error encoding message offset: %v\n", message.Offset) + return nil + } schemaId := binary.BigEndian.Uint32(message.Value[1:5]) utils.CalcStat(consumer.stats, schemaId, &consumer.consumerLock) if consumer.config.store { // lock map, and build result for analysis From 29259d8e38ffcb7c80e01d23f5cee8a25d271cc4 Mon Sep 17 00:00:00 2001 From: Elad Leev Date: Thu, 6 Apr 2023 19:27:48 +0100 Subject: [PATCH 2/3] print info about errors --- main.go | 6 +++++- scripts/avro_producer.py | 8 ++++++-- utils/stats.go | 7 ++++++- 3 files changed, 17 insertions(+), 4 deletions(-) diff --git a/main.go b/main.go index d51cd91..7ea146b 100644 --- a/main.go +++ b/main.go @@ -94,6 +94,8 @@ func (consumer *Consumer) Cleanup(sarama.ConsumerGroupSession) error { for k, v := range consumer.stats.StatMap { if k == "TOTAL" { continue + } else if k == "ERROR" { + defer log.Printf("Unable to decode schema in %v messages. They might be empty, or do not contains any schema.", v) } else { utils.CalcPercentile(k, v, consumedMessages) } @@ -110,7 +112,9 @@ func (consumer *Consumer) ConsumeClaim(session sarama.ConsumerGroupSession, clai case message := <-claim.Messages(): if len(message.Value) < 5 { log.Printf("error encoding message offset: %v\n", message.Offset) - return nil + // append error value, using 4294967295 as a dummy value (end of uint32) + utils.CalcStat(consumer.stats, 4294967295, &consumer.consumerLock) + break } schemaId := binary.BigEndian.Uint32(message.Value[1:5]) utils.CalcStat(consumer.stats, schemaId, &consumer.consumerLock) diff --git a/scripts/avro_producer.py b/scripts/avro_producer.py index e156959..b4465c3 100644 --- a/scripts/avro_producer.py +++ b/scripts/avro_producer.py @@ -22,7 +22,9 @@ "fields": [ {"name": "id", "type": "string"}, {"name": "amount", "type": "double"}, - {"name": "name", "type": "string"} + {"name": "name", "type": "string"}, + {"name": "asd", "type": "string", "default": ""}, + {"name": "asd2", "type": "string", "default": ""} ] } """ @@ -38,5 +40,7 @@ avroProducer.produce(topic='payments-topic', value={"id": "transact_%s" % i, "amount": random.uniform(10, 500), - "name": "customer_%s" % i, }) + "name": "customer_%s" % i, + "asd": "asdasdasdasd", + "asd2": "asdasdasdasd",}) avroProducer.flush() diff --git a/utils/stats.go b/utils/stats.go index 94d87df..71a8fc1 100644 --- a/utils/stats.go +++ b/utils/stats.go @@ -18,7 +18,7 @@ type ResultStats struct { func CalcPercentile(k string, v, consumedMessages int) { idPerc := math.Round((float64(v) / float64(consumedMessages) * 100)) - c := color.New(color.FgGreen, color.BgWhite) + c := color.New(color.FgGreen) c.Printf("Schema ID %v => %v%%\n", k, idPerc) } @@ -33,6 +33,11 @@ func AppendResult(stat ResultStats, offset int64, schemaId uint32, lock *sync.RW func CalcStat(stat ResultStats, schemaId uint32, lock *sync.RWMutex) { lock.Lock() defer lock.Unlock() + // 4294967295 represents error + if schemaId == 4294967295 { + stat.StatMap["ERROR"] += 1 + return + } stat.StatMap[fmt.Sprint(schemaId)] += 1 stat.StatMap["TOTAL"] += 1 } From 0df01ef9aef8f542271f3f251511f4cc021c27d8 Mon Sep 17 00:00:00 2001 From: Elad Leev Date: Thu, 6 Apr 2023 19:34:48 +0100 Subject: [PATCH 3/3] revert avro_producer changes --- scripts/avro_producer.py | 8 ++------ 1 file changed, 2 insertions(+), 6 deletions(-) diff --git a/scripts/avro_producer.py b/scripts/avro_producer.py index b4465c3..e156959 100644 --- a/scripts/avro_producer.py +++ b/scripts/avro_producer.py @@ -22,9 +22,7 @@ "fields": [ {"name": "id", "type": "string"}, {"name": "amount", "type": "double"}, - {"name": "name", "type": "string"}, - {"name": "asd", "type": "string", "default": ""}, - {"name": "asd2", "type": "string", "default": ""} + {"name": "name", "type": "string"} ] } """ @@ -40,7 +38,5 @@ avroProducer.produce(topic='payments-topic', value={"id": "transact_%s" % i, "amount": random.uniform(10, 500), - "name": "customer_%s" % i, - "asd": "asdasdasdasd", - "asd2": "asdasdasdasd",}) + "name": "customer_%s" % i, }) avroProducer.flush()