-
Notifications
You must be signed in to change notification settings - Fork 0
/
mongo_devops_common.go
179 lines (161 loc) · 5.79 KB
/
mongo_devops_common.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
package mongodb
import (
"fmt"
bulkQuerygen "github.com/influxdata/influxdb-comparisons/bulk_query_gen"
"math/rand"
"time"
)
// MongoDevops produces Mongo-specific queries for the devops use case.
type MongoDevops struct {
bulkQuerygen.CommonParams
DatabaseName string
}
// NewMongoDevops makes an MongoDevops object ready to generate Queries.
func NewMongoDevops(dbConfig bulkQuerygen.DatabaseConfig, interval bulkQuerygen.TimeInterval, duration time.Duration, scaleVar int) bulkQuerygen.QueryGenerator {
return &MongoDevops{
CommonParams: *bulkQuerygen.NewCommonParams(interval, scaleVar),
DatabaseName: dbConfig[bulkQuerygen.DatabaseName],
}
}
// Dispatch fulfills the QueryGenerator interface.
func (d *MongoDevops) Dispatch(i int) bulkQuerygen.Query {
q := NewMongoQuery() // from pool
bulkQuerygen.DevopsDispatchAll(d, i, q, d.ScaleVar)
return q
}
// MaxCPUUsageHourByMinuteOneHost populates a Query for getting the maximum CPU
// usage for one host over the course of an hour.
func (d *MongoDevops) MaxCPUUsageHourByMinuteOneHost(q bulkQuerygen.Query) {
d.maxCPUUsageHourByMinuteNHosts(q.(*MongoQuery), 1, time.Hour)
}
// MaxCPUUsageHourByMinuteTwoHosts populates a Query for getting the maximum CPU
// usage for two hosts over the course of an hour.
func (d *MongoDevops) MaxCPUUsageHourByMinuteTwoHosts(q bulkQuerygen.Query) {
d.maxCPUUsageHourByMinuteNHosts(q.(*MongoQuery), 2, time.Hour)
}
// MaxCPUUsageHourByMinuteFourHosts populates a Query for getting the maximum CPU
// usage for four hosts over the course of an hour.
func (d *MongoDevops) MaxCPUUsageHourByMinuteFourHosts(q bulkQuerygen.Query) {
d.maxCPUUsageHourByMinuteNHosts(q.(*MongoQuery), 4, time.Hour)
}
// MaxCPUUsageHourByMinuteEightHosts populates a Query for getting the maximum CPU
// usage for four hosts over the course of an hour.
func (d *MongoDevops) MaxCPUUsageHourByMinuteEightHosts(q bulkQuerygen.Query) {
d.maxCPUUsageHourByMinuteNHosts(q.(*MongoQuery), 8, time.Hour)
}
// MaxCPUUsageHourByMinuteSixteenHosts populates a Query for getting the maximum CPU
// usage for four hosts over the course of an hour.
func (d *MongoDevops) MaxCPUUsageHourByMinuteSixteenHosts(q bulkQuerygen.Query) {
d.maxCPUUsageHourByMinuteNHosts(q.(*MongoQuery), 16, time.Hour)
}
func (d *MongoDevops) MaxCPUUsageHourByMinuteThirtyTwoHosts(q bulkQuerygen.Query) {
d.maxCPUUsageHourByMinuteNHosts(q.(*MongoQuery), 32, time.Hour)
}
func (d *MongoDevops) MaxCPUUsage12HoursByMinuteOneHost(q bulkQuerygen.Query) {
d.maxCPUUsageHourByMinuteNHosts(q.(*MongoQuery), 1, 12*time.Hour)
}
func (d *MongoDevops) maxCPUUsageHourByMinuteNHosts(qi bulkQuerygen.Query, nhosts int, timeRange time.Duration) {
interval := d.AllInterval.RandWindow(timeRange)
nn := rand.Perm(d.ScaleVar)[:nhosts]
hostnames := []string{}
for _, n := range nn {
hostnames = append(hostnames, fmt.Sprintf("host_%d", n))
}
hostnameClauses := []M{}
for _, h := range hostnames {
if DocumentFormat == SimpleArraysFormat {
hostnameClauses = append(hostnameClauses, M{"hostname": h})
} else {
hostnameClauses = append(hostnameClauses, M{"key": "hostname", "val": h})
}
}
var fieldSpec, fieldPath string
var fieldExpr interface{}
if DocumentFormat == SimpleArraysFormat {
fieldSpec = "fields.usage_user"
fieldExpr = 1
fieldPath = "fields.usage_user"
} else {
fieldSpec = "fields"
fieldExpr = M{ "$filter": M{ "input": "$fields", "as": "field", "cond": M{ "$eq": []string{ "$$field.key", "usage_user" } } } }
fieldPath = "fields.val"
}
var bucketNano = time.Minute.Nanoseconds()
pipelineQuery := []M{
{
"$match": M{
"measurement": "cpu",
"timestamp_ns": M{
"$gte": interval.StartUnixNano(),
"$lt": interval.EndUnixNano(),
},
"tags": M{
"$in": hostnameClauses,
},
},
},
{
"$project": M{
"_id": 0,
"time_bucket": M{
"$subtract": S{
"$timestamp_ns",
M{"$mod": S{"$timestamp_ns", bucketNano}},
},
},
fieldSpec: fieldExpr, // was value: 1
"measurement": 1,
},
},
{
"$unwind": "$fields",
},
{
"$group": M{
"_id": M{"time_bucket": "$time_bucket", "tags": "$tags"},
"agg_value": M{"$max": "$"+fieldPath}, // was: $value
},
},
{
"$sort": M{"_id.time_bucket": 1},
},
}
humanLabel := []byte(fmt.Sprintf("Mongo max cpu, rand %4d hosts, rand %s by 1m", nhosts, timeRange))
q := qi.(*MongoQuery)
q.HumanLabel = humanLabel
q.BsonDoc = pipelineQuery
q.DatabaseName = []byte(d.DatabaseName)
q.CollectionName = []byte("point_data")
q.MeasurementName = []byte("cpu")
q.FieldName = []byte("usage_user")
q.HumanDescription = []byte(fmt.Sprintf("%s: %s (%s, %s, %s, %s)", humanLabel, interval.StartString(), q.DatabaseName, q.CollectionName, q.MeasurementName, q.FieldName))
q.TimeStart = interval.Start
q.TimeEnd = interval.End
q.GroupByDuration = time.Minute
}
func (d *MongoDevops) MeanCPUUsageDayByHourAllHostsGroupbyHost(qi bulkQuerygen.Query) {
// if scaleVar > 10000 {
// // TODO: does this apply to mongo?
// panic("scaleVar > 10000 implies size > 10000, which is not supported on elasticsearch. see https://www.elastic.co/guide/en/elasticsearch/reference/current/search-request-from-size.html")
// }
//
// interval := d.AllInterval.RandWindow(24 * time.Hour)
//
// body := new(bytes.Buffer)
// mustExecuteTemplate(mongoFleetGroupByHostnameQuery, body, MongoFleetQueryParams{
// Start: interval.StartString(),
// End: interval.EndString(),
// Bucket: "1h",
// Field: "usage_user",
// HostnameCount: scaleVar,
// })
//
// humanLabel := []byte("Mongo mean cpu, all hosts, rand 1day by 1hour")
// q := qi.(*HTTPQuery)
// q.HumanLabel = humanLabel
// q.HumanDescription = []byte(fmt.Sprintf("%s: %s", humanLabel, interval.StartString()))
// q.Method = []byte("POST")
//
// q.Path = []byte("/cpu/_search")
// q.Body = body.Bytes()
}