Skip to content
This repository has been archived by the owner on May 21, 2021. It is now read-only.

Commit

Permalink
➕ Upgrading kafka-client-go, using new Consume func which will block …
Browse files Browse the repository at this point in the history
…while consuming messages:
  • Loading branch information
peteclark-ft committed Nov 22, 2018
1 parent db23258 commit 6a05d95
Show file tree
Hide file tree
Showing 4 changed files with 12 additions and 5 deletions.
6 changes: 3 additions & 3 deletions Gopkg.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Gopkg.toml
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@

[[constraint]]
name = "github.com/Financial-Times/kafka-client-go"
version = "0.8.3"
version = "0.9.0-blocking-consume-rc1"

[[constraint]]
name = "github.com/Financial-Times/service-status-go"
Expand Down
4 changes: 4 additions & 0 deletions healthcheck_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,10 @@ func (mc mockKafkaConnection) StartListening(messageHandler func(message kafka.F
return
}

func (mc mockKafkaConnection) Consume(messageHandler func(message kafka.FTMessage) error) {
return
}

func (mc mockKafkaConnection) SendMessage(message kafka.FTMessage) error {
return nil
}
Expand Down
5 changes: 4 additions & 1 deletion kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,10 @@ import (
)

func startKafkaConsumer(messageConsumer kafka.Consumer) {
messageConsumer.StartListening(handleMessage)
for {
messageConsumer.Consume(handleMessage)
logger.Warnf(nil, "Consumer stopped processing messages, restarting the consumer.")
}
}

func handleMessage(msg kafka.FTMessage) error {
Expand Down

0 comments on commit 6a05d95

Please sign in to comment.