From aea5ab960e7205cde519923577be94c0ee449efa Mon Sep 17 00:00:00 2001 From: Scott Kidder Date: Wed, 19 Jun 2019 11:04:52 -0700 Subject: [PATCH 1/2] Fix shutdown and race-condition in consumer-group example --- examples/consumergroup/main.go | 30 ++++++++++++++++++++++-------- 1 file changed, 22 insertions(+), 8 deletions(-) diff --git a/examples/consumergroup/main.go b/examples/consumergroup/main.go index 59b20f7b8..dd7fb6073 100644 --- a/examples/consumergroup/main.go +++ b/examples/consumergroup/main.go @@ -7,6 +7,7 @@ import ( "os" "os/signal" "strings" + "sync" "syscall" "github.com/Shopify/sarama" @@ -70,21 +71,30 @@ func main() { /** * Setup a new Sarama consumer group */ - consumer := Consumer{} + consumer := Consumer{ + ready: make(chan bool, 0), + } - ctx := context.Background() + ctx, cancel := context.WithCancel(context.Background()) client, err := sarama.NewConsumerGroup(strings.Split(brokers, ","), group, config) if err != nil { panic(err) } + wg := &sync.WaitGroup{} go func() { + wg.Add(1) + defer wg.Done() for { - consumer.ready = make(chan bool, 0) err := client.Consume(ctx, strings.Split(topics, ","), &consumer) if err != nil { panic(err) } + // check if context was cancelled, signaling that the consumer should stop + if ctx.Err() != nil { + return + } + consumer.ready = make(chan bool, 0) } }() @@ -93,11 +103,15 @@ func main() { sigterm := make(chan os.Signal, 1) signal.Notify(sigterm, syscall.SIGINT, syscall.SIGTERM) - - <-sigterm // Await a sigterm signal before safely closing the consumer - - err = client.Close() - if err != nil { + select { + case <-ctx.Done(): + log.Println("terminating: context cancelled") + case <-sigterm: + log.Println("terminating: via signal") + } + cancel() + wg.Wait() + if err = client.Close(); err != nil { panic(err) } } From 5ad743b18448d30e997e9b0eed673b3acaf3f6d1 Mon Sep 17 00:00:00 2001 From: Scott Kidder Date: Wed, 19 Jun 2019 11:52:21 -0700 Subject: [PATCH 2/2] Use log.Panicf to provide more context to panic errors --- examples/consumergroup/main.go | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/examples/consumergroup/main.go b/examples/consumergroup/main.go index dd7fb6073..54db82b42 100644 --- a/examples/consumergroup/main.go +++ b/examples/consumergroup/main.go @@ -54,7 +54,7 @@ func main() { version, err := sarama.ParseKafkaVersion(version) if err != nil { - panic(err) + log.Panicf("Error parsing Kafka version: %v", err) } /** @@ -78,7 +78,7 @@ func main() { ctx, cancel := context.WithCancel(context.Background()) client, err := sarama.NewConsumerGroup(strings.Split(brokers, ","), group, config) if err != nil { - panic(err) + log.Panicf("Error creating consumer group client: %v", err) } wg := &sync.WaitGroup{} @@ -86,9 +86,8 @@ func main() { wg.Add(1) defer wg.Done() for { - err := client.Consume(ctx, strings.Split(topics, ","), &consumer) - if err != nil { - panic(err) + if err := client.Consume(ctx, strings.Split(topics, ","), &consumer); err != nil { + log.Panicf("Error from consumer: %v", err) } // check if context was cancelled, signaling that the consumer should stop if ctx.Err() != nil { @@ -112,7 +111,7 @@ func main() { cancel() wg.Wait() if err = client.Close(); err != nil { - panic(err) + log.Panicf("Error closing client: %v", err) } }