Skip to content

Commit

Permalink
Add functionality for generated cpu-only queries on time-series colle…
Browse files Browse the repository at this point in the history
…ctions
  • Loading branch information
kaywux authored and gregorynoma committed Jul 13, 2021
1 parent f5b332b commit b3f59f5
Show file tree
Hide file tree
Showing 5 changed files with 243 additions and 94 deletions.
265 changes: 211 additions & 54 deletions cmd/tsbs_generate_queries/databases/mongo/devops-naive.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"go.mongodb.org/mongo-driver/bson"

"github.com/timescale/tsbs/cmd/tsbs_generate_queries/uses/devops"
"github.com/timescale/tsbs/internal/utils"
"github.com/timescale/tsbs/pkg/query"
)

Expand All @@ -17,7 +18,9 @@ func init() {
gob.Register(map[string]interface{}{})
gob.Register([]map[string]interface{}{})
gob.Register(bson.M{})
gob.Register(bson.D{})
gob.Register([]bson.M{})
gob.Register(time.Time{})
}

// NaiveDevops produces Mongo-specific queries for the devops use case.
Expand All @@ -42,46 +45,34 @@ func (d *NaiveDevops) GroupByTime(qi query.Query, nHosts, numMetrics int, timeRa
metrics, err := devops.GetCPUMetricsSlice(numMetrics)
panicIfErr(err)

bucketNano := time.Minute.Nanoseconds()
pipelineQuery := []bson.M{
{
"$match": map[string]interface{}{
"$match": bson.M{
"measurement": "cpu",
"timestamp_ns": map[string]interface{}{
"$gte": interval.StartUnixNano(),
"$lt": interval.EndUnixNano(),
"time": bson.M{
"$gte": interval.Start(),
"$lt": interval.End(),
},
"tags.hostname": map[string]interface{}{
"tags.hostname": bson.M{
"$in": hostnames,
},
},
},
{
"$project": map[string]interface{}{
"_id": 0,
"time_bucket": map[string]interface{}{
"$subtract": []interface{}{
"$timestamp_ns",
map[string]interface{}{"$mod": []interface{}{"$timestamp_ns", bucketNano}},
},
"$group": bson.M{
"_id": bson.M{
"$dateTrunc": bson.M{"date": "$time", "unit": "minute"},
},

"fields": 1,
},
},
}

group := bson.M{
"$group": bson.M{
"_id": "$time_bucket",
{
"$sort": bson.M{"_id": 1},
},
}
resultMap := group["$group"].(bson.M)
resultMap := pipelineQuery[1]["$group"].(bson.M)
for _, metric := range metrics {
resultMap["max_"+metric] = bson.M{"$max": "$fields." + metric}
resultMap["max_"+metric] = bson.M{"$max": "$" + metric}
}
pipelineQuery = append(pipelineQuery, group)
pipelineQuery = append(pipelineQuery, bson.M{"$sort": bson.M{"_id": 1}})

humanLabel := []byte(fmt.Sprintf("Mongo [NAIVE] %d cpu metric(s), random %4d hosts, random %s by 1m", numMetrics, nHosts, timeRange))
q := qi.(*query.Mongo)
Expand All @@ -102,61 +93,227 @@ func (d *NaiveDevops) GroupByTimeAndPrimaryTag(qi query.Query, numMetrics int) {
interval := d.Interval.MustRandWindow(devops.DoubleGroupByDuration)
metrics, err := devops.GetCPUMetricsSlice(numMetrics)
panicIfErr(err)
bucketNano := time.Hour.Nanoseconds()

pipelineQuery := []bson.M{
{
"$match": bson.M{
"measurement": "cpu",
"timestamp_ns": bson.M{
"$gte": interval.StartUnixNano(),
"$lt": interval.EndUnixNano(),
"time": bson.M{
"$gte": interval.Start(),
"$lt": interval.End(),
},
},
},
{
"$project": bson.M{
"_id": 0,
"time_bucket": bson.M{
"$subtract": []interface{}{
"$timestamp_ns",
bson.M{"$mod": []interface{}{"$timestamp_ns", bucketNano}},
"$group": bson.M{
"_id": bson.M{
"time": bson.M{
"$dateTrunc": bson.M{"date": "$time", "unit": "hour"},
},
"hostname": "$tags.hostname",
},

"fields": 1,
"measurement": 1,
"tags": "$tags.hostname",
},
},
{
"$sort": bson.D{{"_id.time", 1}, {"_id.hostname", 1}},
},
}
resultMap := pipelineQuery[1]["$group"].(bson.M)
for _, metric := range metrics {
resultMap["avg_"+metric] = bson.M{"$avg": "$" + metric}
}

humanLabel := devops.GetDoubleGroupByLabel("Mongo [NAIVE]", numMetrics)
q := qi.(*query.Mongo)
q.HumanLabel = []byte(humanLabel)
q.BsonDoc = pipelineQuery
q.CollectionName = []byte("point_data")
q.HumanDescription = []byte(fmt.Sprintf("%s: %s (%s)", humanLabel, interval.StartString(), q.CollectionName))
}

// MaxAllCPU selects the MAX of all metrics under 'cpu' per hour for nhosts hosts,
// e.g. in pseudo-SQL:
//
// SELECT MAX(metric1), ..., MAX(metricN)
// FROM cpu WHERE (hostname = '$HOSTNAME_1' OR ... OR hostname = '$HOSTNAME_N')
// AND time >= '$HOUR_START' AND time < '$HOUR_END'
// GROUP BY hour ORDER BY hour
func (d *NaiveDevops) MaxAllCPU(qi query.Query, nHosts int, duration time.Duration) {
interval := d.Interval.MustRandWindow(duration)
hostnames, err := d.GetRandomHosts(nHosts)
panicIfErr(err)
metrics := devops.GetAllCPUMetrics()

// Add groupby operator
group := bson.M{
"$group": bson.M{
"_id": bson.M{
"time": "$time_bucket",
"hostname": "$tags",
pipelineQuery := []bson.M{
{
"$match": bson.M{
"measurement": "cpu",
"tags.hostname": bson.M{
"$in": hostnames,
},
"time": bson.M{
"$gte": interval.Start(),
"$lt": interval.End(),
},
},
},
{
"$group": bson.M{
"_id": bson.M{
"$dateTrunc": bson.M{"date": "$time", "unit": "hour"},
},
},
},
{
"$sort": bson.M{"_id": 1},
},
}
resultMap := group["$group"].(bson.M)
resultMap := pipelineQuery[1]["$group"].(bson.M)
for _, metric := range metrics {
resultMap["avg_"+metric] = bson.M{"$avg": "$fields." + metric}
resultMap["max_"+metric] = bson.M{"$max": "$" + metric}
}
pipelineQuery = append(pipelineQuery, group)

// Add sort operator
pipelineQuery = append(pipelineQuery, []bson.M{
{"$sort": bson.M{"_id.hostname": 1}},
{"$sort": bson.M{"_id.time": 1}},
}...)
pipelineQuery = append(pipelineQuery, bson.M{"$sort": bson.M{"_id.time": 1, "_id.hostname": 1}})
humanLabel := devops.GetMaxAllLabel("Mongo", nHosts)
q := qi.(*query.Mongo)
q.HumanLabel = []byte(humanLabel)
q.BsonDoc = pipelineQuery
q.CollectionName = []byte("point_data")
q.HumanDescription = []byte(fmt.Sprintf("%s: %s", humanLabel, interval.StartString()))
}

// HighCPUForHosts populates a query that gets CPU metrics when the CPU has high
// usage between a time period for a number of hosts (if 0, it will search all hosts),
// e.g. in pseudo-SQL:
//
// SELECT * FROM cpu
// WHERE usage_user > 90.0
// AND time >= '$TIME_START' AND time < '$TIME_END'
// AND (hostname = '$HOST' OR hostname = '$HOST2'...)
func (d *NaiveDevops) HighCPUForHosts(qi query.Query, nHosts int) {
interval := d.Interval.MustRandWindow(devops.HighCPUDuration)

pipelineQuery := []bson.M{}

humanLabel := devops.GetDoubleGroupByLabel("Mongo [NAIVE]", numMetrics)
// Must match in the documents that correspond to time, as well as optionally
// filter on those with the correct host if nHosts > 0
match := bson.M{
"$match": bson.M{
"measurement": "cpu",
"time": bson.M{
"$gte": interval.Start(),
"$lt": interval.End(),
},
"usage_user": bson.M{"$gt": 90.0},
},
}
if nHosts > 0 {
hostnames, err := d.GetRandomHosts(nHosts)
panicIfErr(err)
matchMap := match["$match"].(bson.M)
matchMap["tags.hostname"] = bson.M{"$in": hostnames}
}
pipelineQuery = append(pipelineQuery, match)
pipelineQuery = append(pipelineQuery, bson.M{"$set": bson.M{"tags": "$tags.hostname"}})

humanLabel, err := devops.GetHighCPULabel("Mongo", nHosts)
panicIfErr(err)
q := qi.(*query.Mongo)
q.HumanLabel = []byte(humanLabel)
q.BsonDoc = pipelineQuery
q.CollectionName = []byte("point_data")
q.HumanDescription = []byte(fmt.Sprintf("%s: %s (%s)", humanLabel, interval.StartString(), q.CollectionName))
}

// LastPointPerHost finds the last row for every host in the dataset, e.g. in pseudo-SQL:

// SELECT DISTINCT ON (hostname) * FROM cpu
// ORDER BY hostname, time DESC
func (d *NaiveDevops) LastPointPerHost(qi query.Query) {
pipelineQuery := []bson.M{
{"$match": bson.M{"measurement": "cpu"}},
{
"$group": bson.M{
"_id": bson.M{"hostname": "$tags.hostname"},
"last_time": bson.M{"$max": "$time"},
},
},
{
"$lookup": bson.M{
"from": "point_data",
"let": bson.M{"time": "$last_time", "hostname": "$_id.hostname"},
"pipeline": []bson.M{
{
"$match": bson.M{
"$expr": bson.M{
"$and": []bson.M{
{"$eq": []interface{}{"$time", "$$time"}},
{"$eq": []interface{}{"$tags.hostname", "$$hostname"}},
{"$eq": []interface{}{"$measurement", "cpu"}},
},
},
},
},
{
"$project": bson.M{
"time": 0,
"tags": 0,
"_id": 0,
},
},
},
"as": "metrics",
},
},
}

humanLabel := "Mongo last row per host"
q := qi.(*query.Mongo)
q.HumanLabel = []byte(humanLabel)
q.BsonDoc = pipelineQuery
q.CollectionName = []byte("point_data")
q.HumanDescription = []byte(fmt.Sprintf("%s", humanLabel))
}

// GroupByOrderByLimit populates a query.Query that has a time WHERE clause, that groups by a
// truncated date, orders by that date, and takes a limit, e.g. in pseudo-SQL:
//
// SELECT minute, MAX(cpu) FROM cpu
// WHERE time < '$TIME'
// GROUP BY minute ORDER BY minute DESC
// LIMIT $LIMIT
func (d *NaiveDevops) GroupByOrderByLimit(qi query.Query) {
interval := d.Interval.MustRandWindow(time.Hour)
interval, err := utils.NewTimeInterval(d.Interval.Start(), interval.End())
if err != nil {
panic(err.Error())
}

pipelineQuery := []bson.M{
{
"$match": bson.M{
"measurement": "cpu",
"time": bson.M{
"$gte": interval.Start(),
"$lt": interval.End(),
},
},
},
{
"$group": bson.M{
"_id": bson.M{
"$dateTrunc": bson.M{"date": "$time", "unit": "minute"},
},
"max_value": bson.M{"$max": "$usage_user"},
},
},
{"$sort": bson.M{"_id": -1}},
{"$limit": 5},
}

humanLabel := "Mongo max cpu over last 5 min-intervals (random end)"
q := qi.(*query.Mongo)
q.HumanLabel = []byte(humanLabel)
q.BsonDoc = pipelineQuery
q.CollectionName = []byte("point_data")
q.HumanDescription = []byte(fmt.Sprintf("%s: %s", humanLabel, interval.EndString()))
}

0 comments on commit b3f59f5

Please sign in to comment.