forked from dolthub/vitess
-
Notifications
You must be signed in to change notification settings - Fork 0
/
row_differ.go
366 lines (328 loc) · 12.2 KB
/
row_differ.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
/*
Copyright 2017 Google Inc.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package worker
import (
"fmt"
"time"
"golang.org/x/net/context"
"vitess.io/vitess/go/sqltypes"
"vitess.io/vitess/go/stats"
"vitess.io/vitess/go/vt/key"
"vitess.io/vitess/go/vt/topo"
querypb "vitess.io/vitess/go/vt/proto/query"
tabletmanagerdatapb "vitess.io/vitess/go/vt/proto/tabletmanagerdata"
topodatapb "vitess.io/vitess/go/vt/proto/topodata"
)
// DiffType specifies why a specific row was found as different when comparing
// a left and right side.
type DiffType int
const (
// DiffMissing is returned when the row is missing on the right side.
DiffMissing DiffType = iota
// DiffNotEqual is returned when the row on the left and right side are
// not equal.
DiffNotEqual
// DiffExtraneous is returned when the row exists on the right side, but not
// on the left side.
DiffExtraneous
// DiffEqual is returned when the rows left and right are equal.
DiffEqual
)
// DiffTypes has the list of available DiffType values, ordered by their value.
var DiffTypes = []DiffType{DiffMissing, DiffNotEqual, DiffExtraneous, DiffEqual}
// DiffFoundTypes has the list of DiffType values which represent that a
// difference was found. The list is ordered by the values of the types.
var DiffFoundTypes = []DiffType{DiffMissing, DiffNotEqual, DiffExtraneous}
// RowDiffer2 will compare and reconcile two sides. It assumes that the left
// side is the source of truth and necessary reconciliations have to be applied
// to the right side.
// It also assumes left and right are sorted by ascending primary key.
type RowDiffer2 struct {
left *RowReader
right *RowReader
pkFieldCount int
// tableStatusList is used to report the number of reconciled rows.
tableStatusList *tableStatusList
// tableIndex is the index of the table in the schema. It is required for
// reporting the number of reconciled rows to tableStatusList.
tableIndex int
// router returns for a row to which destination shard index it should go.
router RowRouter
// aggregators are keyed by destination shard and DiffType.
aggregators [][]*RowAggregator
// equalRowsStatsCounters tracks per table how many rows are equal.
equalRowsStatsCounters *stats.CountersWithSingleLabel
// tableName is required to update "equalRowsStatsCounters".
tableName string
}
// NewRowDiffer2 returns a new RowDiffer2.
// We assume that the indexes of the slice parameters always correspond to the
// same shard e.g. insertChannels[0] refers to destinationShards[0] and so on.
// The column list td.Columns must be have all primary key columns first and
// then the non-primary-key columns. The columns in the rows returned by
// both ResultReader must have the same order as td.Columns.
func NewRowDiffer2(ctx context.Context, left, right ResultReader, td *tabletmanagerdatapb.TableDefinition, tableStatusList *tableStatusList, tableIndex int,
// Parameters required by RowRouter.
destinationShards []*topo.ShardInfo, keyResolver keyspaceIDResolver,
// Parameters required by RowAggregator.
insertChannels []chan string, abort <-chan struct{}, dbNames []string, writeQueryMaxRows, writeQueryMaxSize int, statsCounters []*stats.CountersWithSingleLabel) (*RowDiffer2, error) {
if len(statsCounters) != len(DiffTypes) {
panic(fmt.Sprintf("statsCounter has the wrong number of elements. got = %v, want = %v", len(statsCounters), len(DiffTypes)))
}
if err := compareFields(left.Fields(), right.Fields()); err != nil {
return nil, err
}
// Create a RowAggregator for each destination shard and DiffType.
aggregators := make([][]*RowAggregator, len(destinationShards))
for i := range destinationShards {
aggregators[i] = make([]*RowAggregator, len(DiffFoundTypes))
for _, typ := range DiffFoundTypes {
maxRows := writeQueryMaxRows
aggregators[i][typ] = NewRowAggregator(ctx, maxRows, writeQueryMaxSize,
insertChannels[i], dbNames[i], td, typ, statsCounters[typ])
}
}
return &RowDiffer2{
left: NewRowReader(left),
right: NewRowReader(right),
pkFieldCount: len(td.PrimaryKeyColumns),
tableStatusList: tableStatusList,
tableIndex: tableIndex,
router: NewRowRouter(destinationShards, keyResolver),
aggregators: aggregators,
equalRowsStatsCounters: statsCounters[DiffEqual],
tableName: td.Name,
}, nil
}
func compareFields(left, right []*querypb.Field) error {
if len(left) != len(right) {
return fmt.Errorf("Cannot diff inputs with different number of fields: left: %v right: %v", left, right)
}
for i, field := range left {
if field.Type != right[i].Type {
return fmt.Errorf("Cannot diff inputs with different types: field %v types are %v and %v", i, field.Type, right[i].Type)
}
}
return nil
}
// Diff runs the diff and reconcile.
// If an error occurs, it will return and stop.
func (rd *RowDiffer2) Diff() (DiffReport, error) {
var dr DiffReport
var err error
dr.startingTime = time.Now()
defer dr.ComputeQPS()
fields := rd.left.Fields()
var left []sqltypes.Value
var right []sqltypes.Value
advanceLeft := true
advanceRight := true
for {
if advanceLeft {
if left, err = rd.left.Next(); err != nil {
return dr, err
}
advanceLeft = false
}
if advanceRight {
if right, err = rd.right.Next(); err != nil {
return dr, err
}
advanceRight = false
}
dr.processedRows++
if left == nil && right == nil {
// No more rows from either side. We're done.
break
}
if left == nil {
// No more rows on the left side.
// We know we have at least one row on the right side left.
// Delete the row from the destination.
if err := rd.reconcileRow(right, DiffExtraneous); err != nil {
return dr, err
}
dr.extraRowsRight++
advanceRight = true
continue
}
if right == nil {
// No more rows on the right side.
// We know we have at least one row on the left side left.
// Add the row on the destination.
if err := rd.reconcileRow(left, DiffMissing); err != nil {
return dr, err
}
dr.extraRowsLeft++
advanceLeft = true
continue
}
// we have both left and right, compare
f := RowsEqual(left, right)
if f == -1 {
// rows are the same, next
dr.matchingRows++
advanceLeft = true
advanceRight = true
rd.skipRow()
continue
}
if f >= rd.pkFieldCount {
// rows have the same primary key, only content is different
dr.mismatchedRows++
advanceLeft = true
advanceRight = true
// Update the row on the destination.
if err := rd.updateRow(left, right, DiffNotEqual); err != nil {
return dr, err
}
continue
}
// have to find the 'smallest' row and advance it
c, err := CompareRows(fields, rd.pkFieldCount, left, right)
if err != nil {
return dr, err
}
if c < 0 {
dr.extraRowsLeft++
advanceLeft = true
// Add the row on the destination.
if err := rd.reconcileRow(left, DiffMissing); err != nil {
return dr, err
}
continue
} else if c > 0 {
dr.extraRowsRight++
advanceRight = true
// Delete the row from the destination.
if err := rd.reconcileRow(right, DiffExtraneous); err != nil {
return dr, err
}
continue
}
// Values of the primary key columns were not binary equal but their parsed
// values are.
// This happens when the raw values returned by MySQL were different but
// they became equal after we parsed them into ints/floats
// (due to leading/trailing zeros, for example). So this can happen if MySQL
// is inconsistent in how it prints a given number.
dr.mismatchedRows++
advanceLeft = true
advanceRight = true
// Update the row on the destination.
if err := rd.updateRow(left, right, DiffNotEqual); err != nil {
return dr, err
}
}
// Flush all aggregators in case they have buffered queries left.
for i := range rd.aggregators {
for _, aggregator := range rd.aggregators[i] {
if err := aggregator.Flush(); err != nil {
return dr, err
}
}
}
return dr, nil
}
// skipRow is used for the DiffType DiffEqual.
// Currently, it only updates the statistics and therefore does not require the
// row as input.
func (rd *RowDiffer2) skipRow() {
rd.equalRowsStatsCounters.Add(rd.tableName, 1)
rd.tableStatusList.addCopiedRows(rd.tableIndex, 1)
}
// reconcileRow is used for the DiffType DiffMissing and DiffExtraneous.
func (rd *RowDiffer2) reconcileRow(row []sqltypes.Value, typ DiffType) error {
if typ == DiffNotEqual {
panic(fmt.Sprintf("reconcileRow() called with wrong type: %v", typ))
}
destShardIndex, err := rd.router.Route(row)
if err != nil {
return fmt.Errorf("failed to route row (%v) to correct shard: %v", row, err)
}
if err := rd.aggregators[destShardIndex][typ].Add(row); err != nil {
return fmt.Errorf("failed to add row update to RowAggregator: %v", err)
}
// TODO(mberlin): Add more fine granular stats here.
rd.tableStatusList.addCopiedRows(rd.tableIndex, 1)
return nil
}
// updateRow is used for the DiffType DiffNotEqual.
// It needs to look at the row of the source (newRow) and destination (oldRow)
// to detect if the keyspace_id has changed in the meantime.
// If that's the case, we cannot UPDATE the row. Instead, we must DELETE
// the old row and INSERT the new row to the respective destination shards.
func (rd *RowDiffer2) updateRow(newRow, oldRow []sqltypes.Value, typ DiffType) error {
if typ != DiffNotEqual {
panic(fmt.Sprintf("updateRow() called with wrong type: %v", typ))
}
destShardIndexOld, err := rd.router.Route(oldRow)
if err != nil {
return fmt.Errorf("failed to route old row (%v) to correct shard: %v", oldRow, err)
}
destShardIndexNew, err := rd.router.Route(newRow)
if err != nil {
return fmt.Errorf("failed to route new row (%v) to correct shard: %v", newRow, err)
}
if destShardIndexOld == destShardIndexNew {
// keyspace_id has not changed. Update the row in place on the destination.
if err := rd.aggregators[destShardIndexNew][typ].Add(newRow); err != nil {
return fmt.Errorf("failed to add row update to RowAggregator (keyspace_id not changed): %v", err)
}
} else {
// keyspace_id has changed. Delete the old row and insert the new one.
if err := rd.aggregators[destShardIndexOld][DiffExtraneous].Add(oldRow); err != nil {
return fmt.Errorf("failed to add row update to RowAggregator (keyspace_id changed, deleting old row): %v", err)
}
if err := rd.aggregators[destShardIndexNew][DiffMissing].Add(newRow); err != nil {
return fmt.Errorf("failed to add row update to RowAggregator (keyspace_id changed, inserting new row): %v", err)
}
}
// TODO(mberlin): Add more fine granular stats here.
rd.tableStatusList.addCopiedRows(rd.tableIndex, 1)
return nil
}
// RowRouter allows to find out which shard's key range contains a given
// keyspace ID.
type RowRouter struct {
keyResolver keyspaceIDResolver
keyRanges []*topodatapb.KeyRange
}
// NewRowRouter creates a RowRouter.
// We assume that the key ranges in shardInfo cover the full keyrange i.e.
// for any possible keyspaceID there is a shard we can route to.
func NewRowRouter(shardInfos []*topo.ShardInfo, keyResolver keyspaceIDResolver) RowRouter {
keyRanges := make([]*topodatapb.KeyRange, len(shardInfos))
for i, si := range shardInfos {
keyRanges[i] = si.KeyRange
}
return RowRouter{keyResolver, keyRanges}
}
// Route returns which shard (specified by the index of the list of shards
// passed in NewRowRouter) contains the given row.
func (rr *RowRouter) Route(row []sqltypes.Value) (int, error) {
if len(rr.keyRanges) == 1 {
// Fast path when there is only one destination shard.
return 0, nil
}
k, err := rr.keyResolver.keyspaceID(row)
if err != nil {
return -1, err
}
for i, kr := range rr.keyRanges {
if key.KeyRangeContains(kr, k) {
return i, nil
}
}
return -1, fmt.Errorf("no shard's key range includes the keyspace id: %v for the row: %#v", k, row)
}