forked from influxdata/influxdb
-
Notifications
You must be signed in to change notification settings - Fork 0
/
merger.go
200 lines (179 loc) · 5.62 KB
/
merger.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
package engine
import (
"code.google.com/p/log4go"
"github.com/influxdb/influxdb/protocol"
)
// Merger merges a number of StreamQuery into one stream of points
// where the output stream of points has a monotic timestamp order
// (increasing or decreasing depending on the SeriesHeap that is
// passed to NewCME)
type Merger struct {
name string
s []StreamQuery
size int
h SeriesHeap
n Processor
lastStreamIdx int
initializing bool
mergeColumns bool
fields map[string]struct{}
resultFields []string
resultFieldsPerStream map[int][]int
}
// Creates a new merger that will merge the given slice of StreamQuery
// and yields the result to the processor `n`. `name` is used to
// identify the merger in the logs since it's being used in multiple
// places. The SeriesHeap `h` is used to keep track of the next point
// (whether it's the smallest or largest timestamp depends on the
// SeriesHeap). If `mergeColumns` is true, the resulting time series
// will have the fields from all StreamQueries, i.e. if the first
// stream yields `column0` and `column1` while the second stream
// yields `column2` and `column3` then the result time series will
// have all 4 columns with two columns set to `nil` depending on which
// side the point came from.
func NewCME(name string, s []StreamQuery, h SeriesHeap, n Processor, mergeColumns bool) *Merger {
log4go.Debug("%sMerger: created with %d streams", name, len(s))
return &Merger{
name: name,
s: s,
h: h,
n: n,
lastStreamIdx: 0,
mergeColumns: mergeColumns,
initializing: true,
fields: make(map[string]struct{}),
resultFieldsPerStream: make(map[int][]int),
}
}
// public api
// Consume and yield as many points as we can until one of the streams
// runs out of points (i.e. the call to HasPoint() returns false)
func (cme *Merger) Update() (bool, error) {
if cme.initializing {
return cme.initialize()
}
return cme.tryYieldNextPoint()
}
// private api
// initialize the state of the Merger by reading one point from each
// StreamQuery. if `mergeColumns` is true the union of the fields
// received on the first point from each StreamQuery will be the
// fields of the result Series
func (cme *Merger) initialize() (bool, error) {
for cme.h.Size() != len(cme.s) {
stream := cme.s[cme.lastStreamIdx]
if !stream.HasPoint() && !stream.Closed() {
log4go.Debug("%sMerger: data not ready for stream %d, still initializing", cme.name, cme.lastStreamIdx)
return true, nil
}
if stream.HasPoint() {
p := stream.Next()
cme.h.Add(cme.lastStreamIdx, p)
for _, f := range p.Fields {
cme.fields[f] = struct{}{}
}
cme.lastStreamIdx++
} else if stream.Closed() {
s := len(cme.s)
cme.s[cme.lastStreamIdx] = cme.s[s-1]
cme.s = cme.s[:s-1]
}
}
if cme.mergeColumns {
// finished initialization
cme.resultFields = make([]string, 0, len(cme.fields))
for f := range cme.fields {
cme.resultFields = append(cme.resultFields, f)
}
}
log4go.Debug("%sMerger initialization finished", cme.name)
cme.initializing = false
cme.size = len(cme.s)
return cme.yieldNextPoint()
}
// Try to get more data points from the StreamQueries and if
// successful call yieldNextPoint()
func (cme *Merger) tryYieldNextPoint() (bool, error) {
stream := cme.s[cme.lastStreamIdx]
// If the stream has new points, added to the heap
if stream.HasPoint() {
cme.h.Add(cme.lastStreamIdx, stream.Next())
} else if stream.Closed() {
cme.size--
}
// If all streams have yielded one point. Then we can get the next
// point with the smallest (or largest) timestamp and yield it to the
// next processor.
if cme.h.Size() != cme.size {
return true, nil
}
return cme.yieldNextPoint()
}
// yield as many points as we can to the Processor `n`
func (cme *Merger) yieldNextPoint() (bool, error) {
// If we consumed all the input data points, return
// immediately. This can be the case for example if we finished
// initialization and the first call to yieldNextPoint() consumed
// all the data points. Without this check the call to the heap's
// Next() method will cause a panic
if cme.size == 0 {
return true, nil
}
for {
var s *protocol.Series
cme.lastStreamIdx, s = cme.h.Next()
log4go.Debug("cme.lastStreamIdx: %d, s: %s", cme.lastStreamIdx, s)
cme.fixFields(s)
log4go.Debug("%sMerger yielding to %s: %s", cme.name, cme.n.Name(), s)
ok, err := cme.n.Yield(s)
if !ok || err != nil {
return ok, err
}
stream := cme.s[cme.lastStreamIdx]
if stream.HasPoint() {
s := stream.Next()
log4go.Debug("%sMerger received %s from %d", s, cme.lastStreamIdx)
cme.h.Add(cme.lastStreamIdx, s)
continue
} else if stream.Closed() {
cme.size--
if cme.size != 0 {
continue
}
}
return true, nil
}
}
// modify the series to have the union of the columns from all
// StreamQueries
func (cme *Merger) fixFields(s *protocol.Series) {
if !cme.mergeColumns {
return
}
idx := cme.lastStreamIdx
mapping := cme.resultFieldsPerStream[idx]
if mapping == nil {
for _, f := range cme.resultFields {
index := -1
for i, sf := range s.Fields {
if sf == f {
index = i
break
}
}
mapping = append(mapping, index)
cme.resultFieldsPerStream[idx] = mapping
}
}
s.Fields = cme.resultFields
p := s.Points[0]
originalValues := p.Values
p.Values = nil
for _, i := range mapping {
if i == -1 {
p.Values = append(p.Values, nil)
continue
}
p.Values = append(p.Values, originalValues[i])
}
}