Skip to content

Commit

Permalink
es-stats command, prints ranges of ledgers in es
Browse files Browse the repository at this point in the history
  • Loading branch information
gzigzigzeo committed Jun 3, 2019
1 parent 3081191 commit a3f1e2f
Show file tree
Hide file tree
Showing 3 changed files with 115 additions and 0 deletions.
112 changes: 112 additions & 0 deletions commands/es-stats.go
@@ -0,0 +1,112 @@
package commands

import (
"bytes"
"encoding/json"
"log"
"os"
"strconv"

"github.com/astroband/astrologer/config"
"github.com/olekukonko/tablewriter"
)

// EsStats prints ledger statistics for current database
func EsStats() {
table := tablewriter.NewWriter(os.Stdout)
table.SetHeader([]string{"From", "To", "Doc_count"})

min, max := esMinMax()
buckets := esRanges(min, max)

for i := 0; i < len(buckets); i++ {
bucket := buckets[i].(map[string]interface{})
count := int(bucket["doc_count"].(float64))
from := int(bucket["from"].(float64))
to := int(bucket["to"].(float64))

table.Append([]string{
strconv.Itoa(from),
strconv.Itoa(to),
strconv.Itoa(count),
})
}

table.Render()
}

func esMinMax() (min int, max int) {
query := map[string]interface{}{
"aggs": map[string]interface{}{
"seq_stats": map[string]interface{}{
"stats": map[string]interface{}{
"field": "seq",
},
},
},
}

r := searchLedgers(query)

aggs := r["aggregations"].(map[string]interface{})["seq_stats"].(map[string]interface{})

min = int(aggs["min"].(float64))
max = int(aggs["max"].(float64))

return min, max
}

func esRanges(min int, max int) []interface{} {
var ranges []map[string]interface{}

for i := 0; i < max; i += 1000000 {
ranges = append(ranges, map[string]interface{}{"from": min + i, "to": min + i + 1000000})
}

query := map[string]interface{}{
"aggs": map[string]interface{}{
"seq_ranges": map[string]interface{}{
"range": map[string]interface{}{
"field": "seq",
"ranges": ranges,
},
},
},
}

r := searchLedgers(query)

aggs := r["aggregations"].(map[string]interface{})["seq_ranges"].(map[string]interface{})
buckets := aggs["buckets"].([]interface{})

return buckets
}

func searchLedgers(query map[string]interface{}) (r map[string]interface{}) {
var buf bytes.Buffer

if err := json.NewEncoder(&buf).Encode(query); err != nil {
log.Fatalf("Error encoding query: %s", err)
}

res, err := config.ES.Search(
config.ES.Search.WithIndex("ledger"),
config.ES.Search.WithBody(&buf),
)

if err != nil {
log.Fatal(err)
}

if res.IsError() {
log.Fatal("Error in response", res.Body)
}

if err := json.NewDecoder(res.Body).Decode(&r); err != nil {
log.Fatalf("Error parsing the response body: %s", err)
}

res.Body.Close()

return r
}
1 change: 1 addition & 0 deletions config/main.go
Expand Up @@ -25,6 +25,7 @@ var (
exportCommand = kingpin.Command("export", "Run export")
ingestCommand = kingpin.Command("ingest", "Start real time ingestion")
statsCommand = kingpin.Command("stats", "Print database ledger statistics")
esStatsCommand = kingpin.Command("es-stats", "Print ES ranges stats")

databaseURL = kingpin.
Flag("database-url", "Stellar Core database URL").
Expand Down
2 changes: 2 additions & 0 deletions main.go
Expand Up @@ -20,5 +20,7 @@ func main() {
commands.Export()
case "ingest":
commands.Ingest()
case "es-stats":
commands.EsStats()
}
}

0 comments on commit a3f1e2f

Please sign in to comment.