Skip to content

Commit

Permalink
Merge pull request #1404 from skidder/sk-fix-shutdown-for-consumer-gr…
Browse files Browse the repository at this point in the history
…oup-example

Fix shutdown and race-condition in consumer-group example
  • Loading branch information
bai committed Jun 21, 2019
2 parents 0dbba7e + 5ad743b commit 4154a59
Showing 1 changed file with 27 additions and 14 deletions.
41 changes: 27 additions & 14 deletions examples/consumergroup/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"os"
"os/signal"
"strings"
"sync"
"syscall"

"github.com/Shopify/sarama"
Expand Down Expand Up @@ -53,7 +54,7 @@ func main() {

version, err := sarama.ParseKafkaVersion(version)
if err != nil {
panic(err)
log.Panicf("Error parsing Kafka version: %v", err)
}

/**
Expand All @@ -70,21 +71,29 @@ func main() {
/**
* Setup a new Sarama consumer group
*/
consumer := Consumer{}
consumer := Consumer{
ready: make(chan bool, 0),
}

ctx := context.Background()
ctx, cancel := context.WithCancel(context.Background())
client, err := sarama.NewConsumerGroup(strings.Split(brokers, ","), group, config)
if err != nil {
panic(err)
log.Panicf("Error creating consumer group client: %v", err)
}

wg := &sync.WaitGroup{}
go func() {
wg.Add(1)
defer wg.Done()
for {
consumer.ready = make(chan bool, 0)
err := client.Consume(ctx, strings.Split(topics, ","), &consumer)
if err != nil {
panic(err)
if err := client.Consume(ctx, strings.Split(topics, ","), &consumer); err != nil {
log.Panicf("Error from consumer: %v", err)
}
// check if context was cancelled, signaling that the consumer should stop
if ctx.Err() != nil {
return
}
consumer.ready = make(chan bool, 0)
}
}()

Expand All @@ -93,12 +102,16 @@ func main() {

sigterm := make(chan os.Signal, 1)
signal.Notify(sigterm, syscall.SIGINT, syscall.SIGTERM)

<-sigterm // Await a sigterm signal before safely closing the consumer

err = client.Close()
if err != nil {
panic(err)
select {
case <-ctx.Done():
log.Println("terminating: context cancelled")
case <-sigterm:
log.Println("terminating: via signal")
}
cancel()
wg.Wait()
if err = client.Close(); err != nil {
log.Panicf("Error closing client: %v", err)
}
}

Expand Down

0 comments on commit 4154a59

Please sign in to comment.