Skip to content

Commit

Permalink
Merge pull request #9 from EladLeev/fix-race-bug
Browse files Browse the repository at this point in the history
Make stats map thread safe
  • Loading branch information
EladLeev committed Mar 5, 2023
2 parents 6ead707 + 778e3a3 commit f432b14
Show file tree
Hide file tree
Showing 8 changed files with 75 additions and 17 deletions.
13 changes: 12 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -23,5 +23,16 @@ dep:
tidy:
go mod tidy

vet:
safe:
go vet
go test -race .
go build -race .

test_race:
go run -race . --bootstrap "localhost:9092" \
--topic "payments-topic" \
--group "TEST_GROUP" \
--tls --cert "ca.pem" \
--user "USERNAME" \
--password "PASSWORD" \
--oldest --verbose
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -28,5 +28,6 @@ require (
github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475 // indirect
golang.org/x/crypto v0.0.0-20220722155217-630584e8d5aa // indirect
golang.org/x/net v0.7.0 // indirect
golang.org/x/sync v0.1.0 // indirect
golang.org/x/sys v0.5.0 // indirect
)
3 changes: 2 additions & 1 deletion go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,8 @@ golang.org/x/net v0.0.0-20211112202133-69e39bad7dc2/go.mod h1:9nx3DQGgdP8bBQD5qx
golang.org/x/net v0.0.0-20220725212005-46097bf591d3/go.mod h1:AaygXjzTFtRAg2ttMY5RMuhpJ3cNnI0XpyFJD1iQRSM=
golang.org/x/net v0.7.0 h1:rJrUqqhjsgNp7KqAIc25s9pZnjU7TUcSY7HcVZjdn1g=
golang.org/x/net v0.7.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs=
golang.org/x/sync v0.0.0-20220923202941-7f9b1623fab7 h1:ZrnxWX62AgTKOSagEqxvb3ffipvEDX2pl7E1TdqLqIc=
golang.org/x/sync v0.1.0 h1:wsuoTGHzEhffawBOhz5CYhcrV4IdKZbEyZjBMuTp12o=
golang.org/x/sync v0.1.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20200116001909-b77594299b42/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200223170610-d5e6a3e2c0ae/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
Expand Down
21 changes: 12 additions & 9 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,10 @@ import (
)

type Consumer struct {
ready chan bool
stats utils.ResultStats
config appConfig
ready chan bool
stats utils.ResultStats
config appConfig
consumerLock sync.RWMutex
}

func main() {
Expand All @@ -38,7 +39,8 @@ func main() {
StatMap: map[string]int{"TOTAL": 0},
ResultStore: map[uint32][]int{},
},
config: cfg,
config: cfg,
consumerLock: sync.RWMutex{},
}

ctx, cancel := context.WithCancel(context.Background())
Expand Down Expand Up @@ -107,14 +109,15 @@ func (consumer *Consumer) ConsumeClaim(session sarama.ConsumerGroupSession, clai
select {
case message := <-claim.Messages():
schemaId := binary.BigEndian.Uint32(message.Value[1:5])
utils.CalcStat(consumer.stats, schemaId)
if consumer.config.store { // build result for analysis
utils.AppendResult(consumer.stats, message.Offset, schemaId)
utils.CalcStat(consumer.stats, schemaId, &consumer.consumerLock)
if consumer.config.store { // lock map, and build result for analysis
utils.AppendResult(consumer.stats, message.Offset, schemaId, &consumer.consumerLock)
}
if consumer.stats.StatMap["TOTAL"]%100 == 0 { // I'm still living :)
consumer.consumerLock.RLock()
if consumer.stats.StatMap["TOTAL"]%100 == 0 { // I'm still alive :)
log.Printf("acked 100 messages\n")
}

consumer.consumerLock.RUnlock()
// ack
session.MarkMessage(message, "")

Expand Down
2 changes: 1 addition & 1 deletion scripts/avro_producer.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,5 +38,5 @@
avroProducer.produce(topic='payments-topic',
value={"id": "transact_%s" % i,
"amount": random.uniform(10, 500),
"name": "customer_%s" % i,})
"name": "customer_%s" % i, })
avroProducer.flush()
35 changes: 35 additions & 0 deletions scripts/sasl_producer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
import random
from confluent_kafka import avro
from confluent_kafka.avro import AvroProducer

schema = """
{
"namespace": "io.confluent.examples.clients.basicavro",
"type": "record",
"name": "Payment",
"fields": [
{"name": "id", "type": "string"},
{"name": "amount", "type": "double"},
{"name": "name", "type": "string"}
]
}
"""

value_schema = avro.loads(schema)

producer_conf = {'bootstrap.servers': 'localhost:9092',
'schema.registry.url': 'https://USERNAME:PASSWORD@localhost:8081',
'security.protocol': 'SASL_SSL',
'sasl.mechanisms': 'SCRAM-SHA-256',
'sasl.username': 'USERNAME',
'sasl.password': 'PASSWORD',
'ssl.ca.location': 'ca.pem'}

avroProducer = AvroProducer(producer_conf, default_value_schema=value_schema)

for i in range(1, 20):
avroProducer.produce(topic='payments-topic',
value={"id": "transact_%s" % i,
"amount": random.uniform(10, 500),
"name": "customer_%s" % i, })
avroProducer.flush()
13 changes: 9 additions & 4 deletions utils/stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"log"
"math"
"os"
"sync"

"github.com/fatih/color"
)
Expand All @@ -21,13 +22,17 @@ func CalcPercentile(k string, v, consumedMessages int) {
c.Printf("Schema ID %v => %v%%\n", k, idPerc)
}

// appendResult will map the results to a storeable map
func AppendResult(stat ResultStats, offset int64, schemaId uint32) {
// AppendResult will map the results to a storeable map
func AppendResult(stat ResultStats, offset int64, schemaId uint32, lock *sync.RWMutex) {
lock.Lock()
defer lock.Unlock()
stat.ResultStore[schemaId] = append(stat.ResultStore[schemaId], int(offset))
}

// calcStat keep on track the stats
func CalcStat(stat ResultStats, schemaId uint32) {
// CalcStat keep on track the stats
func CalcStat(stat ResultStats, schemaId uint32, lock *sync.RWMutex) {
lock.Lock()
defer lock.Unlock()
stat.StatMap[fmt.Sprint(schemaId)] += 1
stat.StatMap["TOTAL"] += 1
}
Expand Down
4 changes: 3 additions & 1 deletion utils/stats_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"io/ioutil"
"os"
"reflect"
"sync"
"testing"
)

Expand Down Expand Up @@ -56,6 +57,7 @@ func TestDumpStats(t *testing.T) {
}

func TestAppendResult(t *testing.T) {
l := sync.RWMutex{}
testCases := []struct {
name string
stat ResultStats
Expand Down Expand Up @@ -111,7 +113,7 @@ func TestAppendResult(t *testing.T) {
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
AppendResult(tc.stat, tc.offset, tc.schemaId)
AppendResult(tc.stat, tc.offset, tc.schemaId, &l)
if !reflect.DeepEqual(tc.stat, tc.wantResult) {
t.Errorf("got %+v, want %+v", tc.stat, tc.wantResult)
}
Expand Down

0 comments on commit f432b14

Please sign in to comment.