Skip to content

Commit a834c00

Browse files
Merge pull request #1459 from github/danieljoos-dml-query-builders
Improve query building routines of DML event queries, reducing time and allocations
2 parents 30f28c2 + 5c0829a commit a834c00

File tree

5 files changed

+233
-85
lines changed

5 files changed

+233
-85
lines changed

Diff for: go/logic/applier.go

+44-8
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,10 @@ type Applier struct {
6060
migrationContext *base.MigrationContext
6161
finishedMigrating int64
6262
name string
63+
64+
dmlDeleteQueryBuilder *sql.DMLDeleteQueryBuilder
65+
dmlInsertQueryBuilder *sql.DMLInsertQueryBuilder
66+
dmlUpdateQueryBuilder *sql.DMLUpdateQueryBuilder
6367
}
6468

6569
func NewApplier(migrationContext *base.MigrationContext) *Applier {
@@ -106,6 +110,37 @@ func (this *Applier) InitDBConnections() (err error) {
106110
return nil
107111
}
108112

113+
func (this *Applier) prepareQueries() (err error) {
114+
if this.dmlDeleteQueryBuilder, err = sql.NewDMLDeleteQueryBuilder(
115+
this.migrationContext.DatabaseName,
116+
this.migrationContext.GetGhostTableName(),
117+
this.migrationContext.OriginalTableColumns,
118+
&this.migrationContext.UniqueKey.Columns,
119+
); err != nil {
120+
return err
121+
}
122+
if this.dmlInsertQueryBuilder, err = sql.NewDMLInsertQueryBuilder(
123+
this.migrationContext.DatabaseName,
124+
this.migrationContext.GetGhostTableName(),
125+
this.migrationContext.OriginalTableColumns,
126+
this.migrationContext.SharedColumns,
127+
this.migrationContext.MappedSharedColumns,
128+
); err != nil {
129+
return err
130+
}
131+
if this.dmlUpdateQueryBuilder, err = sql.NewDMLUpdateQueryBuilder(
132+
this.migrationContext.DatabaseName,
133+
this.migrationContext.GetGhostTableName(),
134+
this.migrationContext.OriginalTableColumns,
135+
this.migrationContext.SharedColumns,
136+
this.migrationContext.MappedSharedColumns,
137+
&this.migrationContext.UniqueKey.Columns,
138+
); err != nil {
139+
return err
140+
}
141+
return nil
142+
}
143+
109144
// validateAndReadGlobalVariables potentially reads server global variables, such as the time_zone and wait_timeout.
110145
func (this *Applier) validateAndReadGlobalVariables() error {
111146
query := `select /* gh-ost */ @@global.time_zone, @@global.wait_timeout`
@@ -1137,35 +1172,36 @@ func (this *Applier) updateModifiesUniqueKeyColumns(dmlEvent *binlog.BinlogDMLEv
11371172

11381173
// buildDMLEventQuery creates a query to operate on the ghost table, based on an intercepted binlog
11391174
// event entry on the original table.
1140-
func (this *Applier) buildDMLEventQuery(dmlEvent *binlog.BinlogDMLEvent) (results [](*dmlBuildResult)) {
1175+
func (this *Applier) buildDMLEventQuery(dmlEvent *binlog.BinlogDMLEvent) []*dmlBuildResult {
11411176
switch dmlEvent.DML {
11421177
case binlog.DeleteDML:
11431178
{
1144-
query, uniqueKeyArgs, err := sql.BuildDMLDeleteQuery(dmlEvent.DatabaseName, this.migrationContext.GetGhostTableName(), this.migrationContext.OriginalTableColumns, &this.migrationContext.UniqueKey.Columns, dmlEvent.WhereColumnValues.AbstractValues())
1145-
return append(results, newDmlBuildResult(query, uniqueKeyArgs, -1, err))
1179+
query, uniqueKeyArgs, err := this.dmlDeleteQueryBuilder.BuildQuery(dmlEvent.WhereColumnValues.AbstractValues())
1180+
return []*dmlBuildResult{newDmlBuildResult(query, uniqueKeyArgs, -1, err)}
11461181
}
11471182
case binlog.InsertDML:
11481183
{
1149-
query, sharedArgs, err := sql.BuildDMLInsertQuery(dmlEvent.DatabaseName, this.migrationContext.GetGhostTableName(), this.migrationContext.OriginalTableColumns, this.migrationContext.SharedColumns, this.migrationContext.MappedSharedColumns, dmlEvent.NewColumnValues.AbstractValues())
1150-
return append(results, newDmlBuildResult(query, sharedArgs, 1, err))
1184+
query, sharedArgs, err := this.dmlInsertQueryBuilder.BuildQuery(dmlEvent.NewColumnValues.AbstractValues())
1185+
return []*dmlBuildResult{newDmlBuildResult(query, sharedArgs, 1, err)}
11511186
}
11521187
case binlog.UpdateDML:
11531188
{
11541189
if _, isModified := this.updateModifiesUniqueKeyColumns(dmlEvent); isModified {
1190+
results := make([]*dmlBuildResult, 0, 2)
11551191
dmlEvent.DML = binlog.DeleteDML
11561192
results = append(results, this.buildDMLEventQuery(dmlEvent)...)
11571193
dmlEvent.DML = binlog.InsertDML
11581194
results = append(results, this.buildDMLEventQuery(dmlEvent)...)
11591195
return results
11601196
}
1161-
query, sharedArgs, uniqueKeyArgs, err := sql.BuildDMLUpdateQuery(dmlEvent.DatabaseName, this.migrationContext.GetGhostTableName(), this.migrationContext.OriginalTableColumns, this.migrationContext.SharedColumns, this.migrationContext.MappedSharedColumns, &this.migrationContext.UniqueKey.Columns, dmlEvent.NewColumnValues.AbstractValues(), dmlEvent.WhereColumnValues.AbstractValues())
1197+
query, sharedArgs, uniqueKeyArgs, err := this.dmlUpdateQueryBuilder.BuildQuery(dmlEvent.NewColumnValues.AbstractValues(), dmlEvent.WhereColumnValues.AbstractValues())
11621198
args := sqlutils.Args()
11631199
args = append(args, sharedArgs...)
11641200
args = append(args, uniqueKeyArgs...)
1165-
return append(results, newDmlBuildResult(query, args, 0, err))
1201+
return []*dmlBuildResult{newDmlBuildResult(query, args, 0, err)}
11661202
}
11671203
}
1168-
return append(results, newDmlBuildResultError(fmt.Errorf("Unknown dml event type: %+v", dmlEvent.DML)))
1204+
return []*dmlBuildResult{newDmlBuildResultError(fmt.Errorf("Unknown dml event type: %+v", dmlEvent.DML))}
11691205
}
11701206

11711207
// ApplyDMLEventQueries applies multiple DML queries onto the _ghost_ table

Diff for: go/logic/applier_test.go

+7
Original file line numberDiff line numberDiff line change
@@ -101,6 +101,7 @@ func TestApplierBuildDMLEventQuery(t *testing.T) {
101101
columnValues := sql.ToColumnValues([]interface{}{123456, 42})
102102

103103
migrationContext := base.NewMigrationContext()
104+
migrationContext.DatabaseName = "test"
104105
migrationContext.OriginalTableName = "test"
105106
migrationContext.OriginalTableColumns = columns
106107
migrationContext.SharedColumns = columns
@@ -111,6 +112,7 @@ func TestApplierBuildDMLEventQuery(t *testing.T) {
111112
}
112113

113114
applier := NewApplier(migrationContext)
115+
applier.prepareQueries()
114116

115117
t.Run("delete", func(t *testing.T) {
116118
binlogEvent := &binlog.BinlogDMLEvent{
@@ -307,8 +309,13 @@ func (suite *ApplierTestSuite) TestApplyDMLEventQueries() {
307309
migrationContext.OriginalTableColumns = sql.NewColumnList([]string{"id", "item_id"})
308310
migrationContext.SharedColumns = sql.NewColumnList([]string{"id", "item_id"})
309311
migrationContext.MappedSharedColumns = sql.NewColumnList([]string{"id", "item_id"})
312+
migrationContext.UniqueKey = &sql.UniqueKey{
313+
Name: "primary_key",
314+
Columns: *sql.NewColumnList([]string{"id"}),
315+
}
310316

311317
applier := NewApplier(migrationContext)
318+
suite.Require().NoError(applier.prepareQueries())
312319
defer applier.Teardown()
313320

314321
err = applier.InitDBConnections()

Diff for: go/logic/migrator.go

+4
Original file line numberDiff line numberDiff line change
@@ -386,6 +386,10 @@ func (this *Migrator) Migrate() (err error) {
386386
if err := this.inspector.inspectOriginalAndGhostTables(); err != nil {
387387
return err
388388
}
389+
// We can prepare some of the queries on the applier
390+
if err := this.applier.prepareQueries(); err != nil {
391+
return err
392+
}
389393
// Validation complete! We're good to execute this migration
390394
if err := this.hooksExecutor.onValidated(); err != nil {
391395
return err

Diff for: go/sql/builder.go

+122-55
Original file line numberDiff line numberDiff line change
@@ -406,25 +406,29 @@ func buildUniqueKeyMinMaxValuesPreparedQuery(databaseName, tableName string, uni
406406
return query, nil
407407
}
408408

409-
func BuildDMLDeleteQuery(databaseName, tableName string, tableColumns, uniqueKeyColumns *ColumnList, args []interface{}) (result string, uniqueKeyArgs []interface{}, err error) {
410-
if len(args) != tableColumns.Len() {
411-
return result, uniqueKeyArgs, fmt.Errorf("args count differs from table column count in BuildDMLDeleteQuery")
412-
}
409+
// DMLDeleteQueryBuilder can build DELETE queries for DML events.
410+
// It holds the prepared query statement so it doesn't need to be recreated every time.
411+
type DMLDeleteQueryBuilder struct {
412+
tableColumns, uniqueKeyColumns *ColumnList
413+
preparedStatement string
414+
}
415+
416+
// NewDMLDeleteQueryBuilder creates a new DMLDeleteQueryBuilder.
417+
// It prepares the DELETE query statement.
418+
// Returns an error if no unique key columns are given
419+
// or the prepared statement cannot be built.
420+
func NewDMLDeleteQueryBuilder(databaseName, tableName string, tableColumns, uniqueKeyColumns *ColumnList) (*DMLDeleteQueryBuilder, error) {
413421
if uniqueKeyColumns.Len() == 0 {
414-
return result, uniqueKeyArgs, fmt.Errorf("No unique key columns found in BuildDMLDeleteQuery")
415-
}
416-
for _, column := range uniqueKeyColumns.Columns() {
417-
tableOrdinal := tableColumns.Ordinals[column.Name]
418-
arg := column.convertArg(args[tableOrdinal], true)
419-
uniqueKeyArgs = append(uniqueKeyArgs, arg)
422+
return nil, fmt.Errorf("no unique key columns found in NewDMLDeleteQueryBuilder")
420423
}
421424
databaseName = EscapeName(databaseName)
422425
tableName = EscapeName(tableName)
423426
equalsComparison, err := BuildEqualsPreparedComparison(uniqueKeyColumns.Names())
424427
if err != nil {
425-
return result, uniqueKeyArgs, err
428+
return nil, err
426429
}
427-
result = fmt.Sprintf(`
430+
431+
stmt := fmt.Sprintf(`
428432
delete /* gh-ost %s.%s */
429433
from
430434
%s.%s
@@ -434,35 +438,58 @@ func BuildDMLDeleteQuery(databaseName, tableName string, tableColumns, uniqueKey
434438
databaseName, tableName,
435439
equalsComparison,
436440
)
437-
return result, uniqueKeyArgs, nil
441+
442+
b := &DMLDeleteQueryBuilder{
443+
tableColumns: tableColumns,
444+
uniqueKeyColumns: uniqueKeyColumns,
445+
preparedStatement: stmt,
446+
}
447+
return b, nil
438448
}
439449

440-
func BuildDMLInsertQuery(databaseName, tableName string, tableColumns, sharedColumns, mappedSharedColumns *ColumnList, args []interface{}) (result string, sharedArgs []interface{}, err error) {
441-
if len(args) != tableColumns.Len() {
442-
return result, args, fmt.Errorf("args count differs from table column count in BuildDMLInsertQuery")
450+
// BuildQuery builds the arguments array for a DML event DELETE query.
451+
// It returns the query string and the unique key arguments array.
452+
// Returns an error if the number of arguments is not equal to the number of table columns.
453+
func (b *DMLDeleteQueryBuilder) BuildQuery(args []interface{}) (string, []interface{}, error) {
454+
if len(args) != b.tableColumns.Len() {
455+
return "", nil, fmt.Errorf("args count differs from table column count in BuildDMLDeleteQuery")
443456
}
457+
uniqueKeyArgs := make([]interface{}, 0, b.uniqueKeyColumns.Len())
458+
for _, column := range b.uniqueKeyColumns.Columns() {
459+
tableOrdinal := b.tableColumns.Ordinals[column.Name]
460+
arg := column.convertArg(args[tableOrdinal], true)
461+
uniqueKeyArgs = append(uniqueKeyArgs, arg)
462+
}
463+
return b.preparedStatement, uniqueKeyArgs, nil
464+
}
465+
466+
// DMLInsertQueryBuilder can build INSERT queries for DML events.
467+
// It holds the prepared query statement so it doesn't need to be recreated every time.
468+
type DMLInsertQueryBuilder struct {
469+
tableColumns, sharedColumns *ColumnList
470+
preparedStatement string
471+
}
472+
473+
// NewDMLInsertQueryBuilder creates a new DMLInsertQueryBuilder.
474+
// It prepares the INSERT query statement.
475+
// Returns an error if no shared columns are given, the shared columns are not a subset of the table columns,
476+
// or the prepared statement cannot be built.
477+
func NewDMLInsertQueryBuilder(databaseName, tableName string, tableColumns, sharedColumns, mappedSharedColumns *ColumnList) (*DMLInsertQueryBuilder, error) {
444478
if !sharedColumns.IsSubsetOf(tableColumns) {
445-
return result, args, fmt.Errorf("shared columns is not a subset of table columns in BuildDMLInsertQuery")
479+
return nil, fmt.Errorf("shared columns is not a subset of table columns in NewDMLInsertQueryBuilder")
446480
}
447481
if sharedColumns.Len() == 0 {
448-
return result, args, fmt.Errorf("No shared columns found in BuildDMLInsertQuery")
482+
return nil, fmt.Errorf("no shared columns found in NewDMLInsertQueryBuilder")
449483
}
450484
databaseName = EscapeName(databaseName)
451485
tableName = EscapeName(tableName)
452-
453-
for _, column := range sharedColumns.Columns() {
454-
tableOrdinal := tableColumns.Ordinals[column.Name]
455-
arg := column.convertArg(args[tableOrdinal], false)
456-
sharedArgs = append(sharedArgs, arg)
457-
}
458-
459486
mappedSharedColumnNames := duplicateNames(mappedSharedColumns.Names())
460487
for i := range mappedSharedColumnNames {
461488
mappedSharedColumnNames[i] = EscapeName(mappedSharedColumnNames[i])
462489
}
463490
preparedValues := buildColumnsPreparedValues(mappedSharedColumns)
464491

465-
result = fmt.Sprintf(`
492+
stmt := fmt.Sprintf(`
466493
replace /* gh-ost %s.%s */
467494
into
468495
%s.%s
@@ -474,53 +501,63 @@ func BuildDMLInsertQuery(databaseName, tableName string, tableColumns, sharedCol
474501
strings.Join(mappedSharedColumnNames, ", "),
475502
strings.Join(preparedValues, ", "),
476503
)
477-
return result, sharedArgs, nil
504+
505+
return &DMLInsertQueryBuilder{
506+
tableColumns: tableColumns,
507+
sharedColumns: sharedColumns,
508+
preparedStatement: stmt,
509+
}, nil
478510
}
479511

480-
func BuildDMLUpdateQuery(databaseName, tableName string, tableColumns, sharedColumns, mappedSharedColumns, uniqueKeyColumns *ColumnList, valueArgs, whereArgs []interface{}) (result string, sharedArgs, uniqueKeyArgs []interface{}, err error) {
481-
if len(valueArgs) != tableColumns.Len() {
482-
return result, sharedArgs, uniqueKeyArgs, fmt.Errorf("value args count differs from table column count in BuildDMLUpdateQuery")
512+
// BuildQuery builds the arguments array for a DML event INSERT query.
513+
// It returns the query string and the shared arguments array.
514+
// Returns an error if the number of arguments differs from the number of table columns.
515+
func (b *DMLInsertQueryBuilder) BuildQuery(args []interface{}) (string, []interface{}, error) {
516+
if len(args) != b.tableColumns.Len() {
517+
return "", nil, fmt.Errorf("args count differs from table column count in BuildDMLInsertQuery")
483518
}
484-
if len(whereArgs) != tableColumns.Len() {
485-
return result, sharedArgs, uniqueKeyArgs, fmt.Errorf("where args count differs from table column count in BuildDMLUpdateQuery")
519+
sharedArgs := make([]interface{}, 0, b.sharedColumns.Len())
520+
for _, column := range b.sharedColumns.Columns() {
521+
tableOrdinal := b.tableColumns.Ordinals[column.Name]
522+
arg := column.convertArg(args[tableOrdinal], false)
523+
sharedArgs = append(sharedArgs, arg)
486524
}
525+
return b.preparedStatement, sharedArgs, nil
526+
}
527+
528+
// DMLUpdateQueryBuilder can build UPDATE queries for DML events.
529+
// It holds the prepared query statement so it doesn't need to be recreated every time.
530+
type DMLUpdateQueryBuilder struct {
531+
tableColumns, sharedColumns, uniqueKeyColumns *ColumnList
532+
preparedStatement string
533+
}
534+
535+
// NewDMLUpdateQueryBuilder creates a new DMLUpdateQueryBuilder.
536+
// It prepares the UPDATE query statement.
537+
// Returns an error if no shared columns are given, the shared columns are not a subset of the table columns,
538+
// no unique key columns are given or the prepared statement cannot be built.
539+
func NewDMLUpdateQueryBuilder(databaseName, tableName string, tableColumns, sharedColumns, mappedSharedColumns, uniqueKeyColumns *ColumnList) (*DMLUpdateQueryBuilder, error) {
487540
if !sharedColumns.IsSubsetOf(tableColumns) {
488-
return result, sharedArgs, uniqueKeyArgs, fmt.Errorf("shared columns is not a subset of table columns in BuildDMLUpdateQuery")
489-
}
490-
if !uniqueKeyColumns.IsSubsetOf(sharedColumns) {
491-
return result, sharedArgs, uniqueKeyArgs, fmt.Errorf("unique key columns is not a subset of shared columns in BuildDMLUpdateQuery")
541+
return nil, fmt.Errorf("shared columns is not a subset of table columns in NewDMLUpdateQueryBuilder")
492542
}
493543
if sharedColumns.Len() == 0 {
494-
return result, sharedArgs, uniqueKeyArgs, fmt.Errorf("No shared columns found in BuildDMLUpdateQuery")
544+
return nil, fmt.Errorf("no shared columns found in NewDMLUpdateQueryBuilder")
495545
}
496546
if uniqueKeyColumns.Len() == 0 {
497-
return result, sharedArgs, uniqueKeyArgs, fmt.Errorf("No unique key columns found in BuildDMLUpdateQuery")
547+
return nil, fmt.Errorf("no unique key columns found in NewDMLUpdateQueryBuilder")
498548
}
499549
databaseName = EscapeName(databaseName)
500550
tableName = EscapeName(tableName)
501-
502-
for _, column := range sharedColumns.Columns() {
503-
tableOrdinal := tableColumns.Ordinals[column.Name]
504-
arg := column.convertArg(valueArgs[tableOrdinal], false)
505-
sharedArgs = append(sharedArgs, arg)
506-
}
507-
508-
for _, column := range uniqueKeyColumns.Columns() {
509-
tableOrdinal := tableColumns.Ordinals[column.Name]
510-
arg := column.convertArg(whereArgs[tableOrdinal], true)
511-
uniqueKeyArgs = append(uniqueKeyArgs, arg)
512-
}
513-
514551
setClause, err := BuildSetPreparedClause(mappedSharedColumns)
515552
if err != nil {
516-
return "", sharedArgs, uniqueKeyArgs, err
553+
return nil, err
517554
}
518555

519556
equalsComparison, err := BuildEqualsPreparedComparison(uniqueKeyColumns.Names())
520557
if err != nil {
521-
return "", sharedArgs, uniqueKeyArgs, err
558+
return nil, err
522559
}
523-
result = fmt.Sprintf(`
560+
stmt := fmt.Sprintf(`
524561
update /* gh-ost %s.%s */
525562
%s.%s
526563
set
@@ -532,5 +569,35 @@ func BuildDMLUpdateQuery(databaseName, tableName string, tableColumns, sharedCol
532569
setClause,
533570
equalsComparison,
534571
)
535-
return result, sharedArgs, uniqueKeyArgs, nil
572+
return &DMLUpdateQueryBuilder{
573+
tableColumns: tableColumns,
574+
sharedColumns: sharedColumns,
575+
uniqueKeyColumns: uniqueKeyColumns,
576+
preparedStatement: stmt,
577+
}, nil
578+
}
579+
580+
// BuildQuery builds the arguments array for a DML event UPDATE query.
581+
// It returns the query string, the shared arguments array, and the unique key arguments array.
582+
func (b *DMLUpdateQueryBuilder) BuildQuery(valueArgs, whereArgs []interface{}) (string, []interface{}, []interface{}, error) {
583+
// TODO: move this check back to `NewDMLUpdateQueryBuilder()`, needs fix on generated columns.
584+
if !b.uniqueKeyColumns.IsSubsetOf(b.sharedColumns) {
585+
return "", nil, nil, fmt.Errorf("unique key columns is not a subset of shared columns in DMLUpdateQueryBuilder")
586+
}
587+
588+
sharedArgs := make([]interface{}, 0, b.sharedColumns.Len())
589+
for _, column := range b.sharedColumns.Columns() {
590+
tableOrdinal := b.tableColumns.Ordinals[column.Name]
591+
arg := column.convertArg(valueArgs[tableOrdinal], false)
592+
sharedArgs = append(sharedArgs, arg)
593+
}
594+
595+
uniqueKeyArgs := make([]interface{}, 0, b.uniqueKeyColumns.Len())
596+
for _, column := range b.uniqueKeyColumns.Columns() {
597+
tableOrdinal := b.tableColumns.Ordinals[column.Name]
598+
arg := column.convertArg(whereArgs[tableOrdinal], true)
599+
uniqueKeyArgs = append(uniqueKeyArgs, arg)
600+
}
601+
602+
return b.preparedStatement, sharedArgs, uniqueKeyArgs, nil
536603
}

0 commit comments

Comments
 (0)