Skip to content

Commit

Permalink
fix xa branch commit bug (apache#564)
Browse files Browse the repository at this point in the history
* fix xa branch commit bug
  • Loading branch information
luky116 authored and georgehao committed May 28, 2023
1 parent 848af5f commit 199e66d
Show file tree
Hide file tree
Showing 7 changed files with 46 additions and 10 deletions.
2 changes: 1 addition & 1 deletion goimports.sh
Expand Up @@ -16,7 +16,7 @@
#

# format go imports style
go install -v golang.org/x/tools/cmd/goimports
go install golang.org/x/tools/cmd/goimports
goimports -local github.com/seata/seata-go -w .

# format licence style
Expand Down
2 changes: 1 addition & 1 deletion pkg/client/client.go
Expand Up @@ -18,14 +18,14 @@
package client

import (
"github.com/seata/seata-go/pkg/remoting/getty"
"sync"

"github.com/seata/seata-go/pkg/datasource"
at "github.com/seata/seata-go/pkg/datasource/sql"
"github.com/seata/seata-go/pkg/datasource/sql/exec/config"
"github.com/seata/seata-go/pkg/integration"
remoteConfig "github.com/seata/seata-go/pkg/remoting/config"
"github.com/seata/seata-go/pkg/remoting/getty"
"github.com/seata/seata-go/pkg/remoting/processor/client"
"github.com/seata/seata-go/pkg/rm"
"github.com/seata/seata-go/pkg/rm/tcc"
Expand Down
12 changes: 6 additions & 6 deletions pkg/datasource/sql/conn_xa.go
Expand Up @@ -322,22 +322,22 @@ func (c *XAConn) Commit(ctx context.Context) error {

now := time.Now()
if c.end(ctx, xa.TMSuccess) != nil {
return c.commitErrorHandle()
return c.commitErrorHandle(ctx)
}

if c.checkTimeout(ctx, now) != nil {
return c.commitErrorHandle()
return c.commitErrorHandle(ctx)
}

if c.xaResource.XAPrepare(ctx, c.xaBranchXid.String()) != nil {
return c.commitErrorHandle()
return c.commitErrorHandle(ctx)
}
return nil
}

func (c *XAConn) commitErrorHandle() error {
func (c *XAConn) commitErrorHandle(ctx context.Context) error {
var err error
if err = c.tx.Rollback(); err != nil {
if err = c.XaRollback(ctx, c.xaBranchXid); err != nil {
err = fmt.Errorf("failed to report XA branch commit-failure xid:%s, err:%w", c.txCtx.XID, err)
}
c.cleanXABranchContext()
Expand Down Expand Up @@ -389,7 +389,7 @@ func (c *XAConn) XaRollbackByBranchId(ctx context.Context, xaXid XAXid) error {
}

func (c *XAConn) XaRollback(ctx context.Context, xaXid XAXid) error {
err := c.xaResource.Rollback(ctx, xaXid.GetGlobalXid())
err := c.xaResource.Rollback(ctx, xaXid.String())
c.releaseIfNecessary()
return err
}
10 changes: 9 additions & 1 deletion pkg/datasource/sql/db.go
Expand Up @@ -22,14 +22,15 @@ import (
"database/sql"
"database/sql/driver"
"fmt"
"github.com/seata/seata-go/pkg/util/log"
"sync"

"github.com/seata/seata-go/pkg/datasource/sql/datasource"
"github.com/seata/seata-go/pkg/datasource/sql/types"
"github.com/seata/seata-go/pkg/datasource/sql/undo"
"github.com/seata/seata-go/pkg/datasource/sql/util"
"github.com/seata/seata-go/pkg/datasource/sql/xa"
"github.com/seata/seata-go/pkg/protocol/branch"
"github.com/seata/seata-go/pkg/util/log"
)

type dbOption func(db *DBResource)
Expand Down Expand Up @@ -218,10 +219,17 @@ func (db *DBResource) ConnectionForXA(ctx context.Context, xaXid XAXid) (*XAConn
if err != nil {
return nil, fmt.Errorf("get xa new connection failure, xid:%s, err:%v", xaXid.String(), err)
}
xaResource, err := xa.CreateXAResource(newDriverConn, types.DBTypeMySQL)
if err != nil {
return nil, fmt.Errorf("create xa resoruce err:%w", err)
}
xaConn := &XAConn{
Conn: &Conn{
targetConn: newDriverConn,
res: db,
},
xaBranchXid: XaIdBuild(xaXid.GetGlobalXid(), xaXid.GetBranchId()),
xaResource: xaResource,
}
return xaConn, nil
}
Expand Down
1 change: 1 addition & 0 deletions pkg/datasource/sql/driver.go
Expand Up @@ -28,6 +28,7 @@ import (
"strings"

"github.com/go-sql-driver/mysql"

"github.com/seata/seata-go/pkg/datasource/sql/datasource"
mysql2 "github.com/seata/seata-go/pkg/datasource/sql/datasource/mysql"
"github.com/seata/seata-go/pkg/datasource/sql/types"
Expand Down
2 changes: 1 addition & 1 deletion pkg/datasource/sql/exec/at/multi_delete_executor.go
Expand Up @@ -70,7 +70,7 @@ type multiDelete struct {
clear bool
}

//NewMultiDeleteExecutor get multiDelete executor
// NewMultiDeleteExecutor get multiDelete executor
func NewMultiDeleteExecutor(parserCtx *types.ParseContext, execContent *types.ExecContext, hooks []exec.SQLHook) *multiDeleteExecutor {
return &multiDeleteExecutor{parserCtx: parserCtx, execContext: execContent, baseExecutor: baseExecutor{hooks: hooks}}
}
Expand Down
27 changes: 27 additions & 0 deletions pkg/datasource/sql/xa/mysql_xa_connection.go
Expand Up @@ -25,6 +25,8 @@ import (
"io"
"strings"
"time"

"github.com/seata/seata-go/pkg/util/log"
)

type MysqlXAConn struct {
Expand All @@ -36,6 +38,8 @@ func NewMysqlXaConn(conn driver.Conn) *MysqlXAConn {
}

func (c *MysqlXAConn) Commit(ctx context.Context, xid string, onePhase bool) error {
log.Infof("xa branch commit, xid %s", xid)

var sb strings.Builder
sb.WriteString("XA COMMIT ")
sb.WriteString("'")
Expand All @@ -47,10 +51,15 @@ func (c *MysqlXAConn) Commit(ctx context.Context, xid string, onePhase bool) err

conn, _ := c.Conn.(driver.ExecerContext)
_, err := conn.ExecContext(ctx, sb.String(), nil)
if err != nil {
log.Errorf("xa branch commit failed, xid %s, err %v", xid, err)
}
return err
}

func (c *MysqlXAConn) End(ctx context.Context, xid string, flags int) error {
log.Infof("xa branch end, xid %s", xid)

var sb strings.Builder
sb.WriteString("XA END ")
sb.WriteString("'")
Expand All @@ -71,6 +80,9 @@ func (c *MysqlXAConn) End(ctx context.Context, xid string, flags int) error {

conn, _ := c.Conn.(driver.ExecerContext)
_, err := conn.ExecContext(ctx, sb.String(), nil)
if err != nil {
log.Errorf("xa branch end failed, xid %s, err %v", xid, err)
}
return err
}

Expand All @@ -91,6 +103,8 @@ func (c *MysqlXAConn) IsSameRM(ctx context.Context, xares XAResource) bool {
}

func (c *MysqlXAConn) XAPrepare(ctx context.Context, xid string) error {
log.Infof("xa branch prepare, xid %s", xid)

var sb strings.Builder
sb.WriteString("XA PREPARE ")
sb.WriteString("'")
Expand All @@ -99,6 +113,9 @@ func (c *MysqlXAConn) XAPrepare(ctx context.Context, xid string) error {

conn, _ := c.Conn.(driver.ExecerContext)
_, err := conn.ExecContext(ctx, sb.String(), nil)
if err != nil {
log.Errorf("xa branch prepare failed, xid %s, err %v", xid, err)
}
return err
}

Expand Down Expand Up @@ -143,6 +160,8 @@ func (c *MysqlXAConn) Recover(ctx context.Context, flag int) (xids []string, err
}

func (c *MysqlXAConn) Rollback(ctx context.Context, xid string) error {
log.Infof("xa branch rollback, xid %s", xid)

var sb strings.Builder
sb.WriteString("XA ROLLBACK ")
sb.WriteString("'")
Expand All @@ -151,6 +170,9 @@ func (c *MysqlXAConn) Rollback(ctx context.Context, xid string) error {

conn, _ := c.Conn.(driver.ExecerContext)
_, err := conn.ExecContext(ctx, sb.String(), nil)
if err != nil {
log.Errorf("xa branch rollback failed, xid %s, err %v", xid, err)
}
return err
}

Expand All @@ -159,6 +181,8 @@ func (c *MysqlXAConn) SetTransactionTimeout(duration time.Duration) bool {
}

func (c *MysqlXAConn) Start(ctx context.Context, xid string, flags int) error {
log.Infof("xa branch start, xid %s", xid)

var sb strings.Builder
sb.WriteString("XA START ")
sb.WriteString("'")
Expand All @@ -180,5 +204,8 @@ func (c *MysqlXAConn) Start(ctx context.Context, xid string, flags int) error {

conn, _ := c.Conn.(driver.ExecerContext)
_, err := conn.ExecContext(ctx, sb.String(), nil)
if err != nil {
log.Errorf("xa branch start failed, xid %s, err %v", xid, err)
}
return err
}

0 comments on commit 199e66d

Please sign in to comment.