Skip to content

Commit a7e4601

Browse files
committed
use multiStatements to apply DML
1 parent a834c00 commit a7e4601

File tree

2 files changed

+23
-15
lines changed

2 files changed

+23
-15
lines changed

Diff for: go/logic/applier.go

+22-15
Original file line numberDiff line numberDiff line change
@@ -1225,27 +1225,34 @@ func (this *Applier) ApplyDMLEventQueries(dmlEvents [](*binlog.BinlogDMLEvent))
12251225
if _, err := tx.Exec(sessionQuery); err != nil {
12261226
return rollback(err)
12271227
}
1228+
multiArgs := []interface{}{}
1229+
var multiQueryBuilder strings.Builder
12281230
for _, dmlEvent := range dmlEvents {
12291231
for _, buildResult := range this.buildDMLEventQuery(dmlEvent) {
12301232
if buildResult.err != nil {
1231-
return rollback(buildResult.err)
1233+
return buildResult.err
12321234
}
1233-
result, err := tx.Exec(buildResult.query, buildResult.args...)
1234-
if err != nil {
1235-
err = fmt.Errorf("%w; query=%s; args=%+v", err, buildResult.query, buildResult.args)
1236-
return rollback(err)
1237-
}
1238-
1239-
rowsAffected, err := result.RowsAffected()
1240-
if err != nil {
1241-
log.Warningf("error getting rows affected from DML event query: %s. i'm going to assume that the DML affected a single row, but this may result in inaccurate statistics", err)
1242-
rowsAffected = 1
1243-
}
1244-
// each DML is either a single insert (delta +1), update (delta +0) or delete (delta -1).
1245-
// multiplying by the rows actually affected (either 0 or 1) will give an accurate row delta for this DML event
1246-
totalDelta += buildResult.rowsDelta * rowsAffected
1235+
multiArgs = append(multiArgs, buildResult.args...)
1236+
multiQueryBuilder.WriteString(buildResult.query)
1237+
multiQueryBuilder.WriteString(";\n")
12471238
}
12481239
}
1240+
// TODO: get rows affected from each query in multi statement
1241+
log.Warningf("error getting rows affected from DML event query: %s. i'm going to assume that the DML affected a single row, but this may result in inaccurate statistics", err)
1242+
_, err = tx.Exec(multiQueryBuilder.String(), multiArgs...)
1243+
if err != nil {
1244+
err = fmt.Errorf("%w; query=%s; args=%+v", err, multiQueryBuilder.String(), multiArgs)
1245+
return rollback(err)
1246+
}
1247+
// rowsAffected, err := result.RowsAffected()
1248+
// if err != nil {
1249+
// log.Warningf("error getting rows affected from DML event query: %s. i'm going to assume that the DML affected a single row, but this may result in inaccurate statistics", err)
1250+
// rowsAffected = 1
1251+
// }
1252+
// each DML is either a single insert (delta +1), update (delta +0) or delete (delta -1).
1253+
// multiplying by the rows actually affected (either 0 or 1) will give an accurate row delta for this DML event
1254+
// totalDelta += buildResult.rowsDelta * rowsAffected
1255+
12491256
if err := tx.Commit(); err != nil {
12501257
return err
12511258
}

Diff for: go/mysql/connection.go

+1
Original file line numberDiff line numberDiff line change
@@ -132,6 +132,7 @@ func (this *ConnectionConfig) GetDBUri(databaseName string) string {
132132
connectionParams := []string{
133133
"autocommit=true",
134134
"interpolateParams=true",
135+
"multiStatements=true",
135136
fmt.Sprintf("charset=%s", this.Charset),
136137
fmt.Sprintf("tls=%s", tlsOption),
137138
fmt.Sprintf("transaction_isolation=%q", this.TransactionIsolation),

0 commit comments

Comments
 (0)