-
Notifications
You must be signed in to change notification settings - Fork 6
/
document.go
230 lines (217 loc) · 8.12 KB
/
document.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
package elasticsearch
import (
"bytes"
"encoding/json"
"fmt"
"path"
)
// Order can be used to define the order of the Elasticsearch result.
type Order struct {
Field string
Order string // asc or desc
}
// MarshalJSON is the interface implementation for json Marshaler
func (o *Order) MarshalJSON() ([]byte, error) {
return json.Marshal(map[string]interface{}{
o.Field: o.Order,
})
}
// InsertDocument inserts a document in a specific index.
// If the id already exists, the old document will be replaced.
// If refresh is set to false, the result will be returned immediately.
// If refresh is set to true, elasticsearch waits until all changes were done.
// If multiple inserts are done and all changes have to be done before continuing,
// set refresh to false and call Refresh() after.
func (c *Client) InsertDocument(index, doctype, id string, document map[string]interface{}, refresh Refresh) error {
b, err := json.Marshal(document)
if err != nil {
return fmt.Errorf("could not marshal the document: %s", err)
}
apipath := path.Join(index, doctype, id) + "?refresh=" + getRefreshString(refresh)
if _, err := c.put(apipath, b); err != nil {
return fmt.Errorf("could not insert document: %s", err)
}
return nil
}
// GetDocument returns the document in a specific index and a specific id.
func (c *Client) GetDocument(index, doctype, id string) (map[string]interface{}, error) {
apipath := path.Join(index, doctype, id)
b, err := c.get(apipath, nil)
if err != nil {
return nil, fmt.Errorf("could not get document: %s", err)
}
decoder := json.NewDecoder(bytes.NewReader(b))
decoder.UseNumber()
result := map[string]interface{}{}
if err := decoder.Decode(&result); err != nil {
return nil, fmt.Errorf("could not decode document: %s", err)
}
return result, nil
}
// GetDocuments returns multiple documents in a specific index. Order and Query are optional.
// A offset and size have to be defined. The offset+size have to be lower than 10.000, otherwise
// Elasticsearch returns an error. If you want to get more than 10.000, use ScrollDocuments instead.
func (c *Client) GetDocuments(index, doctype string, query map[string]interface{}, from int64, size int64, order *Order) ([]map[string]interface{}, int64, error) {
request := map[string]interface{}{}
if query != nil {
request["query"] = query
}
if order != nil {
request["sort"] = []*Order{order}
}
b, err := json.Marshal(request)
if err != nil {
return nil, 0, fmt.Errorf("could not marshal query: %s", err)
}
apipath := path.Join(index, doctype) + fmt.Sprintf("/_search?from=%d&size=%d", from, size)
b, err = c.get(apipath, b)
if err != nil {
return nil, 0, fmt.Errorf("could not get documents: %s", err)
}
decoder := json.NewDecoder(bytes.NewReader(b))
decoder.UseNumber()
result := struct {
Hits struct {
Total int64 `json:"total"`
Hits []map[string]interface{} `json:"hits"`
} `json:"hits"`
}{}
if err := decoder.Decode(&result); err != nil {
return nil, 0, fmt.Errorf("could not decode documents: %s", err)
}
return result.Hits.Hits, result.Hits.Total, nil
}
// UpdateDocument runs a update script on a specific index and a specific id.
// It's recommended to use parameterized update scripts and pass the parameters in 'params'.
// Then elasticsearch has to compile the script only once. Elasticsearch will also return
// an error, if to many different scripts are executed in a small time interval.
func (c *Client) UpdateDocument(index, doctype, id string, painlessScript string, params map[string]interface{}, refresh Refresh) error {
script := map[string]interface{}{
"source": painlessScript,
"lang": "painless",
}
if params != nil {
script["params"] = params
}
b, err := json.Marshal(map[string]interface{}{
"script": script,
})
if err != nil {
return fmt.Errorf("could not marshal the changes: %s", err)
}
apipath := path.Join(index, doctype, id) + "/_update?refresh=" + getRefreshString(refresh)
if _, err := c.post(apipath, b); err != nil {
return fmt.Errorf("could not update document: %s", err)
}
return nil
}
// UpdateDocuments runs an update script on multiple documents in a specific index. A query is optional.
// It's recommended to use parameterized update scripts and pass the parameters in 'params'.
// Then elasticsearch has to compile the script only once. Elasticsearch will also return
// an error, if to many different scripts are executed in a small time interval.
func (c *Client) UpdateDocuments(index, doctype string, query map[string]interface{}, painlessScript string, params map[string]interface{}, refresh Refresh) error {
script := map[string]interface{}{
"source": painlessScript,
"lang": "painless",
}
if params != nil {
script["params"] = params
}
b, err := json.Marshal(map[string]interface{}{
"query": query,
"script": script,
})
if err != nil {
return fmt.Errorf("could not marshal the query: %s", err)
}
apipath := path.Join(index, doctype) + "/_update_by_query?conflicts=proceed&refresh=" + getRefreshString(refresh)
if _, err := c.post(apipath, b); err != nil {
return fmt.Errorf("could not update documents: %s", err)
}
return nil
}
// DeleteDocument deletes a specific document in a specific index.
func (c *Client) DeleteDocument(index, doctype, id string, refresh Refresh) error {
apipath := path.Join(index, doctype, id) + "?refresh=" + getRefreshString(refresh)
if _, err := c.delete_(apipath, nil); err != nil {
return fmt.Errorf("could not update document: %s", err)
}
return nil
}
// DeleteDocuments deletes multiple documents in a specific index. A query is optional.
func (c *Client) DeleteDocuments(index, doctype string, query map[string]interface{}, refresh Refresh) error {
b, err := json.Marshal(map[string]interface{}{
"query": query,
})
if err != nil {
return fmt.Errorf("could not marshal the query: %s", err)
}
apipath := path.Join(index, doctype) + "/_delete_by_query?refresh=" + getRefreshString(refresh)
if _, err := c.post(apipath, b); err != nil {
return fmt.Errorf("could not delete by query: %s", err)
}
return nil
}
// ScrollDocuments is the more performant solution to get lots of documents in a specific index. A query is optional.
// This function will return always all found documents without an order into the 'docs' channel. Ensure that this function
// is called as a go routine!
func (c *Client) ScrollDocuments(index, doctype string, query map[string]interface{}, docs chan map[string]interface{}) error {
defer close(docs)
apipath := path.Join(index, doctype) + "/_search?scroll=5m"
req := map[string]interface{}{
"size": 1000,
"sort": []string{"_doc"},
}
if query != nil {
req["query"] = query
}
return c.scrollDocuments(apipath, req, docs, "")
}
func (c *Client) scrollDocuments(apipath string, req map[string]interface{}, docs chan map[string]interface{}, scrollId string) error {
scrollResult := struct {
ScrollId string `json:"_scroll_id"`
Hits struct {
Hits []map[string]interface{} `json:"hits"`
} `json:"hits"`
}{}
b, err := json.Marshal(req)
if err != nil {
return fmt.Errorf("could not marshal scroll request: %s", err)
}
res, err := c.post(apipath, b)
if err != nil {
return fmt.Errorf("could not scroll documents: %s", err)
}
decoder := json.NewDecoder(bytes.NewReader(res))
decoder.UseNumber()
if err := decoder.Decode(&scrollResult); err != nil {
return fmt.Errorf("could not unmarshal scroll result: %s", err)
}
if scrollId != "" && scrollId != scrollResult.ScrollId {
if err := c.deleteScroll(scrollId); err != nil {
return fmt.Errorf("could not delete scroll: %s", err)
}
}
if len(scrollResult.Hits.Hits) == 0 {
return nil
}
for _, hit := range scrollResult.Hits.Hits {
docs <- hit
}
return c.scrollDocuments("_search/scroll", map[string]interface{}{
"scroll": "5m",
"scroll_id": scrollResult.ScrollId,
}, docs, scrollResult.ScrollId)
}
func (c *Client) deleteScroll(scrollId string) error {
b, err := json.Marshal(map[string]interface{}{
"scroll_id": scrollId,
})
if err != nil {
return fmt.Errorf("could not marshal the delete scroll query: %s", err)
}
if _, err := c.delete_("_search/scroll", b); err != nil {
return fmt.Errorf("could not delete the scroll: %s", err)
}
return nil
}