Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support for MongoDB time-series collections #173

Open
wants to merge 12 commits into
base: master
Choose a base branch
from
Open
3 changes: 1 addition & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -90,8 +90,7 @@ query execution performance. (It currently does not measure
concurrent insert and query performance, which is a future priority.)
To accomplish this in a fair way, the data to be inserted and the
queries to run are pre-generated and native Go clients are used
wherever possible to connect to each database (e.g., `mgo` for MongoDB,
`aws sdk` for Timestream).
wherever possible to connect to each database (e.g., `aws sdk` for Timestream).

Although the data is randomly generated, TSBS data and queries are
entirely deterministic. By supplying the same PRNG (pseudo-random number
Expand Down
268 changes: 213 additions & 55 deletions cmd/tsbs_generate_queries/databases/mongo/devops-naive.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,10 @@ import (
"fmt"
"time"

"github.com/globalsign/mgo/bson"
"go.mongodb.org/mongo-driver/bson"

"github.com/timescale/tsbs/cmd/tsbs_generate_queries/uses/devops"
"github.com/timescale/tsbs/internal/utils"
"github.com/timescale/tsbs/pkg/query"
)

Expand All @@ -16,7 +18,9 @@ func init() {
gob.Register(map[string]interface{}{})
gob.Register([]map[string]interface{}{})
gob.Register(bson.M{})
gob.Register(bson.D{})
gob.Register([]bson.M{})
gob.Register(time.Time{})
}

// NaiveDevops produces Mongo-specific queries for the devops use case.
Expand All @@ -41,46 +45,34 @@ func (d *NaiveDevops) GroupByTime(qi query.Query, nHosts, numMetrics int, timeRa
metrics, err := devops.GetCPUMetricsSlice(numMetrics)
panicIfErr(err)

bucketNano := time.Minute.Nanoseconds()
pipelineQuery := []bson.M{
{
"$match": map[string]interface{}{
"$match": bson.M{
"measurement": "cpu",
"timestamp_ns": map[string]interface{}{
"$gte": interval.StartUnixNano(),
"$lt": interval.EndUnixNano(),
"time": bson.M{
"$gte": interval.Start(),
"$lt": interval.End(),
},
"tags.hostname": map[string]interface{}{
"tags.hostname": bson.M{
"$in": hostnames,
},
},
},
{
"$project": map[string]interface{}{
"_id": 0,
"time_bucket": map[string]interface{}{
"$subtract": []interface{}{
"$timestamp_ns",
map[string]interface{}{"$mod": []interface{}{"$timestamp_ns", bucketNano}},
},
"$group": bson.M{
"_id": bson.M{
"$dateTrunc": bson.M{"date": "$time", "unit": "minute"},
},

"fields": 1,
},
},
}

group := bson.M{
"$group": bson.M{
"_id": "$time_bucket",
{
"$sort": bson.M{"_id": 1},
},
}
resultMap := group["$group"].(bson.M)
resultMap := pipelineQuery[1]["$group"].(bson.M)
for _, metric := range metrics {
resultMap["max_"+metric] = bson.M{"$max": "$fields." + metric}
resultMap["max_"+metric] = bson.M{"$max": "$" + metric}
}
pipelineQuery = append(pipelineQuery, group)
pipelineQuery = append(pipelineQuery, bson.M{"$sort": bson.M{"_id": 1}})

humanLabel := []byte(fmt.Sprintf("Mongo [NAIVE] %d cpu metric(s), random %4d hosts, random %s by 1m", numMetrics, nHosts, timeRange))
q := qi.(*query.Mongo)
Expand All @@ -101,61 +93,227 @@ func (d *NaiveDevops) GroupByTimeAndPrimaryTag(qi query.Query, numMetrics int) {
interval := d.Interval.MustRandWindow(devops.DoubleGroupByDuration)
metrics, err := devops.GetCPUMetricsSlice(numMetrics)
panicIfErr(err)
bucketNano := time.Hour.Nanoseconds()

pipelineQuery := []bson.M{
{
"$match": bson.M{
"measurement": "cpu",
"timestamp_ns": bson.M{
"$gte": interval.StartUnixNano(),
"$lt": interval.EndUnixNano(),
"time": bson.M{
"$gte": interval.Start(),
"$lt": interval.End(),
},
},
},
{
"$project": bson.M{
"_id": 0,
"time_bucket": bson.M{
"$subtract": []interface{}{
"$timestamp_ns",
bson.M{"$mod": []interface{}{"$timestamp_ns", bucketNano}},
"$group": bson.M{
"_id": bson.M{
"time": bson.M{
"$dateTrunc": bson.M{"date": "$time", "unit": "hour"},
},
"hostname": "$tags.hostname",
},

"fields": 1,
"measurement": 1,
"tags": "$tags.hostname",
},
},
{
"$sort": bson.D{{"_id.time", 1}, {"_id.hostname", 1}},
},
}
resultMap := pipelineQuery[1]["$group"].(bson.M)
for _, metric := range metrics {
resultMap["avg_"+metric] = bson.M{"$avg": "$" + metric}
}

// Add groupby operator
group := bson.M{
"$group": bson.M{
"_id": bson.M{
"time": "$time_bucket",
"hostname": "$tags",
humanLabel := devops.GetDoubleGroupByLabel("Mongo [NAIVE]", numMetrics)
q := qi.(*query.Mongo)
q.HumanLabel = []byte(humanLabel)
q.BsonDoc = pipelineQuery
q.CollectionName = []byte("point_data")
q.HumanDescription = []byte(fmt.Sprintf("%s: %s (%s)", humanLabel, interval.StartString(), q.CollectionName))
}

// MaxAllCPU selects the MAX of all metrics under 'cpu' per hour for nhosts hosts,
// e.g. in pseudo-SQL:
//
// SELECT MAX(metric1), ..., MAX(metricN)
// FROM cpu WHERE (hostname = '$HOSTNAME_1' OR ... OR hostname = '$HOSTNAME_N')
// AND time >= '$HOUR_START' AND time < '$HOUR_END'
// GROUP BY hour ORDER BY hour
func (d *NaiveDevops) MaxAllCPU(qi query.Query, nHosts int, duration time.Duration) {
interval := d.Interval.MustRandWindow(duration)
hostnames, err := d.GetRandomHosts(nHosts)
panicIfErr(err)
metrics := devops.GetAllCPUMetrics()

pipelineQuery := []bson.M{
{
"$match": bson.M{
"measurement": "cpu",
"tags.hostname": bson.M{
"$in": hostnames,
},
"time": bson.M{
"$gte": interval.Start(),
"$lt": interval.End(),
},
},
},
{
"$group": bson.M{
"_id": bson.M{
"$dateTrunc": bson.M{"date": "$time", "unit": "hour"},
},
},
},
{
"$sort": bson.M{"_id": 1},
},
}
resultMap := group["$group"].(bson.M)
resultMap := pipelineQuery[1]["$group"].(bson.M)
for _, metric := range metrics {
resultMap["avg_"+metric] = bson.M{"$avg": "$fields." + metric}
resultMap["max_"+metric] = bson.M{"$max": "$" + metric}
}
pipelineQuery = append(pipelineQuery, group)

// Add sort operator
pipelineQuery = append(pipelineQuery, []bson.M{
{"$sort": bson.M{"_id.hostname": 1}},
{"$sort": bson.M{"_id.time": 1}},
}...)
pipelineQuery = append(pipelineQuery, bson.M{"$sort": bson.M{"_id.time": 1, "_id.hostname": 1}})
humanLabel := devops.GetMaxAllLabel("Mongo", nHosts)
q := qi.(*query.Mongo)
q.HumanLabel = []byte(humanLabel)
q.BsonDoc = pipelineQuery
q.CollectionName = []byte("point_data")
q.HumanDescription = []byte(fmt.Sprintf("%s: %s", humanLabel, interval.StartString()))
}

// HighCPUForHosts populates a query that gets CPU metrics when the CPU has high
// usage between a time period for a number of hosts (if 0, it will search all hosts),
// e.g. in pseudo-SQL:
//
// SELECT * FROM cpu
// WHERE usage_user > 90.0
// AND time >= '$TIME_START' AND time < '$TIME_END'
// AND (hostname = '$HOST' OR hostname = '$HOST2'...)
func (d *NaiveDevops) HighCPUForHosts(qi query.Query, nHosts int) {
interval := d.Interval.MustRandWindow(devops.HighCPUDuration)

humanLabel := devops.GetDoubleGroupByLabel("Mongo [NAIVE]", numMetrics)
pipelineQuery := []bson.M{}

// Must match in the documents that correspond to time, as well as optionally
// filter on those with the correct host if nHosts > 0
match := bson.M{
"$match": bson.M{
"measurement": "cpu",
"time": bson.M{
"$gte": interval.Start(),
"$lt": interval.End(),
},
"usage_user": bson.M{"$gt": 90.0},
},
}
if nHosts > 0 {
hostnames, err := d.GetRandomHosts(nHosts)
panicIfErr(err)
matchMap := match["$match"].(bson.M)
matchMap["tags.hostname"] = bson.M{"$in": hostnames}
}
pipelineQuery = append(pipelineQuery, match)
pipelineQuery = append(pipelineQuery, bson.M{"$set": bson.M{"tags": "$tags.hostname"}})

humanLabel, err := devops.GetHighCPULabel("Mongo", nHosts)
panicIfErr(err)
q := qi.(*query.Mongo)
q.HumanLabel = []byte(humanLabel)
q.BsonDoc = pipelineQuery
q.CollectionName = []byte("point_data")
q.HumanDescription = []byte(fmt.Sprintf("%s: %s (%s)", humanLabel, interval.StartString(), q.CollectionName))
}

// LastPointPerHost finds the last row for every host in the dataset, e.g. in pseudo-SQL:

// SELECT DISTINCT ON (hostname) * FROM cpu
// ORDER BY hostname, time DESC
func (d *NaiveDevops) LastPointPerHost(qi query.Query) {
pipelineQuery := []bson.M{
{"$match": bson.M{"measurement": "cpu"}},
{
"$group": bson.M{
"_id": bson.M{"hostname": "$tags.hostname"},
"last_time": bson.M{"$max": "$time"},
},
},
{
"$lookup": bson.M{
"from": "point_data",
"let": bson.M{"time": "$last_time", "hostname": "$_id.hostname"},
"pipeline": []bson.M{
{
"$match": bson.M{
"$expr": bson.M{
"$and": []bson.M{
{"$eq": []interface{}{"$time", "$$time"}},
{"$eq": []interface{}{"$tags.hostname", "$$hostname"}},
{"$eq": []interface{}{"$measurement", "cpu"}},
},
},
},
},
{
"$project": bson.M{
"time": 0,
"tags": 0,
"_id": 0,
},
},
},
"as": "metrics",
},
},
}

humanLabel := "Mongo last row per host"
q := qi.(*query.Mongo)
q.HumanLabel = []byte(humanLabel)
q.BsonDoc = pipelineQuery
q.CollectionName = []byte("point_data")
q.HumanDescription = []byte(fmt.Sprintf("%s", humanLabel))
}

// GroupByOrderByLimit populates a query.Query that has a time WHERE clause, that groups by a
// truncated date, orders by that date, and takes a limit, e.g. in pseudo-SQL:
//
// SELECT minute, MAX(cpu) FROM cpu
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the implementation, the column name usage_user. Should we fix the docs too?

Suggested change
// SELECT minute, MAX(cpu) FROM cpu
// SELECT minute, MAX(usage_user) FROM cpu

// WHERE time < '$TIME'
// GROUP BY minute ORDER BY minute DESC
// LIMIT $LIMIT
func (d *NaiveDevops) GroupByOrderByLimit(qi query.Query) {
interval := d.Interval.MustRandWindow(time.Hour)
interval, err := utils.NewTimeInterval(d.Interval.Start(), interval.End())
if err != nil {
panic(err.Error())
}

pipelineQuery := []bson.M{
{
"$match": bson.M{
"measurement": "cpu",
"time": bson.M{
"$gte": interval.Start(),
"$lt": interval.End(),
},
},
},
{
"$group": bson.M{
"_id": bson.M{
"$dateTrunc": bson.M{"date": "$time", "unit": "minute"},
},
"max_value": bson.M{"$max": "$usage_user"},
},
},
{"$sort": bson.M{"_id": -1}},
{"$limit": 5},
}

humanLabel := "Mongo max cpu over last 5 min-intervals (random end)"
q := qi.(*query.Mongo)
q.HumanLabel = []byte(humanLabel)
q.BsonDoc = pipelineQuery
q.CollectionName = []byte("point_data")
q.HumanDescription = []byte(fmt.Sprintf("%s: %s", humanLabel, interval.EndString()))
}