-
Notifications
You must be signed in to change notification settings - Fork 335
/
es.go
89 lines (77 loc) · 2.02 KB
/
es.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
package es
import (
"context"
"encoding/json"
"fmt"
"github.com/cortezaproject/corteza/extra/server-discovery/pkg/options"
"github.com/elastic/go-elasticsearch/v7"
"github.com/elastic/go-elasticsearch/v7/esapi"
"github.com/elastic/go-elasticsearch/v7/esutil"
"go.uber.org/zap"
)
type (
es struct {
log *zap.Logger
opt options.EsOpt
//esc *elasticsearch.Client
//esb esutil.BulkIndexer
}
Service interface {
EsClient() *elasticsearch.Client
EsBulk() esutil.BulkIndexer
Watch(ctx context.Context)
}
//apiClientService interface {
// HttpClient() *http.Client
// Mappings() (*http.Request, error)
// Resources(string, url.Values) (*http.Request, error)
// Request(string) (*http.Request, error)
// Authenticate() error
//}
)
func ES(log *zap.Logger, opt options.EsOpt) (out *es, err error) {
out = &es{log: log, opt: opt}
return
}
func (es *es) Client() (*elasticsearch.Client, error) {
config := elasticsearch.Config{
Addresses: es.opt.Addresses,
EnableRetryOnTimeout: es.opt.EnableRetryOnTimeout,
MaxRetries: es.opt.MaxRetries,
}
if len(es.opt.Username) > 0 {
config.Username = es.opt.Username
config.Password = es.opt.Password
}
return elasticsearch.NewClient(config)
}
func (es *es) BulkIndexer() (esutil.BulkIndexer, error) {
client, err := es.Client()
if err != nil {
return nil, err
}
return esutil.NewBulkIndexer(esutil.BulkIndexerConfig{
Client: client,
FlushBytes: 5e+5,
})
}
func ValidElasticResponse(res *esapi.Response, err error) error {
if err != nil {
return fmt.Errorf("failed to get response from search backend: %w", err)
}
if res.IsError() {
defer res.Body.Close()
var rsp struct {
Error struct {
Type string
Reason string
}
}
if err := json.NewDecoder(res.Body).Decode(&rsp); err != nil {
return fmt.Errorf("could not parse response body: %w", err)
} else {
return fmt.Errorf("search backend responded with an error: %s (type: %s, status: %s)", rsp.Error.Reason, rsp.Error.Type, res.Status())
}
}
return nil
}