forked from influxdata/influxdb
-
Notifications
You must be signed in to change notification settings - Fork 0
/
result.go
186 lines (167 loc) · 4.41 KB
/
result.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
package influxql
import (
"encoding/json"
"errors"
"github.com/influxdb/influxdb/models"
)
// TagSet is a fundamental concept within the query system. It represents a composite series,
// composed of multiple individual series that share a set of tag attributes.
type TagSet struct {
Tags map[string]string
Filters []Expr
SeriesKeys []string
Key []byte
}
// AddFilter adds a series-level filter to the Tagset.
func (t *TagSet) AddFilter(key string, filter Expr) {
t.SeriesKeys = append(t.SeriesKeys, key)
t.Filters = append(t.Filters, filter)
}
// Rows represents a list of rows that can be sorted consistently by name/tag.
// Result represents a resultset returned from a single statement.
type Result struct {
// StatementID is just the statement's position in the query. It's used
// to combine statement results if they're being buffered in memory.
StatementID int `json:"-"`
Series models.Rows
Err error
}
// MarshalJSON encodes the result into JSON.
func (r *Result) MarshalJSON() ([]byte, error) {
// Define a struct that outputs "error" as a string.
var o struct {
Series []*models.Row `json:"series,omitempty"`
Err string `json:"error,omitempty"`
}
// Copy fields to output struct.
o.Series = r.Series
if r.Err != nil {
o.Err = r.Err.Error()
}
return json.Marshal(&o)
}
// UnmarshalJSON decodes the data into the Result struct
func (r *Result) UnmarshalJSON(b []byte) error {
var o struct {
Series []*models.Row `json:"series,omitempty"`
Err string `json:"error,omitempty"`
}
err := json.Unmarshal(b, &o)
if err != nil {
return err
}
r.Series = o.Series
if o.Err != "" {
r.Err = errors.New(o.Err)
}
return nil
}
func GetProcessor(expr Expr, startIndex int) (Processor, int) {
switch expr := expr.(type) {
case *VarRef:
return newEchoProcessor(startIndex), startIndex + 1
case *Call:
return newEchoProcessor(startIndex), startIndex + 1
case *BinaryExpr:
return getBinaryProcessor(expr, startIndex)
case *ParenExpr:
return GetProcessor(expr.Expr, startIndex)
case *NumberLiteral:
return newLiteralProcessor(expr.Val), startIndex
case *StringLiteral:
return newLiteralProcessor(expr.Val), startIndex
case *BooleanLiteral:
return newLiteralProcessor(expr.Val), startIndex
case *TimeLiteral:
return newLiteralProcessor(expr.Val), startIndex
case *DurationLiteral:
return newLiteralProcessor(expr.Val), startIndex
}
panic("unreachable")
}
type Processor func(values []interface{}) interface{}
func newEchoProcessor(index int) Processor {
return func(values []interface{}) interface{} {
if index > len(values)-1 {
return nil
}
return values[index]
}
}
func newLiteralProcessor(val interface{}) Processor {
return func(values []interface{}) interface{} {
return val
}
}
func getBinaryProcessor(expr *BinaryExpr, startIndex int) (Processor, int) {
lhs, index := GetProcessor(expr.LHS, startIndex)
rhs, index := GetProcessor(expr.RHS, index)
return newBinaryExprEvaluator(expr.Op, lhs, rhs), index
}
func newBinaryExprEvaluator(op Token, lhs, rhs Processor) Processor {
switch op {
case ADD:
return func(values []interface{}) interface{} {
l := lhs(values)
r := rhs(values)
if lf, rf, ok := processorValuesAsFloat64(l, r); ok {
return lf + rf
}
return nil
}
case SUB:
return func(values []interface{}) interface{} {
l := lhs(values)
r := rhs(values)
if lf, rf, ok := processorValuesAsFloat64(l, r); ok {
return lf - rf
}
return nil
}
case MUL:
return func(values []interface{}) interface{} {
l := lhs(values)
r := rhs(values)
if lf, rf, ok := processorValuesAsFloat64(l, r); ok {
return lf * rf
}
return nil
}
case DIV:
return func(values []interface{}) interface{} {
l := lhs(values)
r := rhs(values)
if lf, rf, ok := processorValuesAsFloat64(l, r); ok {
return lf / rf
}
return nil
}
default:
// we shouldn't get here, but give them back nils if it goes this way
return func(values []interface{}) interface{} {
return nil
}
}
}
func processorValuesAsFloat64(lhs interface{}, rhs interface{}) (float64, float64, bool) {
var lf float64
var rf float64
var ok bool
lf, ok = lhs.(float64)
if !ok {
var li int64
if li, ok = lhs.(int64); !ok {
return 0, 0, false
}
lf = float64(li)
}
rf, ok = rhs.(float64)
if !ok {
var ri int64
if ri, ok = rhs.(int64); !ok {
return 0, 0, false
}
rf = float64(ri)
}
return lf, rf, true
}