/
stats.go
75 lines (65 loc) · 1.66 KB
/
stats.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
package minion
import (
"context"
"go.mongodb.org/mongo-driver/bson"
)
type stat struct {
Id struct {
Status string `bson:"status"`
Queue string `bson:"queue"`
} `bson:"_id"`
Count int `bson:"count"`
}
type Stats map[string]map[string]int
func (m *Minion) SubscribeStats(f func(Stats)) {
// if m.statsEntry == 0 {
// id, err := m.cron.AddFunc("* * * * * *", m.stats)
// if err != nil {
// m.Log.Errorf("error scheduling stats: %s", err)
// return
// }
// m.statsEntry = id
// }
m.statsSubs = append(m.statsSubs, f)
}
func (m *Minion) stats(ctx context.Context) {
if m.statsEntry == 0 {
return
}
if len(m.statsSubs) == 0 {
m.Remove(m.statsEntry)
return
}
// Equivalent to the following MongoDB query:
// db.jobs.aggregate([
// { $group: { _id: {status:"$status",queue:"$queue"}, count: {$sum: 1}}},
// { $project: { count: 1 } }
// ])
cur, err := m.db.Jobs.Collection.Aggregate(ctx, bson.A{
bson.M{"$group": bson.M{"_id": bson.M{"queue": "$queue", "status": "$status"}, "count": bson.M{"$sum": 1}}},
bson.M{"$project": bson.M{"count": 1}},
})
if err != nil {
m.Log.Errorf("error querying stats: %s", err)
m.Remove(m.statsEntry)
}
results := make([]*stat, 0)
if err = cur.All(ctx, &results); err != nil {
m.Log.Errorf("error decoding stats: %s", err)
m.Remove(m.statsEntry)
}
stats := Stats{}
for _, s := range results {
if _, ok := stats["totals"]; !ok {
stats["totals"] = make(map[string]int)
}
if _, ok := stats[s.Id.Queue]; !ok {
stats[s.Id.Queue] = make(map[string]int)
}
stats[s.Id.Queue][s.Id.Status] = s.Count
stats["totals"][s.Id.Status] += s.Count
}
for _, f := range m.statsSubs {
f(stats)
}
}