Skip to content

Commit

Permalink
store: upgrade the CheckTxnStatus API (pingcap#13123)
Browse files Browse the repository at this point in the history
CheckTxnStatus introduces a non-block read mode. In this mode, TiDB can ignore
the secondary lock TTL check and send the CheckTxnStatus request.
  • Loading branch information
tiancaiamao authored and XiaTianliang committed Dec 21, 2019
1 parent f692411 commit 0b265ea
Show file tree
Hide file tree
Showing 14 changed files with 229 additions and 50 deletions.
10 changes: 5 additions & 5 deletions executor/analyze_test.go
Expand Up @@ -455,24 +455,24 @@ func (s *testFastAnalyze) TestFastAnalyzeRetryRowCount(c *C) {
tk := testkit.NewTestKit(c, s.store)
tk.MustExec("use test")
tk.MustExec("drop table if exists t")
tk.MustExec("create table t(a int primary key)")
tk.MustExec("create table retry_row_count(a int primary key)")
c.Assert(s.dom.StatsHandle().Update(s.dom.InfoSchema()), IsNil)
tk.MustExec("set @@session.tidb_enable_fast_analyze=1")
tk.MustExec("set @@session.tidb_build_stats_concurrency=1")
tblInfo, err := s.dom.InfoSchema().TableByName(model.NewCIStr("test"), model.NewCIStr("t"))
tblInfo, err := s.dom.InfoSchema().TableByName(model.NewCIStr("test"), model.NewCIStr("retry_row_count"))
c.Assert(err, IsNil)
tid := tblInfo.Meta().ID
// construct 6 regions split by {6, 12, 18, 24, 30}
splitKeys := generateTableSplitKeyForInt(tid, []int{6, 12, 18, 24, 30})
regionIDs := manipulateCluster(s.cluster, splitKeys)
for i := 0; i < 30; i++ {
tk.MustExec(fmt.Sprintf("insert into t values (%d)", i))
tk.MustExec(fmt.Sprintf("insert into retry_row_count values (%d)", i))
}
s.cli.setFailRegion(regionIDs[4])
tk.MustExec("analyze table t")
tk.MustExec("analyze table retry_row_count")
// 4 regions will be sampled, and it will retry the last failed region.
c.Assert(s.cli.mu.count, Equals, int64(5))
row := tk.MustQuery(`show stats_meta where db_name = "test" and table_name = "t"`).Rows()[0]
row := tk.MustQuery(`show stats_meta where db_name = "test" and table_name = "retry_row_count"`).Rows()[0]
c.Assert(row[5], Equals, "30")
}

Expand Down
2 changes: 1 addition & 1 deletion go.mod
Expand Up @@ -37,7 +37,7 @@ require (
github.com/pingcap/failpoint v0.0.0-20190512135322-30cc7431d99c
github.com/pingcap/fn v0.0.0-20191016082858-07623b84a47d
github.com/pingcap/goleveldb v0.0.0-20171020122428-b9ff6c35079e
github.com/pingcap/kvproto v0.0.0-20191101062931-76b56d6eb466
github.com/pingcap/kvproto v0.0.0-20191104103048-40f562012fb1
github.com/pingcap/log v0.0.0-20191012051959-b742a5d432e9
github.com/pingcap/parser v0.0.0-20191031081038-bfb0c3adf567
github.com/pingcap/pd v1.1.0-beta.0.20190923032047-5c648dc365e0
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Expand Up @@ -158,8 +158,8 @@ github.com/pingcap/fn v0.0.0-20191016082858-07623b84a47d/go.mod h1:fMRU1BA1y+r89
github.com/pingcap/goleveldb v0.0.0-20171020122428-b9ff6c35079e h1:P73/4dPCL96rGrobssy1nVy2VaVpNCuLpCbr+FEaTA8=
github.com/pingcap/goleveldb v0.0.0-20171020122428-b9ff6c35079e/go.mod h1:O17XtbryoCJhkKGbT62+L2OlrniwqiGLSqrmdHCMzZw=
github.com/pingcap/kvproto v0.0.0-20190822090350-11ea838aedf7/go.mod h1:QMdbTAXCHzzygQzqcG9uVUgU2fKeSN1GmfMiykdSzzY=
github.com/pingcap/kvproto v0.0.0-20191101062931-76b56d6eb466 h1:C5nV9osqA+R/R2fxYxVfqAUlCi3Oo5yJ/JSKDeHSAOk=
github.com/pingcap/kvproto v0.0.0-20191101062931-76b56d6eb466/go.mod h1:WWLmULLO7l8IOcQG+t+ItJ3fEcrL5FxF0Wu+HrMy26w=
github.com/pingcap/kvproto v0.0.0-20191104103048-40f562012fb1 h1:J5oimSv+0emw5e/D1ZX/zh2WcMv0pOVT9QKruXfvJbg=
github.com/pingcap/kvproto v0.0.0-20191104103048-40f562012fb1/go.mod h1:WWLmULLO7l8IOcQG+t+ItJ3fEcrL5FxF0Wu+HrMy26w=
github.com/pingcap/log v0.0.0-20190715063458-479153f07ebd/go.mod h1:WpHUKhNZ18v116SvGrmjkA9CBhYmuUTKL+p8JC9ANEw=
github.com/pingcap/log v0.0.0-20191012051959-b742a5d432e9 h1:AJD9pZYm72vMgPcQDww9rkZ1DnWfl0pXV3BOWlkYIjA=
github.com/pingcap/log v0.0.0-20191012051959-b742a5d432e9/go.mod h1:4rbK1p9ILyIfb6hU7OG2CiWSqMXnp3JMbiaVJ6mvoY8=
Expand Down
3 changes: 3 additions & 0 deletions session/session_test.go
Expand Up @@ -2443,6 +2443,8 @@ func (s *testSessionSuite) TestKVVars(c *C) {
}
wg.Done()
}()

c.Assert(failpoint.Enable("github.com/pingcap/tidb/store/tikv/mockSleepBetween2PC", "return"), IsNil)
go func() {
for {
tk.MustExec("update kvvars set b = b + 1 where a = 1")
Expand All @@ -2453,6 +2455,7 @@ func (s *testSessionSuite) TestKVVars(c *C) {
wg.Done()
}()
wg.Wait()
c.Assert(failpoint.Disable("github.com/pingcap/tidb/store/tikv/mockSleepBetween2PC"), IsNil)
for {
tk2.MustQuery("select * from kvvars")
if atomic.LoadInt32(backOffWeightVal) != 0 {
Expand Down
9 changes: 9 additions & 0 deletions store/mockstore/mocktikv/errors.go
Expand Up @@ -98,3 +98,12 @@ type ErrCommitTSExpired struct {
func (e *ErrCommitTSExpired) Error() string {
return "commit ts expired"
}

// ErrTxnNotFound is returned when the primary lock of the txn is not found.
type ErrTxnNotFound struct {
kvrpcpb.TxnNotFound
}

func (e *ErrTxnNotFound) Error() string {
return "txn not found"
}
31 changes: 27 additions & 4 deletions store/mockstore/mocktikv/mock_tikv_test.go
Expand Up @@ -663,31 +663,54 @@ func (s *testMVCCLevelDB) TestErrors(c *C) {
func (s *testMVCCLevelDB) TestCheckTxnStatus(c *C) {
s.mustPrewriteWithTTLOK(c, putMutations("pk", "val"), "pk", 5, 666)

ttl, commitTS, err := s.store.CheckTxnStatus([]byte("pk"), 5, 0, 666)
ttl, commitTS, err := s.store.CheckTxnStatus([]byte("pk"), 5, 0, 666, false)
c.Assert(err, IsNil)
c.Assert(ttl, Equals, uint64(666))
c.Assert(commitTS, Equals, uint64(0))

s.mustCommitOK(c, [][]byte{[]byte("pk")}, 5, 30)

ttl, commitTS, err = s.store.CheckTxnStatus([]byte("pk"), 5, 0, 666)
ttl, commitTS, err = s.store.CheckTxnStatus([]byte("pk"), 5, 0, 666, false)
c.Assert(err, IsNil)
c.Assert(ttl, Equals, uint64(0))
c.Assert(commitTS, Equals, uint64(30))

s.mustPrewriteWithTTLOK(c, putMutations("pk1", "val"), "pk1", 5, 666)
s.mustRollbackOK(c, [][]byte{[]byte("pk1")}, 5)

ttl, commitTS, err = s.store.CheckTxnStatus([]byte("pk1"), 5, 0, 666)
ttl, commitTS, err = s.store.CheckTxnStatus([]byte("pk1"), 5, 0, 666, false)
c.Assert(err, IsNil)
c.Assert(ttl, Equals, uint64(0))
c.Assert(commitTS, Equals, uint64(0))

// Cover the TxnNotFound case.
_, _, err = s.store.CheckTxnStatus([]byte("txnNotFound"), 5, 0, 666, false)
c.Assert(err, NotNil)
notFound, ok := errors.Cause(err).(*ErrTxnNotFound)
c.Assert(ok, IsTrue)
c.Assert(notFound.StartTs, Equals, uint64(5))
c.Assert(string(notFound.PrimaryKey), Equals, "txnNotFound")

ttl, commitTS, err = s.store.CheckTxnStatus([]byte("txnNotFound"), 5, 0, 666, true)
c.Assert(err, IsNil)
c.Assert(ttl, Equals, uint64(0))
c.Assert(commitTS, Equals, uint64(0))

// Check the rollback tombstone blocks this prewrite which comes with a smaller startTS.
req := &kvrpcpb.PrewriteRequest{
Mutations: putMutations("txnNotFound", "val"),
PrimaryLock: []byte("txnNotFound"),
StartVersion: 4,
MinCommitTs: 6,
}
errs := s.store.Prewrite(req)
c.Assert(errs, NotNil)
}

func (s *testMVCCLevelDB) TestRejectCommitTS(c *C) {
s.mustPrewriteOK(c, putMutations("x", "A"), "x", 5)
// Push the minCommitTS
_, _, err := s.store.CheckTxnStatus([]byte("x"), 5, 100, 100)
_, _, err := s.store.CheckTxnStatus([]byte("x"), 5, 100, 100, false)
c.Assert(err, IsNil)
err = s.store.Commit([][]byte{[]byte("x")}, 5, 10)
e, ok := errors.Cause(err).(*ErrCommitTSExpired)
Expand Down
2 changes: 1 addition & 1 deletion store/mockstore/mocktikv/mvcc.go
Expand Up @@ -268,7 +268,7 @@ type MVCCStore interface {
BatchResolveLock(startKey, endKey []byte, txnInfos map[uint64]uint64) error
GC(startKey, endKey []byte, safePoint uint64) error
DeleteRange(startKey, endKey []byte) error
CheckTxnStatus(primaryKey []byte, lockTS uint64, startTS, currentTS uint64) (ttl, commitTS uint64, err error)
CheckTxnStatus(primaryKey []byte, lockTS uint64, startTS, currentTS uint64, rollbackIfNotFound bool) (ttl, commitTS uint64, err error)
Close() error
}

Expand Down
38 changes: 28 additions & 10 deletions store/mockstore/mocktikv/mvcc_leveldb.go
Expand Up @@ -882,7 +882,7 @@ func rollbackKey(db *leveldb.DB, batch *leveldb.Batch, key []byte, startTS uint6
}
// If current transaction's lock exist.
if ok && dec.lock.startTS == startTS {
if err = rollbackLock(batch, dec.lock, key, startTS); err != nil {
if err = rollbackLock(batch, key, startTS); err != nil {
return errors.Trace(err)
}
return nil
Expand Down Expand Up @@ -919,7 +919,7 @@ func rollbackKey(db *leveldb.DB, batch *leveldb.Batch, key []byte, startTS uint6
return nil
}

func rollbackLock(batch *leveldb.Batch, lock mvccLock, key []byte, startTS uint64) error {
func rollbackLock(batch *leveldb.Batch, key []byte, startTS uint64) error {
tomb := mvccValue{
valueType: typeRollback,
startTS: startTS,
Expand Down Expand Up @@ -980,7 +980,7 @@ func (mvcc *MVCCLevelDB) Cleanup(key []byte, startTS, currentTS uint64) error {
if ok && dec.lock.startTS == startTS {
// If the lock has already outdated, clean up it.
if currentTS == 0 || uint64(oracle.ExtractPhysical(dec.lock.startTS))+dec.lock.ttl < uint64(oracle.ExtractPhysical(currentTS)) {
if err = rollbackLock(batch, dec.lock, key, startTS); err != nil {
if err = rollbackLock(batch, key, startTS); err != nil {
return err
}
return mvcc.db.Write(batch, nil)
Expand Down Expand Up @@ -1032,7 +1032,7 @@ func (mvcc *MVCCLevelDB) Cleanup(key []byte, startTS, currentTS uint64) error {
// primaryKey + lockTS together could locate the primary lock.
// callerStartTS is the start ts of reader transaction.
// currentTS is the current ts, but it may be inaccurate. Just use it to check TTL.
func (mvcc *MVCCLevelDB) CheckTxnStatus(primaryKey []byte, lockTS, callerStartTS, currentTS uint64) (uint64, uint64, error) {
func (mvcc *MVCCLevelDB) CheckTxnStatus(primaryKey []byte, lockTS, callerStartTS, currentTS uint64, rollbackIfNotExist bool) (uint64, uint64, error) {
mvcc.mu.Lock()
defer mvcc.mu.Unlock()

Expand All @@ -1057,7 +1057,7 @@ func (mvcc *MVCCLevelDB) CheckTxnStatus(primaryKey []byte, lockTS, callerStartTS

// If the lock has already outdated, clean up it.
if uint64(oracle.ExtractPhysical(lock.startTS))+lock.ttl < uint64(oracle.ExtractPhysical(currentTS)) {
if err = rollbackLock(batch, lock, primaryKey, lockTS); err != nil {
if err = rollbackLock(batch, primaryKey, lockTS); err != nil {
return 0, 0, errors.Trace(err)
}
if err = mvcc.db.Write(batch, nil); err != nil {
Expand Down Expand Up @@ -1112,9 +1112,27 @@ func (mvcc *MVCCLevelDB) CheckTxnStatus(primaryKey []byte, lockTS, callerStartTS
}

// If current transaction is not prewritted before, it may be pessimistic lock.
// When pessimistic lock rollback, it may not leave a 'rollbacked' tombstone.
logutil.BgLogger().Debug("CheckTxnStatus can't find the primary lock, pessimistic rollback?")
return 0, 0, nil
// When pessimistic txn rollback statement, it may not leave a 'rollbacked' tombstone.

// Or maybe caused by concurrent prewrite operation.
// Especially in the non-block reading case, the secondary lock is likely to be
// written before the primary lock.

if rollbackIfNotExist {
batch := &leveldb.Batch{}
if err := rollbackLock(batch, primaryKey, lockTS); err != nil {
return 0, 0, errors.Trace(err)
}
if err := mvcc.db.Write(batch, nil); err != nil {
return 0, 0, errors.Trace(err)
}
return 0, 0, nil
}

return 0, 0, &ErrTxnNotFound{kvrpcpb.TxnNotFound{
StartTs: lockTS,
PrimaryKey: primaryKey,
}}
}

// TxnHeartBeat implements the MVCCStore interface.
Expand Down Expand Up @@ -1220,7 +1238,7 @@ func (mvcc *MVCCLevelDB) ResolveLock(startKey, endKey []byte, startTS, commitTS
if commitTS > 0 {
err = commitLock(batch, dec.lock, currKey, startTS, commitTS)
} else {
err = rollbackLock(batch, dec.lock, currKey, startTS)
err = rollbackLock(batch, currKey, startTS)
}
if err != nil {
return errors.Trace(err)
Expand Down Expand Up @@ -1260,7 +1278,7 @@ func (mvcc *MVCCLevelDB) BatchResolveLock(startKey, endKey []byte, txnInfos map[
if commitTS > 0 {
err = commitLock(batch, dec.lock, currKey, dec.lock.startTS, commitTS)
} else {
err = rollbackLock(batch, dec.lock, currKey, dec.lock.startTS)
err = rollbackLock(batch, currKey, dec.lock.startTS)
}
if err != nil {
return errors.Trace(err)
Expand Down
7 changes: 6 additions & 1 deletion store/mockstore/mocktikv/rpc.go
Expand Up @@ -98,6 +98,11 @@ func convertToKeyError(err error) *kvrpcpb.KeyError {
CommitTsExpired: &expired.CommitTsExpired,
}
}
if tmp, ok := errors.Cause(err).(*ErrTxnNotFound); ok {
return &kvrpcpb.KeyError{
TxnNotFound: &tmp.TxnNotFound,
}
}
return &kvrpcpb.KeyError{
Abort: err.Error(),
}
Expand Down Expand Up @@ -382,7 +387,7 @@ func (h *rpcHandler) handleKvCheckTxnStatus(req *kvrpcpb.CheckTxnStatusRequest)
panic("KvCheckTxnStatus: key not in region")
}
var resp kvrpcpb.CheckTxnStatusResponse
ttl, commitTS, err := h.mvccStore.CheckTxnStatus(req.GetPrimaryKey(), req.GetLockTs(), req.GetCallerStartTs(), req.GetCurrentTs())
ttl, commitTS, err := h.mvccStore.CheckTxnStatus(req.GetPrimaryKey(), req.GetLockTs(), req.GetCallerStartTs(), req.GetCurrentTs(), req.GetRollbackIfNotExist())
if err != nil {
resp.Error = convertToKeyError(err)
} else {
Expand Down
6 changes: 6 additions & 0 deletions store/tikv/2pc.go
Expand Up @@ -24,6 +24,7 @@ import (

"github.com/opentracing/opentracing-go"
"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
pb "github.com/pingcap/kvproto/pkg/kvrpcpb"
"github.com/pingcap/parser/terror"
"github.com/pingcap/tidb/kv"
Expand Down Expand Up @@ -1098,6 +1099,11 @@ func (c *twoPhaseCommitter) execute(ctx context.Context) error {
return err
}

failpoint.Inject("mockSleepBetween2PC", func() error {
time.Sleep(100 * time.Millisecond)
return nil
})

start = time.Now()
commitBo := NewBackoffer(ctx, CommitMaxBackoff).WithVars(c.txn.vars)
err = c.commitKeys(commitBo, c.keys)
Expand Down
2 changes: 1 addition & 1 deletion store/tikv/2pc_test.go
Expand Up @@ -597,7 +597,7 @@ func (s *testCommitterSuite) TestPessimisticTTL(c *C) {

lr := newLockResolver(s.store)
bo := NewBackoffer(context.Background(), getMaxBackoff)
status, err := lr.getTxnStatus(bo, txn.startTS, key2, 0, txn.startTS)
status, err := lr.getTxnStatus(bo, txn.startTS, key2, 0, txn.startTS, true)
c.Assert(err, IsNil)
c.Assert(status.ttl, GreaterEqual, lockInfo.LockTtl)

Expand Down
7 changes: 6 additions & 1 deletion store/tikv/backoff.go
Expand Up @@ -142,6 +142,7 @@ const (
BoRegionMiss
BoUpdateLeader
boServerBusy
boTxnNotFound
)

func (t backoffType) createFn(vars *kv.Variables) func(context.Context, int) int {
Expand All @@ -160,6 +161,8 @@ func (t backoffType) createFn(vars *kv.Variables) func(context.Context, int) int
case BoRegionMiss:
// change base time to 2ms, because it may recover soon.
return NewBackoffFn(2, 500, NoJitter)
case boTxnNotFound:
return NewBackoffFn(2, 500, NoJitter)
case BoUpdateLeader:
return NewBackoffFn(1, 10, NoJitter)
case boServerBusy:
Expand All @@ -184,6 +187,8 @@ func (t backoffType) String() string {
return "updateLeader"
case boServerBusy:
return "serverBusy"
case boTxnNotFound:
return "txnNotFound"
}
return ""
}
Expand All @@ -192,7 +197,7 @@ func (t backoffType) TError() error {
switch t {
case boTiKVRPC:
return ErrTiKVServerTimeout
case BoTxnLock, boTxnLockFast:
case BoTxnLock, boTxnLockFast, boTxnNotFound:
return ErrResolveLockTimeout
case BoPDRPC:
return ErrPDServerTimeout
Expand Down

0 comments on commit 0b265ea

Please sign in to comment.