forked from influxdata/influxdb
-
Notifications
You must be signed in to change notification settings - Fork 0
/
functions.go
157 lines (141 loc) · 3.97 KB
/
functions.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
package influxql
// FloatMeanReducer calculates the mean of the aggregated points.
type FloatMeanReducer struct {
sum float64
count uint32
}
// NewFloatMeanReducer creates a new FloatMeanReducer.
func NewFloatMeanReducer() *FloatMeanReducer {
return &FloatMeanReducer{}
}
// AggregateFloat aggregates a point into the reducer.
func (r *FloatMeanReducer) AggregateFloat(p *FloatPoint) {
if p.Aggregated >= 2 {
r.sum += p.Value * float64(p.Aggregated)
r.count += p.Aggregated
} else {
r.sum += p.Value
r.count++
}
}
// Emit emits the mean of the aggregated points as a single point.
func (r *FloatMeanReducer) Emit() []FloatPoint {
return []FloatPoint{{
Time: ZeroTime,
Value: r.sum / float64(r.count),
Aggregated: r.count,
}}
}
// IntegerMeanReducer calculates the mean of the aggregated points.
type IntegerMeanReducer struct {
sum int64
count uint32
}
// NewIntegerMeanReducer creates a new IntegerMeanReducer.
func NewIntegerMeanReducer() *IntegerMeanReducer {
return &IntegerMeanReducer{}
}
// AggregateInteger aggregates a point into the reducer.
func (r *IntegerMeanReducer) AggregateInteger(p *IntegerPoint) {
if p.Aggregated >= 2 {
r.sum += p.Value * int64(p.Aggregated)
r.count += p.Aggregated
} else {
r.sum += p.Value
r.count++
}
}
// Emit emits the mean of the aggregated points as a single point.
func (r *IntegerMeanReducer) Emit() []FloatPoint {
return []FloatPoint{{
Time: ZeroTime,
Value: float64(r.sum) / float64(r.count),
Aggregated: r.count,
}}
}
// FloatMovingAverageReducer calculates the moving average of the aggregated points.
type FloatMovingAverageReducer struct {
pos int
sum float64
time int64
buf []float64
}
// NewFloatMovingAverageReducer creates a new FloatMovingAverageReducer.
func NewFloatMovingAverageReducer(n int) *FloatMovingAverageReducer {
return &FloatMovingAverageReducer{
buf: make([]float64, 0, n),
}
}
// AggregateFloat aggregates a point into the reducer and updates the current window.
func (r *FloatMovingAverageReducer) AggregateFloat(p *FloatPoint) {
if len(r.buf) != cap(r.buf) {
r.buf = append(r.buf, p.Value)
} else {
r.sum -= r.buf[r.pos]
r.buf[r.pos] = p.Value
}
r.sum += p.Value
r.time = p.Time
r.pos++
if r.pos >= cap(r.buf) {
r.pos = 0
}
}
// Emit emits the moving average of the current window. Emit should be called
// after every call to AggregateFloat and it will produce one point if there
// is enough data to fill a window, otherwise it will produce zero points.
func (r *FloatMovingAverageReducer) Emit() []FloatPoint {
if len(r.buf) != cap(r.buf) {
return []FloatPoint{}
}
return []FloatPoint{
{
Value: r.sum / float64(len(r.buf)),
Time: r.time,
Aggregated: uint32(len(r.buf)),
},
}
}
// IntegerMovingAverageReducer calculates the moving average of the aggregated points.
type IntegerMovingAverageReducer struct {
pos int
sum int64
time int64
buf []int64
}
// NewIntegerMovingAverageReducer creates a new IntegerMovingAverageReducer.
func NewIntegerMovingAverageReducer(n int) *IntegerMovingAverageReducer {
return &IntegerMovingAverageReducer{
buf: make([]int64, 0, n),
}
}
// AggregateInteger aggregates a point into the reducer and updates the current window.
func (r *IntegerMovingAverageReducer) AggregateInteger(p *IntegerPoint) {
if len(r.buf) != cap(r.buf) {
r.buf = append(r.buf, p.Value)
} else {
r.sum -= r.buf[r.pos]
r.buf[r.pos] = p.Value
}
r.sum += p.Value
r.time = p.Time
r.pos++
if r.pos >= cap(r.buf) {
r.pos = 0
}
}
// Emit emits the moving average of the current window. Emit should be called
// after every call to AggregateInteger and it will produce one point if there
// is enough data to fill a window, otherwise it will produce zero points.
func (r *IntegerMovingAverageReducer) Emit() []FloatPoint {
if len(r.buf) != cap(r.buf) {
return []FloatPoint{}
}
return []FloatPoint{
{
Value: float64(r.sum) / float64(len(r.buf)),
Time: r.time,
Aggregated: uint32(len(r.buf)),
},
}
}