forked from influxdata/influxdb
-
Notifications
You must be signed in to change notification settings - Fork 0
/
influx_stress.go
128 lines (97 loc) · 3.22 KB
/
influx_stress.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
package main
import (
"flag"
"fmt"
"runtime"
"sort"
"time"
"github.com/influxdb/influxdb/stress"
)
var (
batchSize = flag.Int("batchsize", 0, "number of points per batch")
concurrency = flag.Int("concurrency", 0, "number of simultaneous writes to run")
batchInterval = flag.Duration("batchinterval", 0*time.Second, "duration between batches")
database = flag.String("database", "", "name of database")
address = flag.String("addr", "", "IP address and port of database (e.g., localhost:8086)")
precision = flag.String("precision", "", "The precision that points in the database will be with")
test = flag.String("test", "", "The stress test file")
)
func main() {
var cfg *runner.Config
var err error
runtime.GOMAXPROCS(runtime.NumCPU())
flag.Parse()
if *test == "" {
fmt.Println("'-test' flag is required")
return
}
cfg, err = runner.DecodeFile(*test)
if err != nil {
fmt.Println(err)
return
}
if *batchSize != 0 {
cfg.Write.BatchSize = *batchSize
}
if *concurrency != 0 {
cfg.Write.Concurrency = *concurrency
}
if *batchInterval != 0*time.Second {
cfg.Write.BatchInterval = batchInterval.String()
}
if *database != "" {
cfg.Write.Database = *database
}
if *address != "" {
cfg.Write.Address = *address
}
if *precision != "" {
cfg.Write.Precision = *precision
}
d := make(chan struct{})
seriesQueryResults := make(chan runner.QueryResults)
if cfg.SeriesQuery.Enabled {
go runner.SeriesQuery(cfg, d, seriesQueryResults)
}
measurementQueryResults := make(chan runner.QueryResults)
ts := make(chan time.Time)
if cfg.MeasurementQuery.Enabled {
go runner.MeasurementQuery(cfg, ts, measurementQueryResults)
}
// Get the stress results
totalPoints, failedRequests, responseTimes, timer := runner.Run(cfg, d, ts)
sort.Sort(sort.Reverse(sort.Interface(responseTimes)))
total := int64(0)
for _, t := range responseTimes {
total += int64(t.Value)
}
mean := total / int64(len(responseTimes))
fmt.Printf("Wrote %d points at average rate of %.0f\n", totalPoints, float64(totalPoints)/timer.Elapsed().Seconds())
fmt.Printf("%d requests failed for %d total points that didn't get posted.\n", failedRequests, failedRequests**batchSize)
fmt.Println("Average response time: ", time.Duration(mean))
fmt.Println("Slowest response times:")
for _, r := range responseTimes[:100] {
fmt.Println(time.Duration(r.Value))
}
// Get series query results
if cfg.SeriesQuery.Enabled {
qrs := <-seriesQueryResults
queryTotal := int64(0)
for _, qt := range qrs.ResponseTimes {
queryTotal += int64(qt.Value)
}
seriesQueryMean := queryTotal / int64(len(qrs.ResponseTimes))
fmt.Printf("Queried Series %d times with a average response time of %v milliseconds\n", qrs.TotalQueries, time.Duration(seriesQueryMean).Seconds()*1000)
}
// Get measurement query results
if cfg.MeasurementQuery.Enabled {
qrs := <-measurementQueryResults
queryTotal := int64(0)
for _, qt := range qrs.ResponseTimes {
queryTotal += int64(qt.Value)
}
seriesQueryMean := queryTotal / int64(len(qrs.ResponseTimes))
fmt.Printf("Queried Measurement %d times with a average response time of %v milliseconds\n", qrs.TotalQueries, time.Duration(seriesQueryMean).Seconds()*1000)
}
return
}