forked from influxdata/influxdb
/
reporting.go
95 lines (76 loc) · 2.58 KB
/
reporting.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
package stressClient
import (
"log"
"strconv"
"time"
influx "github.com/influxdata/influxdb/client/v2"
)
// reporting.go contains functions to emit tags and points from various parts of stressClient
// These points are then written to the ("_%v", sf.TestName) database
// These are the tags that stressClient adds to any response points
func (sc *stressClient) tags(statementID string) map[string]string {
tags := map[string]string{
"number_targets": fmtInt(len(sc.addresses)),
"precision": sc.precision,
"writers": fmtInt(sc.wconc),
"readers": fmtInt(sc.qconc),
"test_id": sc.testID,
"statement_id": statementID,
"write_interval": sc.wdelay,
"query_interval": sc.qdelay,
}
return tags
}
// These are the tags that the StressTest adds to any response points
func (st *StressTest) tags() map[string]string {
tags := map[string]string{
"precision": st.Precision,
"batch_size": fmtInt(st.BatchSize),
}
return tags
}
// This function makes a *client.Point for reporting on writes
func (sc *stressClient) writePoint(retries int, statementID string, statusCode int, responseTime time.Duration, addedTags map[string]string, writeBytes int) *influx.Point {
tags := sumTags(sc.tags(statementID), addedTags)
fields := map[string]interface{}{
"status_code": statusCode,
"response_time_ns": responseTime.Nanoseconds(),
"num_bytes": writeBytes,
}
point, err := influx.NewPoint("write", tags, fields, time.Now())
if err != nil {
log.Fatalf("Error creating write results point\n error: %v\n", err)
}
return point
}
// This function makes a *client.Point for reporting on queries
func (sc *stressClient) queryPoint(statementID string, body []byte, statusCode int, responseTime time.Duration, addedTags map[string]string) *influx.Point {
tags := sumTags(sc.tags(statementID), addedTags)
fields := map[string]interface{}{
"status_code": statusCode,
"num_bytes": len(body),
"response_time_ns": responseTime.Nanoseconds(),
}
point, err := influx.NewPoint("query", tags, fields, time.Now())
if err != nil {
log.Fatalf("Error creating query results point\n error: %v\n", err)
}
return point
}
// Adds two map[string]string together
func sumTags(tags1, tags2 map[string]string) map[string]string {
tags := make(map[string]string)
// Add all tags from first map to return map
for k, v := range tags1 {
tags[k] = v
}
// Add all tags from second map to return map
for k, v := range tags2 {
tags[k] = v
}
return tags
}
// Turns an int into a string
func fmtInt(i int) string {
return strconv.FormatInt(int64(i), 10)
}