Skip to content

Commit

Permalink
Fix shut down of Kafka consumer
Browse files Browse the repository at this point in the history
  • Loading branch information
MiroslavGatsanoga committed Oct 22, 2019
1 parent acf33b6 commit 4a4f8fb
Showing 1 changed file with 21 additions and 13 deletions.
34 changes: 21 additions & 13 deletions app.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import (
"github.com/Financial-Times/neo-utils-go/neoutils"
status "github.com/Financial-Times/service-status-go/httphandlers"
"github.com/gorilla/mux"
"github.com/jawher/mow.cli"
cli "github.com/jawher/mow.cli"
"github.com/rcrowley/go-metrics"

_ "github.com/joho/godotenv/autoload"
Expand Down Expand Up @@ -133,6 +133,7 @@ func main() {
httpHandler.producer = p
}

var qh queueHandler
if *shouldConsumeMessages {
var consumer kafka.Consumer
consumer, err = setupMessageConsumer(*zookeeperAddress, *consumerGroup, *consumerTopic)
Expand All @@ -141,28 +142,35 @@ func main() {
}
healtcheckHandler.consumer = consumer

qh := queueHandler{annotationsService: annotationsService, consumer: consumer, producer: p}
qh = queueHandler{annotationsService: annotationsService, consumer: consumer, producer: p}
qh.originMap = originMap
qh.lifecycleMap = lifecycleMap
qh.messageType = messageType
qh.log = log
qh.Ingest()

go func() {
waitForSignal()
log.Infof("Shutting down Kafka consumer")
qh.consumer.Shutdown()
}()
}

http.Handle("/", router(&httpHandler, &healtcheckHandler, log))
err = startServer(*port)
if err != nil {
log.WithError(err).Fatal("http server error occurred")

go func() {
err = startServer(*port)
if err != nil {
log.WithError(err).Fatal("http server error occurred")
}
}()

waitForSignal()
log.Infof("Shutting down Kafka consumer")
if *shouldConsumeMessages {
qh.consumer.Shutdown()
}
}

err := app.Run(os.Args)
if err != nil {
fmt.Printf("app could not start: %s", err)
return
}
app.Run(os.Args)
}

func setupAnnotationsService(neoURL string, bathSize int) (annotations.Service, error) {
Expand Down Expand Up @@ -269,7 +277,7 @@ func startServer(port int) error {
}

func waitForSignal() {
ch := make(chan os.Signal)
ch := make(chan os.Signal, 1)
signal.Notify(ch, syscall.SIGINT, syscall.SIGTERM)
<-ch
}

0 comments on commit 4a4f8fb

Please sign in to comment.