Skip to content

Commit

Permalink
Add Kafka consumer monitoring
Browse files Browse the repository at this point in the history
  • Loading branch information
atanasdinov committed Apr 11, 2022
1 parent 2dd9b2e commit 96643e9
Show file tree
Hide file tree
Showing 9 changed files with 96 additions and 46 deletions.
33 changes: 20 additions & 13 deletions cmd/content-rw-elasticsearch/main.go
Expand Up @@ -13,7 +13,7 @@ import (
"github.com/Financial-Times/content-rw-elasticsearch/v2/pkg/mapper"
"github.com/Financial-Times/content-rw-elasticsearch/v2/pkg/message"
"github.com/Financial-Times/go-logger/v2"
"github.com/Financial-Times/kafka-client-go/v2"
"github.com/Financial-Times/kafka-client-go/v3"
cli "github.com/jawher/mow.cli"
)

Expand Down Expand Up @@ -51,7 +51,7 @@ func main() {
})
secretKey := app.String(cli.StringOpt{
Name: "aws-secret-access-key",
Desc: "AWS SECRET ACCES KEY",
Desc: "AWS SECRET ACCESS KEY",
EnvVar: "AWS_SECRET_ACCESS_KEY",
})
esEndpoint := app.String(cli.StringOpt{
Expand All @@ -63,7 +63,7 @@ func main() {
indexName := app.String(cli.StringOpt{
Name: "index-name",
Value: "ft",
Desc: "The name of the elaticsearch index",
Desc: "The name of the elasticsearch index",
EnvVar: "ELASTICSEARCH_SAPI_INDEX",
})
kafkaAddress := app.String(cli.StringOpt{
Expand All @@ -84,6 +84,16 @@ func main() {
Desc: "The topic to read the messages from",
EnvVar: "KAFKA_TOPIC",
})
kafkaTopicOffsetFetchInterval := app.Int(cli.IntOpt{
Name: "kafka-topic-offset-fetch-interval",
Desc: "Interval (in minutes) between each offset fetching request",
EnvVar: "KAFKA_TOPIC_OFFSET_FETCH_INTERVAL",
})
kafkaLagTolerance := app.Int(cli.IntOpt{
Name: "kafka-topic-lag-tolerance",
Desc: "Lag tolerance (in number of messages) used for monitoring the Kafka consumer",
EnvVar: "KAFKA_TOPIC_LAG_TOLERANCE",
})
publicConcordancesEndpoint := app.String(cli.StringOpt{
Name: "public-concordances-endpoint",
Value: "http://public-concordances-api:8080",
Expand Down Expand Up @@ -115,23 +125,20 @@ func main() {
}

esService := es.NewService(*indexName)

concordanceAPIService := concept.NewConcordanceAPIService(*publicConcordancesEndpoint, httpClient)

mapperHandler := mapper.NewMapperHandler(
concordanceAPIService,
*baseAPIUrl,
appConfig,
log,
)
mapperHandler := mapper.NewMapperHandler(concordanceAPIService, *baseAPIUrl, appConfig, log)

consumerConfig := kafka.ConsumerConfig{
BrokersConnectionString: *kafkaAddress,
ConsumerGroup: *kafkaConsumerGroup,
Topics: []string{*kafkaTopic},
ConnectionRetryInterval: time.Minute,
OffsetFetchInterval: time.Duration(*kafkaTopicOffsetFetchInterval) * time.Minute,
Options: kafka.DefaultConsumerOptions(),
}
messageConsumer := kafka.NewConsumer(consumerConfig, log, time.Minute)
topics := []*kafka.Topic{
kafka.NewTopic(*kafkaTopic, kafka.WithLagTolerance(int64(*kafkaLagTolerance))),
}
messageConsumer := kafka.NewConsumer(consumerConfig, topics, log)

handler := message.NewMessageHandler(
esService,
Expand Down
12 changes: 6 additions & 6 deletions go.mod
Expand Up @@ -5,7 +5,7 @@ go 1.17
require (
github.com/Financial-Times/go-fthealth v0.0.0-20180807113633-3d8eb430d5b5
github.com/Financial-Times/go-logger/v2 v2.0.1
github.com/Financial-Times/kafka-client-go/v2 v2.0.0
github.com/Financial-Times/kafka-client-go/v3 v3.0.0
github.com/Financial-Times/service-status-go v0.0.0-20160323111542-3f5199736a3d
github.com/Financial-Times/transactionid-utils-go v0.2.0
github.com/gorilla/mux v1.8.0
Expand All @@ -18,7 +18,7 @@ require (
)

require (
github.com/Shopify/sarama v1.30.0 // indirect
github.com/Shopify/sarama v1.32.0 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/eapache/go-resiliency v1.2.0 // indirect
github.com/eapache/go-xerial-snappy v0.0.0-20180814174437-776d5712da21 // indirect
Expand All @@ -33,7 +33,7 @@ require (
github.com/jcmturner/gofork v1.0.0 // indirect
github.com/jcmturner/gokrb5/v8 v8.4.2 // indirect
github.com/jcmturner/rpc/v2 v2.0.3 // indirect
github.com/klauspost/compress v1.13.6 // indirect
github.com/klauspost/compress v1.14.4 // indirect
github.com/magiconair/properties v1.8.5 // indirect
github.com/mitchellh/mapstructure v1.4.2 // indirect
github.com/pelletier/go-toml v1.9.4 // indirect
Expand All @@ -49,9 +49,9 @@ require (
github.com/spf13/pflag v1.0.5 // indirect
github.com/stretchr/objx v0.1.1 // indirect
github.com/subosito/gotenv v1.2.0 // indirect
golang.org/x/crypto v0.0.0-20210920023735-84f357641f63 // indirect
golang.org/x/net v0.0.0-20210917221730-978cfadd31cf // indirect
golang.org/x/sys v0.0.0-20210823070655-63515b42dcdf // indirect
golang.org/x/crypto v0.0.0-20220214200702-86341886e292 // indirect
golang.org/x/net v0.0.0-20220127200216-cd36cc0744dd // indirect
golang.org/x/sys v0.0.0-20211216021012-1d35b9e2eb4e // indirect
golang.org/x/text v0.3.7 // indirect
gopkg.in/ini.v1 v1.63.2 // indirect
gopkg.in/yaml.v2 v2.4.0 // indirect
Expand Down
43 changes: 25 additions & 18 deletions go.sum
Expand Up @@ -48,17 +48,17 @@ github.com/Financial-Times/go-fthealth v0.0.0-20180807113633-3d8eb430d5b5 h1:XH5
github.com/Financial-Times/go-fthealth v0.0.0-20180807113633-3d8eb430d5b5/go.mod h1:gpAzq6W5rCheYlY32JOIxS/VjVcYHbC2PkMzQngHT9c=
github.com/Financial-Times/go-logger/v2 v2.0.1 h1:iekEfSsUtlkg+YkXTZo+/fIN2VbZ2/3Hl9yolP3z5X8=
github.com/Financial-Times/go-logger/v2 v2.0.1/go.mod h1:Jpky5JYSX7xjGUClfA9hEMDmn40tUbfQQITjVIFGQiM=
github.com/Financial-Times/kafka-client-go/v2 v2.0.0 h1:ihagAp76LC8vffDBEw6HjOjy7kSalsarAP9aoeUwsmI=
github.com/Financial-Times/kafka-client-go/v2 v2.0.0/go.mod h1:vNPU+6xisgHZjinTFzxHJ5tHL7pycPgNXTWKD1IQ3qY=
github.com/Financial-Times/kafka-client-go/v3 v3.0.0 h1:ibYwJ5Q4gd6VMisKMzmnsIzpCX2sL/T7soR0BmURuVI=
github.com/Financial-Times/kafka-client-go/v3 v3.0.0/go.mod h1:XVyp/FqBmUsnzrxYmWp9Grbuh86O4vCO3xBGTV2gdms=
github.com/Financial-Times/service-status-go v0.0.0-20160323111542-3f5199736a3d h1:USNBTIof6vWGM49SYrxvC5Y8NqyDL3YuuYmID81ORZQ=
github.com/Financial-Times/service-status-go v0.0.0-20160323111542-3f5199736a3d/go.mod h1:7zULC9rrq6KxFkpB3Y5zNVaEwrf1g2m3dvXJBPDXyvM=
github.com/Financial-Times/transactionid-utils-go v0.2.0 h1:YcET5Hd1fUGWWpQSVszYUlAc15ca8tmjRetUuQKRqEQ=
github.com/Financial-Times/transactionid-utils-go v0.2.0/go.mod h1:tPAcAFs/dR6Q7hBDGNyUyixHRvg/n9NW/JTq8C58oZ0=
github.com/OneOfOne/xxhash v1.2.2/go.mod h1:HSdplMjZKSmBqAxg5vPj2TmRDmfkzw+cTzAElWljhcU=
github.com/Shopify/sarama v1.30.0 h1:TOZL6r37xJBDEMLx4yjB77jxbZYXPaDow08TSK6vIL0=
github.com/Shopify/sarama v1.30.0/go.mod h1:zujlQQx1kzHsh4jfV1USnptCQrHAEZ2Hk8fTKCulPVs=
github.com/Shopify/toxiproxy/v2 v2.1.6-0.20210914104332-15ea381dcdae h1:ePgznFqEG1v3AjMklnK8H7BSc++FDSo7xfK9K7Af+0Y=
github.com/Shopify/toxiproxy/v2 v2.1.6-0.20210914104332-15ea381dcdae/go.mod h1:/cvHQkZ1fst0EmZnA5dFtiQdWCNCFYzb+uE2vqVgvx0=
github.com/Shopify/sarama v1.32.0 h1:P+RUjEaRU0GMMbYexGMDyrMkLhbbBVUVISDywi+IlFU=
github.com/Shopify/sarama v1.32.0/go.mod h1:+EmJJKZWVT/faR9RcOxJerP+LId4iWdQPBGLy1Y1Njs=
github.com/Shopify/toxiproxy/v2 v2.3.0 h1:62YkpiP4bzdhKMH+6uC5E95y608k3zDwdzuBMsnn3uQ=
github.com/Shopify/toxiproxy/v2 v2.3.0/go.mod h1:KvQTtB6RjCJY4zqNJn7C7JDFgsG5uoHYDirfUfpIm0c=
github.com/antihax/optional v1.0.0/go.mod h1:uupD/76wgC+ih3iEmQUL+0Ugr19nfwCT1kdvxnR2qWY=
github.com/armon/circbuf v0.0.0-20150827004946-bbbad097214e/go.mod h1:3U/XgcO3hCbHZ8TKRvWD2dDTCfh9M9ya+I9JpbB7O8o=
github.com/armon/go-metrics v0.0.0-20180917152333-f0300d1749da/go.mod h1:Q73ZrmVTwzkszR9V5SSuryQ31EELlFMUz1kKyl939pY=
Expand Down Expand Up @@ -101,8 +101,8 @@ github.com/fatih/color v1.7.0/go.mod h1:Zm6kSWBoL9eyXnKyktHP6abPY2pDugNf5Kwzbycv
github.com/fatih/color v1.9.0/go.mod h1:eQcE1qtQxscV5RaZvpXrrb8Drkc3/DdQ+uUYCNjL+zU=
github.com/fortytw2/leaktest v1.3.0 h1:u8491cBMTQ8ft8aeV+adlcytMZylmA5nnwwkRZjI8vw=
github.com/fortytw2/leaktest v1.3.0/go.mod h1:jDsjWgpAGjm2CA7WthBh/CdZYEPF31XHquHwclZch5g=
github.com/frankban/quicktest v1.11.3 h1:8sXhOn0uLys67V8EsXLc6eszDs8VXWxL3iRvebPhedY=
github.com/frankban/quicktest v1.11.3/go.mod h1:wRf/ReqHper53s+kmmSZizM8NamnL3IM0I9ntUbOk+k=
github.com/frankban/quicktest v1.14.2 h1:SPb1KFFmM+ybpEjPUhCCkZOM5xlovT5UbrMvWnXyBns=
github.com/frankban/quicktest v1.14.2/go.mod h1:mgiwOwqx65TmIk1wJ6Q7wvnVMocbUorkibMOrVTHZps=
github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo=
github.com/fsnotify/fsnotify v1.5.1 h1:mZcQUHVQUQWoPXXtuf9yuEXKudkV2sx1E06UadKWpgI=
github.com/fsnotify/fsnotify v1.5.1/go.mod h1:T3375wBYaZdLLcVNkcVbzGHY7f1l/uK5T5Ai1i3InKU=
Expand Down Expand Up @@ -158,8 +158,9 @@ github.com/google/go-cmp v0.5.2/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/
github.com/google/go-cmp v0.5.3/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.4/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.6 h1:BKbKCqvP6I+rmFHt06ZmyQtvB8xAkWdhFyr0ZUNZcxQ=
github.com/google/go-cmp v0.5.6/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.7 h1:81/ik6ipDQS2aGcBfIN5dHDB36BwrStyeAQquSYCV4o=
github.com/google/go-cmp v0.5.7/go.mod h1:n+brtR0CgQNWTVd5ZUFpTBC8YFBDLK/h/bpaJ8/DtOE=
github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg=
github.com/google/martian v2.1.0+incompatible/go.mod h1:9I4somxYTbIHy5NJKHRl3wXiIaQGbYVAs8BPL6v8lEs=
github.com/google/martian/v3 v3.0.0/go.mod h1:y5Zk1BBys9G+gd6Jrk0W3cC1+ELVxBWuIGO+w/tUAp0=
Expand Down Expand Up @@ -237,13 +238,14 @@ github.com/jstemmer/go-junit-report v0.0.0-20190106144839-af01ea7f8024/go.mod h1
github.com/jstemmer/go-junit-report v0.9.1/go.mod h1:Brl9GWCQeLvo8nXZwPNNblvFj/XSXhF0NWZEnDohbsk=
github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8=
github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck=
github.com/klauspost/compress v1.13.6 h1:P76CopJELS0TiO2mebmnzgWaajssP/EszplttgQxcgc=
github.com/klauspost/compress v1.13.6/go.mod h1:/3/Vjq9QcHkK5uEr5lBEmyoZ1iFhe47etQ6QUkpK6sk=
github.com/klauspost/compress v1.14.4 h1:eijASRJcobkVtSt81Olfh7JX43osYLwy5krOJo6YEu4=
github.com/klauspost/compress v1.14.4/go.mod h1:/3/Vjq9QcHkK5uEr5lBEmyoZ1iFhe47etQ6QUkpK6sk=
github.com/kr/fs v0.1.0/go.mod h1:FFnZGqtBN9Gxj7eW1uZ42v5BccTP0vu6NEaFoC2HwRg=
github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo=
github.com/kr/pretty v0.2.0/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI=
github.com/kr/pretty v0.2.1 h1:Fmg33tUaq4/8ym9TJN1x7sLJnHVwhP33CNkpYV/7rwI=
github.com/kr/pretty v0.2.1/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI=
github.com/kr/pretty v0.3.0 h1:WgNl7dwNpEZ6jJ9k1snq4pZsg7DOEN8hP9Xw0Tsjwk0=
github.com/kr/pretty v0.3.0/go.mod h1:640gp4NfQd8pI5XOwp5fnNeVWj67G7CFk/SaSQn7NBk=
github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
Expand Down Expand Up @@ -291,6 +293,8 @@ github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475 h1:N/ElC8H3+5X
github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4=
github.com/rogpeppe/fastuuid v1.2.0/go.mod h1:jVj6XXZzXRy/MSR5jhDC/2q6DgLz+nrA6LYCDYWNEvQ=
github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4=
github.com/rogpeppe/go-internal v1.6.1 h1:/FiVV8dS/e+YqF2JvO3yXRFbBLTIuSDkuC7aBOAvL+k=
github.com/rogpeppe/go-internal v1.6.1/go.mod h1:xXDCJY+GAPziupqXw64V24skbSoqbTEfhy4qGm1nDQc=
github.com/russross/blackfriday/v2 v2.0.1/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM=
github.com/ryanuber/columnize v0.0.0-20160712163229-9b3edd62028f/go.mod h1:sm1tb6uqfes/u+d4ooFouqFdy9/2g9QGwK3SQygK0Ts=
github.com/sagikazarmark/crypt v0.1.0/go.mod h1:B/mN0msZuINBtQ1zZLEQcegFJJf9vnYIR88KRMEuODE=
Expand Down Expand Up @@ -331,7 +335,7 @@ github.com/subosito/gotenv v1.2.0 h1:Slr1R9HxAlEKefgq5jn9U+DnETlIUa6HfgEzj0g5d7s
github.com/subosito/gotenv v1.2.0/go.mod h1:N0PQaV/YGNqwC0u51sEeR/aUtSLEXKX9iv69rRypqCw=
github.com/urfave/cli/v2 v2.3.0/go.mod h1:LJmUH05zAU44vOAcrfzZQKsZbVcdbOG8rtL3/XcUArI=
github.com/xdg-go/pbkdf2 v1.0.0/go.mod h1:jrpuAogTd400dnrH08LKmI/xc1MbPOebTwRqcT5RDeI=
github.com/xdg-go/scram v1.0.2/go.mod h1:1WAq6h33pAW+iRreB34OORO2Nf7qel3VV3fjBj+hCSs=
github.com/xdg-go/scram v1.1.0/go.mod h1:1WAq6h33pAW+iRreB34OORO2Nf7qel3VV3fjBj+hCSs=
github.com/xdg-go/stringprep v1.0.2/go.mod h1:8F9zXuvzgwmyT5DUm4GUfZGDdT3W+LCvS6+da4O5kxM=
github.com/yuin/goldmark v1.1.25/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
Expand Down Expand Up @@ -363,8 +367,8 @@ golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8U
golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
golang.org/x/crypto v0.0.0-20201112155050-0c6587e931a9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
golang.org/x/crypto v0.0.0-20210817164053-32db794688a5/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc=
golang.org/x/crypto v0.0.0-20210920023735-84f357641f63 h1:kETrAMYZq6WVGPa8IIixL0CaEcIUNi+1WX7grUoi3y8=
golang.org/x/crypto v0.0.0-20210920023735-84f357641f63/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc=
golang.org/x/crypto v0.0.0-20220214200702-86341886e292 h1:f+lwQ+GtmgoY+A2YaQxlSOnDjXcQ7ZRLWOHbC6HtRqE=
golang.org/x/crypto v0.0.0-20220214200702-86341886e292/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4=
golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
golang.org/x/exp v0.0.0-20190306152737-a1d7652674e8/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
golang.org/x/exp v0.0.0-20190510132918-efd6b22b2522/go.mod h1:ZjyILWgesfNpC6sMxTJOJm9Kp84zZh5NQWvqDGG3Qr8=
Expand Down Expand Up @@ -438,8 +442,9 @@ golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v
golang.org/x/net v0.0.0-20210316092652-d523dce5a7f4/go.mod h1:RBQZq4jEuRlivfhVLdyRGr576XBO4/greRjx4P4O3yc=
golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4/go.mod h1:p54w0d4576C0XHj96bSt6lcn1PtDYWL6XObtHCRCNQM=
golang.org/x/net v0.0.0-20210503060351-7fd8e65b6420/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
golang.org/x/net v0.0.0-20210917221730-978cfadd31cf h1:R150MpwJIv1MpS0N/pc+NhTM8ajzvlmxlY5OYsrevXQ=
golang.org/x/net v0.0.0-20210917221730-978cfadd31cf/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
golang.org/x/net v0.0.0-20211112202133-69e39bad7dc2/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
golang.org/x/net v0.0.0-20220127200216-cd36cc0744dd h1:O7DYs+zxREGLKzKoMQrtrEacpb0ZVXA5rIwylE2Xchk=
golang.org/x/net v0.0.0-20220127200216-cd36cc0744dd/go.mod h1:CfG3xpIq0wQ8r1q4Su4UZFWDARRcnwPjda9FqA0JpMk=
golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U=
golang.org/x/oauth2 v0.0.0-20190226205417-e64efc72b421/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw=
golang.org/x/oauth2 v0.0.0-20190604053449-0f29369cfe45/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw=
Expand Down Expand Up @@ -521,9 +526,11 @@ golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBc
golang.org/x/sys v0.0.0-20210616094352-59db8d763f22/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20210630005230-0f9fa26af87c/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20210806184541-e5e7981a1069/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20210823070655-63515b42dcdf h1:2ucpDCmfkl8Bd/FsLtiD653Wf96cW37s+iGx93zsu4k=
golang.org/x/sys v0.0.0-20210823070655-63515b42dcdf/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20211216021012-1d35b9e2eb4e h1:fLOSk5Q00efkSvAm+4xcoXD+RRmLmmulPn5I3Y9F2EM=
golang.org/x/sys v0.0.0-20211216021012-1d35b9e2eb4e/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8=
golang.org/x/text v0.0.0-20170915032832-14c0d48ead0c/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.1-0.20180807135948-17ff2d5776d2/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
Expand Down
Expand Up @@ -4,4 +4,6 @@ service:
name: content-rw-elasticsearch-reindexer
env:
KAFKA_TOPIC: ForcedCombinedPostPublicationEvents
KAFKA_TOPIC_OFFSET_FETCH_INTERVAL: 10
KAFKA_TOPIC_LAG_TOLERANCE: 2000
PUBLIC_CONCORDANCES_ENDPOINT: http://public-concordances-api:8080
Expand Up @@ -4,4 +4,6 @@ service:
name: content-rw-elasticsearch
env:
KAFKA_TOPIC: CombinedPostPublicationEvents
KAFKA_TOPIC_OFFSET_FETCH_INTERVAL: 3
KAFKA_TOPIC_LAG_TOLERANCE: 250
PUBLIC_CONCORDANCES_ENDPOINT: http://public-concordances-api:8080
4 changes: 4 additions & 0 deletions helm/content-rw-elasticsearch/templates/deployment.yaml
Expand Up @@ -44,6 +44,10 @@ spec:
value: {{ .Values.service.name }}
- name: KAFKA_TOPIC
value: {{ .Values.env.KAFKA_TOPIC }}
- name: KAFKA_TOPIC_OFFSET_FETCH_INTERVAL
value: "{{ .Values.env.KAFKA_TOPIC_OFFSET_FETCH_INTERVAL }}"
- name: KAFKA_TOPIC_LAG_TOLERANCE
value: "{{ .Values.env.KAFKA_TOPIC_LAG_TOLERANCE }}"
- name: KAFKA_ADDR
valueFrom:
configMapKeyRef:
Expand Down
24 changes: 22 additions & 2 deletions pkg/health/healthcheck.go
Expand Up @@ -44,6 +44,7 @@ func NewHealthService(consumer message.Consumer, esHealthService es.HealthStatus
service.connectivityHealthyCheck(),
service.schemaHealthyCheck(),
service.kafkaConnectivityCheck(),
service.kafkaMonitoringCheck(),
service.checkConcordanceAPI(),
}
return service
Expand Down Expand Up @@ -138,19 +139,38 @@ func (s *Service) kafkaConnectivityCheck() fthealth.Check {
Name: "Check Kafka connectivity",
PanicGuide: panicGuide,
Severity: 1,
TechnicalSummary: "Messages couldn't be read from the queue. Check if Kafka is reachable.",
TechnicalSummary: "Establishing Kafka connection failed. Check if Kafka is reachable.",
Checker: s.kafkaConnectivityChecker,
}
}

func (s *Service) kafkaConnectivityChecker() (string, error) {
err := s.ConsumerInstance.ConnectivityCheck()
if err != nil {
return "Could not connect to Kafka", err
return "", err
}
return "Successfully connected to Kafka", nil
}

func (s *Service) kafkaMonitoringCheck() fthealth.Check {
return fthealth.Check{
ID: s.AppSystemCode,
BusinessImpact: "Consumer is lagging behind when reading messages. Indexing of content is delayed.",
Name: "Check Kafka consumer status",
PanicGuide: panicGuide,
Severity: 3,
TechnicalSummary: "Messages awaiting handling exceed the configured lag tolerance. Check if Kafka consumer is stuck.",
Checker: s.kafkaMonitoringChecker,
}
}

func (s *Service) kafkaMonitoringChecker() (string, error) {
if err := s.ConsumerInstance.MonitorCheck(); err != nil {
return "", err
}
return "Kafka consumer status is healthy", nil
}

func (s *Service) checkConcordanceAPI() fthealth.Check {
return fthealth.Check{
ID: s.AppSystemCode,
Expand Down
7 changes: 4 additions & 3 deletions pkg/message/message_handler.go
Expand Up @@ -12,7 +12,7 @@ import (
"github.com/Financial-Times/content-rw-elasticsearch/v2/pkg/mapper"
"github.com/Financial-Times/content-rw-elasticsearch/v2/pkg/schema"
"github.com/Financial-Times/go-logger/v2"
"github.com/Financial-Times/kafka-client-go/v2"
"github.com/Financial-Times/kafka-client-go/v3"
transactionid "github.com/Financial-Times/transactionid-utils-go"
)

Expand All @@ -28,9 +28,10 @@ const (
type ESClient func(config es.AccessConfig, c *http.Client, log *logger.UPPLogger) (es.Client, error)

type Consumer interface {
StartListening(messageHandler func(message kafka.FTMessage))
Start(messageHandler func(message kafka.FTMessage))
Close() error
ConnectivityCheck() error
MonitorCheck() error
}

type Handler struct {
Expand Down Expand Up @@ -67,7 +68,7 @@ func (h *Handler) Start(baseAPIURL string, accessConfig es.AccessConfig) {
h.esService.SetClient(ec)
h.log.Info("Connected to Elasticsearch")
// this is a blocking method
h.consumer.StartListening(h.handleMessage)
h.consumer.Start(h.handleMessage)
return
}
}()
Expand Down

0 comments on commit 96643e9

Please sign in to comment.