/
es.go
154 lines (126 loc) · 3.03 KB
/
es.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
package es
import (
"context"
"fmt"
"github.com/olivere/elastic"
log "github.com/sirupsen/logrus"
)
// Document interface
type Document interface {
Index() string
Type() string
ID() string
}
var defaultClient *elastic.Client
// Init initializes default client
func Init(hosts []string, enableAuth bool, username, password string) (err error) {
if enableAuth {
defaultClient, err = elastic.NewClient(
elastic.SetBasicAuth(username, password),
elastic.SetSniff(false),
elastic.SetURL(hosts...),
)
} else {
defaultClient, err = elastic.NewClient(
elastic.SetSniff(false),
elastic.SetURL(hosts...),
)
}
if err != nil {
return err
}
return nil
}
func checkExists(index string) (bool, error) {
exists, err := defaultClient.IndexExists(index).Do(context.Background())
if err != nil {
return false, err
}
return exists, nil
}
// UpdateSettings update index settings
func UpdateSettings(index string, setting map[string]interface{}) (uIndex *elastic.IndicesPutSettingsResponse, err error) {
// check if index exists
exists, err := checkExists(index)
if err != nil {
return nil, err
}
if exists {
uIndex, err = defaultClient.IndexPutSettings(index).BodyJson(setting).Do(context.Background())
if err != nil {
return nil, err
}
}
return uIndex, nil
}
// CheckAndCreate creates index if not exists
func CheckAndCreate(index string, bodyString string) (cIndex *elastic.IndicesCreateResult, err error) {
// check if index exists
exists, err := checkExists(index)
if err != nil {
return nil, err
}
if !exists {
cIndex, err = defaultClient.CreateIndex(index).BodyString(bodyString).Do(context.Background())
if err != nil {
return nil, err
}
}
return cIndex, nil
}
// CheckAndDelete deletes index if exists
func CheckAndDelete(index string) (dIndex *elastic.IndicesDeleteResponse, err error) {
// check if index exists
exists, err := checkExists(index)
if err != nil {
return nil, err
}
if exists {
dIndex, err = defaultClient.DeleteIndex(index).Do(context.Background())
if err != nil {
return nil, err
}
}
return dIndex, nil
}
func index(index string, docs []Document, count int) {
bulkRequest := defaultClient.Bulk()
for i := 0; i < count; i++ {
bulkRequest = bulkRequest.Add(
elastic.NewBulkIndexRequest().Index(index).Type(docs[i].Type()).Id(docs[i].ID()).Doc(docs[i]),
)
}
if bulkRequest.NumberOfActions() != 0 {
resp, err := bulkRequest.Do(context.Background())
if err != nil {
log.Error(err)
}
if resp != nil && resp.Errors {
for _, failed := range resp.Failed() {
log.Error(failed.Error)
}
}
}
bulkRequest.Reset()
}
// Index index documents to elasticsearch
func Index(idx string, docs []Document) error {
// check if index exists
exists, err := checkExists(idx)
if err != nil {
return err
}
if !exists {
return fmt.Errorf("index %s doesn't exists", idx)
}
for len(docs) > 0 {
if len(docs) >= 10000 {
index(idx, docs, 10000)
docs = docs[10000:]
} else {
index(idx, docs, len(docs))
docs = docs[len(docs):]
}
}
return nil
}