forked from vitessio/vitess
-
Notifications
You must be signed in to change notification settings - Fork 1
/
equal_splits_algorithm.go
242 lines (225 loc) · 9.65 KB
/
equal_splits_algorithm.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
package splitquery
import (
"fmt"
"math/big"
"strconv"
log "github.com/golang/glog"
"github.com/youtube/vitess/go/sqltypes"
querypb "github.com/youtube/vitess/go/vt/proto/query"
"github.com/youtube/vitess/go/vt/sqlparser"
)
// EqualSplitsAlgorithm implements the SplitAlgorithmInterface and represents the equal-splits
// algorithm for generating the boundary tuples. If this algorithm is used then
// SplitParams.split_columns must contain only one split_column. Additionally, the split_column
// must have numeric type (integral or floating point).
//
// The algorithm works by issuing a query to the database to find the minimum and maximum
// elements of the split column in the table referenced by the given SQL query. Denote these
// by min and max, respecitvely. The algorithm then "splits" the interval [min, max] into
// SplitParams.split_count sub-intervals of equal length:
// [a_1, a_2], [a_2, a_3],..., [a_{split_count}, a_{split_count+1}],
// where min=a_1 < a_2 < a_3 < ... < a_split_count < a_{split_count+1}=max.
// The boundary points returned by this algorithm are then: a_2, a_3, ..., a_{split_count}
// (an empty list of boundary points is returned if split_count <= 1). If the type of the
// split column is integral, the boundary points are truncated to the integer part.
type EqualSplitsAlgorithm struct {
splitParams *SplitParams
sqlExecuter SQLExecuter
minMaxQuery string
}
// NewEqualSplitsAlgorithm constructs a new equal splits algorithm.
// It requires an SQLExecuter since it needs to execute a query to figure out the
// minimum and maximum elements in the table.
func NewEqualSplitsAlgorithm(splitParams *SplitParams, sqlExecuter SQLExecuter) (
*EqualSplitsAlgorithm, error) {
if len(splitParams.splitColumns) != len(splitParams.splitColumnTypes) {
panic(fmt.Sprintf("len(splitparams.splitColumns) != len(splitparams.splitColumnTypes): %v!=%v",
len(splitParams.splitColumns), len(splitParams.splitColumnTypes)))
}
if len(splitParams.splitColumns) != 1 {
return nil, fmt.Errorf("using the EQUAL_SPLITS algorithm in SplitQuery requires having"+
" exactly one split-column. Got split-columns: %v",
splitParams.splitColumns)
}
if !sqltypes.IsFloat(splitParams.splitColumnTypes[0]) &&
!sqltypes.IsIntegral(splitParams.splitColumnTypes[0]) {
return nil, fmt.Errorf("using the EQUAL_SPLITS algorithm in SplitQuery requires having"+
" a numeric (integral or float) split-column. Got type: %v", splitParams.splitColumnTypes[0])
}
if splitParams.splitCount <= 0 {
return nil, fmt.Errorf("using the EQUAL_SPLITS algorithm in SplitQuery requires a positive"+
" splitParams.splitCount. Got: %v", splitParams.splitCount)
}
result := &EqualSplitsAlgorithm{
splitParams: splitParams,
sqlExecuter: sqlExecuter,
minMaxQuery: buildMinMaxQuery(splitParams),
}
return result, nil
}
func (a *EqualSplitsAlgorithm) generateBoundaries() ([]tuple, error) {
// generateBoundaries should work for a split_column whose type is integral
// (both signed and unsigned) as well as for floating point values.
// We perform the calculation of the boundaries using precise big.Rat arithmetic and only
// truncate the result in the end if necessary.
// We do this since using float64 arithmetic does not have enough precision:
// for example, if max=math.MaxUint64 and min=math.MaxUint64-1000 then float64(min)==float64(max).
// On the other hand, using integer arithmetic for the case where the split_column is integral
// (i.e., rounding (max-min)/split_count to an integer) may cause very dissimilar interval
// lengths or a large deviation between split_count and the number of query-parts actually
// returned (consider min=0, max=9.5*10^6, and split_count=10^6).
// Note(erez): We can probably get away with using big.Float with ~64 bits of precision which
// will likely be more efficient. However, we defer optimizing this code until we see if this
// is a bottle-neck.
minValue, maxValue, err := a.executeMinMaxQuery()
if err != nil {
return nil, err
}
// If the table is empty, minValue and maxValue will be NULL.
if (minValue.IsNull() && !maxValue.IsNull()) ||
!minValue.IsNull() && maxValue.IsNull() {
panic(fmt.Sprintf("minValue and maxValue must both be NULL or both be non-NULL."+
" minValue: %v, maxValue: %v, splitParams.sql: %v",
minValue, maxValue, a.splitParams.sql))
}
if minValue.IsNull() {
log.Infof("Splitting an empty table. splitParams.sql: %v. Query will not be split.",
a.splitParams.sql)
return []tuple{}, nil
}
min, err := valueToBigRat(minValue)
if err != nil {
panic(fmt.Sprintf("Failed to convert min to a big.Rat: %v, min: %+v", err, min))
}
max, err := valueToBigRat(maxValue)
if err != nil {
panic(fmt.Sprintf("Failed to convert max to a big.Rat: %v, max: %+v", err, max))
}
minCmpMax := min.Cmp(max)
if minCmpMax > 0 {
panic(fmt.Sprintf("max(splitColumn) < min(splitColumn): max:%v, min:%v", max, min))
}
if minCmpMax == 0 {
log.Infof("max(%v)=min(%v)=%v. splitParams.sql: %v. Query will not be split.",
a.splitParams.splitColumns[0],
a.splitParams.splitColumns[0],
min,
a.splitParams.sql)
return []tuple{}, nil
}
// subIntervalSize = (max - min) / a.splitParams.splitCount
subIntervalSize := new(big.Rat)
subIntervalSize.Sub(max, min)
subIntervalSize.Quo(subIntervalSize, new(big.Rat).SetInt64(a.splitParams.splitCount))
boundary := new(big.Rat).Set(min) // Copy min into boundary.
var result []tuple
for i := int64(1); i < a.splitParams.splitCount; i++ {
boundary.Add(boundary, subIntervalSize)
// Here boundary=min+i*subIntervalSize
boundaryValue := bigRatToValue(boundary, a.splitParams.splitColumnTypes[0])
result = append(result, tuple{boundaryValue})
}
return result, nil
}
func (a *EqualSplitsAlgorithm) executeMinMaxQuery() (minValue, maxValue sqltypes.Value, err error) {
sqlResults, err := a.sqlExecuter.SQLExecute(a.minMaxQuery, nil /* Bind Variables */)
if err != nil {
return sqltypes.Value{}, sqltypes.Value{}, err
}
if len(sqlResults.Rows) != 1 {
panic(fmt.Sprintf("MinMaxQuery should return exactly 1 row from query. MinMaxQuery: %v"+
" Results: %v", a.minMaxQuery, sqlResults))
}
if len(sqlResults.Rows[0]) != 2 {
panic(fmt.Sprintf("MinMaxQuery should return exactly 2 columns. MinMaxQuery: %v, Results:%v",
a.minMaxQuery, sqlResults))
}
return sqlResults.Rows[0][0], sqlResults.Rows[0][1], nil
}
// buildMinMaxQuery returns the query to execute to get the minimum and maximum of the splitColumn.
// The query returned is:
// SELECT MIN(<splitColumn>), MAX(<splitColumn>) FROM <table>;
// where <table> is the table referenced in the original query (held in splitParams.sql).
func buildMinMaxQuery(splitParams *SplitParams) string {
// The SplitParams constructor should have already checked that the FROM clause of the query
// is a simple table expression, so this type-assertion should succeed.
tableName := sqlparser.GetTableName(
splitParams.selectAST.From[0].(*sqlparser.AliasedTableExpr).Expr)
if tableName == "" {
panic(fmt.Sprintf("Can't get tableName from query %v", splitParams.sql))
}
return fmt.Sprintf("select min(%v), max(%v) from %v",
splitParams.splitColumns[0],
splitParams.splitColumns[0],
tableName)
}
// bigRatToValue converts 'number' to an SQL value with SQL type: valueType.
// If valueType is integral it truncates 'number' to the integer part according to the
// semantics of the big.Rat.Int method.
func bigRatToValue(number *big.Rat, valueType querypb.Type) sqltypes.Value {
var numberAsBytes []byte
switch {
case sqltypes.IsIntegral(valueType):
// 'number.Num()' returns a reference to the numerator of 'number'.
// We copy it here to avoid changing 'number'.
truncatedNumber := new(big.Int).Set(number.Num())
truncatedNumber.Quo(truncatedNumber, number.Denom())
numberAsBytes = bigIntToSliceOfBytes(truncatedNumber)
case sqltypes.IsFloat(valueType):
// Truncate to the closest 'float'.
// There's not much we can do if there isn't an exact representation.
numberAsFloat64, _ := number.Float64()
numberAsBytes = strconv.AppendFloat([]byte{}, numberAsFloat64, 'f', -1, 64)
default:
panic(fmt.Sprintf("Unsupported type: %v", valueType))
}
result, err := sqltypes.ValueFromBytes(valueType, numberAsBytes)
if err != nil {
panic(fmt.Sprintf("sqltypes.ValueFromBytes failed with: %v", err))
}
return result
}
// Converts a big.Int into a slice of bytes.
func bigIntToSliceOfBytes(bigInt *big.Int) []byte {
// Go1.6 introduced the method bigInt.Append() which makes this conversion
// a lot easier.
// TODO(erez): Use bigInt.Append() once we switch to GO-1.6.
result := strconv.AppendQuoteToASCII([]byte{}, bigInt.String())
// AppendQuoteToASCII adds a double-quoted string. We need to remove them.
return result[1 : len(result)-1]
}
// valueToBigRat converts a numeric 'value' into a big.Rat object.
func valueToBigRat(value sqltypes.Value) (*big.Rat, error) {
switch {
case value.IsUnsigned():
nativeValue, err := value.ParseUint64()
if err != nil {
return nil, err
}
return uint64ToBigRat(nativeValue), nil
case value.IsSigned():
nativeValue, err := value.ParseInt64()
if err != nil {
return nil, err
}
return int64ToBigRat(nativeValue), nil
case value.IsFloat():
nativeValue, err := value.ParseFloat64()
if err != nil {
return nil, err
}
return float64ToBigRat(nativeValue), nil
default:
panic(fmt.Sprintf("got value with a non numeric type: %v", value))
}
}
func int64ToBigRat(value int64) *big.Rat {
return new(big.Rat).SetInt64(value)
}
func uint64ToBigRat(value uint64) *big.Rat {
// big.Rat does not have a 'setUint64()' so we have to use an intermediate 'big.Int'.
return new(big.Rat).SetInt(big.NewInt(0).SetUint64(value))
}
func float64ToBigRat(value float64) *big.Rat {
return new(big.Rat).SetFloat64(value)
}