From a3f1e2fba255a90b599294f14cf8c92707d4ff08 Mon Sep 17 00:00:00 2001 From: Viktor Sokolov Date: Mon, 3 Jun 2019 11:58:32 +0400 Subject: [PATCH] es-stats command, prints ranges of ledgers in es --- commands/es-stats.go | 112 +++++++++++++++++++++++++++++++++++++++++++ config/main.go | 1 + main.go | 2 + 3 files changed, 115 insertions(+) create mode 100644 commands/es-stats.go diff --git a/commands/es-stats.go b/commands/es-stats.go new file mode 100644 index 0000000..00f4ef4 --- /dev/null +++ b/commands/es-stats.go @@ -0,0 +1,112 @@ +package commands + +import ( + "bytes" + "encoding/json" + "log" + "os" + "strconv" + + "github.com/astroband/astrologer/config" + "github.com/olekukonko/tablewriter" +) + +// EsStats prints ledger statistics for current database +func EsStats() { + table := tablewriter.NewWriter(os.Stdout) + table.SetHeader([]string{"From", "To", "Doc_count"}) + + min, max := esMinMax() + buckets := esRanges(min, max) + + for i := 0; i < len(buckets); i++ { + bucket := buckets[i].(map[string]interface{}) + count := int(bucket["doc_count"].(float64)) + from := int(bucket["from"].(float64)) + to := int(bucket["to"].(float64)) + + table.Append([]string{ + strconv.Itoa(from), + strconv.Itoa(to), + strconv.Itoa(count), + }) + } + + table.Render() +} + +func esMinMax() (min int, max int) { + query := map[string]interface{}{ + "aggs": map[string]interface{}{ + "seq_stats": map[string]interface{}{ + "stats": map[string]interface{}{ + "field": "seq", + }, + }, + }, + } + + r := searchLedgers(query) + + aggs := r["aggregations"].(map[string]interface{})["seq_stats"].(map[string]interface{}) + + min = int(aggs["min"].(float64)) + max = int(aggs["max"].(float64)) + + return min, max +} + +func esRanges(min int, max int) []interface{} { + var ranges []map[string]interface{} + + for i := 0; i < max; i += 1000000 { + ranges = append(ranges, map[string]interface{}{"from": min + i, "to": min + i + 1000000}) + } + + query := map[string]interface{}{ + "aggs": map[string]interface{}{ + "seq_ranges": map[string]interface{}{ + "range": map[string]interface{}{ + "field": "seq", + "ranges": ranges, + }, + }, + }, + } + + r := searchLedgers(query) + + aggs := r["aggregations"].(map[string]interface{})["seq_ranges"].(map[string]interface{}) + buckets := aggs["buckets"].([]interface{}) + + return buckets +} + +func searchLedgers(query map[string]interface{}) (r map[string]interface{}) { + var buf bytes.Buffer + + if err := json.NewEncoder(&buf).Encode(query); err != nil { + log.Fatalf("Error encoding query: %s", err) + } + + res, err := config.ES.Search( + config.ES.Search.WithIndex("ledger"), + config.ES.Search.WithBody(&buf), + ) + + if err != nil { + log.Fatal(err) + } + + if res.IsError() { + log.Fatal("Error in response", res.Body) + } + + if err := json.NewDecoder(res.Body).Decode(&r); err != nil { + log.Fatalf("Error parsing the response body: %s", err) + } + + res.Body.Close() + + return r +} diff --git a/config/main.go b/config/main.go index 8a4d806..e621cb6 100644 --- a/config/main.go +++ b/config/main.go @@ -25,6 +25,7 @@ var ( exportCommand = kingpin.Command("export", "Run export") ingestCommand = kingpin.Command("ingest", "Start real time ingestion") statsCommand = kingpin.Command("stats", "Print database ledger statistics") + esStatsCommand = kingpin.Command("es-stats", "Print ES ranges stats") databaseURL = kingpin. Flag("database-url", "Stellar Core database URL"). diff --git a/main.go b/main.go index dac97be..7b1779e 100644 --- a/main.go +++ b/main.go @@ -20,5 +20,7 @@ func main() { commands.Export() case "ingest": commands.Ingest() + case "es-stats": + commands.EsStats() } }