-
Notifications
You must be signed in to change notification settings - Fork 223
/
count.go
98 lines (84 loc) · 2.1 KB
/
count.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
package count
import (
"fmt"
"github.com/alpacahq/marketstore/uda"
"github.com/alpacahq/marketstore/utils/functions"
"github.com/alpacahq/marketstore/utils/io"
"time"
)
/*
This is filled in for example purposes, should be overridden in implementation
*/
var requiredColumns = []io.DataShape{
{Name: "*", Type: io.INT64},
}
/*
For the optional inputs, we'll postpend the input names mapped to each optional
for output, for example: if we map user input "Volume" to "Sum", the output
will be "Sum_Volume"
*/
var optionalColumns = []io.DataShape{}
var initArgs = []io.DataShape{}
type Count struct {
uda.AggInterface
// Input arguments mapping
ArgMap *functions.ArgumentMap
Sum int64
}
func (ca *Count) GetRequiredArgs() []io.DataShape {
return requiredColumns
}
func (ca *Count) GetOptionalArgs() []io.DataShape {
return optionalColumns
}
func (ca *Count) GetInitArgs() []io.DataShape {
return initArgs
}
/*
Accum() sends new data to the aggregate
*/
func (ca *Count) Accum(cols io.ColumnInterface) error {
ca.Sum += int64(cols.Len())
return nil
}
/*
Creates a new count using the arguments of the specific implementation
for inputColumns and optionalInputColumns
*/
func (c Count) New() (out uda.AggInterface, am *functions.ArgumentMap) {
ca := NewCount(requiredColumns, optionalColumns)
return ca, ca.ArgMap
}
/*
CONCRETE - these may be suitable methods for general usage
*/
func NewCount(inputColumns, optionalInputColumns []io.DataShape) (ca *Count) {
ca = new(Count)
ca.ArgMap = functions.NewArgumentMap(inputColumns, optionalInputColumns...)
return ca
}
func (ca *Count) Init(itf ...interface{}) error {
if unmapped := ca.ArgMap.Validate(); unmapped != nil {
return fmt.Errorf("Unmapped columns: %s", unmapped)
}
ca.Sum = 0
return nil
}
/*
Output() returns the currently valid output of this aggregate
*/
func (ca *Count) Output() *io.ColumnSeries {
cs := io.NewColumnSeries()
cs.AddColumn("Epoch", []int64{time.Now().UTC().Unix()})
cs.AddColumn("Count", []int64{ca.Sum})
return cs
}
/*
Reset() puts the aggregate state back to "new"
*/
func (ca *Count) Reset() {
ca.Sum = 0
}
/*
Utility Functions
*/