@@ -80,7 +80,8 @@ func NewApplier(migrationContext *base.MigrationContext) *Applier {
80
80
81
81
func (this * Applier ) InitDBConnections () (err error ) {
82
82
applierUri := this .connectionConfig .GetDBUri (this .migrationContext .DatabaseName )
83
- if this .db , _ , err = mysql .GetDB (this .migrationContext .Uuid , applierUri ); err != nil {
83
+ uriWithMulti := fmt .Sprintf ("%s&multiStatements=true" , applierUri )
84
+ if this .db , _ , err = mysql .GetDB (this .migrationContext .Uuid , uriWithMulti ); err != nil {
84
85
return err
85
86
}
86
87
singletonApplierUri := fmt .Sprintf ("%s&timeout=0" , applierUri )
@@ -1210,7 +1211,7 @@ func (this *Applier) buildDMLEventQuery(dmlEvent *binlog.BinlogDMLEvent) []*dmlB
1210
1211
// ApplyDMLEventQueries applies multiple DML queries onto the _ghost_ table
1211
1212
func (this * Applier ) ApplyDMLEventQueries (dmlEvents [](* binlog.BinlogDMLEvent )) error {
1212
1213
var totalDelta int64
1213
- ctx := context .TODO ()
1214
+ ctx := context .Background ()
1214
1215
1215
1216
err := func () error {
1216
1217
conn , err := this .db .Conn (ctx )
@@ -1236,31 +1237,23 @@ func (this *Applier) ApplyDMLEventQueries(dmlEvents [](*binlog.BinlogDMLEvent))
1236
1237
}
1237
1238
1238
1239
buildResults := make ([]* dmlBuildResult , 0 , len (dmlEvents ))
1240
+ nArgs := 0
1239
1241
for _ , dmlEvent := range dmlEvents {
1240
1242
for _ , buildResult := range this .buildDMLEventQuery (dmlEvent ) {
1241
1243
if buildResult .err != nil {
1242
1244
return rollback (buildResult .err )
1243
1245
}
1244
-
1246
+ nArgs += len ( buildResult . args )
1245
1247
buildResults = append (buildResults , buildResult )
1246
1248
}
1247
1249
}
1248
1250
1249
1251
execErr := conn .Raw (func (driverConn any ) error {
1250
- ex , ok := driverConn .(driver.ExecerContext )
1251
- if ! ok {
1252
- return fmt .Errorf ("could not cast driverConn to ExecerContext" )
1253
- }
1254
-
1255
- nvc , ok := driverConn .(driver.NamedValueChecker )
1256
- if ! ok {
1257
- return fmt .Errorf ("could not cast driverConn to NamedValueChecker" )
1258
- }
1252
+ ex := driverConn .(driver.ExecerContext )
1253
+ nvc := driverConn .(driver.NamedValueChecker )
1259
1254
1260
- var multiArgs []driver.NamedValue
1255
+ multiArgs := make ( []driver.NamedValue , 0 , nArgs )
1261
1256
multiQueryBuilder := strings.Builder {}
1262
- var rowDeltas []int64
1263
-
1264
1257
for _ , buildResult := range buildResults {
1265
1258
for _ , arg := range buildResult .args {
1266
1259
nv := driver.NamedValue {Value : driver .Value (arg )}
@@ -1270,29 +1263,21 @@ func (this *Applier) ApplyDMLEventQueries(dmlEvents [](*binlog.BinlogDMLEvent))
1270
1263
1271
1264
multiQueryBuilder .WriteString (buildResult .query )
1272
1265
multiQueryBuilder .WriteString (";\n " )
1273
-
1274
- rowDeltas = append (rowDeltas , buildResult .rowsDelta )
1275
1266
}
1276
1267
1277
- // this.migrationContext.Log.Infof("Executing query: %s, args: %+v", multiQueryBuilder.String(), multiArgs)
1278
1268
res , err := ex .ExecContext (ctx , multiQueryBuilder .String (), multiArgs )
1279
1269
if err != nil {
1280
1270
err = fmt .Errorf ("%w; query=%s; args=%+v" , err , multiQueryBuilder .String (), multiArgs )
1281
- this .migrationContext .Log .Errorf ("Error exec: %+v" , err )
1282
1271
return err
1283
1272
}
1284
1273
1285
- mysqlRes , ok := res .(drivermysql.Result )
1286
- if ! ok {
1287
- return fmt .Errorf ("Could not cast %+v to mysql.Result" , res )
1288
- }
1274
+ mysqlRes := res .(drivermysql.Result )
1289
1275
1290
1276
// each DML is either a single insert (delta +1), update (delta +0) or delete (delta -1).
1291
1277
// multiplying by the rows actually affected (either 0 or 1) will give an accurate row delta for this DML event
1292
1278
for i , rowsAffected := range mysqlRes .AllRowsAffected () {
1293
- totalDelta += rowDeltas [i ] * rowsAffected
1279
+ totalDelta += buildResults [i ]. rowsDelta * rowsAffected
1294
1280
}
1295
-
1296
1281
return nil
1297
1282
})
1298
1283
0 commit comments