-
Notifications
You must be signed in to change notification settings - Fork 267
/
mysql_delete_undo_log_builder.go
122 lines (100 loc) · 3.64 KB
/
mysql_delete_undo_log_builder.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package builder
import (
"context"
"database/sql/driver"
"fmt"
"github.com/seata/seata-go/pkg/datasource/sql/undo"
"github.com/seata/seata-go/pkg/datasource/sql/parser"
"github.com/arana-db/parser/ast"
"github.com/arana-db/parser/format"
"github.com/seata/seata-go/pkg/datasource/sql/types"
"github.com/seata/seata-go/pkg/util/bytes"
"github.com/seata/seata-go/pkg/util/log"
)
type MySQLDeleteUndoLogBuilder struct {
BasicUndoLogBuilder
}
func GetMySQLDeleteUndoLogBuilder() undo.UndoLogBuilder {
return &MySQLDeleteUndoLogBuilder{}
}
func (u *MySQLDeleteUndoLogBuilder) BeforeImage(ctx context.Context, execCtx *types.ExecContext) ([]*types.RecordImage, error) {
vals := execCtx.Values
if vals == nil {
for n, param := range execCtx.NamedValues {
vals[n] = param.Value
}
}
selectSQL, selectArgs, err := u.buildBeforeImageSQL(execCtx.Query, vals)
if err != nil {
return nil, err
}
stmt, err := execCtx.Conn.Prepare(selectSQL)
if err != nil {
log.Errorf("build prepare stmt: %+v", err)
return nil, err
}
rows, err := stmt.Query(selectArgs)
if err != nil {
log.Errorf("stmt query: %+v", err)
return nil, err
}
tableName := execCtx.ParseContext.DeleteStmt.TableRefs.TableRefs.Left.(*ast.TableSource).Source.(*ast.TableName).Name.O
metaData := execCtx.MetaDataMap[tableName]
image, err := u.buildRecordImages(rows, &metaData)
if err != nil {
return nil, err
}
lockKey := u.buildLockKey(rows, metaData)
execCtx.TxCtx.LockKeys[lockKey] = struct{}{}
return []*types.RecordImage{image}, nil
}
func (u *MySQLDeleteUndoLogBuilder) AfterImage(ctx context.Context, execCtx *types.ExecContext, beforImages []*types.RecordImage) ([]*types.RecordImage, error) {
return nil, nil
}
// buildBeforeImageSQL build delete sql from delete sql
func (u *MySQLDeleteUndoLogBuilder) buildBeforeImageSQL(query string, args []driver.Value) (string, []driver.Value, error) {
p, err := parser.DoParser(query)
if err != nil {
return "", nil, err
}
if p.DeleteStmt == nil {
log.Errorf("invalid delete stmt")
return "", nil, fmt.Errorf("invalid delete stmt")
}
selStmt := ast.SelectStmt{
SelectStmtOpts: &ast.SelectStmtOpts{},
From: p.DeleteStmt.TableRefs,
Where: p.DeleteStmt.Where,
Fields: &ast.FieldList{Fields: []*ast.SelectField{{WildCard: &ast.WildCardField{}}}},
OrderBy: p.DeleteStmt.Order,
Limit: p.DeleteStmt.Limit,
TableHints: p.DeleteStmt.TableHints,
LockInfo: &ast.SelectLockInfo{
LockType: ast.SelectLockForUpdate,
},
}
b := bytes.NewByteBuffer([]byte{})
_ = selStmt.Restore(format.NewRestoreCtx(format.RestoreKeyWordUppercase, b))
sql := string(b.Bytes())
log.Infof("build select sql by delete sourceQuery, sql {%s}", sql)
return sql, u.buildSelectArgs(&selStmt, args), nil
}
func (u *MySQLDeleteUndoLogBuilder) GetExecutorType() types.ExecutorType {
return types.DeleteExecutor
}