From 199e66d4ff1fd4e5d78c00e3e8ac9f98c7cf6f63 Mon Sep 17 00:00:00 2001 From: Yuecai Liu <38887641+luky116@users.noreply.github.com> Date: Sun, 28 May 2023 15:43:48 +0800 Subject: [PATCH] fix xa branch commit bug (#564) * fix xa branch commit bug --- goimports.sh | 2 +- pkg/client/client.go | 2 +- pkg/datasource/sql/conn_xa.go | 12 ++++----- pkg/datasource/sql/db.go | 10 ++++++- pkg/datasource/sql/driver.go | 1 + .../sql/exec/at/multi_delete_executor.go | 2 +- pkg/datasource/sql/xa/mysql_xa_connection.go | 27 +++++++++++++++++++ 7 files changed, 46 insertions(+), 10 deletions(-) diff --git a/goimports.sh b/goimports.sh index 383253eb8..5fb713be1 100755 --- a/goimports.sh +++ b/goimports.sh @@ -16,7 +16,7 @@ # # format go imports style -go install -v golang.org/x/tools/cmd/goimports +go install golang.org/x/tools/cmd/goimports goimports -local github.com/seata/seata-go -w . # format licence style diff --git a/pkg/client/client.go b/pkg/client/client.go index 2819a660f..c2ee1e30e 100644 --- a/pkg/client/client.go +++ b/pkg/client/client.go @@ -18,7 +18,6 @@ package client import ( - "github.com/seata/seata-go/pkg/remoting/getty" "sync" "github.com/seata/seata-go/pkg/datasource" @@ -26,6 +25,7 @@ import ( "github.com/seata/seata-go/pkg/datasource/sql/exec/config" "github.com/seata/seata-go/pkg/integration" remoteConfig "github.com/seata/seata-go/pkg/remoting/config" + "github.com/seata/seata-go/pkg/remoting/getty" "github.com/seata/seata-go/pkg/remoting/processor/client" "github.com/seata/seata-go/pkg/rm" "github.com/seata/seata-go/pkg/rm/tcc" diff --git a/pkg/datasource/sql/conn_xa.go b/pkg/datasource/sql/conn_xa.go index 82b2fb85d..7dce1e966 100644 --- a/pkg/datasource/sql/conn_xa.go +++ b/pkg/datasource/sql/conn_xa.go @@ -322,22 +322,22 @@ func (c *XAConn) Commit(ctx context.Context) error { now := time.Now() if c.end(ctx, xa.TMSuccess) != nil { - return c.commitErrorHandle() + return c.commitErrorHandle(ctx) } if c.checkTimeout(ctx, now) != nil { - return c.commitErrorHandle() + return c.commitErrorHandle(ctx) } if c.xaResource.XAPrepare(ctx, c.xaBranchXid.String()) != nil { - return c.commitErrorHandle() + return c.commitErrorHandle(ctx) } return nil } -func (c *XAConn) commitErrorHandle() error { +func (c *XAConn) commitErrorHandle(ctx context.Context) error { var err error - if err = c.tx.Rollback(); err != nil { + if err = c.XaRollback(ctx, c.xaBranchXid); err != nil { err = fmt.Errorf("failed to report XA branch commit-failure xid:%s, err:%w", c.txCtx.XID, err) } c.cleanXABranchContext() @@ -389,7 +389,7 @@ func (c *XAConn) XaRollbackByBranchId(ctx context.Context, xaXid XAXid) error { } func (c *XAConn) XaRollback(ctx context.Context, xaXid XAXid) error { - err := c.xaResource.Rollback(ctx, xaXid.GetGlobalXid()) + err := c.xaResource.Rollback(ctx, xaXid.String()) c.releaseIfNecessary() return err } diff --git a/pkg/datasource/sql/db.go b/pkg/datasource/sql/db.go index 027f8b10d..7056e6c5f 100644 --- a/pkg/datasource/sql/db.go +++ b/pkg/datasource/sql/db.go @@ -22,14 +22,15 @@ import ( "database/sql" "database/sql/driver" "fmt" - "github.com/seata/seata-go/pkg/util/log" "sync" "github.com/seata/seata-go/pkg/datasource/sql/datasource" "github.com/seata/seata-go/pkg/datasource/sql/types" "github.com/seata/seata-go/pkg/datasource/sql/undo" "github.com/seata/seata-go/pkg/datasource/sql/util" + "github.com/seata/seata-go/pkg/datasource/sql/xa" "github.com/seata/seata-go/pkg/protocol/branch" + "github.com/seata/seata-go/pkg/util/log" ) type dbOption func(db *DBResource) @@ -218,10 +219,17 @@ func (db *DBResource) ConnectionForXA(ctx context.Context, xaXid XAXid) (*XAConn if err != nil { return nil, fmt.Errorf("get xa new connection failure, xid:%s, err:%v", xaXid.String(), err) } + xaResource, err := xa.CreateXAResource(newDriverConn, types.DBTypeMySQL) + if err != nil { + return nil, fmt.Errorf("create xa resoruce err:%w", err) + } xaConn := &XAConn{ Conn: &Conn{ targetConn: newDriverConn, + res: db, }, + xaBranchXid: XaIdBuild(xaXid.GetGlobalXid(), xaXid.GetBranchId()), + xaResource: xaResource, } return xaConn, nil } diff --git a/pkg/datasource/sql/driver.go b/pkg/datasource/sql/driver.go index 1baf84f9a..165cead44 100644 --- a/pkg/datasource/sql/driver.go +++ b/pkg/datasource/sql/driver.go @@ -28,6 +28,7 @@ import ( "strings" "github.com/go-sql-driver/mysql" + "github.com/seata/seata-go/pkg/datasource/sql/datasource" mysql2 "github.com/seata/seata-go/pkg/datasource/sql/datasource/mysql" "github.com/seata/seata-go/pkg/datasource/sql/types" diff --git a/pkg/datasource/sql/exec/at/multi_delete_executor.go b/pkg/datasource/sql/exec/at/multi_delete_executor.go index 078a2a2dd..5e47eb630 100644 --- a/pkg/datasource/sql/exec/at/multi_delete_executor.go +++ b/pkg/datasource/sql/exec/at/multi_delete_executor.go @@ -70,7 +70,7 @@ type multiDelete struct { clear bool } -//NewMultiDeleteExecutor get multiDelete executor +// NewMultiDeleteExecutor get multiDelete executor func NewMultiDeleteExecutor(parserCtx *types.ParseContext, execContent *types.ExecContext, hooks []exec.SQLHook) *multiDeleteExecutor { return &multiDeleteExecutor{parserCtx: parserCtx, execContext: execContent, baseExecutor: baseExecutor{hooks: hooks}} } diff --git a/pkg/datasource/sql/xa/mysql_xa_connection.go b/pkg/datasource/sql/xa/mysql_xa_connection.go index c3b6de49e..c2366aeae 100644 --- a/pkg/datasource/sql/xa/mysql_xa_connection.go +++ b/pkg/datasource/sql/xa/mysql_xa_connection.go @@ -25,6 +25,8 @@ import ( "io" "strings" "time" + + "github.com/seata/seata-go/pkg/util/log" ) type MysqlXAConn struct { @@ -36,6 +38,8 @@ func NewMysqlXaConn(conn driver.Conn) *MysqlXAConn { } func (c *MysqlXAConn) Commit(ctx context.Context, xid string, onePhase bool) error { + log.Infof("xa branch commit, xid %s", xid) + var sb strings.Builder sb.WriteString("XA COMMIT ") sb.WriteString("'") @@ -47,10 +51,15 @@ func (c *MysqlXAConn) Commit(ctx context.Context, xid string, onePhase bool) err conn, _ := c.Conn.(driver.ExecerContext) _, err := conn.ExecContext(ctx, sb.String(), nil) + if err != nil { + log.Errorf("xa branch commit failed, xid %s, err %v", xid, err) + } return err } func (c *MysqlXAConn) End(ctx context.Context, xid string, flags int) error { + log.Infof("xa branch end, xid %s", xid) + var sb strings.Builder sb.WriteString("XA END ") sb.WriteString("'") @@ -71,6 +80,9 @@ func (c *MysqlXAConn) End(ctx context.Context, xid string, flags int) error { conn, _ := c.Conn.(driver.ExecerContext) _, err := conn.ExecContext(ctx, sb.String(), nil) + if err != nil { + log.Errorf("xa branch end failed, xid %s, err %v", xid, err) + } return err } @@ -91,6 +103,8 @@ func (c *MysqlXAConn) IsSameRM(ctx context.Context, xares XAResource) bool { } func (c *MysqlXAConn) XAPrepare(ctx context.Context, xid string) error { + log.Infof("xa branch prepare, xid %s", xid) + var sb strings.Builder sb.WriteString("XA PREPARE ") sb.WriteString("'") @@ -99,6 +113,9 @@ func (c *MysqlXAConn) XAPrepare(ctx context.Context, xid string) error { conn, _ := c.Conn.(driver.ExecerContext) _, err := conn.ExecContext(ctx, sb.String(), nil) + if err != nil { + log.Errorf("xa branch prepare failed, xid %s, err %v", xid, err) + } return err } @@ -143,6 +160,8 @@ func (c *MysqlXAConn) Recover(ctx context.Context, flag int) (xids []string, err } func (c *MysqlXAConn) Rollback(ctx context.Context, xid string) error { + log.Infof("xa branch rollback, xid %s", xid) + var sb strings.Builder sb.WriteString("XA ROLLBACK ") sb.WriteString("'") @@ -151,6 +170,9 @@ func (c *MysqlXAConn) Rollback(ctx context.Context, xid string) error { conn, _ := c.Conn.(driver.ExecerContext) _, err := conn.ExecContext(ctx, sb.String(), nil) + if err != nil { + log.Errorf("xa branch rollback failed, xid %s, err %v", xid, err) + } return err } @@ -159,6 +181,8 @@ func (c *MysqlXAConn) SetTransactionTimeout(duration time.Duration) bool { } func (c *MysqlXAConn) Start(ctx context.Context, xid string, flags int) error { + log.Infof("xa branch start, xid %s", xid) + var sb strings.Builder sb.WriteString("XA START ") sb.WriteString("'") @@ -180,5 +204,8 @@ func (c *MysqlXAConn) Start(ctx context.Context, xid string, flags int) error { conn, _ := c.Conn.(driver.ExecerContext) _, err := conn.ExecContext(ctx, sb.String(), nil) + if err != nil { + log.Errorf("xa branch start failed, xid %s, err %v", xid, err) + } return err }