-
Notifications
You must be signed in to change notification settings - Fork 0
/
es-restore-index.go
122 lines (114 loc) · 2.86 KB
/
es-restore-index.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
// Copyright 2017 BrightLocal Ltd. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package main
import (
"context"
"encoding/json"
"flag"
"fmt"
"io"
"log"
"os"
"path/filepath"
"strings"
"time"
gzip "github.com/klauspost/pgzip"
"github.com/olivere/elastic/v7"
"github.com/BrightLocal/ES-Backup/app/item"
)
var appVersion = "<none>"
func main() {
var (
hosts string
index string
files string
)
flag.StringVar(&hosts, "hosts", "", "List of ElasticSearch hosts")
flag.StringVar(&index, "index", "", "Index to restore")
flag.StringVar(&files, "files", "", "Files to use")
getVersion := flag.Bool("version", false, "Get version")
flag.Parse()
if *getVersion {
fmt.Println(appVersion)
return
}
if hosts == "" {
flag.Usage()
os.Exit(1)
}
if index == "" {
flag.Usage()
os.Exit(1)
}
if files == "" {
flag.Usage()
os.Exit(1)
}
list, err := filepath.Glob(files)
if err != nil {
log.Fatalf("Error getting files list: %s", err)
}
if len(list) == 0 {
log.Fatalf("No files found")
}
log.Printf("Importing from %d file(s)", len(list))
args := []elastic.ClientOptionFunc{elastic.SetMaxRetries(10)}
for _, h := range strings.Split(hosts, ",") {
args = append(args, elastic.SetURL(h), elastic.SetSniff(false))
}
esClient, err := elastic.NewClient(args...)
if err != nil {
log.Fatalf("Error connecting to ElasticSearch at %q: %s", hosts, err)
}
start := time.Now()
total := 0
for _, fileName := range list {
file, err := os.Open(fileName)
if err != nil {
log.Fatalf("Error opening file %q: %s", fileName, err)
}
gz, err := gzip.NewReader(file)
if err != nil {
log.Fatalf("Error creating uncompressor: %s", err)
}
decoder := json.NewDecoder(gz)
i := 0
bs := elastic.NewBulkService(esClient)
for {
var line item.Record
if err := decoder.Decode(&line); err != nil {
if err == io.EOF {
break
} else if err != nil {
log.Printf("Error reading from file %q: %s", fileName, err)
break
}
}
i++
total++
bs.Add(elastic.NewBulkUpdateRequest().Index(index).Id(line.ID).DocAsUpsert(true).Doc(line.Source))
if bs.EstimatedSizeInBytes() > 10*1024*1024 {
if resp, err := bs.Do(context.TODO()); err != nil {
log.Fatalf("Error during bulk upsert: %s", err)
} else if resp.Errors {
for _, rr := range resp.Failed() {
log.Printf("Error: %s", rr.Error.Reason)
}
log.Fatal()
}
log.Printf("Records inserted: %d", total)
}
}
if resp, err := bs.Do(context.TODO()); err != nil {
log.Fatalf("Error during bulk upsert: %s", err)
} else if resp.Errors {
for _, rr := range resp.Failed() {
log.Printf("Error: %s", rr.Error.Reason)
}
log.Fatal()
}
log.Printf("Records in %q: %d", fileName, i)
}
log.Printf("%d records processed in %s", total, time.Now().Sub(start).String())
}