Skip to content

Commit

Permalink
store/tikv: support single statement rollback for pessimistic transac…
Browse files Browse the repository at this point in the history
…tion (pingcap#10654)
  • Loading branch information
coocood committed Jun 5, 2019
1 parent e99ef9f commit 21c4182
Show file tree
Hide file tree
Showing 17 changed files with 379 additions and 84 deletions.
54 changes: 33 additions & 21 deletions executor/adapter.go
Expand Up @@ -40,6 +40,7 @@ import (
"github.com/pingcap/tidb/plugin"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/store/tikv"
"github.com/pingcap/tidb/util/chunk"
"github.com/pingcap/tidb/util/logutil"
"github.com/pingcap/tidb/util/sqlexec"
Expand Down Expand Up @@ -427,40 +428,51 @@ func (a *ExecStmt) handlePessimisticLockError(ctx context.Context, err error) (E
if err == nil {
return nil, nil
}
if !terror.ErrorEqual(kv.ErrWriteConflict, err) {
txnCtx := a.Ctx.GetSessionVars().TxnCtx
var newForUpdateTS uint64
if deadlock, ok := errors.Cause(err).(*tikv.ErrDeadlock); ok {
if !deadlock.IsRetryable {
return nil, ErrDeadlock
}
logutil.Logger(ctx).Info("single statement deadlock, retry statement",
zap.Uint64("txn", txnCtx.StartTS),
zap.Uint64("lockTS", deadlock.LockTs),
zap.Binary("lockKey", deadlock.LockKey),
zap.Uint64("deadlockKeyHash", deadlock.DeadlockKeyHash))
} else if terror.ErrorEqual(kv.ErrWriteConflict, err) {
conflictCommitTS := extractConflictCommitTS(err.Error())
if conflictCommitTS == 0 {
logutil.Logger(ctx).Warn("failed to extract conflictCommitTS from a conflict error")
}
forUpdateTS := txnCtx.GetForUpdateTS()
logutil.Logger(ctx).Info("pessimistic write conflict, retry statement",
zap.Uint64("txn", txnCtx.StartTS),
zap.Uint64("forUpdateTS", forUpdateTS),
zap.Uint64("conflictCommitTS", conflictCommitTS))
if conflictCommitTS > forUpdateTS {
newForUpdateTS = conflictCommitTS
}
} else {
return nil, err
}
if a.retryCount >= config.GetGlobalConfig().PessimisticTxn.MaxRetryCount {
return nil, errors.New("pessimistic lock retry limit reached")
}
a.retryCount++
conflictCommitTS := extractConflictCommitTS(err.Error())
if conflictCommitTS == 0 {
logutil.Logger(ctx).Warn("failed to extract conflictCommitTS from a conflict error")
}
sctx := a.Ctx
txnCtx := sctx.GetSessionVars().TxnCtx
forUpdateTS := txnCtx.GetForUpdateTS()
logutil.Logger(ctx).Info("pessimistic write conflict, retry statement",
zap.Uint64("txn", txnCtx.StartTS),
zap.Uint64("forUpdateTS", forUpdateTS),
zap.Uint64("conflictCommitTS", conflictCommitTS))
if conflictCommitTS > txnCtx.GetForUpdateTS() {
txnCtx.SetForUpdateTS(conflictCommitTS)
} else {
ts, err1 := sctx.GetStore().GetOracle().GetTimestamp(ctx)
if err1 != nil {
return nil, err1
if newForUpdateTS == 0 {
newForUpdateTS, err = a.Ctx.GetStore().GetOracle().GetTimestamp(ctx)
if err != nil {
return nil, err
}
txnCtx.SetForUpdateTS(ts)
}
txnCtx.SetForUpdateTS(newForUpdateTS)
e, err := a.buildExecutor()
if err != nil {
return nil, err
}
// Rollback the statement change before retry it.
sctx.StmtRollback()
sctx.GetSessionVars().StmtCtx.ResetForRetry()
a.Ctx.StmtRollback()
a.Ctx.GetSessionVars().StmtCtx.ResetForRetry()

if err = e.Open(ctx); err != nil {
return nil, err
Expand Down
2 changes: 2 additions & 0 deletions executor/errors.go
Expand Up @@ -51,6 +51,7 @@ var (
ErrBadDB = terror.ClassExecutor.New(mysql.ErrBadDB, mysql.MySQLErrName[mysql.ErrBadDB])
ErrWrongObject = terror.ClassExecutor.New(mysql.ErrWrongObject, mysql.MySQLErrName[mysql.ErrWrongObject])
ErrRoleNotGranted = terror.ClassPrivilege.New(mysql.ErrRoleNotGranted, mysql.MySQLErrName[mysql.ErrRoleNotGranted])
ErrDeadlock = terror.ClassExecutor.New(mysql.ErrLockDeadlock, mysql.MySQLErrName[mysql.ErrLockDeadlock])
)

func init() {
Expand All @@ -67,6 +68,7 @@ func init() {
mysql.ErrTableaccessDenied: mysql.ErrTableaccessDenied,
mysql.ErrBadDB: mysql.ErrBadDB,
mysql.ErrWrongObject: mysql.ErrWrongObject,
mysql.ErrLockDeadlock: mysql.ErrLockDeadlock,
}
terror.ErrClassToMySQLCodes[terror.ClassExecutor] = tableMySQLErrCodes
}
32 changes: 14 additions & 18 deletions executor/executor.go
Expand Up @@ -637,6 +637,7 @@ type SelectLockExec struct {
baseExecutor

Lock ast.SelectLockType
keys []kv.Key
}

// Open implements the Executor Open interface.
Expand Down Expand Up @@ -670,29 +671,24 @@ func (e *SelectLockExec) Next(ctx context.Context, req *chunk.RecordBatch) error
if len(e.Schema().TblID2Handle) == 0 || e.Lock != ast.SelectLockForUpdate {
return nil
}
if req.NumRows() != 0 {
iter := chunk.NewIterator4Chunk(req.Chunk)
for id, cols := range e.Schema().TblID2Handle {
for _, col := range cols {
for row := iter.Begin(); row != iter.End(); row = iter.Next() {
e.keys = append(e.keys, tablecodec.EncodeRowKeyWithHandle(id, row.GetInt64(col.Index)))
}
}
}
return nil
}
// Lock keys only once when finished fetching all results.
txn, err := e.ctx.Txn(true)
if err != nil {
return err
}
keys := make([]kv.Key, 0, req.NumRows())
iter := chunk.NewIterator4Chunk(req.Chunk)
forUpdateTS := e.ctx.GetSessionVars().TxnCtx.GetForUpdateTS()
for id, cols := range e.Schema().TblID2Handle {
for _, col := range cols {
keys = keys[:0]
for row := iter.Begin(); row != iter.End(); row = iter.Next() {
keys = append(keys, tablecodec.EncodeRowKeyWithHandle(id, row.GetInt64(col.Index)))
}
if len(keys) == 0 {
continue
}
err = txn.LockKeys(ctx, forUpdateTS, keys...)
if err != nil {
return err
}
}
}
return nil
return txn.LockKeys(ctx, forUpdateTS, e.keys...)
}

// LimitExec represents limit executor
Expand Down
35 changes: 35 additions & 0 deletions session/pessimistic_test.go
Expand Up @@ -27,6 +27,8 @@ import (
"github.com/pingcap/tidb/session"
"github.com/pingcap/tidb/store/mockstore"
"github.com/pingcap/tidb/store/mockstore/mocktikv"
"github.com/pingcap/tidb/tablecodec"
"github.com/pingcap/tidb/util/codec"
"github.com/pingcap/tidb/util/testkit"
"github.com/pingcap/tidb/util/testleak"
)
Expand Down Expand Up @@ -194,3 +196,36 @@ func (s *testPessimisticSuite) TestDeadlock(c *C) {
c.Assert(int(e.Code()), Equals, mysql.ErrLockDeadlock)
syncCh <- struct{}{}
}

func (s *testPessimisticSuite) TestSingleStatementRollback(c *C) {
tk := testkit.NewTestKitWithInit(c, s.store)
tk2 := testkit.NewTestKitWithInit(c, s.store)

tk.MustExec("drop table if exists pessimistic")
tk.MustExec("create table single_statement (id int primary key, v int)")
tk.MustExec("insert into single_statement values (1, 1), (2, 1), (3, 1), (4, 1)")
tblID := tk.GetTableID("single_statement")
s.cluster.SplitTable(s.mvccStore, tblID, 2)
region1Key := codec.EncodeBytes(nil, tablecodec.EncodeRowKeyWithHandle(tblID, 1))
region1, _ := s.cluster.GetRegionByKey(region1Key)
region1ID := region1.Id
region2Key := codec.EncodeBytes(nil, tablecodec.EncodeRowKeyWithHandle(tblID, 3))
region2, _ := s.cluster.GetRegionByKey(region2Key)
region2ID := region2.Id

syncCh := make(chan bool)
go func() {
tk2.MustExec("begin pessimistic")
<-syncCh
s.cluster.ScheduleDelay(tk2.Se.GetSessionVars().TxnCtx.StartTS, region2ID, time.Millisecond*3)
tk2.MustExec("update single_statement set v = v + 1")
tk2.MustExec("commit")
<-syncCh
}()
tk.MustExec("begin pessimistic")
syncCh <- true
s.cluster.ScheduleDelay(tk.Se.GetSessionVars().TxnCtx.StartTS, region1ID, time.Millisecond*3)
tk.MustExec("update single_statement set v = v + 1")
tk.MustExec("commit")
syncCh <- true
}
6 changes: 1 addition & 5 deletions session/tidb.go
Expand Up @@ -163,9 +163,8 @@ func finishStmt(ctx context.Context, sctx sessionctx.Context, se *session, sessV
if !sessVars.InTxn() {
logutil.Logger(context.Background()).Info("rollbackTxn for ddl/autocommit error.")
se.RollbackTxn(ctx)
} else if se.txn.Valid() && se.txn.IsPessimistic() && strings.Contains(meetsErr.Error(), "deadlock") {
} else if se.txn.Valid() && se.txn.IsPessimistic() && executor.ErrDeadlock.Equal(meetsErr) {
logutil.Logger(context.Background()).Info("rollbackTxn for deadlock error", zap.Uint64("txn", se.txn.StartTS()))
meetsErr = errDeadlock
se.RollbackTxn(ctx)
}
return meetsErr
Expand Down Expand Up @@ -328,18 +327,15 @@ func IsQuery(sql string) bool {
var (
errForUpdateCantRetry = terror.ClassSession.New(codeForUpdateCantRetry,
mysql.MySQLErrName[mysql.ErrForUpdateCantRetry])
errDeadlock = terror.ClassSession.New(codeDeadlock, mysql.MySQLErrName[mysql.ErrLockDeadlock])
)

const (
codeForUpdateCantRetry terror.ErrCode = mysql.ErrForUpdateCantRetry
codeDeadlock terror.ErrCode = mysql.ErrLockDeadlock
)

func init() {
sessionMySQLErrCodes := map[terror.ErrCode]uint16{
codeForUpdateCantRetry: mysql.ErrForUpdateCantRetry,
codeDeadlock: mysql.ErrLockDeadlock,
}
terror.ErrClassToMySQLCodes[terror.ClassSession] = sessionMySQLErrCodes
}
35 changes: 33 additions & 2 deletions store/mockstore/mocktikv/cluster.go
Expand Up @@ -18,6 +18,7 @@ import (
"context"
"math"
"sync"
"time"

"github.com/golang/protobuf/proto"
"github.com/pingcap/kvproto/pkg/kvrpcpb"
Expand All @@ -41,14 +42,24 @@ type Cluster struct {
id uint64
stores map[uint64]*Store
regions map[uint64]*Region

// delayEvents is used to control the execution sequence of rpc requests for test.
delayEvents map[delayKey]time.Duration
delayMu sync.Mutex
}

type delayKey struct {
startTS uint64
regionID uint64
}

// NewCluster creates an empty cluster. It needs to be bootstrapped before
// providing service.
func NewCluster() *Cluster {
return &Cluster{
stores: make(map[uint64]*Store),
regions: make(map[uint64]*Region),
stores: make(map[uint64]*Store),
regions: make(map[uint64]*Region),
delayEvents: make(map[delayKey]time.Duration),
}
}

Expand Down Expand Up @@ -347,6 +358,26 @@ func (c *Cluster) SplitKeys(mvccStore MVCCStore, start, end kv.Key, count int) {
c.splitRange(mvccStore, NewMvccKey(start), NewMvccKey(end), count)
}

// ScheduleDelay schedules a delay event for a transaction on a region.
func (c *Cluster) ScheduleDelay(startTS, regionID uint64, dur time.Duration) {
c.delayMu.Lock()
c.delayEvents[delayKey{startTS: startTS, regionID: regionID}] = dur
c.delayMu.Unlock()
}

func (c *Cluster) handleDelay(startTS, regionID uint64) {
key := delayKey{startTS: startTS, regionID: regionID}
c.delayMu.Lock()
dur, ok := c.delayEvents[key]
if ok {
delete(c.delayEvents, key)
}
c.delayMu.Unlock()
if ok {
time.Sleep(dur)
}
}

func (c *Cluster) splitRange(mvccStore MVCCStore, start, end MvccKey, count int) {
c.Lock()
defer c.Unlock()
Expand Down
13 changes: 12 additions & 1 deletion store/mockstore/mocktikv/errors.go
Expand Up @@ -61,7 +61,7 @@ func (e ErrAlreadyCommitted) Error() string {
return "txn already committed"
}

// ErrConflict is turned when the commitTS of key in the DB is greater than startTS.
// ErrConflict is returned when the commitTS of key in the DB is greater than startTS.
type ErrConflict struct {
StartTS uint64
ConflictTS uint64
Expand All @@ -71,3 +71,14 @@ type ErrConflict struct {
func (e *ErrConflict) Error() string {
return "write conflict"
}

// ErrDeadlock is returned when deadlock error is detected.
type ErrDeadlock struct {
LockTS uint64
LockKey []byte
DealockKeyHash uint64
}

func (e *ErrDeadlock) Error() string {
return "deadlock"
}
16 changes: 10 additions & 6 deletions store/mockstore/mocktikv/mvcc.go
Expand Up @@ -42,11 +42,12 @@ type mvccValue struct {
}

type mvccLock struct {
startTS uint64
primary []byte
value []byte
op kvrpcpb.Op
ttl uint64
startTS uint64
primary []byte
value []byte
op kvrpcpb.Op
ttl uint64
forUpdateTS uint64
}

type mvccEntry struct {
Expand All @@ -66,6 +67,7 @@ func (l *mvccLock) MarshalBinary() ([]byte, error) {
mh.WriteSlice(&buf, l.value)
mh.WriteNumber(&buf, l.op)
mh.WriteNumber(&buf, l.ttl)
mh.WriteNumber(&buf, l.forUpdateTS)
return buf.Bytes(), errors.Trace(mh.err)
}

Expand All @@ -78,6 +80,7 @@ func (l *mvccLock) UnmarshalBinary(data []byte) error {
mh.ReadSlice(buf, &l.value)
mh.ReadNumber(buf, &l.op)
mh.ReadNumber(buf, &l.ttl)
mh.ReadNumber(buf, &l.forUpdateTS)
return errors.Trace(mh.err)
}

Expand Down Expand Up @@ -429,7 +432,8 @@ type MVCCStore interface {
ReverseScan(startKey, endKey []byte, limit int, startTS uint64, isoLevel kvrpcpb.IsolationLevel) []Pair
BatchGet(ks [][]byte, startTS uint64, isoLevel kvrpcpb.IsolationLevel) []Pair
PessimisticLock(mutations []*kvrpcpb.Mutation, primary []byte, startTS, forUpdateTS uint64, ttl uint64) []error
Prewrite(mutations []*kvrpcpb.Mutation, primary []byte, startTS uint64, ttl uint64) []error
PessimisticRollback(keys [][]byte, startTS, forUpdateTS uint64) []error
Prewrite(mutations []*kvrpcpb.Mutation, primary []byte, startTS, ttl uint64) []error
Commit(keys [][]byte, startTS, commitTS uint64) error
Rollback(keys [][]byte, startTS uint64) error
Cleanup(key []byte, startTS uint64) error
Expand Down

0 comments on commit 21c4182

Please sign in to comment.