/
main.go
145 lines (111 loc) · 3.24 KB
/
main.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
package config
import (
"log"
"strconv"
es "github.com/elastic/go-elasticsearch"
"github.com/jmoiron/sqlx"
"gopkg.in/alecthomas/kingpin.v2"
_ "github.com/lib/pq" // Postgres driver
)
// Version Application version
const Version string = "0.0.1"
// NumberWithSign represents arg having sign
type NumberWithSign struct {
Value int
Explicit bool // True if + or - was passed
}
var (
createIndexCommand = kingpin.Command("create-index", "Create ES indexes")
exportCommand = kingpin.Command("export", "Run export")
ingestCommand = kingpin.Command("ingest", "Start real time ingestion")
statsCommand = kingpin.Command("stats", "Print database ledger statistics")
esStatsCommand = kingpin.Command("es-stats", "Print ES ranges stats")
databaseURL = kingpin.
Flag("database-url", "Stellar Core database URL").
Default("postgres://localhost/core?sslmode=disable").
OverrideDefaultFromEnvar("DATABASE_URL").
URL()
esURL = kingpin.
Flag("es-url", "ElasticSearch URL").
Default("http://localhost:9200").
OverrideDefaultFromEnvar("ES_URL").
URL()
// Concurrency How many tasks and goroutines to produce (all at once for now)
Concurrency = kingpin.
Flag("concurrency", "Concurrency for indexing").
Short('c').
Default("5").
OverrideDefaultFromEnvar("CONCURRENCY").
Int()
// BatchSize Batch size for bulk export
BatchSize = exportCommand.
Flag("batch", "Ledger batch size").
Short('b').
Default("50").
Int()
// Start ledger to start with
Start = NumberWithSign{0, false}
start = exportCommand.Arg("start", "Ledger to start indexing, +100 means offset 100 from the first").Default("0").String()
// Count ledgers
Count = exportCommand.Arg("count", "Count of ledgers to ingest, should be aliquout batch size").Default("0").Int()
// StartIngest ledger to start with ingesting
StartIngest = ingestCommand.Arg("start", "Ledger to start ingesting").Int()
// Verbose print data
Verbose = exportCommand.Flag("verbose", "Print indexed data").Bool()
// DryRun do not index data
DryRun = exportCommand.Flag("dry-run", "Do not send actual data to Elastic").Bool()
// ForceRecreateIndexes Allows indexes to be deleted before creation
ForceRecreateIndexes = createIndexCommand.Flag("force", "Delete indexes before creation").Bool()
// DB Instance of sqlx.DB
DB *sqlx.DB
// ES ElasticSearch client instance
ES *es.Client
// Command KingPin command
Command string
)
func initDB() {
databaseDriver := (*databaseURL).Scheme
db, err := sqlx.Connect(databaseDriver, (*databaseURL).String())
if err != nil {
log.Fatal(err)
}
DB = db
}
func initES() {
esCfg := es.Config{
Addresses: []string{(*esURL).String()},
}
client, err := es.NewClient(esCfg)
if err != nil {
log.Fatal(err)
}
ES = client
}
func parseNumberWithSign(value string) (r NumberWithSign, err error) {
v, err := strconv.Atoi(value)
if err != nil {
return r, err
}
r.Value = v
if value[0] == '-' || value[0] == '+' {
r.Explicit = true
}
return r, nil
}
func parseStart() {
if *start == "" {
return
}
s, err := parseNumberWithSign(*start)
if err != nil {
log.Fatal("Error parsing start value", err)
}
Start = s
}
func init() {
kingpin.Version(Version)
Command = kingpin.Parse()
initDB()
initES()
parseStart()
}