Skip to content

Commit

Permalink
fix: support release savepoint (#251)
Browse files Browse the repository at this point in the history
  • Loading branch information
dk-lockdown committed Aug 18, 2022
1 parent 4de4eb3 commit 63b19cb
Show file tree
Hide file tree
Showing 7 changed files with 70 additions and 11 deletions.
17 changes: 15 additions & 2 deletions pkg/executor/read_write_splitting.go
Original file line number Diff line number Diff line change
Expand Up @@ -212,13 +212,26 @@ func (executor *ReadWriteSplittingExecutor) ExecutorComQuery(
if !ok {
return nil, 0, errors.New("there is no transaction")
}
defer executor.localTransactionMap.Delete(connectionID)
if stmt.SavepointName == "" {
defer executor.localTransactionMap.Delete(connectionID)
}
tx = txi.(proto.Tx)
// TODO add metrics
if result, err = tx.Rollback(spanCtx, stmt); err != nil {
return nil, 0, err
}
return result, 0, err
case *ast.ReleaseSavepointStmt:
txi, ok := executor.localTransactionMap.Load(connectionID)
if !ok {
return nil, 0, errors.New("there is no transaction")
}
defer executor.localTransactionMap.Delete(connectionID)
tx = txi.(proto.Tx)
if result, err = tx.ReleaseSavepoint(spanCtx, stmt.Name); err != nil {
return nil, 0, err
}
return result, 0, err
case *ast.InsertStmt, *ast.DeleteStmt, *ast.UpdateStmt:
txi, ok := executor.localTransactionMap.Load(connectionID)
if ok {
Expand Down Expand Up @@ -334,5 +347,5 @@ func (executor *ReadWriteSplittingExecutor) doPostFilter(ctx context.Context, re
return err
}
}
return nil
return err
}
2 changes: 1 addition & 1 deletion pkg/executor/sharding.go
Original file line number Diff line number Diff line change
Expand Up @@ -291,5 +291,5 @@ func (executor *ShardingExecutor) doPostFilter(ctx context.Context, result proto
return err
}
}
return nil
return err
}
17 changes: 15 additions & 2 deletions pkg/executor/single_db.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,13 +207,26 @@ func (executor *SingleDBExecutor) ExecutorComQuery(
if !ok {
return nil, 0, errors.New("there is no transaction")
}
defer executor.localTransactionMap.Delete(connectionID)
if stmt.SavepointName == "" {
defer executor.localTransactionMap.Delete(connectionID)
}
tx = txi.(proto.Tx)
// TODO add metrics
if result, err = tx.Rollback(spanCtx, stmt); err != nil {
return nil, 0, err
}
return result, 0, err
case *ast.ReleaseSavepointStmt:
txi, ok := executor.localTransactionMap.Load(connectionID)
if !ok {
return nil, 0, errors.New("there is no transaction")
}
defer executor.localTransactionMap.Delete(connectionID)
tx = txi.(proto.Tx)
if result, err = tx.ReleaseSavepoint(spanCtx, stmt.Name); err != nil {
return nil, 0, err
}
return result, 0, err
default:
txi, ok := executor.localTransactionMap.Load(connectionID)
if ok {
Expand Down Expand Up @@ -285,5 +298,5 @@ func (executor *SingleDBExecutor) doPostFilter(ctx context.Context, result proto
return err
}
}
return nil
return err
}
1 change: 1 addition & 0 deletions pkg/proto/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,7 @@ type (
ExecuteSqlDirectly(sql string, args ...interface{}) (Result, uint16, error)
Commit(ctx context.Context) (Result, error)
Rollback(ctx context.Context, stmt *ast.RollbackStmt) (Result, error)
ReleaseSavepoint(ctx context.Context, savepoint string) (result Result, err error)
}

DBManager interface {
Expand Down
18 changes: 17 additions & 1 deletion pkg/sql/tx.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,9 +165,25 @@ func (tx *Tx) Rollback(ctx context.Context, stmt *ast.RollbackStmt) (result prot
result, err = tx.conn.Execute(ctx, fmt.Sprintf("ROLLBACK TO %s", stmt.SavepointName), false)
} else {
result, err = tx.conn.Execute(ctx, "ROLLBACK", false)
tx.db.pool.Put(tx.conn)
tx.Close()
}
return
}

func (tx *Tx) ReleaseSavepoint(ctx context.Context, savepoint string) (result proto.Result, err error) {
_, span := tracing.GetTraceSpan(ctx, tracing.TxReleaseSavePoint)
span.SetAttributes(attribute.KeyValue{Key: "db", Value: attribute.StringValue(tx.db.name)})
defer span.End()

if tx.closed.Load() {
return nil, nil
}
if tx.db == nil || tx.db.IsClosed() {
return nil, err2.ErrInvalidConn
}
result, err = tx.conn.Execute(ctx, fmt.Sprintf("RELEASE SAVEPOINT %s", savepoint), false)
tx.db.pool.Put(tx.conn)
tx.Close()
return
}

Expand Down
11 changes: 6 additions & 5 deletions pkg/tracing/constant.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,11 +68,12 @@ const (
DBLocalTransactionBegin = "db_local_transaction_begin"

// tx
TxQuery = "tx_query"
TxExecSQL = "tx_exec_sql"
TxExecStmt = "tx_exec_stmt"
TxCommit = "db_local_transaction_commit"
TxRollback = "db_local_transaction_rollback"
TxQuery = "tx_query"
TxExecSQL = "tx_exec_sql"
TxExecStmt = "tx_exec_stmt"
TxCommit = "db_local_transaction_commit"
TxRollback = "db_local_transaction_rollback"
TxReleaseSavePoint = "db_local_transaction_release_savepoint"

// conn
ConnQuery = "conn_com_query"
Expand Down
15 changes: 15 additions & 0 deletions testdata/mock_tx.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

0 comments on commit 63b19cb

Please sign in to comment.