-
Notifications
You must be signed in to change notification settings - Fork 0
/
keys.go
106 lines (99 loc) · 2.66 KB
/
keys.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
package spankeys
import (
"context"
"errors"
"fmt"
"math"
"strings"
"cloud.google.com/go/spanner"
)
type CountableKeyRange struct {
spanner.KeyRange
RowCount int64
}
func CountIndexesWithChildren(ctx context.Context, client *spanner.Client, tableName string) (int, error) {
idxCnt := 0
secIdxes, err := GetSecondaryIndexes(ctx, client, tableName)
if err != nil {
return 0, err
}
idxCnt += len(secIdxes)
children, err := GetInterleaveChildren(ctx, client, tableName)
if err != nil {
return 0, err
}
for _, child := range children {
if child.OnDelete == OnDeleteCascade {
cnt, err := CountIndexesWithChildren(ctx, client, child.Table)
if err != nil {
return 0, err
}
idxCnt += cnt
}
}
return idxCnt, nil
}
func CalcMutationBatchSize(ctx context.Context, client *spanner.Client, tableName string) (int, error) {
idxCnt, err := CountIndexesWithChildren(ctx, client, tableName)
if err != nil {
return 0, err
}
if idxCnt == 0 {
// if the table has no index, mutation count is 1 regardless of the number of rows
return math.MaxInt32, nil
}
return 20000/idxCnt - 1, nil
}
func PartitionsKeyRanges(ctx context.Context, client *spanner.Client, tableName string, pkColumns []*Column, mutationBatchSize, selectLimit int) ([]*CountableKeyRange, error) {
var pkns []string
for _, col := range pkColumns {
pkns = append(pkns, fmt.Sprintf("`%s`", col.Name))
}
if len(pkns) < 1 {
return nil, errors.New("at least one of Primary Key is required")
}
sql := fmt.Sprintf("SELECT %s FROM `%s` ORDER BY %s ASC LIMIT %d", strings.Join(pkns, ","), tableName, pkns[0], selectLimit)
stmt := spanner.NewStatement(sql)
var keySets []*CountableKeyRange
var startKey spanner.Key
var currentKey spanner.Key
cnt := 0
if err := client.Single().Query(ctx, stmt).Do(func(r *spanner.Row) error {
var key spanner.Key
for _, col := range pkColumns {
var gcv spanner.GenericColumnValue
if err := r.ColumnByName(col.Name, &gcv); err != nil {
return err
}
var k interface{}
if err := DecodeToInterface(&gcv, &k); err != nil {
return err
}
key = append(key, k)
}
currentKey = key
if cnt == 0 {
startKey = key
}
cnt++
if cnt >= mutationBatchSize {
endKey := key
keySets = append(keySets, &CountableKeyRange{
KeyRange: spanner.KeyRange{Start: startKey, End: endKey, Kind: spanner.ClosedClosed},
RowCount: int64(cnt),
})
cnt = 0
}
return nil
}); err != nil {
return nil, err
}
if cnt > 0 && currentKey != nil {
keySets = append(keySets, &CountableKeyRange{
KeyRange: spanner.KeyRange{Start: startKey, End: currentKey, Kind: spanner.ClosedClosed},
RowCount: int64(cnt),
})
cnt = 0
}
return keySets, nil
}