@@ -1234,46 +1234,68 @@ func (this *Applier) ApplyDMLEventQueries(dmlEvents [](*binlog.BinlogDMLEvent))
1234
1234
if _ , err := tx .Exec (sessionQuery ); err != nil {
1235
1235
return rollback (err )
1236
1236
}
1237
- rowDeltas := make ([]int64 , 0 , len (dmlEvents ))
1238
- multiArgs := []driver.NamedValue {}
1239
- var multiQueryBuilder strings.Builder
1237
+
1238
+ buildResults := make ([]* dmlBuildResult , 0 , len (dmlEvents ))
1240
1239
for _ , dmlEvent := range dmlEvents {
1241
1240
for _ , buildResult := range this .buildDMLEventQuery (dmlEvent ) {
1242
1241
if buildResult .err != nil {
1243
1242
return rollback (buildResult .err )
1244
1243
}
1245
- for _ , arg := range buildResult .args {
1246
- multiArgs = append (multiArgs , driver.NamedValue {Value : driver .Value (arg )})
1247
- }
1248
- rowDeltas = append (rowDeltas , buildResult .rowsDelta )
1249
- multiQueryBuilder .WriteString (buildResult .query )
1250
- multiQueryBuilder .WriteString (";\n " )
1244
+
1245
+ buildResults = append (buildResults , buildResult )
1251
1246
}
1252
1247
}
1253
1248
1254
- //this.migrationContext.Log.Infof("Executing query: %s, args: %+v", multiQueryBuilder.String(), multiArgs)
1255
1249
execErr := conn .Raw (func (driverConn any ) error {
1256
1250
ex , ok := driverConn .(driver.ExecerContext )
1257
1251
if ! ok {
1258
1252
return fmt .Errorf ("could not cast driverConn to ExecerContext" )
1259
1253
}
1254
+
1255
+ nvc , ok := driverConn .(driver.NamedValueChecker )
1256
+ if ! ok {
1257
+ return fmt .Errorf ("could not cast driverConn to NamedValueChecker" )
1258
+ }
1259
+
1260
+ var multiArgs []driver.NamedValue
1261
+ multiQueryBuilder := strings.Builder {}
1262
+ var rowDeltas []int64
1263
+
1264
+ for _ , buildResult := range buildResults {
1265
+ for _ , arg := range buildResult .args {
1266
+ nv := driver.NamedValue {Value : driver .Value (arg )}
1267
+ nvc .CheckNamedValue (& nv )
1268
+ multiArgs = append (multiArgs , nv )
1269
+ }
1270
+
1271
+ multiQueryBuilder .WriteString (buildResult .query )
1272
+ multiQueryBuilder .WriteString (";\n " )
1273
+
1274
+ rowDeltas = append (rowDeltas , buildResult .rowsDelta )
1275
+ }
1276
+
1277
+ // this.migrationContext.Log.Infof("Executing query: %s, args: %+v", multiQueryBuilder.String(), multiArgs)
1260
1278
res , err := ex .ExecContext (ctx , multiQueryBuilder .String (), multiArgs )
1261
1279
if err != nil {
1262
1280
err = fmt .Errorf ("%w; query=%s; args=%+v" , err , multiQueryBuilder .String (), multiArgs )
1263
1281
this .migrationContext .Log .Errorf ("Error exec: %+v" , err )
1264
1282
return err
1265
1283
}
1284
+
1266
1285
mysqlRes , ok := res .(drivermysql.Result )
1267
1286
if ! ok {
1268
1287
return fmt .Errorf ("Could not cast %+v to mysql.Result" , res )
1269
1288
}
1289
+
1270
1290
// each DML is either a single insert (delta +1), update (delta +0) or delete (delta -1).
1271
1291
// multiplying by the rows actually affected (either 0 or 1) will give an accurate row delta for this DML event
1272
1292
for i , rowsAffected := range mysqlRes .AllRowsAffected () {
1273
1293
totalDelta += rowDeltas [i ] * rowsAffected
1274
1294
}
1295
+
1275
1296
return nil
1276
1297
})
1298
+
1277
1299
if execErr != nil {
1278
1300
return rollback (execErr )
1279
1301
}
0 commit comments