forked from caict-benchmark/BDC-TS
/
query_plan_aggregators.go
89 lines (77 loc) · 1.84 KB
/
query_plan_aggregators.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
package main
import "fmt"
// Type Aggregator merges QueryPlan results on the client in constant time.
// This is intended to match the aggregation that a CQLQuery performs on a
// Cassandra server.
//
// Note that the underlying functions should be commutative and associative.
type Aggregator interface {
Put(float64)
Get() float64
}
// AggregatorMax aggregates the maximum of a stream of values.
type AggregatorMax struct {
value float64
count int64
}
// Put puts a value for finding the maximum.
func (a *AggregatorMax) Put(n float64) {
if n > a.value || a.count == 0 {
a.value = n
}
a.count++
}
// Get computes the aggregated maximum.
func (a *AggregatorMax) Get() float64 {
if a.count == 0 {
return 0
}
return a.value
}
// AggregatorMax aggregates the minimum of a stream of values.
type AggregatorMin struct {
value float64
count int64
}
// Put puts a value for finding the minimum.
func (a *AggregatorMin) Put(n float64) {
if n < a.value || a.count == 0 {
a.value = n
}
a.count++
}
// Get computes the aggregated minimum.
func (a *AggregatorMin) Get() float64 {
return a.value
}
// AggregatorMax aggregates the average of a stream of values.
type AggregatorAvg struct {
value float64
count int64
}
// Put puts a value for averaging.
func (a *AggregatorAvg) Put(n float64) {
a.value += n
a.count++
}
// Get computes the aggregated average.
func (a *AggregatorAvg) Get() float64 {
if a.count == 0 {
return 0
}
return a.value / float64(a.count)
}
// GetConstantSpaceAggr translates a label into a new ConstantSpaceAggr.
func GetAggregator(label string) (Aggregator, error) {
// TODO(rw): fewer heap allocations here.
switch label {
case "min":
return &AggregatorMin{}, nil
case "max":
return &AggregatorMax{}, nil
case "avg":
return &AggregatorAvg{}, nil
default:
return nil, fmt.Errorf("invalid aggregation specifier")
}
}