forked from ghetzel/pivot
/
mongo-aggregator.go
167 lines (137 loc) · 4.29 KB
/
mongo-aggregator.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
package backends
// this file satifies the Aggregator interface for MongoBackend
import (
"fmt"
"strings"
"time"
"github.com/PerformLine/go-stockutil/stringutil"
"github.com/PerformLine/pivot/v3/dal"
"github.com/PerformLine/pivot/v3/filter"
"gopkg.in/mgo.v2/bson"
)
func (self *MongoBackend) Sum(collection *dal.Collection, field string, f ...*filter.Filter) (float64, error) {
return self.aggregateFloat(collection, filter.Sum, field, f)
}
func (self *MongoBackend) Count(collection *dal.Collection, flt ...*filter.Filter) (uint64, error) {
var f *filter.Filter
if len(flt) > 0 {
f = flt[0]
}
if query, err := self.filterToNative(collection, f); err == nil {
q := self.db.C(collection.Name).Find(query)
if totalResults, err := q.Count(); err == nil {
return uint64(totalResults), nil
} else {
return 0, err
}
} else {
return 0, err
}
}
func (self *MongoBackend) Minimum(collection *dal.Collection, field string, f ...*filter.Filter) (float64, error) {
return self.aggregateFloat(collection, filter.Minimum, field, f)
}
func (self *MongoBackend) Maximum(collection *dal.Collection, field string, f ...*filter.Filter) (float64, error) {
return self.aggregateFloat(collection, filter.Maximum, field, f)
}
func (self *MongoBackend) Average(collection *dal.Collection, field string, f ...*filter.Filter) (float64, error) {
return self.aggregateFloat(collection, filter.Average, field, f)
}
func (self *MongoBackend) GroupBy(collection *dal.Collection, groupBy []string, aggregates []filter.Aggregate, flt ...*filter.Filter) (*dal.RecordSet, error) {
if result, err := self.aggregate(collection, groupBy, aggregates, flt, false); err == nil {
return result.(*dal.RecordSet), nil
} else {
return nil, err
}
}
func (self *MongoBackend) aggregateFloat(collection *dal.Collection, aggregation filter.Aggregation, field string, flt []*filter.Filter) (float64, error) {
if result, err := self.aggregate(collection, nil, []filter.Aggregate{
{
Aggregation: aggregation,
Field: field,
},
}, flt, true); err == nil {
if vF, ok := result.(float64); ok {
return vF, nil
} else {
return 0, err
}
} else {
return 0, err
}
}
func (self *MongoBackend) aggregate(collection *dal.Collection, groupBy []string, aggregates []filter.Aggregate, flt []*filter.Filter, single bool) (interface{}, error) {
var f *filter.Filter
if len(flt) > 0 {
f = flt[0]
}
if query, err := self.filterToNative(collection, f); err == nil {
var aggGroups []bson.M
var firstKey string
for _, aggregate := range aggregates {
var mongoFn string
switch aggregate.Aggregation {
case filter.Sum:
mongoFn = `$sum`
case filter.First:
mongoFn = `$first`
case filter.Last:
mongoFn = `$last`
case filter.Minimum:
mongoFn = `$min`
case filter.Maximum:
mongoFn = `$max`
case filter.Average:
mongoFn = `$avg`
}
aggGroups = append(aggGroups, bson.M{
`$group`: bson.M{
`_id`: aggregate.Field,
strings.TrimPrefix(mongoFn, `$`): bson.M{
mongoFn: fmt.Sprintf("$%s", aggregate.Field),
},
},
})
if firstKey == `` {
firstKey = strings.TrimPrefix(mongoFn, `$`)
}
}
var finalQuery []bson.M
if len(query) > 0 {
finalQuery = append(finalQuery, query)
}
finalQuery = append(finalQuery, aggGroups...)
q := self.db.C(collection.Name).Pipe(finalQuery)
iter := q.Iter()
var result map[string]interface{}
for iter.Next(&result) {
if err := iter.Err(); err != nil {
return nil, err
} else if single {
_id, _ := result[`_id`]
if v, ok := result[firstKey]; ok {
if vF, err := stringutil.ConvertToFloat(v); err == nil {
return vF, nil
} else if vT, err := stringutil.ConvertToTime(v); err == nil {
return float64(vT.UnixNano()) / float64(time.Second), nil
} else {
return 0, fmt.Errorf("'%s' aggregation not supported for field %v", firstKey, _id)
}
} else {
return 0, fmt.Errorf("missing aggregation value '%s'", firstKey)
}
} else {
return nil, fmt.Errorf("Not implemented")
}
}
return nil, nil
} else {
return nil, fmt.Errorf("filter error: %v", err)
}
}
func (self *MongoBackend) AggregatorConnectionString() *dal.ConnectionString {
return self.GetConnectionString()
}
func (self *MongoBackend) AggregatorInitialize(parent Backend) error {
return nil
}