Skip to content

Commit eedac87

Browse files
PanicOnWarnings option to detect SQL warnings and fail the copy process (#1500)
* First pass at raise_on_warnings * Update dev.yml * add CLI option, ignore expected warnings, check for row count discrepancy * Count rows in each insert range prepared query - log insert warnings always - terminate if row count doesn't match and the PanicOnWarnings flag is set * Lint comments - ensure errors are handled in show warnings * update TestBuildUniqueKeyRangeEndPreparedQuery * Localtests for PanicOnWarnings with data loss * Unwrap CTE (mysql 5) * Update localtests/panic-on-warnings-duplicate-unique-values-on-column-type-change/extra_args Co-authored-by: Bastian Bartmann <accounts@bastianbartmann.de> * limit BuildUniqueKeyRangeEndPreparedQueryViaOffset subquery properly * Update Applier to support all unique indices with PanicOnWarnings. Add test coverage. * Impl code review feedback for PanicOnWarnings - documentation - remove dev.yml - remove unused variable * bump golangci-lint for local dev * Support altering index names with PanicOnWarnings * Fix string matching for PanicOnWarnings to correctly suppress warnings when renaming unique keys Error message formats are different across mysql distributions and versions --------- Co-authored-by: Bastian Bartmann <accounts@bastianbartmann.de>
1 parent 5c3e42a commit eedac87

File tree

18 files changed

+293
-35
lines changed

18 files changed

+293
-35
lines changed

Diff for: doc/command-line-flags.md

+6
Original file line numberDiff line numberDiff line change
@@ -202,6 +202,12 @@ List of metrics and threshold values; topping the threshold of any will cause th
202202

203203
Typically `gh-ost` is used to migrate tables on a master. If you wish to only perform the migration in full on a replica, connect `gh-ost` to said replica and pass `--migrate-on-replica`. `gh-ost` will briefly connect to the master but otherwise will make no changes on the master. Migration will be fully executed on the replica, while making sure to maintain a small replication lag.
204204

205+
### panic-on-warnings
206+
207+
When this flag is set, `gh-ost` will panic when SQL warnings indicating data loss are encountered when copying data. This flag helps prevent data loss scenarios with migrations touching unique keys, column collation and types, as well as `NOT NULL` constraints, where `MySQL` will silently drop inserted rows that no longer satisfy the updated constraint (also dependent on the configured `sql_mode`).
208+
209+
While `panic-on-warnings` is currently disabled by defaults, it will default to `true` in a future version of `gh-ost`.
210+
205211
### postpone-cut-over-flag-file
206212

207213
Indicate a file name, such that the final [cut-over](cut-over.md) step does not take place as long as the file exists.

Diff for: go/base/context.go

+2
Original file line numberDiff line numberDiff line change
@@ -151,6 +151,7 @@ type MigrationContext struct {
151151
HooksHintOwner string
152152
HooksHintToken string
153153
HooksStatusIntervalSec int64
154+
PanicOnWarnings bool
154155

155156
DropServeSocket bool
156157
ServeSocketFile string
@@ -231,6 +232,7 @@ type MigrationContext struct {
231232
ColumnRenameMap map[string]string
232233
DroppedColumnsMap map[string]bool
233234
MappedSharedColumns *sql.ColumnList
235+
MigrationLastInsertSQLWarnings []string
234236
MigrationRangeMinValues *sql.ColumnValues
235237
MigrationRangeMaxValues *sql.ColumnValues
236238
Iteration int64

Diff for: go/cmd/gh-ost/main.go

+1
Original file line numberDiff line numberDiff line change
@@ -108,6 +108,7 @@ func main() {
108108
chunkSize := flag.Int64("chunk-size", 1000, "amount of rows to handle in each iteration (allowed range: 10-100,000)")
109109
dmlBatchSize := flag.Int64("dml-batch-size", 10, "batch size for DML events to apply in a single transaction (range 1-100)")
110110
defaultRetries := flag.Int64("default-retries", 60, "Default number of retries for various operations before panicking")
111+
flag.BoolVar(&migrationContext.PanicOnWarnings, "panic-on-warnings", false, "Panic when SQL warnings are encountered when copying a batch indicating data loss")
111112
cutOverLockTimeoutSeconds := flag.Int64("cut-over-lock-timeout-seconds", 3, "Max number of seconds to hold locks on tables while attempting to cut-over (retry attempted when lock exceeds timeout) or attempting instant DDL")
112113
niceRatio := flag.Float64("nice-ratio", 0, "force being 'nice', imply sleep time per chunk time; range: [0.0..100.0]. Example values: 0 is aggressive. 1: for every 1ms spent copying rows, sleep additional 1ms (effectively doubling runtime); 0.7: for every 10ms spend in a rowcopy chunk, spend 7ms sleeping immediately after")
113114

Diff for: go/logic/applier.go

+45-9
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ package logic
88
import (
99
gosql "database/sql"
1010
"fmt"
11+
"regexp"
1112
"strings"
1213
"sync/atomic"
1314
"time"
@@ -662,7 +663,7 @@ func (this *Applier) ReadMigrationRangeValues() error {
662663
// which will be used for copying the next chunk of rows. Ir returns "false" if there is
663664
// no further chunk to work through, i.e. we're past the last chunk and are done with
664665
// iterating the range (and this done with copying row chunks)
665-
func (this *Applier) CalculateNextIterationRangeEndValues() (hasFurtherRange bool, err error) {
666+
func (this *Applier) CalculateNextIterationRangeEndValues() (hasFurtherRange bool, expectedRowCount int64, err error) {
666667
this.migrationContext.MigrationIterationRangeMinValues = this.migrationContext.MigrationIterationRangeMaxValues
667668
if this.migrationContext.MigrationIterationRangeMinValues == nil {
668669
this.migrationContext.MigrationIterationRangeMinValues = this.migrationContext.MigrationRangeMinValues
@@ -683,32 +684,36 @@ func (this *Applier) CalculateNextIterationRangeEndValues() (hasFurtherRange boo
683684
fmt.Sprintf("iteration:%d", this.migrationContext.GetIteration()),
684685
)
685686
if err != nil {
686-
return hasFurtherRange, err
687+
return hasFurtherRange, expectedRowCount, err
687688
}
688689

689690
rows, err := this.db.Query(query, explodedArgs...)
690691
if err != nil {
691-
return hasFurtherRange, err
692+
return hasFurtherRange, expectedRowCount, err
692693
}
693694
defer rows.Close()
694695

695-
iterationRangeMaxValues := sql.NewColumnValues(this.migrationContext.UniqueKey.Len())
696+
iterationRangeMaxValues := sql.NewColumnValues(this.migrationContext.UniqueKey.Len() + 1)
696697
for rows.Next() {
697698
if err = rows.Scan(iterationRangeMaxValues.ValuesPointers...); err != nil {
698-
return hasFurtherRange, err
699+
return hasFurtherRange, expectedRowCount, err
699700
}
700-
hasFurtherRange = true
701+
702+
expectedRowCount = (*iterationRangeMaxValues.ValuesPointers[len(iterationRangeMaxValues.ValuesPointers)-1].(*interface{})).(int64)
703+
iterationRangeMaxValues = sql.ToColumnValues(iterationRangeMaxValues.AbstractValues()[:len(iterationRangeMaxValues.AbstractValues())-1])
704+
705+
hasFurtherRange = expectedRowCount > 0
701706
}
702707
if err = rows.Err(); err != nil {
703-
return hasFurtherRange, err
708+
return hasFurtherRange, expectedRowCount, err
704709
}
705710
if hasFurtherRange {
706711
this.migrationContext.MigrationIterationRangeMaxValues = iterationRangeMaxValues
707-
return hasFurtherRange, nil
712+
return hasFurtherRange, expectedRowCount, nil
708713
}
709714
}
710715
this.migrationContext.Log.Debugf("Iteration complete: no further range to iterate")
711-
return hasFurtherRange, nil
716+
return hasFurtherRange, expectedRowCount, nil
712717
}
713718

714719
// ApplyIterationInsertQuery issues a chunk-INSERT query on the ghost table. It is where
@@ -753,6 +758,37 @@ func (this *Applier) ApplyIterationInsertQuery() (chunkSize int64, rowsAffected
753758
if err != nil {
754759
return nil, err
755760
}
761+
762+
if this.migrationContext.PanicOnWarnings {
763+
//nolint:execinquery
764+
rows, err := tx.Query("SHOW WARNINGS")
765+
if err != nil {
766+
return nil, err
767+
}
768+
defer rows.Close()
769+
if err = rows.Err(); err != nil {
770+
return nil, err
771+
}
772+
773+
var sqlWarnings []string
774+
for rows.Next() {
775+
var level, message string
776+
var code int
777+
if err := rows.Scan(&level, &code, &message); err != nil {
778+
this.migrationContext.Log.Warningf("Failed to read SHOW WARNINGS row")
779+
continue
780+
}
781+
// Duplicate warnings are formatted differently across mysql versions, hence the optional table name prefix
782+
migrationUniqueKeyExpression := fmt.Sprintf("for key '(%s\\.)?%s'", this.migrationContext.GetGhostTableName(), this.migrationContext.UniqueKey.NameInGhostTable)
783+
matched, _ := regexp.MatchString(migrationUniqueKeyExpression, message)
784+
if strings.Contains(message, "Duplicate entry") && matched {
785+
continue
786+
}
787+
sqlWarnings = append(sqlWarnings, fmt.Sprintf("%s: %s (%d)", level, message, code))
788+
}
789+
this.migrationContext.MigrationLastInsertSQLWarnings = sqlWarnings
790+
}
791+
756792
if err := tx.Commit(); err != nil {
757793
return nil, err
758794
}

Diff for: go/logic/applier_test.go

+93
Original file line numberDiff line numberDiff line change
@@ -504,6 +504,99 @@ func (suite *ApplierTestSuite) TestCreateGhostTable() {
504504
suite.Require().Equal("CREATE TABLE `_testing_gho` (\n `id` int DEFAULT NULL,\n `item_id` int DEFAULT NULL\n) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci", createDDL)
505505
}
506506

507+
func (suite *ApplierTestSuite) TestPanicOnWarningsInApplyIterationInsertQuerySucceedsWithUniqueKeyWarningInsertedByDMLEvent() {
508+
ctx := context.Background()
509+
510+
var err error
511+
512+
_, err = suite.db.ExecContext(ctx, "CREATE TABLE test.testing (id INT, item_id INT, UNIQUE KEY (item_id));")
513+
suite.Require().NoError(err)
514+
515+
_, err = suite.db.ExecContext(ctx, "CREATE TABLE test._testing_gho (id INT, item_id INT, UNIQUE KEY (item_id));")
516+
suite.Require().NoError(err)
517+
518+
connectionConfig, err := GetConnectionConfig(ctx, suite.mysqlContainer)
519+
suite.Require().NoError(err)
520+
521+
migrationContext := base.NewMigrationContext()
522+
migrationContext.ApplierConnectionConfig = connectionConfig
523+
migrationContext.DatabaseName = "test"
524+
migrationContext.SkipPortValidation = true
525+
migrationContext.OriginalTableName = "testing"
526+
migrationContext.SetConnectionConfig("innodb")
527+
528+
migrationContext.PanicOnWarnings = true
529+
530+
migrationContext.OriginalTableColumns = sql.NewColumnList([]string{"id", "item_id"})
531+
migrationContext.SharedColumns = sql.NewColumnList([]string{"id", "item_id"})
532+
migrationContext.MappedSharedColumns = sql.NewColumnList([]string{"id", "item_id"})
533+
migrationContext.UniqueKey = &sql.UniqueKey{
534+
Name: "item_id",
535+
NameInGhostTable: "item_id",
536+
Columns: *sql.NewColumnList([]string{"item_id"}),
537+
}
538+
539+
applier := NewApplier(migrationContext)
540+
suite.Require().NoError(applier.prepareQueries())
541+
defer applier.Teardown()
542+
543+
err = applier.InitDBConnections()
544+
suite.Require().NoError(err)
545+
546+
_, err = suite.db.ExecContext(ctx, "INSERT INTO test.testing (id, item_id) VALUES (123456, 42);")
547+
suite.Require().NoError(err)
548+
549+
dmlEvents := []*binlog.BinlogDMLEvent{
550+
{
551+
DatabaseName: "test",
552+
TableName: "testing",
553+
DML: binlog.InsertDML,
554+
NewColumnValues: sql.ToColumnValues([]interface{}{123456, 42}),
555+
},
556+
}
557+
err = applier.ApplyDMLEventQueries(dmlEvents)
558+
suite.Require().NoError(err)
559+
560+
err = applier.CreateChangelogTable()
561+
suite.Require().NoError(err)
562+
err = applier.ReadMigrationRangeValues()
563+
suite.Require().NoError(err)
564+
565+
hasFurtherRange, expectedRangeSize, err := applier.CalculateNextIterationRangeEndValues()
566+
suite.Require().NoError(err)
567+
suite.Require().True(hasFurtherRange)
568+
suite.Require().Equal(int64(1), expectedRangeSize)
569+
570+
_, rowsAffected, _, err := applier.ApplyIterationInsertQuery()
571+
suite.Require().NoError(err)
572+
suite.Require().Equal(int64(0), rowsAffected)
573+
574+
// Ensure Duplicate entry '42' for key '_testing_gho.item_id' is ignored correctly
575+
suite.Require().Empty(applier.migrationContext.MigrationLastInsertSQLWarnings)
576+
577+
// Check that the row was inserted
578+
rows, err := suite.db.Query("SELECT * FROM test._testing_gho")
579+
suite.Require().NoError(err)
580+
defer rows.Close()
581+
582+
var count, id, item_id int
583+
for rows.Next() {
584+
err = rows.Scan(&id, &item_id)
585+
suite.Require().NoError(err)
586+
count += 1
587+
}
588+
suite.Require().NoError(rows.Err())
589+
590+
suite.Require().Equal(1, count)
591+
suite.Require().Equal(123456, id)
592+
suite.Require().Equal(42, item_id)
593+
594+
suite.Require().
595+
Equal(int64(1), migrationContext.TotalDMLEventsApplied)
596+
suite.Require().
597+
Equal(int64(0), migrationContext.RowsDeltaEstimate)
598+
}
599+
507600
func TestApplier(t *testing.T) {
508601
suite.Run(t, new(ApplierTestSuite))
509602
}

Diff for: go/logic/inspect.go

+3
Original file line numberDiff line numberDiff line change
@@ -846,6 +846,9 @@ func (this *Inspector) getSharedUniqueKeys(originalUniqueKeys, ghostUniqueKeys [
846846
for _, originalUniqueKey := range originalUniqueKeys {
847847
for _, ghostUniqueKey := range ghostUniqueKeys {
848848
if originalUniqueKey.Columns.IsSubsetOf(&ghostUniqueKey.Columns) {
849+
// In case the unique key gets renamed in -alter, PanicOnWarnings needs to rely on the new name
850+
// to check SQL warnings on the ghost table, so return new name here.
851+
originalUniqueKey.NameInGhostTable = ghostUniqueKey.Name
849852
uniqueKeys = append(uniqueKeys, originalUniqueKey)
850853
break
851854
}

Diff for: go/logic/migrator.go

+15-1
Original file line numberDiff line numberDiff line change
@@ -1238,8 +1238,9 @@ func (this *Migrator) iterateChunks() error {
12381238
// When hasFurtherRange is false, original table might be write locked and CalculateNextIterationRangeEndValues would hangs forever
12391239

12401240
hasFurtherRange := false
1241+
expectedRangeSize := int64(0)
12411242
if err := this.retryOperation(func() (e error) {
1242-
hasFurtherRange, e = this.applier.CalculateNextIterationRangeEndValues()
1243+
hasFurtherRange, expectedRangeSize, e = this.applier.CalculateNextIterationRangeEndValues()
12431244
return e
12441245
}); err != nil {
12451246
return terminateRowIteration(err)
@@ -1265,6 +1266,19 @@ func (this *Migrator) iterateChunks() error {
12651266
if err != nil {
12661267
return err // wrapping call will retry
12671268
}
1269+
1270+
if this.migrationContext.PanicOnWarnings {
1271+
if len(this.migrationContext.MigrationLastInsertSQLWarnings) > 0 {
1272+
for _, warning := range this.migrationContext.MigrationLastInsertSQLWarnings {
1273+
this.migrationContext.Log.Infof("ApplyIterationInsertQuery has SQL warnings! %s", warning)
1274+
}
1275+
if expectedRangeSize != rowsAffected {
1276+
joinedWarnings := strings.Join(this.migrationContext.MigrationLastInsertSQLWarnings, "; ")
1277+
terminateRowIteration(fmt.Errorf("ApplyIterationInsertQuery failed because of SQL warnings: [%s]", joinedWarnings))
1278+
}
1279+
}
1280+
}
1281+
12681282
atomic.AddInt64(&this.migrationContext.TotalRowsCopied, rowsAffected)
12691283
atomic.AddInt64(&this.migrationContext.Iteration, 1)
12701284
return nil

Diff for: go/sql/builder.go

+53-16
Original file line numberDiff line numberDiff line change
@@ -275,36 +275,54 @@ func BuildUniqueKeyRangeEndPreparedQueryViaOffset(databaseName, tableName string
275275

276276
uniqueKeyColumnNames := duplicateNames(uniqueKeyColumns.Names())
277277
uniqueKeyColumnAscending := make([]string, len(uniqueKeyColumnNames))
278-
uniqueKeyColumnDescending := make([]string, len(uniqueKeyColumnNames))
279278
for i, column := range uniqueKeyColumns.Columns() {
280279
uniqueKeyColumnNames[i] = EscapeName(uniqueKeyColumnNames[i])
281280
if column.Type == EnumColumnType {
282281
uniqueKeyColumnAscending[i] = fmt.Sprintf("concat(%s) asc", uniqueKeyColumnNames[i])
283-
uniqueKeyColumnDescending[i] = fmt.Sprintf("concat(%s) desc", uniqueKeyColumnNames[i])
284282
} else {
285283
uniqueKeyColumnAscending[i] = fmt.Sprintf("%s asc", uniqueKeyColumnNames[i])
286-
uniqueKeyColumnDescending[i] = fmt.Sprintf("%s desc", uniqueKeyColumnNames[i])
287284
}
288285
}
286+
joinedColumnNames := strings.Join(uniqueKeyColumnNames, ", ")
289287
result = fmt.Sprintf(`
290288
select /* gh-ost %s.%s %s */
291-
%s
292-
from
293-
%s.%s
294-
where
295-
%s and %s
289+
%s,
290+
(select count(*) from (
291+
select
292+
%s
293+
from
294+
%s.%s
295+
where
296+
%s and %s
297+
limit
298+
%d
299+
) select_osc_chunk)
300+
from (
301+
select
302+
%s
303+
from
304+
%s.%s
305+
where
306+
%s and %s
307+
limit
308+
%d
309+
) select_osc_chunk
296310
order by
297311
%s
298312
limit 1
299313
offset %d`,
300314
databaseName, tableName, hint,
301-
strings.Join(uniqueKeyColumnNames, ", "),
315+
joinedColumnNames, joinedColumnNames,
302316
databaseName, tableName,
303-
rangeStartComparison, rangeEndComparison,
317+
rangeStartComparison, rangeEndComparison, chunkSize,
318+
joinedColumnNames,
319+
databaseName, tableName,
320+
rangeStartComparison, rangeEndComparison, chunkSize,
304321
strings.Join(uniqueKeyColumnAscending, ", "),
305322
(chunkSize - 1),
306323
)
307-
return result, explodedArgs, nil
324+
// 2x the explodedArgs for the subquery (CTE would be possible but not supported by MySQL 5)
325+
return result, append(explodedArgs, explodedArgs...), nil
308326
}
309327

310328
func BuildUniqueKeyRangeEndPreparedQueryViaTemptable(databaseName, tableName string, uniqueKeyColumns *ColumnList, rangeStartArgs, rangeEndArgs []interface{}, chunkSize int64, includeRangeStartValues bool, hint string) (result string, explodedArgs []interface{}, err error) {
@@ -342,8 +360,22 @@ func BuildUniqueKeyRangeEndPreparedQueryViaTemptable(databaseName, tableName str
342360
uniqueKeyColumnDescending[i] = fmt.Sprintf("%s desc", uniqueKeyColumnNames[i])
343361
}
344362
}
363+
364+
joinedColumnNames := strings.Join(uniqueKeyColumnNames, ", ")
345365
result = fmt.Sprintf(`
346-
select /* gh-ost %s.%s %s */ %s
366+
select /* gh-ost %s.%s %s */
367+
%s,
368+
(select count(*) from (
369+
select
370+
%s
371+
from
372+
%s.%s
373+
where
374+
%s and %s
375+
order by
376+
%s
377+
limit %d
378+
) select_osc_chunk)
347379
from (
348380
select
349381
%s
@@ -353,17 +385,22 @@ func BuildUniqueKeyRangeEndPreparedQueryViaTemptable(databaseName, tableName str
353385
%s and %s
354386
order by
355387
%s
356-
limit %d) select_osc_chunk
388+
limit %d
389+
) select_osc_chunk
357390
order by
358391
%s
359392
limit 1`,
360-
databaseName, tableName, hint, strings.Join(uniqueKeyColumnNames, ", "),
361-
strings.Join(uniqueKeyColumnNames, ", "), databaseName, tableName,
393+
databaseName, tableName, hint, joinedColumnNames,
394+
joinedColumnNames, databaseName, tableName,
395+
rangeStartComparison, rangeEndComparison,
396+
strings.Join(uniqueKeyColumnAscending, ", "), chunkSize,
397+
joinedColumnNames, databaseName, tableName,
362398
rangeStartComparison, rangeEndComparison,
363399
strings.Join(uniqueKeyColumnAscending, ", "), chunkSize,
364400
strings.Join(uniqueKeyColumnDescending, ", "),
365401
)
366-
return result, explodedArgs, nil
402+
// 2x the explodedArgs for the subquery (CTE would be possible but not supported by MySQL 5)
403+
return result, append(explodedArgs, explodedArgs...), nil
367404
}
368405

369406
func BuildUniqueKeyMinValuesPreparedQuery(databaseName, tableName string, uniqueKey *UniqueKey) (string, error) {

0 commit comments

Comments
 (0)