Skip to content

Commit

Permalink
Some sanity checks
Browse files Browse the repository at this point in the history
  • Loading branch information
Philipp Heckel committed Apr 8, 2021
1 parent 639683d commit 11ef470
Show file tree
Hide file tree
Showing 6 changed files with 89 additions and 47 deletions.
45 changes: 36 additions & 9 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,23 +1,50 @@
# elastictl

Simple tool to dump an elasticsearch index into a file to later insert it wih high concurrency.
This can be used for load testing. In my local cluster, I was able to index ~10k documents per second.
Simple tool import/export Elasticsearch indices into a file, and/or reshard an index. The tool can be used for:

Usage:
* Backup/restore of an Elasticsearch index
* Performance test an Elasticsearch cluster (import with high concurrency, see `--workers`)
* Change the shard/replica count of an index (see `reshard` subcomment)

In my local cluster, I was able to import ~10k documents per second.

## Build
```
# Build
go build
$ go build
```

Or via goreleaser:
```
$ make [build | build-snapshot]
```

## Usage:

# Dump index
### Export/dump an index to a file
The first line of the output format is the mapping, the rest are the documents.
```
# Entire index
elastictl export dummy > dummy.json
# Insert index with high concurrency
# Only a subset of documents
elastictl export \
--search '{"query":{"bool":{"must_not":{"match":{"eventType":"Success"}}}}}' \
dummy > dummy.json
```

### Import to new index
```
# With high concurrency
cat dummy.json | elastictl import --workers 100 dummy-copy
```

# Reshard (import/export) an index
### Reshard (import/export) an index
This commands export the index `dummy` to `dummy.json` and re-imports it as `dummy` using a different number of shards.
This command does `DELETE` the index after exporting it!
```
elastictl reshard \
--search '{"query":{"bool":{"must_not":{"match":{"eventType":"Success"}}}}}' \
--shards 1 \
--replicas 1 \
dummy-index
dummy
```
3 changes: 2 additions & 1 deletion cmd/export.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,5 +24,6 @@ func execExport(c *cli.Context) error {
return cli.Exit("invalid syntax: index missing", 1)
}
index := c.Args().Get(0)
return tools.Export(host, index, search, c.App.Writer)
_, err := tools.Export(host, index, search, c.App.Writer)
return err
}
7 changes: 4 additions & 3 deletions cmd/import.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,19 +16,20 @@ var cmdBlast = &cli.Command{
&cli.IntFlag{Name: "workers", Aliases: []string{"w"}, Value: 50, Usage: "number of concurrent workers"},
&cli.IntFlag{Name: "shards", Aliases: []string{"s"}, Value: -1, DefaultText: "no change", Usage: "override the number of shards on index creation"},
&cli.IntFlag{Name: "replicas", Aliases: []string{"r"}, Value: -1, DefaultText: "no change", Usage: "override the number of replicas on index creation"},
&cli.BoolFlag{Name: "nocreate", Aliases: []string{"N"}, Value: false, Usage: "do not create index"},
&cli.BoolFlag{Name: "no-create", Aliases: []string{"N"}, Value: false, Usage: "do not create index"},
},
}

func execImport(c *cli.Context) error {
host := c.String("host")
workers := c.Int("workers")
nocreate := c.Bool("nocreate")
nocreate := c.Bool("no-create")
shards := c.Int("shards")
replicas := c.Int("replicas")
if c.NArg() < 1 {
return cli.Exit("invalid syntax: index missing", 1)
}
index := c.Args().Get(0)
return tools.Import(host, index, workers, nocreate, shards, replicas, c.App.Reader, -1)
_, err := tools.Import(host, index, workers, nocreate, shards, replicas, c.App.Reader, -1)
return err
}
38 changes: 20 additions & 18 deletions tools/export.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,27 +13,27 @@ import (
"strings"
)

