forked from influxdata/influxdb
-
Notifications
You must be signed in to change notification settings - Fork 0
/
mapper.go
149 lines (132 loc) · 4.41 KB
/
mapper.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
package tsdb
import (
"encoding/json"
)
// Mapper is the interface all Mapper types must implement.
type Mapper interface {
Open() error
TagSets() []string
Fields() []string
NextChunk() (interface{}, error)
Close()
}
// StatefulMapper encapsulates a Mapper and some state that the executor needs to
// track for that mapper.
type StatefulMapper struct {
Mapper
bufferedChunk *MapperOutput // Last read chunk.
drained bool
}
// NextChunk wraps a RawMapper and some state.
func (sm *StatefulMapper) NextChunk() (*MapperOutput, error) {
c, err := sm.Mapper.NextChunk()
if err != nil {
return nil, err
}
chunk, ok := c.(*MapperOutput)
if !ok {
if chunk == interface{}(nil) {
return nil, nil
}
}
return chunk, nil
}
// MapperValue is a complex type, which can encapsulate data from both raw and aggregate
// mappers. This currently allows marshalling and network system to remain simpler. For
// aggregate output Time is ignored, and actual Time-Value pairs are contained soley
// within the Value field.
type MapperValue struct {
Time int64 `json:"time,omitempty"` // Ignored for aggregate output.
Value interface{} `json:"value,omitempty"` // For aggregate, contains interval time multiple values.
Tags map[string]string `json:"tags,omitempty"` // Meta tags for results
}
// MapperValueJSON is the JSON-encoded representation of MapperValue. Because MapperValue is
// a complex type, custom JSON encoding is required so that none of the types contained within
// a MapperValue are "lost", and so the data are encoded as byte slices where necessary.
type MapperValueJSON struct {
Time int64 `json:"time,omitempty"`
RawData []byte `json:"rdata,omitempty"`
AggData [][]byte `json:"adata,omitempty"`
Tags map[string]string `json:"tags,omitempty"`
}
// MarshalJSON returns the JSON-encoded representation of a MapperValue.
func (mv *MapperValue) MarshalJSON() ([]byte, error) {
o := &MapperValueJSON{
Time: mv.Time,
AggData: make([][]byte, 0),
Tags: mv.Tags,
}
o.Time = mv.Time
o.Tags = mv.Tags
if values, ok := mv.Value.([]interface{}); ok {
// Value contain a slice of more values. This happens only with
// aggregate output.
for _, v := range values {
b, err := json.Marshal(v)
if err != nil {
return nil, err
}
o.AggData = append(o.AggData, b)
}
} else {
// If must be raw output, so just marshal the single value.
b, err := json.Marshal(mv.Value)
if err != nil {
return nil, err
}
o.RawData = b
}
return json.Marshal(o)
}
type MapperValues []*MapperValue
func (a MapperValues) Len() int { return len(a) }
func (a MapperValues) Less(i, j int) bool { return a[i].Time < a[j].Time }
func (a MapperValues) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
type MapperOutput struct {
Name string `json:"name,omitempty"`
Tags map[string]string `json:"tags,omitempty"`
Fields []string `json:"fields,omitempty"` // Field names of returned data.
Values []*MapperValue `json:"values,omitempty"` // For aggregates contains a single value at [0]
cursorKey string // Tagset-based key for the source cursor. Cached for performance reasons.
}
// MapperOutputJSON is the JSON-encoded representation of MapperOutput. The query data is represented
// as a raw JSON message, so decode is delayed, and can proceed in a custom manner.
type MapperOutputJSON struct {
Name string `json:"name,omitempty"`
Tags map[string]string `json:"tags,omitempty"`
Fields []string `json:"fields,omitempty"` // Field names of returned data.
Values json.RawMessage `json:"values,omitempty"`
}
// MarshalJSON returns the JSON-encoded representation of a MapperOutput.
func (mo *MapperOutput) MarshalJSON() ([]byte, error) {
o := &MapperOutputJSON{
Name: mo.Name,
Tags: mo.Tags,
Fields: mo.Fields,
}
data, err := json.Marshal(mo.Values)
if err != nil {
return nil, err
}
o.Values = data
return json.Marshal(o)
}
func (mo *MapperOutput) key() string {
return mo.cursorKey
}
// uniqueStrings returns a slice of unique strings from all lists in a.
func uniqueStrings(a ...[]string) []string {
// Calculate unique set of strings.
m := make(map[string]struct{})
for _, strs := range a {
for _, str := range strs {
m[str] = struct{}{}
}
}
// Convert back to slice.
result := make([]string, 0, len(m))
for k := range m {
result = append(result, k)
}
return result
}