Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

TT-5585 feat(elasticsearch): support v8 #795

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 9 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,12 @@ require (
github.com/aws/aws-sdk-go-v2/service/sqs v1.26.0
github.com/aws/aws-sdk-go-v2/service/timestreamwrite v1.9.0
github.com/cenkalti/backoff/v4 v4.0.2
github.com/elastic/go-elasticsearch/v8 v8.12.0
github.com/fatih/structs v1.1.0
github.com/gocraft/health v0.0.0-20170925182251-8675af27fef0
github.com/gofrs/uuid v4.0.0+incompatible
github.com/golang/protobuf v1.5.3
github.com/google/go-cmp v0.5.9
github.com/google/go-cmp v0.6.0
github.com/gorilla/mux v1.8.0
github.com/influxdata/influxdb v1.8.10
github.com/influxdata/influxdb-client-go/v2 v2.6.0
Expand Down Expand Up @@ -73,6 +74,9 @@ require (
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/deepmap/oapi-codegen v1.8.2 // indirect
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect
github.com/elastic/elastic-transport-go/v8 v8.4.0 // indirect
github.com/go-logr/logr v1.3.0 // indirect
github.com/go-logr/stdr v1.2.2 // indirect
github.com/go-ole/go-ole v1.2.4 // indirect
github.com/go-sql-driver/mysql v1.5.0 // indirect
github.com/golang/snappy v0.0.3 // indirect
Expand Down Expand Up @@ -119,10 +123,13 @@ require (
github.com/xtgo/uuid v0.0.0-20140804021211-a0b114877d4c // indirect
github.com/youmark/pkcs8 v0.0.0-20181117223130-1be2e3e5546d // indirect
go.mongodb.org/mongo-driver v1.13.1 // indirect
go.opentelemetry.io/otel v1.21.0 // indirect
go.opentelemetry.io/otel/metric v1.21.0 // indirect
go.opentelemetry.io/otel/trace v1.21.0 // indirect
go.uber.org/atomic v1.9.0 // indirect
golang.org/x/crypto v0.14.0 // indirect
golang.org/x/sync v0.2.0 // indirect
golang.org/x/sys v0.13.0 // indirect
golang.org/x/sys v0.14.0 // indirect
golang.org/x/text v0.13.0 // indirect
google.golang.org/appengine v1.6.7 // indirect
gopkg.in/mgo.v2 v2.0.0-20190816093944-a6b53ec6cb22 // indirect
Expand Down
25 changes: 21 additions & 4 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,10 @@ github.com/dustin/go-humanize v1.0.0/go.mod h1:HtrtbFcZ19U5GC7JDqmcUSB87Iq5E25Kn
github.com/eapache/go-xerial-snappy v0.0.0-20180814174437-776d5712da21 h1:YEetp8/yCZMuEPMUDHG0CW/brkkEp8mzqk2+ODEitlw=
github.com/eapache/go-xerial-snappy v0.0.0-20180814174437-776d5712da21/go.mod h1:+020luEh2TKB4/GOp8oxxtq0Daoen/Cii55CzbTV6DU=
github.com/eclipse/paho.mqtt.golang v1.2.0/go.mod h1:H9keYFcgq3Qr5OUJm/JZI/i6U7joQ8SYLhZwfeOo6Ts=
github.com/elastic/elastic-transport-go/v8 v8.4.0 h1:EKYiH8CHd33BmMna2Bos1rDNMM89+hdgcymI+KzJCGE=
github.com/elastic/elastic-transport-go/v8 v8.4.0/go.mod h1:YLHer5cj0csTzNFXoNQ8qhtGY1GTvSqPnKWKaqQE3Hk=
github.com/elastic/go-elasticsearch/v8 v8.12.0 h1:krkiCf4peJa7bZwGegy01b5xWWaYpik78wvisTeRO1U=
github.com/elastic/go-elasticsearch/v8 v8.12.0/go.mod h1:wSzJYrrKPZQ8qPuqAqc6KMR4HrBfHnZORvyL+FMFqq0=
github.com/envoyproxy/go-control-plane v0.9.0/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4=
github.com/envoyproxy/go-control-plane v0.9.1-0.20191026205805-5f8ba28d4473/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4=
github.com/envoyproxy/go-control-plane v0.9.4/go.mod h1:6rpuAdCZL397s3pYoYcLgu1mIlRU8Am5FuJP05cCM98=
Expand All @@ -162,6 +166,11 @@ github.com/go-kit/log v0.1.0/go.mod h1:zbhenjAZHb184qTLMA9ZjW7ThYL0H2mk7Q6pNt4vb
github.com/go-logfmt/logfmt v0.3.0/go.mod h1:Qt1PoO58o5twSAckw1HlFXLmHsOX5/0LbT9GBnD5lWE=
github.com/go-logfmt/logfmt v0.4.0/go.mod h1:3RMwSq7FuexP4Kalkev3ejPJsZTpXXBr9+V4qmtdjCk=
github.com/go-logfmt/logfmt v0.5.0/go.mod h1:wCYkCAKZfumFQihp8CzCvQ3paCTfi41vtzG1KdI/P7A=
github.com/go-logr/logr v1.2.2/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A=
github.com/go-logr/logr v1.3.0 h1:2y3SDp0ZXuc6/cjLSZ+Q3ir+QB9T/iG5yYRXqsagWSY=
github.com/go-logr/logr v1.3.0/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY=
github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag=
github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE=
github.com/go-ole/go-ole v1.2.4 h1:nNBDSCOigTSiarFpYE9J/KtEA1IOW4CNeqT9TQDqCxI=
github.com/go-ole/go-ole v1.2.4/go.mod h1:XCwSNxSkXRo4vlyPy93sltvi/qJq0jqQhjqQNIwKuxM=
github.com/go-openapi/jsonpointer v0.19.5/go.mod h1:Pl9vOtqEWErmShwVjC8pYs9cog34VGT37dQOVbmoatg=
Expand Down Expand Up @@ -219,8 +228,8 @@ github.com/google/go-cmp v0.5.3/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/
github.com/google/go-cmp v0.5.4/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.6/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.9 h1:O2Tfq5qg4qc4AmwVlvv0oLiVAGB7enBSJ2x2DqQFi38=
github.com/google/go-cmp v0.5.9/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI=
github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
github.com/google/martian v2.1.0+incompatible/go.mod h1:9I4somxYTbIHy5NJKHRl3wXiIaQGbYVAs8BPL6v8lEs=
github.com/google/pprof v0.0.0-20181206194817-3ea8567a2e57/go.mod h1:zfwlbNMJ+OItoe0UupaVj+oy1omPYYDuagoSzA8v9mc=
github.com/google/pprof v0.0.0-20190515194954-54271f7e092f/go.mod h1:zfwlbNMJ+OItoe0UupaVj+oy1omPYYDuagoSzA8v9mc=
Expand Down Expand Up @@ -543,6 +552,14 @@ go.opencensus.io v0.22.0/go.mod h1:+kGneAE2xo2IficOXnaByMWTGM9T73dGwxeWcUqIpI8=
go.opencensus.io v0.22.2/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw=
go.opencensus.io v0.22.3/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw=
go.opencensus.io v0.23.0/go.mod h1:XItmlyltB5F7CS4xOC1DcqMoFqwtC6OG2xF7mCv7P7E=
go.opentelemetry.io/otel v1.21.0 h1:hzLeKBZEL7Okw2mGzZ0cc4k/A7Fta0uoPgaJCr8fsFc=
go.opentelemetry.io/otel v1.21.0/go.mod h1:QZzNPQPm1zLX4gZK4cMi+71eaorMSGT3A4znnUvNNEo=
go.opentelemetry.io/otel/metric v1.21.0 h1:tlYWfeo+Bocx5kLEloTjbcDwBuELRrIFxwdQ36PlJu4=
go.opentelemetry.io/otel/metric v1.21.0/go.mod h1:o1p3CA8nNHW8j5yuQLdc1eeqEaPfzug24uvsyIEJRWM=
go.opentelemetry.io/otel/sdk v1.21.0 h1:FTt8qirL1EysG6sTQRZ5TokkU8d0ugCj8htOgThZXQ8=
go.opentelemetry.io/otel/sdk v1.21.0/go.mod h1:Nna6Yv7PWTdgJHVRD9hIYywQBRx7pbox6nwBnZIxl/E=
go.opentelemetry.io/otel/trace v1.21.0 h1:WD9i5gzvoUPuXIXH24ZNBudiarZDKuekPqi/E8fpfLc=
go.opentelemetry.io/otel/trace v1.21.0/go.mod h1:LGbsEB0f9LGjN+OZaQQ26sohbOmiMR+BaslueVtS/qQ=
go.uber.org/atomic v1.3.2/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE=
go.uber.org/atomic v1.4.0/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE=
go.uber.org/atomic v1.5.0/go.mod h1:sABNBOSYdrvTF6hTgEIbc7YasKWGhgEQZyfxyTvoXHQ=
Expand Down Expand Up @@ -681,8 +698,8 @@ golang.org/x/sys v0.0.0-20210423082822-04245dca01da/go.mod h1:h1NjWce9XRLGQEsW7w
golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.13.0 h1:Af8nKPmuFypiUBjVoU9V20FiaFXOcuZI21p0ycVYYGE=
golang.org/x/sys v0.13.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.14.0 h1:Vz7Qs629MkJkGyHxUlRHizWJRG2j8fbQKjELVSNhy7Q=
golang.org/x/sys v0.14.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/term v0.0.0-20201117132131-f5c789dd3221/go.mod h1:Nr5EML6q2oocZ2LXRh80K7BxOlk5/8JxuGnuhpl+muw=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8=
Expand Down
143 changes: 139 additions & 4 deletions pumps/elasticsearch.go
Original file line number Diff line number Diff line change
@@ -1,16 +1,20 @@
package pumps

import (
"bytes"
"context"
"crypto/tls"
"encoding/base64"
"encoding/json"
"errors"
"fmt"
"net/http"
"regexp"
"strings"
"time"

elasticv8 "github.com/elastic/go-elasticsearch/v8"
"github.com/elastic/go-elasticsearch/v8/esutil"
"github.com/mitchellh/mapstructure"
elasticv7 "github.com/olivere/elastic/v7"
elasticv3 "gopkg.in/olivere/elastic.v3"
Expand Down Expand Up @@ -126,6 +130,13 @@ type Elasticsearch7Operator struct {
log *logrus.Entry
}

type Elasticsearch8Operator struct {
conf *ElasticsearchConf
esClient *elasticv8.Client
bulkIndexer esutil.BulkIndexer
log *logrus.Entry
}

type ApiKeyTransport struct {
APIKey string
APIKeyID string
Expand All @@ -147,11 +158,13 @@ func (e *ElasticsearchPump) getOperator() (ElasticsearchOperator, error) {

urls := strings.Split(conf.ElasticsearchURL, ",")

var httpTransport http.RoundTripper = nil
httpClient := http.DefaultClient
if conf.AuthAPIKey != "" && conf.AuthAPIKeyID != "" {
conf.Username = ""
conf.Password = ""
httpClient = &http.Client{Transport: &ApiKeyTransport{APIKey: conf.AuthAPIKey, APIKeyID: conf.AuthAPIKeyID}}
httpTransport = &ApiKeyTransport{APIKey: conf.AuthAPIKey, APIKeyID: conf.AuthAPIKeyID}
httpClient = &http.Client{Transport: httpTransport}
}

if conf.UseSSL {
Expand All @@ -160,7 +173,8 @@ func (e *ElasticsearchPump) getOperator() (ElasticsearchOperator, error) {
e.log.WithError(err).Error("Failed to get TLS config")
return nil, err
}
httpClient = &http.Client{Transport: &http.Transport{TLSClientConfig: tlsConf}}
httpTransport = &http.Transport{TLSClientConfig: tlsConf}
httpClient = &http.Client{Transport: httpTransport}
}

switch conf.Version {
Expand Down Expand Up @@ -310,6 +324,36 @@ func (e *ElasticsearchPump) getOperator() (ElasticsearchOperator, error) {
}

op.bulkProcessor, err = p.Do(context.Background())
op.log = e.log
return op, err
case "8":
op := &Elasticsearch8Operator{
conf: &conf,
}

cfg := elasticv8.Config{
Addresses: urls,
}
if conf.Username != "" || conf.Password != "" {
cfg.Username = conf.Username
cfg.Password = conf.Password
}
if httpTransport != nil {
cfg.Transport = httpTransport
}

op.esClient, err = elasticv8.NewClient(cfg)

if err != nil {
return op, err
}

op.bulkIndexer, err = setupElasticsearch8BulkIndexer(op)

if err != nil {
return op, err
}

op.log = e.log
return op, err
default:
Expand All @@ -320,6 +364,28 @@ func (e *ElasticsearchPump) getOperator() (ElasticsearchOperator, error) {
return nil, err
}

func setupElasticsearch8BulkIndexer(op *Elasticsearch8Operator) (esutil.BulkIndexer, error) {
// Setup a bulk indexer
bulkCfg := esutil.BulkIndexerConfig{
Index: getIndexName(op.conf),
Client: op.esClient,
}

if op.conf.BulkConfig.Workers != 0 {
bulkCfg.NumWorkers = op.conf.BulkConfig.Workers
}

if op.conf.BulkConfig.FlushInterval != 0 {
bulkCfg.FlushInterval = time.Duration(op.conf.BulkConfig.FlushInterval) * time.Second
}

// op.conf.BulkConfig.BulkActions not supported

// op.conf.BulkConfig.BulkSize not supported

return esutil.NewBulkIndexer(bulkCfg)
}

func (e *ElasticsearchPump) New() Pump {
newPump := ElasticsearchPump{}
return &newPump
Expand Down Expand Up @@ -360,9 +426,9 @@ func (e *ElasticsearchPump) Init(config interface{}) error {
case "":
e.esConf.Version = "3"
log.Info("Version not specified, defaulting to 3. If you are importing to Elasticsearch 5, please specify \"version\" = \"5\"")
case "3", "5", "6", "7":
case "3", "5", "6", "7", "8":
default:
err := errors.New("Only 3, 5, 6, 7 are valid values for this field")
err := errors.New("Only 3, 5, 6, 7, 8 are valid values for this field")
e.log.Fatal("Invalid version: ", err)
}

Expand Down Expand Up @@ -643,6 +709,75 @@ func (e Elasticsearch7Operator) flushRecords() error {
return e.bulkProcessor.Flush()
}

func (e *Elasticsearch8Operator) processData(ctx context.Context, data []interface{}, esConf *ElasticsearchConf) error {
for dataIndex := range data {
if ctxErr := ctx.Err(); ctxErr != nil {
continue
}

d, ok := data[dataIndex].(analytics.AnalyticsRecord)
if !ok {
e.log.Error("Error while writing ", data[dataIndex], ": data not of type analytics.AnalyticsRecord")
continue
}

mapping, id := getMapping(d, esConf.ExtendedStatistics, esConf.GenerateID, esConf.DecodeBase64)
bs, err := json.Marshal(mapping)
if err != nil {
e.log.Error("Error while writing ", data[dataIndex], ": failed to marshal into JSON: ", err)
continue
}
body := bytes.NewReader(bs)

if !esConf.DisableBulk {
err = e.bulkIndexer.Add(
ctx,
esutil.BulkIndexerItem{
Action: "index",
Body: body,
Index: getIndexName(esConf),
DocumentID: id,

OnSuccess: func(ctx context.Context, item esutil.BulkIndexerItem, resp esutil.BulkIndexerResponseItem) {
e.log.Info("Purged 1 record...")
},
OnFailure: func(ctx context.Context, item esutil.BulkIndexerItem, resp esutil.BulkIndexerResponseItem, err error) {
e.log.Error("Error while writing ", data[dataIndex], err)
},
},
)
if err != nil {
e.log.Error("Error while adding ", data[dataIndex], " to BulkIndexer: ", err)
}
} else {
e.esClient.Index(
getIndexName(esConf),
body,
e.esClient.Index.WithDocumentID(id),
e.esClient.Index.WithContext(ctx),
)
if err != nil {
e.log.Error("Error while writing ", data[dataIndex], err)
}
}
}
if esConf.DisableBulk {
e.log.Info("Purged ", len(data), " records...")
}

return nil
}

func (e *Elasticsearch8Operator) flushRecords() error {
err := e.bulkIndexer.Close(context.Background())
if err != nil {
return err
}
e.log.Info("Purged ", e.bulkIndexer.Stats().NumFlushed, " records in this bulk...")
e.bulkIndexer, err = setupElasticsearch8BulkIndexer(e)
return err
}

// printPurgedBulkRecords print the purged records = bulk size when bulk is enabled
func printPurgedBulkRecords(bulkSize int, err error, logger *logrus.Entry) {
if err != nil {
Expand Down