func Export(host string, index string, search string, w io.Writer) error {
func Export(host string, index string, search string, w io.Writer) (int, error) {
log.Printf("exporting index %s/%s", host, index)
rootURI := fmt.Sprintf("http://%s", host)

// Dump mapping first
rootIndexURI := fmt.Sprintf("http://%s/%s", host, index)
req, err := http.NewRequest("GET", rootIndexURI, nil)
if err != nil {
return err
return 0, err
}
resp, err := client.Do(req)
if err != nil {
return err
return 0, err
}
rawMapping, err := ioutil.ReadAll(resp.Body)
if err != nil {
return err
return 0, err
}
mapping := gjson.GetBytes(rawMapping, index).String()
if _, err := fmt.Fprintln(w, mapping); err != nil {
return err
return 0, err
}

// Initial search request
Expand All @@ -44,68 +44,70 @@ func Export(host string, index string, search string, w io.Writer) error {
uri := fmt.Sprintf("%s/_search?size=10000&scroll=1m", rootIndexURI)
req, err = http.NewRequest("POST", uri, body)
if err != nil {
return err
return 0, err
}
resp, err = client.Do(req)
if err != nil {
return err
return 0, err
}
if resp.Body == nil {
return err
return 0, err
}

var progress *util.ProgressBar
exported := 0

for {
body, err := ioutil.ReadAll(resp.Body)
if err != nil {
return err
return 0, err
}

if progress == nil {
total := gjson.GetBytes(body, "hits.total")
if !total.Exists() {
return errors.New("no total")
return 0, errors.New("no total")
}
progress = util.NewProgressBarWithTotal(os.Stderr, int(total.Int()))
}

scrollID := gjson.GetBytes(body, "_scroll_id")
if !scrollID.Exists() {
return errors.New("no scroll id: "+string(body))
return 0, errors.New("no scroll id: "+string(body))
}

hits := gjson.GetBytes(body, "hits.hits")
if !hits.Exists() || !hits.IsArray() {
return errors.New("no hits: "+string(body))
return 0, errors.New("no hits: "+string(body))
}
if len(hits.Array()) == 0 {
break // we're done!
}

for _, hit := range hits.Array() {
progress.Add(1)
exported++
progress.Add(int64(len(hit.Raw)))
if _, err := fmt.Fprintln(w, hit.Raw); err != nil {
return err
return 0, err
}
}

uri := fmt.Sprintf("%s/_search/scroll", rootURI)
postBody := fmt.Sprintf(`{"scroll":"1m","scroll_id":"%s"}`, scrollID.String())
req, err := http.NewRequest("POST", uri, strings.NewReader(postBody))
if err != nil {
return err
return 0, err
}

resp, err = client.Do(req)
if err != nil {
return err
return 0, err
}

if resp.Body == nil {
return err
return 0, err
}
}
progress.Done()
return nil
return exported, nil
}
25 changes: 14 additions & 11 deletions tools/import.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,53 +15,54 @@ import (
"os"
"strings"
"sync"
"sync/atomic"
)

var (
client = &http.Client{}
settingsToRemove = []string{"settings.index.creation_date", "settings.index.uuid", "settings.index.version", "settings.index.provided_name"}
)

func Import(host string, index string, workers int, nocreate bool, shards int, replicas int, r io.Reader, totalHint int) error {
func Import(host string, index string, workers int, nocreate bool, shards int, replicas int, r io.Reader, totalHint int) (int, error) {
log.Printf("importing index %s/%s", host, index)
rootURI := fmt.Sprintf("http://%s/%s", host, index)
scanner := bufio.NewScanner(r)

// Create index
if !scanner.Scan() {
return errors.New("cannot read mapping")
return 0, errors.New("cannot read mapping")
}
mapping := scanner.Text()
if !nocreate {
var err error
for _, keyToRemove := range settingsToRemove {
mapping, err = sjson.Delete(mapping, keyToRemove)
if err != nil {
return err
return 0, err
}
}
if shards > 0 {
mapping, err = sjson.Set(mapping, "settings.index.number_of_shards", fmt.Sprintf("%d", shards))
if err != nil {
return err
return 0, err
}
}
if replicas > -1 { // zero replicas is allowed!
mapping, err = sjson.Set(mapping, "settings.index.number_of_replicas", fmt.Sprintf("%d", replicas))
if err != nil {
return err
return 0, err
}
}
req, err := http.NewRequest("PUT", rootURI, strings.NewReader(mapping))
if err != nil {
return err
return 0, err
}
resp, err := client.Do(req)
if err != nil {
return err
return 0, err
}
if resp.StatusCode != 201 && resp.StatusCode != 200 {
return fmt.Errorf("unexpected response code during index creation: %d", resp.StatusCode)
return 0, fmt.Errorf("unexpected response code during index creation: %d", resp.StatusCode)
}
}

Expand All @@ -71,9 +72,10 @@ func Import(host string, index string, workers int, nocreate bool, shards int, r
wg := &sync.WaitGroup{}
docsChan := make(chan string)
progress := util.NewProgressBarWithTotal(os.Stderr, totalHint)
imported := int64(0)
for i := 0; i < workers; i++ {
wg.Add(1)
go importWorker(wg, docsChan, progress, client, rootURI)
go importWorker(wg, docsChan, progress, client, rootURI, &imported)
}

go func() {
Expand All @@ -86,10 +88,10 @@ func Import(host string, index string, workers int, nocreate bool, shards int, r
wg.Wait()
progress.Done()

return nil
return int(imported), nil
}

func importWorker(wg *sync.WaitGroup, docsChan chan string, progress *util.ProgressBar, client *http.Client, rootURI string) {
func importWorker(wg *sync.WaitGroup, docsChan chan string, progress *util.ProgressBar, client *http.Client, rootURI string, imported *int64) {
defer wg.Done()
for doc := range docsChan {
id := url.QueryEscape(gjson.Get(doc, "_id").String())
Expand All @@ -115,5 +117,6 @@ func importWorker(wg *sync.WaitGroup, docsChan chan string, progress *util.Progr
resp.Body.Close()
}
progress.Add(int64(len(source)))
atomic.AddInt64(imported, 1)
}
}
18 changes: 13 additions & 5 deletions tools/reshard.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,8 @@ func Reshard(host string, index string, dir string, keep bool, search string, wo
return err
}
defer file.Close()
if err := Export(host, index, search, file); err != nil {
return err
}
if err := deleteIndex(host, index); err != nil {
exported, err := Export(host, index, search, file)
if err != nil {
return err
}
if _, err := file.Seek(0,0); err != nil {
Expand All @@ -37,12 +35,22 @@ func Reshard(host string, index string, dir string, keep bool, search string, wo
if err != nil {
return err
}
if exported != lines-1 {
return fmt.Errorf("unexpected count: %d documents expected in exported file, got %d", exported, lines-1)
}
if _, err := file.Seek(0,0); err != nil {
return err
}
if err := Import(host, index, workers, false, shards, replicas, file, lines-1); err != nil {
if err := deleteIndex(host, index); err != nil {
return err
}
imported, err := Import(host, index, workers, false, shards, replicas, file, exported)
if err != nil {
return err
}
if imported != exported {
return fmt.Errorf("count mismatch: %d documents exported, but %d imported", exported, imported)
}
if !keep {
file.Close()
os.Remove(filename)
Expand Down

0 comments on commit 11ef470

Please sign in to comment.