diff --git a/examples/consumergroup/main.go b/examples/consumergroup/main.go index 59b20f7b8..54db82b42 100644 --- a/examples/consumergroup/main.go +++ b/examples/consumergroup/main.go @@ -7,6 +7,7 @@ import ( "os" "os/signal" "strings" + "sync" "syscall" "github.com/Shopify/sarama" @@ -53,7 +54,7 @@ func main() { version, err := sarama.ParseKafkaVersion(version) if err != nil { - panic(err) + log.Panicf("Error parsing Kafka version: %v", err) } /** @@ -70,21 +71,29 @@ func main() { /** * Setup a new Sarama consumer group */ - consumer := Consumer{} + consumer := Consumer{ + ready: make(chan bool, 0), + } - ctx := context.Background() + ctx, cancel := context.WithCancel(context.Background()) client, err := sarama.NewConsumerGroup(strings.Split(brokers, ","), group, config) if err != nil { - panic(err) + log.Panicf("Error creating consumer group client: %v", err) } + wg := &sync.WaitGroup{} go func() { + wg.Add(1) + defer wg.Done() for { - consumer.ready = make(chan bool, 0) - err := client.Consume(ctx, strings.Split(topics, ","), &consumer) - if err != nil { - panic(err) + if err := client.Consume(ctx, strings.Split(topics, ","), &consumer); err != nil { + log.Panicf("Error from consumer: %v", err) } + // check if context was cancelled, signaling that the consumer should stop + if ctx.Err() != nil { + return + } + consumer.ready = make(chan bool, 0) } }() @@ -93,12 +102,16 @@ func main() { sigterm := make(chan os.Signal, 1) signal.Notify(sigterm, syscall.SIGINT, syscall.SIGTERM) - - <-sigterm // Await a sigterm signal before safely closing the consumer - - err = client.Close() - if err != nil { - panic(err) + select { + case <-ctx.Done(): + log.Println("terminating: context cancelled") + case <-sigterm: + log.Println("terminating: via signal") + } + cancel() + wg.Wait() + if err = client.Close(); err != nil { + log.Panicf("Error closing client: %v", err) } }