forked from influxdata/influxdb-comparisons
-
Notifications
You must be signed in to change notification settings - Fork 0
/
mongo_iot_common.go
121 lines (110 loc) · 3.47 KB
/
mongo_iot_common.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
package mongodb
import (
"fmt"
bulkDataGenIot "github.com/influxdata/influxdb-comparisons/bulk_data_gen/iot"
bulkQuerygen "github.com/influxdata/influxdb-comparisons/bulk_query_gen"
"math/rand"
"time"
)
// MongoIot produces Mongo-specific queries for the devops use case.
type MongoIot struct {
bulkQuerygen.CommonParams
DatabaseName string
}
// NewMongoIot makes an MongoIot object ready to generate Queries.
func NewMongoIot(dbConfig bulkQuerygen.DatabaseConfig, interval bulkQuerygen.TimeInterval, duration time.Duration, scaleVar int) bulkQuerygen.QueryGenerator {
return &MongoIot{
CommonParams: *bulkQuerygen.NewCommonParams(interval, scaleVar),
DatabaseName: dbConfig[bulkQuerygen.DatabaseName],
}
}
// Dispatch fulfills the QueryGenerator interface.
func (d *MongoIot) Dispatch(i int) bulkQuerygen.Query {
q := NewMongoQuery() // from pool
bulkQuerygen.IotDispatchAll(d, i, q, d.ScaleVar)
return q
}
// AverageTemperatureDayByHourOneHome populates a Query for getting the average temperature
// for one home over the course of a half a day.
func (d *MongoIot) AverageTemperatureDayByHourOneHome(q bulkQuerygen.Query) {
d.averageTemperatureDayByHourNHomes(q.(*MongoQuery), 1, 12*time.Hour)
}
func (d *MongoIot) averageTemperatureDayByHourNHomes(qi bulkQuerygen.Query, nHomes int, timeRange time.Duration) {
interval := d.AllInterval.RandWindow(timeRange)
nn := rand.Perm(d.ScaleVar)[:nHomes]
homes := []string{}
for _, n := range nn {
homes = append(homes, fmt.Sprintf(bulkDataGenIot.SmartHomeIdFormat, n))
}
homeClauses := []M{}
for _, h := range homes {
if DocumentFormat == SimpleArraysFormat {
homeClauses = append(homeClauses, M{"home_id": h})
} else {
homeClauses = append(homeClauses, M{"key": "home_id", "val": h})
}
}
var fieldSpec, fieldPath string
var fieldExpr interface{}
if DocumentFormat == SimpleArraysFormat {
fieldSpec = "fields.temperature"
fieldExpr = 1
fieldPath = "fields.temperature"
} else {
fieldSpec = "fields"
fieldExpr = M{ "$filter": M{ "input": "$fields", "as": "field", "cond": M{ "$eq": []string{ "$$field.key", "temperature" } } } }
fieldPath = "fields.val"
}
var bucketNano = time.Hour.Nanoseconds()
pipelineQuery := []M{
{
"$match": M{
"measurement": "air_condition_room",
"timestamp_ns": M{
"$gte": interval.StartUnixNano(),
"$lt": interval.EndUnixNano(),
},
"tags": M{
"$in": homeClauses,
},
},
},
{
"$project": M{
"_id": 0,
"time_bucket": M{
"$subtract": S{
"$timestamp_ns",
M{"$mod": S{"$timestamp_ns", bucketNano}},
},
},
fieldSpec: fieldExpr, // was value: 1
"measurement": 1,
},
},
{
"$unwind": "$fields",
},
{
"$group": M{
"_id": M{"time_bucket": "$time_bucket", "tags": "$tags"},
"agg_value": M{"$avg": "$"+fieldPath}, // was: $value
},
},
{
"$sort": M{"_id.time_bucket": 1},
},
}
humanLabel := []byte(fmt.Sprintf("Mongo avg temperature, rand %4d homes, rand %s by 1h", nHomes, timeRange))
q := qi.(*MongoQuery)
q.HumanLabel = humanLabel
q.BsonDoc = pipelineQuery
q.DatabaseName = []byte(d.DatabaseName)
q.CollectionName = []byte("point_data")
q.MeasurementName = []byte("air_condition_room")
q.FieldName = []byte("temperature")
q.HumanDescription = []byte(fmt.Sprintf("%s: %s (%s, %s, %s, %s)", humanLabel, interval.StartString(), q.DatabaseName, q.CollectionName, q.MeasurementName, q.FieldName))
q.TimeStart = interval.Start
q.TimeEnd = interval.End
q.GroupByDuration = time.Hour
}