Skip to content

Commit

Permalink
Upgrade Kafka library
Browse files Browse the repository at this point in the history
  • Loading branch information
atanasdinov committed Nov 9, 2021
1 parent 7b53afb commit d8e4bb1
Show file tree
Hide file tree
Showing 12 changed files with 271 additions and 138 deletions.
4 changes: 2 additions & 2 deletions .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@ jobs:
command: |
GOPRIVATE="github.com/Financial-Times"
git config --global url."https://$GITHUB_USERNAME:$GITHUB_TOKEN@github.com/".insteadOf "https://github.com/"
curl -sL https://deb.nodesource.com/setup_11.x | bash -
DEBIAN_FRONTEND=noninteractive apt-get install -y nodejs=11.\*
curl -sL https://deb.nodesource.com/setup_14.x | bash -
DEBIAN_FRONTEND=noninteractive apt-get install -y nodejs=14.\*
npm install -g --unsafe-perm --loglevel warn --user 0 --no-progress dredd@11.2.19
rm -rf /var/lib/apt/lists/*
GO111MODULE=off go get -u github.com/myitcv/gobin
Expand Down
6 changes: 1 addition & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -83,17 +83,13 @@ Options:
--aws-secret-access-key AWS SECRET ACCES KEY (env $AWS_SECRET_ACCESS_KEY)
--elasticsearch-sapi-endpoint AES endpoint (env $ELASTICSEARCH_SAPI_ENDPOINT) (default "http://localhost:9200")
--index-name The name of the elaticsearch index (env $ELASTICSEARCH_SAPI_INDEX) (default "ft")
--kafka-proxy-address Addresses used by the queue consumer to connect to the queue (env $KAFKA_PROXY_ADDR) (default "http://localhost:8080")
--kafka-address Addresses used by the consumer to connect to Kafka (env $KAFKA_ADDR) (default "http://localhost:29092")
--kafka-consumer-group Group used to read the messages from the queue (env $KAFKA_CONSUMER_GROUP) (default "default-consumer-group")
--kafka-topic The topic to read the messages from (env $KAFKA_TOPIC) (default "CombinedPostPublicationEvents")
--kafka-header The header identifying the queue to read the messages from (env $KAFKA_HEADER) (default "kafka")
--kafka-concurrent-processing Whether the consumer uses concurrent processing for the messages (env $KAFKA_CONCURRENT_PROCESSING)
--public-concordances-endpoint Endpoint to concord ids with (env $PUBLIC_CONCORDANCES_ENDPOINT) (default "http://public-concordances-api:8080")
--base-api-url Base API URL (env $BASE_API_URL) (default "https://api.ft.com/")
```

Whether the consumer uses concurrent processing for the messages ($KAFKA_CONCURRENT_PROCESSING)

## Build and deployment

* Built by Docker Hub on merge to master: [coco/content-rw-elasticsearch](https://hub.docker.com/r/coco/content-rw-elasticsearch/)
Expand Down
46 changes: 17 additions & 29 deletions cmd/content-rw-elasticsearch/main.go
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
package main

import (
"github.com/Financial-Times/kafka-client-go/v2"
"net/http"
"os"
"time"

cli "github.com/jawher/mow.cli"

"github.com/Financial-Times/go-logger/v2"
"github.com/Financial-Times/message-queue-gonsumer/consumer"
"github.com/Financial-Times/upp-go-sdk/pkg/api"
"github.com/Financial-Times/upp-go-sdk/pkg/internalcontent"

Expand Down Expand Up @@ -70,15 +70,15 @@ func main() {
Desc: "The name of the elaticsearch index",
EnvVar: "ELASTICSEARCH_SAPI_INDEX",
})
kafkaProxyAddress := app.String(cli.StringOpt{
kafkaAddress := app.String(cli.StringOpt{
Name: "kafka-proxy-address",
Value: "http://localhost:8080",
Desc: "Addresses used by the queue consumer to connect to the queue",
EnvVar: "KAFKA_PROXY_ADDR",
Value: "http://localhost:29092",
Desc: "Addresses used by the queue consumer to connect to Kafka",
EnvVar: "KAFKA_ADDR",
})
kafkaConsumerGroup := app.String(cli.StringOpt{
Name: "kafka-consumer-group",
Value: "default-consumer-group",
Value: "content-rw-elasticsearch",
Desc: "Group used to read the messages from the queue",
EnvVar: "KAFKA_CONSUMER_GROUP",
})
Expand All @@ -88,18 +88,6 @@ func main() {
Desc: "The topic to read the messages from",
EnvVar: "KAFKA_TOPIC",
})
kafkaHeader := app.String(cli.StringOpt{
Name: "kafka-header",
Value: "kafka",
Desc: "The header identifying the queue to read the messages from",
EnvVar: "KAFKA_HEADER",
})
kafkaConcurrentProcessing := app.Bool(cli.BoolOpt{
Name: "kafka-concurrent-processing",
Value: false,
Desc: "Whether the consumer uses concurrent processing for the messages",
EnvVar: "KAFKA_CONCURRENT_PROCESSING",
})
publicConcordancesEndpoint := app.String(cli.StringOpt{
Name: "public-concordances-endpoint",
Value: "http://public-concordances-api:8080",
Expand Down Expand Up @@ -134,14 +122,6 @@ func main() {
EnvVar: "API_BASIC_PASS",
})

queueConfig := consumer.QueueConfig{
Addrs: []string{*kafkaProxyAddress},
Group: *kafkaConsumerGroup,
Topic: *kafkaTopic,
Queue: *kafkaHeader,
ConcurrentProcessing: *kafkaConcurrentProcessing,
}

log := logger.NewUPPLogger(*appSystemCode, *logLevel)
log.Info("[Startup] Application is starting")

Expand Down Expand Up @@ -176,19 +156,27 @@ func main() {
internalContentClient,
)

consumerConfig := kafka.ConsumerConfig{
BrokersConnectionString: *kafkaAddress,
ConsumerGroup: *kafkaConsumerGroup,
Topics: []string{*kafkaTopic},
Options: kafka.DefaultConsumerOptions(),
}
messageConsumer := kafka.NewConsumer(consumerConfig, log, time.Minute)

handler := message.NewMessageHandler(
esService,
mapperHandler,
httpClient,
queueConfig,
messageConsumer,
es.NewClient,
log,
)

handler.Start(*baseAPIUrl, accessConfig)

healthService := health.NewHealthService(&queueConfig, esService, httpClient, concordanceAPIService, *appSystemCode, log)
//
healthService := health.NewHealthService(messageConsumer, esService, httpClient, concordanceAPIService, *appSystemCode, log)

serveMux := http.NewServeMux()
serveMux = healthService.AttachHTTPEndpoints(serveMux, *appName, config.AppDescription)
pkghttp.StartServer(log, serveMux, *port)
Expand Down
54 changes: 45 additions & 9 deletions go.mod
Original file line number Diff line number Diff line change
@@ -1,24 +1,60 @@
module github.com/Financial-Times/content-rw-elasticsearch/v2

go 1.14
go 1.17

require (
github.com/Financial-Times/go-fthealth v0.0.0-20180807113633-3d8eb430d5b5
github.com/Financial-Times/go-logger/v2 v2.0.1
github.com/Financial-Times/message-queue-gonsumer v0.0.0-20180518165041-cd41937c7566
github.com/Financial-Times/kafka-client-go/v2 v2.0.0
github.com/Financial-Times/service-status-go v0.0.0-20160323111542-3f5199736a3d
github.com/Financial-Times/transactionid-utils-go v0.2.0
github.com/Financial-Times/upp-go-sdk v0.0.7
github.com/fortytw2/leaktest v1.3.0 // indirect
github.com/gorilla/context v1.1.1 // indirect
github.com/gorilla/mux v1.6.2
github.com/hashicorp/go-version v1.0.0 // indirect
github.com/gorilla/mux v1.8.0
github.com/jawher/mow.cli v1.0.4
github.com/pborman/uuid v0.0.0-20170612153648-e790cca94e6c
github.com/smartystreets/go-aws-auth v0.0.0-20170504205021-8ef1316913ee
github.com/smartystreets/gunit v1.1.3 // indirect
github.com/spf13/viper v1.6.2
github.com/stretchr/testify v1.4.0
golang.org/x/sys v0.0.0-20191120155948-bd437916bb0e // indirect
github.com/stretchr/testify v1.7.0
gopkg.in/olivere/elastic.v2 v2.0.61
)

require (
github.com/Shopify/sarama v1.30.0 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/eapache/go-resiliency v1.2.0 // indirect
github.com/eapache/go-xerial-snappy v0.0.0-20180814174437-776d5712da21 // indirect
github.com/eapache/queue v1.1.0 // indirect
github.com/fsnotify/fsnotify v1.4.7 // indirect
github.com/golang/snappy v0.0.4 // indirect
github.com/hashicorp/go-uuid v1.0.2 // indirect
github.com/hashicorp/go-version v1.0.0 // indirect
github.com/hashicorp/hcl v1.0.0 // indirect
github.com/jcmturner/aescts/v2 v2.0.0 // indirect
github.com/jcmturner/dnsutils/v2 v2.0.0 // indirect
github.com/jcmturner/gofork v1.0.0 // indirect
github.com/jcmturner/gokrb5/v8 v8.4.2 // indirect
github.com/jcmturner/rpc/v2 v2.0.3 // indirect
github.com/klauspost/compress v1.13.6 // indirect
github.com/magiconair/properties v1.8.1 // indirect
github.com/mitchellh/mapstructure v1.1.2 // indirect
github.com/pelletier/go-toml v1.2.0 // indirect
github.com/pierrec/lz4 v2.6.1+incompatible // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475 // indirect
github.com/sirupsen/logrus v1.8.1 // indirect
github.com/smartystreets/assertions v1.0.1 // indirect
github.com/smartystreets/gunit v1.1.3 // indirect
github.com/spf13/afero v1.1.2 // indirect
github.com/spf13/cast v1.3.0 // indirect
github.com/spf13/jwalterweatherman v1.0.0 // indirect
github.com/spf13/pflag v1.0.3 // indirect
github.com/stretchr/objx v0.1.1 // indirect
github.com/subosito/gotenv v1.2.0 // indirect
golang.org/x/crypto v0.0.0-20210920023735-84f357641f63 // indirect
golang.org/x/net v0.0.0-20210917221730-978cfadd31cf // indirect
golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1 // indirect
golang.org/x/text v0.3.7 // indirect
gopkg.in/ini.v1 v1.51.0 // indirect
gopkg.in/yaml.v2 v2.2.4 // indirect
gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b // indirect
)
Loading

0 comments on commit d8e4bb1

Please sign in to comment.