forked from dolthub/vitess
-
Notifications
You must be signed in to change notification settings - Fork 0
/
row_splitter.go
73 lines (65 loc) · 2.13 KB
/
row_splitter.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
// Copyright 2014, Google Inc. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package worker
import (
"github.com/youtube/vitess/go/sqltypes"
"github.com/youtube/vitess/go/vt/key"
"github.com/youtube/vitess/go/vt/topo"
querypb "github.com/youtube/vitess/go/vt/proto/query"
topodatapb "github.com/youtube/vitess/go/vt/proto/topodata"
)
// RowSplitter is a helper class to split rows into multiple
// subsets targeted to different shards.
type RowSplitter struct {
KeyResolver keyspaceIDResolver
KeyRanges []*topodatapb.KeyRange
}
// NewRowSplitter returns a new row splitter for the given shard distribution.
func NewRowSplitter(shardInfos []*topo.ShardInfo, keyResolver keyspaceIDResolver) *RowSplitter {
result := &RowSplitter{
KeyResolver: keyResolver,
KeyRanges: make([]*topodatapb.KeyRange, len(shardInfos)),
}
for i, si := range shardInfos {
result.KeyRanges[i] = si.KeyRange
}
return result
}
// StartSplit starts a new split. Split can then be called multiple times.
func (rs *RowSplitter) StartSplit() [][][]sqltypes.Value {
return make([][][]sqltypes.Value, len(rs.KeyRanges))
}
// Split will split the rows into subset for each distribution
func (rs *RowSplitter) Split(result [][][]sqltypes.Value, rows [][]sqltypes.Value) error {
for _, row := range rows {
k, err := rs.KeyResolver.keyspaceID(row)
if err != nil {
return err
}
for i, kr := range rs.KeyRanges {
if key.KeyRangeContains(kr, k) {
result[i] = append(result[i], row)
break
}
}
}
return nil
}
// Send will send the rows to the list of channels. Returns true if aborted.
func (rs *RowSplitter) Send(fields []*querypb.Field, result [][][]sqltypes.Value, baseCmd string, insertChannels []chan string, abort <-chan struct{}) bool {
for i, c := range insertChannels {
// one of the chunks might be empty, so no need
// to send data in that case
if len(result[i]) > 0 {
cmd := baseCmd + makeValueString(fields, result[i])
// also check on abort, so we don't wait forever
select {
case c <- cmd:
case <-abort:
return true
}
}
}
return false
}