-
Notifications
You must be signed in to change notification settings - Fork 1
/
input_sync.go
354 lines (315 loc) · 11.5 KB
/
input_sync.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
// Copyright 2016 The Cockroach Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
// implied. See the License for the specific language governing
// permissions and limitations under the License.
//
// Input synchronizers are used by processors to merge incoming rows from
// (potentially) multiple streams; see docs/RFCS/distributed_sql.md
package distsqlrun
import (
"container/heap"
"golang.org/x/net/context"
"github.com/cockroachdb/cockroach/pkg/sql/parser"
"github.com/cockroachdb/cockroach/pkg/sql/sqlbase"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/pkg/errors"
)
type srcInfo struct {
src RowSource
// row is the last row received from src.
row sqlbase.EncDatumRow
}
// srcIdx refers to the index of a source inside a []srcInfo array.
type srcIdx int
type orderedSynchronizerState int
const (
// notInitialized means that the heap has not yet been constructed. A row
// needs to be read from each source to build the heap.
notInitialized orderedSynchronizerState = iota
// returningRows is the regular operation mode of the orderedSynchronizer.
// Rows and metadata records are returning to the consumer.
returningRows
// draining means the orderedSynchronizer will ignore everything but metadata
// records. On the first call to NextRow() while in draining mode, all the
// sources are read until exhausted and metadata is accumulated. The state is
// then transitioned to drained.
draining
// In the drainBuffered mode, all the sources of the orderedSynchronizer have been
// exhausted, and we might have some buffered metadata. Metadata records are
// going to be returned, one by one.
drainBuffered
)
// orderedSynchronizer receives rows from multiple streams and produces a single
// stream of rows, ordered according to a set of columns. The rows in each input
// stream are assumed to be ordered according to the same set of columns
// (intra-stream ordering).
type orderedSynchronizer struct {
ordering sqlbase.ColumnOrdering
evalCtx *parser.EvalContext
sources []srcInfo
// state dictates the operation mode.
state orderedSynchronizerState
// heap of source indexes, ordered by the current row. Sources with no more
// rows are not in the heap.
heap []srcIdx
// needsAdvance is set when the row at the root of the heap has already been
// consumed and thus producing a new row requires the root to be advanced.
// This is usually set after a row is produced, but is not set when a metadata
// row has just been produced, as that means that the heap is in good state to
// serve the next row without advancing anything.
needsAdvance bool
// err can be set by the Less function (used by the heap implementation)
err error
alloc sqlbase.DatumAlloc
// metadata is accumulated from all the sources and is passed on as soon as
// possible.
metadata []*ProducerMetadata
}
var _ RowSource = &orderedSynchronizer{}
// Types is part of the RowSource interface.
func (s *orderedSynchronizer) Types() []sqlbase.ColumnType {
return s.sources[0].src.Types()
}
// Len is part of heap.Interface and is only meant to be used internally.
func (s *orderedSynchronizer) Len() int {
return len(s.heap)
}
// Less is part of heap.Interface and is only meant to be used internally.
func (s *orderedSynchronizer) Less(i, j int) bool {
si := &s.sources[s.heap[i]]
sj := &s.sources[s.heap[j]]
cmp, err := si.row.Compare(&s.alloc, s.ordering, s.evalCtx, sj.row)
if err != nil {
s.err = err
return false
}
return cmp < 0
}
// Swap is part of heap.Interface and is only meant to be used internally.
func (s *orderedSynchronizer) Swap(i, j int) {
s.heap[i], s.heap[j] = s.heap[j], s.heap[i]
}
// Push is part of heap.Interface; it's not used as we never insert elements to
// the heap (we initialize it with all sources, see initHeap).
func (s *orderedSynchronizer) Push(x interface{}) { panic("unimplemented") }
// Pop is part of heap.Interface and is only meant to be used internally.
func (s *orderedSynchronizer) Pop() interface{} {
s.heap = s.heap[:len(s.heap)-1]
return nil
}
// initHeap grabs a row from each source and initializes the heap. Any given
// source will be on the heap (even if an error was encountered while reading
// from it) unless there are no more rows to read from it.
// If an error is returned, heap.Init() has not been called, so s.heap is not
// an actual heap. In this case, all members of the heap need to be drained.
func (s *orderedSynchronizer) initHeap() error {
// consumeErr is the last error encountered while consuming metadata.
var consumeErr error
for i := range s.sources {
src := &s.sources[i]
err := s.consumeMetadata(src, stopOnRowOrError)
if err != nil {
consumeErr = err
}
// We add the source to the heap either if we have received a row from
// it or there was an error reading from this source. We still add to
// the heap in case of error so that these sources can be drained in
// `drainSources`.
if src.row != nil || err != nil {
// Add to the heap array (it won't be a heap until we call heap.Init).
s.heap = append(s.heap, srcIdx(i))
}
}
if consumeErr != nil {
return consumeErr
}
heap.Init(s)
// heap operations might set s.err (see Less)
return s.err
}
type consumeMetadataOption int
const (
// stopOnRowOrError means that consumeMetadata() will stop consuming as soon
// as a row or metadata record with an error is received. The idea is to allow
// this row to be placed in the heap, or for the error to be passed to the
// consumer as soon as possible.
stopOnRowOrError consumeMetadataOption = iota
// drain means that we're going to fully consume the source, accumulating all
// metadata records and ignoring all rows.
drain
)
// consumeMetadata accumulates metadata from a source. Depending on mode, it
// will stop on the first row or error, or it will completely consume the
// source.
//
// In the stopOnRowOrError mode, src.row will be updated to the first row
// received (or to nil if src has been exhausted).
//
// Metadata records are accumulated in s.metadata. With the stopOnRowOrError
// mode, if a metadata record with an error is encountered, further metadata is
// not consumed and the error is returned. With the drain mode, metadata records
// with error are accumulated like all the others and this method doesn't return
// any errors.
func (s *orderedSynchronizer) consumeMetadata(src *srcInfo, mode consumeMetadataOption) error {
for {
row, meta := src.src.Next()
if meta.Err != nil && mode == stopOnRowOrError {
return meta.Err
}
if !meta.Empty() {
s.metadata = append(s.metadata, &meta)
continue
}
if mode == stopOnRowOrError {
src.row = row
return nil
}
if row == nil && meta.Empty() {
return nil
}
}
}
// advanceRoot retrieves the next row for the source at the root of the heap and
// updates the heap accordingly.
//
// Metadata records from the source currently at the root are accumulated.
//
// If an error is returned, then either the heap is in a bad state (s.err has
// been set), or one of the sources is borked. In either case, advanceRoot()
// should not be called again - the caller should update the
// orderedSynchronizer.state accordingly.
func (s *orderedSynchronizer) advanceRoot() error {
if s.state != returningRows {
return errors.Errorf("advanceRoot() called in unsupported state: %d", s.state)
}
if len(s.heap) == 0 {
return nil
}
src := &s.sources[s.heap[0]]
if src.row == nil {
return errors.Errorf("trying to advance closed source")
}
oldRow := src.row
if err := s.consumeMetadata(src, stopOnRowOrError); err != nil {
return err
}
if src.row == nil {
heap.Remove(s, 0)
} else {
heap.Fix(s, 0)
// TODO(radu): this check may be costly, we could disable it in production
if cmp, err := oldRow.Compare(&s.alloc, s.ordering, s.evalCtx, src.row); err != nil {
return err
} else if cmp > 0 {
return errors.Errorf("incorrectly ordered stream %s after %s", src.row, oldRow)
}
}
// heap operations might set s.err (see Less)
return s.err
}
// drainSources consumes all the rows from the sources. All the data is
// discarded, except the metadata records which are accumulated in s.metadata.
func (s *orderedSynchronizer) drainSources() {
for _, srcIdx := range s.heap {
if err := s.consumeMetadata(&s.sources[srcIdx], drain); err != nil {
log.Fatalf(context.TODO(), "unexpected draining error: %s", err)
}
}
}
// Next is part of the RowSource interface.
func (s *orderedSynchronizer) Next() (sqlbase.EncDatumRow, ProducerMetadata) {
if s.state == notInitialized {
if err := s.initHeap(); err != nil {
s.ConsumerDone()
return nil, ProducerMetadata{Err: err}
}
s.state = returningRows
} else if s.state == returningRows && s.needsAdvance {
// Last row returned was from the source at the root of the heap; get
// the next row for that source.
if err := s.advanceRoot(); err != nil {
s.ConsumerDone()
return nil, ProducerMetadata{Err: err}
}
}
if s.state == draining {
// ConsumerDone(), or an error, has put us in draining mode. All subsequent
// Next() calls will return metadata records.
s.drainSources()
s.state = drainBuffered
s.heap = nil
}
if len(s.metadata) != 0 {
// TODO(andrei): We return the metadata records one by one. The interface
// should support returning all of them at once.
var meta *ProducerMetadata
meta, s.metadata = s.metadata[0], s.metadata[1:]
s.needsAdvance = false
return nil, *meta
}
if len(s.heap) == 0 {
return nil, ProducerMetadata{}
}
s.needsAdvance = true
return s.sources[s.heap[0]].row, ProducerMetadata{}
}
// ConsumerDone is part of the RowSource interface.
func (s *orderedSynchronizer) ConsumerDone() {
// We're entering draining mode. Only metadata will be forwarded from now on.
if s.state != draining {
s.consumerStatusChanged(draining, RowSource.ConsumerDone)
}
}
// ConsumerClosed is part of the RowSource interface.
func (s *orderedSynchronizer) ConsumerClosed() {
// The state shouldn't matter, as no further methods should be called, but
// we'll set it to something other than the default.
s.consumerStatusChanged(drainBuffered, RowSource.ConsumerClosed)
}
// consumerStatusChanged calls a RowSource method on all the non-exhausted
// sources.
func (s *orderedSynchronizer) consumerStatusChanged(
newState orderedSynchronizerState, f func(RowSource),
) {
if s.state == notInitialized {
for i := range s.sources {
f(s.sources[i].src)
}
} else {
// The sources that are not in the heap have been consumed already. It
// would be ok to call ConsumerDone()/ConsumerClosed() on them too, but
// avoiding the call may be a bit faster (in most cases there should be
// no sources left).
for _, sIdx := range s.heap {
f(s.sources[sIdx].src)
}
}
s.state = newState
}
func makeOrderedSync(
ordering sqlbase.ColumnOrdering, evalCtx *parser.EvalContext, sources []RowSource,
) (RowSource, error) {
if len(sources) < 2 {
return nil, errors.Errorf("only %d sources for ordered synchronizer", len(sources))
}
s := &orderedSynchronizer{
state: notInitialized,
sources: make([]srcInfo, len(sources)),
heap: make([]srcIdx, 0, len(sources)),
ordering: ordering,
evalCtx: evalCtx,
}
for i := range s.sources {
s.sources[i].src = sources[i]
}
return s, nil
}