-
Notifications
You must be signed in to change notification settings - Fork 0
/
export.go
140 lines (124 loc) · 2.87 KB
/
export.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
package esHandler
import (
"bytes"
"github.com/elastic/go-elasticsearch/v6/esapi"
"github.com/tidwall/gjson"
"io"
"log"
"os"
"time"
"github.com/elastic/go-elasticsearch/v6"
)
var (
scrollID string
batchNum int
Es *elasticsearch.Client
)
type DumpInfo struct {
User string
Password string
Host string
Index string
Size int
Query string
}
func EsInit(dumpInfo DumpInfo) {
var err error
config := elasticsearch.Config{}
if dumpInfo.User != "" && dumpInfo.Password != "" {
config.Addresses = []string{dumpInfo.Host}
config.Username = dumpInfo.User
config.Password = dumpInfo.Password
Es, err = elasticsearch.NewClient(config)
if err != nil {
log.Println(err)
os.Exit(1)
}
} else if dumpInfo.User == "" && dumpInfo.Password == "" {
config.Addresses = []string{dumpInfo.Host}
Es, err = elasticsearch.NewClient(config)
if err != nil {
log.Println(err)
os.Exit(1)
}
}
}
func Read(r io.Reader) string {
var b bytes.Buffer
_, err := b.ReadFrom(r)
if err != nil {
log.Println(err)
os.Exit(1)
}
return b.String()
}
//从elasticsearch中导出索引
func Exporter(dumpInfo *DumpInfo, ch chan string) (err error) {
EsInit(*dumpInfo)
log.Println("导出开始...")
var res *esapi.Response
if dumpInfo.Query!=""{
res, err = Es.Search(
Es.Search.WithIndex(dumpInfo.Index),
Es.Search.WithSort("_doc"),
Es.Search.WithSize(dumpInfo.Size),
Es.Search.WithScroll(time.Minute),
Es.Search.WithQuery(dumpInfo.Query),
)
if err != nil {
return err
}
}else{
res, err = Es.Search(
Es.Search.WithIndex(dumpInfo.Index),
Es.Search.WithSort("_doc"),
Es.Search.WithSize(dumpInfo.Size),
Es.Search.WithScroll(time.Minute),
)
if err != nil {
return err
}
}
//先做查询初始化并获取scrollID
json := Read(res.Body)
defer res.Body.Close()
//将scrollID保存至全局变量中
scrollID = gjson.Get(json, "_scroll_id").String()
res.Body.Close()
// 从response中提取scrollID
scrollID = gjson.Get(json, "_scroll_id").String()
// 提取搜索结果
hits := gjson.Get(json, "hits.hits")
go func() {
for _, v := range hits.Array() {
ch <- v.String()
}
for {
//执行滚动请求并传递scrollID和滚动持续时间
res, err := Es.Scroll(Es.Scroll.WithScrollID(scrollID), Es.Scroll.WithScroll(time.Minute))
if err != nil {
log.Fatalf("Error: %s", err)
}
if res.IsError() {
log.Fatalf("Error response: %s", res)
}
json := Read(res.Body)
res.Body.Close()
// 从response中提取scrollID
scrollID = gjson.Get(json, "_scroll_id").String()
// 提取搜索结果
hits := gjson.Get(json, "hits.hits")
//没有结果时跳出循环
if len(hits.Array()) < 1 {
break
} else {
//遍历每次查询结果得到一条数据并推送至管道
for _, v := range hits.Array() {
ch <- v.String()
}
}
}
close(ch)
}()
return nil
}