Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

store: upgrade the CheckTxnStatus API #13123

Merged
merged 18 commits into from Nov 7, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
10 changes: 5 additions & 5 deletions executor/analyze_test.go
Expand Up @@ -455,24 +455,24 @@ func (s *testFastAnalyze) TestFastAnalyzeRetryRowCount(c *C) {
tk := testkit.NewTestKit(c, s.store)
tk.MustExec("use test")
tk.MustExec("drop table if exists t")
tk.MustExec("create table t(a int primary key)")
tk.MustExec("create table retry_row_count(a int primary key)")
c.Assert(s.dom.StatsHandle().Update(s.dom.InfoSchema()), IsNil)
tk.MustExec("set @@session.tidb_enable_fast_analyze=1")
tk.MustExec("set @@session.tidb_build_stats_concurrency=1")
tblInfo, err := s.dom.InfoSchema().TableByName(model.NewCIStr("test"), model.NewCIStr("t"))
tblInfo, err := s.dom.InfoSchema().TableByName(model.NewCIStr("test"), model.NewCIStr("retry_row_count"))
c.Assert(err, IsNil)
tid := tblInfo.Meta().ID
// construct 6 regions split by {6, 12, 18, 24, 30}
splitKeys := generateTableSplitKeyForInt(tid, []int{6, 12, 18, 24, 30})
regionIDs := manipulateCluster(s.cluster, splitKeys)
for i := 0; i < 30; i++ {
tk.MustExec(fmt.Sprintf("insert into t values (%d)", i))
tk.MustExec(fmt.Sprintf("insert into retry_row_count values (%d)", i))
}
s.cli.setFailRegion(regionIDs[4])
tk.MustExec("analyze table t")
tk.MustExec("analyze table retry_row_count")
// 4 regions will be sampled, and it will retry the last failed region.
c.Assert(s.cli.mu.count, Equals, int64(5))
row := tk.MustQuery(`show stats_meta where db_name = "test" and table_name = "t"`).Rows()[0]
row := tk.MustQuery(`show stats_meta where db_name = "test" and table_name = "retry_row_count"`).Rows()[0]
c.Assert(row[5], Equals, "30")
}

Expand Down
2 changes: 1 addition & 1 deletion go.mod
Expand Up @@ -37,7 +37,7 @@ require (
github.com/pingcap/failpoint v0.0.0-20190512135322-30cc7431d99c
github.com/pingcap/fn v0.0.0-20191016082858-07623b84a47d
github.com/pingcap/goleveldb v0.0.0-20171020122428-b9ff6c35079e
github.com/pingcap/kvproto v0.0.0-20191101062931-76b56d6eb466
github.com/pingcap/kvproto v0.0.0-20191104103048-40f562012fb1
github.com/pingcap/log v0.0.0-20191012051959-b742a5d432e9
github.com/pingcap/parser v0.0.0-20191031081038-bfb0c3adf567
github.com/pingcap/pd v1.1.0-beta.0.20190923032047-5c648dc365e0
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Expand Up @@ -158,8 +158,8 @@ github.com/pingcap/fn v0.0.0-20191016082858-07623b84a47d/go.mod h1:fMRU1BA1y+r89
github.com/pingcap/goleveldb v0.0.0-20171020122428-b9ff6c35079e h1:P73/4dPCL96rGrobssy1nVy2VaVpNCuLpCbr+FEaTA8=
github.com/pingcap/goleveldb v0.0.0-20171020122428-b9ff6c35079e/go.mod h1:O17XtbryoCJhkKGbT62+L2OlrniwqiGLSqrmdHCMzZw=
github.com/pingcap/kvproto v0.0.0-20190822090350-11ea838aedf7/go.mod h1:QMdbTAXCHzzygQzqcG9uVUgU2fKeSN1GmfMiykdSzzY=
github.com/pingcap/kvproto v0.0.0-20191101062931-76b56d6eb466 h1:C5nV9osqA+R/R2fxYxVfqAUlCi3Oo5yJ/JSKDeHSAOk=
github.com/pingcap/kvproto v0.0.0-20191101062931-76b56d6eb466/go.mod h1:WWLmULLO7l8IOcQG+t+ItJ3fEcrL5FxF0Wu+HrMy26w=
github.com/pingcap/kvproto v0.0.0-20191104103048-40f562012fb1 h1:J5oimSv+0emw5e/D1ZX/zh2WcMv0pOVT9QKruXfvJbg=
github.com/pingcap/kvproto v0.0.0-20191104103048-40f562012fb1/go.mod h1:WWLmULLO7l8IOcQG+t+ItJ3fEcrL5FxF0Wu+HrMy26w=
github.com/pingcap/log v0.0.0-20190715063458-479153f07ebd/go.mod h1:WpHUKhNZ18v116SvGrmjkA9CBhYmuUTKL+p8JC9ANEw=
github.com/pingcap/log v0.0.0-20191012051959-b742a5d432e9 h1:AJD9pZYm72vMgPcQDww9rkZ1DnWfl0pXV3BOWlkYIjA=
github.com/pingcap/log v0.0.0-20191012051959-b742a5d432e9/go.mod h1:4rbK1p9ILyIfb6hU7OG2CiWSqMXnp3JMbiaVJ6mvoY8=
Expand Down
3 changes: 3 additions & 0 deletions session/session_test.go
Expand Up @@ -2443,6 +2443,8 @@ func (s *testSessionSuite) TestKVVars(c *C) {
}
wg.Done()
}()

c.Assert(failpoint.Enable("github.com/pingcap/tidb/store/tikv/mockSleepBetween2PC", "return"), IsNil)
go func() {
for {
tk.MustExec("update kvvars set b = b + 1 where a = 1")
Expand All @@ -2453,6 +2455,7 @@ func (s *testSessionSuite) TestKVVars(c *C) {
wg.Done()
}()
wg.Wait()
c.Assert(failpoint.Disable("github.com/pingcap/tidb/store/tikv/mockSleepBetween2PC"), IsNil)
for {
tk2.MustQuery("select * from kvvars")
if atomic.LoadInt32(backOffWeightVal) != 0 {
Expand Down
9 changes: 9 additions & 0 deletions store/mockstore/mocktikv/errors.go
Expand Up @@ -98,3 +98,12 @@ type ErrCommitTSExpired struct {
func (e *ErrCommitTSExpired) Error() string {
return "commit ts expired"
}

// ErrTxnNotFound is returned when the primary lock of the txn is not found.
type ErrTxnNotFound struct {
kvrpcpb.TxnNotFound
}

func (e *ErrTxnNotFound) Error() string {
return "txn not found"
}
31 changes: 27 additions & 4 deletions store/mockstore/mocktikv/mock_tikv_test.go
Expand Up @@ -663,31 +663,54 @@ func (s *testMVCCLevelDB) TestErrors(c *C) {
func (s *testMVCCLevelDB) TestCheckTxnStatus(c *C) {
s.mustPrewriteWithTTLOK(c, putMutations("pk", "val"), "pk", 5, 666)

ttl, commitTS, err := s.store.CheckTxnStatus([]byte("pk"), 5, 0, 666)
ttl, commitTS, err := s.store.CheckTxnStatus([]byte("pk"), 5, 0, 666, false)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you need to cover the case where the last parameter is true.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

c.Assert(err, IsNil)
c.Assert(ttl, Equals, uint64(666))
c.Assert(commitTS, Equals, uint64(0))

s.mustCommitOK(c, [][]byte{[]byte("pk")}, 5, 30)

ttl, commitTS, err = s.store.CheckTxnStatus([]byte("pk"), 5, 0, 666)
ttl, commitTS, err = s.store.CheckTxnStatus([]byte("pk"), 5, 0, 666, false)
c.Assert(err, IsNil)
c.Assert(ttl, Equals, uint64(0))
c.Assert(commitTS, Equals, uint64(30))

s.mustPrewriteWithTTLOK(c, putMutations("pk1", "val"), "pk1", 5, 666)
s.mustRollbackOK(c, [][]byte{[]byte("pk1")}, 5)

ttl, commitTS, err = s.store.CheckTxnStatus([]byte("pk1"), 5, 0, 666)
ttl, commitTS, err = s.store.CheckTxnStatus([]byte("pk1"), 5, 0, 666, false)
c.Assert(err, IsNil)
c.Assert(ttl, Equals, uint64(0))
c.Assert(commitTS, Equals, uint64(0))

// Cover the TxnNotFound case.
_, _, err = s.store.CheckTxnStatus([]byte("txnNotFound"), 5, 0, 666, false)
c.Assert(err, NotNil)
notFound, ok := errors.Cause(err).(*ErrTxnNotFound)
c.Assert(ok, IsTrue)
c.Assert(notFound.StartTs, Equals, uint64(5))
c.Assert(string(notFound.PrimaryKey), Equals, "txnNotFound")

ttl, commitTS, err = s.store.CheckTxnStatus([]byte("txnNotFound"), 5, 0, 666, true)
c.Assert(err, IsNil)
c.Assert(ttl, Equals, uint64(0))
c.Assert(commitTS, Equals, uint64(0))

// Check the rollback tombstone blocks this prewrite which comes with a smaller startTS.
req := &kvrpcpb.PrewriteRequest{
Mutations: putMutations("txnNotFound", "val"),
PrimaryLock: []byte("txnNotFound"),
StartVersion: 4,
MinCommitTs: 6,
}
errs := s.store.Prewrite(req)
c.Assert(errs, NotNil)
}

func (s *testMVCCLevelDB) TestRejectCommitTS(c *C) {
s.mustPrewriteOK(c, putMutations("x", "A"), "x", 5)
// Push the minCommitTS
_, _, err := s.store.CheckTxnStatus([]byte("x"), 5, 100, 100)
_, _, err := s.store.CheckTxnStatus([]byte("x"), 5, 100, 100, false)
c.Assert(err, IsNil)
err = s.store.Commit([][]byte{[]byte("x")}, 5, 10)
e, ok := errors.Cause(err).(*ErrCommitTSExpired)
Expand Down
2 changes: 1 addition & 1 deletion store/mockstore/mocktikv/mvcc.go
Expand Up @@ -268,7 +268,7 @@ type MVCCStore interface {
BatchResolveLock(startKey, endKey []byte, txnInfos map[uint64]uint64) error
GC(startKey, endKey []byte, safePoint uint64) error
DeleteRange(startKey, endKey []byte) error
CheckTxnStatus(primaryKey []byte, lockTS uint64, startTS, currentTS uint64) (ttl, commitTS uint64, err error)
CheckTxnStatus(primaryKey []byte, lockTS uint64, startTS, currentTS uint64, rollbackIfNotFound bool) (ttl, commitTS uint64, err error)
Close() error
}

Expand Down
38 changes: 28 additions & 10 deletions store/mockstore/mocktikv/mvcc_leveldb.go
Expand Up @@ -882,7 +882,7 @@ func rollbackKey(db *leveldb.DB, batch *leveldb.Batch, key []byte, startTS uint6
}
// If current transaction's lock exist.
if ok && dec.lock.startTS == startTS {
if err = rollbackLock(batch, dec.lock, key, startTS); err != nil {
if err = rollbackLock(batch, key, startTS); err != nil {
return errors.Trace(err)
}
return nil
Expand Down Expand Up @@ -919,7 +919,7 @@ func rollbackKey(db *leveldb.DB, batch *leveldb.Batch, key []byte, startTS uint6
return nil
}

func rollbackLock(batch *leveldb.Batch, lock mvccLock, key []byte, startTS uint64) error {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This parameter is not used in the function, so I remove it

func rollbackLock(batch *leveldb.Batch, key []byte, startTS uint64) error {
tomb := mvccValue{
valueType: typeRollback,
startTS: startTS,
Expand Down Expand Up @@ -980,7 +980,7 @@ func (mvcc *MVCCLevelDB) Cleanup(key []byte, startTS, currentTS uint64) error {
if ok && dec.lock.startTS == startTS {
// If the lock has already outdated, clean up it.
if currentTS == 0 || uint64(oracle.ExtractPhysical(dec.lock.startTS))+dec.lock.ttl < uint64(oracle.ExtractPhysical(currentTS)) {
if err = rollbackLock(batch, dec.lock, key, startTS); err != nil {
if err = rollbackLock(batch, key, startTS); err != nil {
return err
}
return mvcc.db.Write(batch, nil)
Expand Down Expand Up @@ -1032,7 +1032,7 @@ func (mvcc *MVCCLevelDB) Cleanup(key []byte, startTS, currentTS uint64) error {
// primaryKey + lockTS together could locate the primary lock.
// callerStartTS is the start ts of reader transaction.
// currentTS is the current ts, but it may be inaccurate. Just use it to check TTL.
func (mvcc *MVCCLevelDB) CheckTxnStatus(primaryKey []byte, lockTS, callerStartTS, currentTS uint64) (uint64, uint64, error) {
func (mvcc *MVCCLevelDB) CheckTxnStatus(primaryKey []byte, lockTS, callerStartTS, currentTS uint64, rollbackIfNotExist bool) (uint64, uint64, error) {
mvcc.mu.Lock()
defer mvcc.mu.Unlock()

Expand All @@ -1057,7 +1057,7 @@ func (mvcc *MVCCLevelDB) CheckTxnStatus(primaryKey []byte, lockTS, callerStartTS

// If the lock has already outdated, clean up it.
if uint64(oracle.ExtractPhysical(lock.startTS))+lock.ttl < uint64(oracle.ExtractPhysical(currentTS)) {
if err = rollbackLock(batch, lock, primaryKey, lockTS); err != nil {
if err = rollbackLock(batch, primaryKey, lockTS); err != nil {
return 0, 0, errors.Trace(err)
}
if err = mvcc.db.Write(batch, nil); err != nil {
Expand Down Expand Up @@ -1112,9 +1112,27 @@ func (mvcc *MVCCLevelDB) CheckTxnStatus(primaryKey []byte, lockTS, callerStartTS
}

// If current transaction is not prewritted before, it may be pessimistic lock.
// When pessimistic lock rollback, it may not leave a 'rollbacked' tombstone.
logutil.BgLogger().Debug("CheckTxnStatus can't find the primary lock, pessimistic rollback?")
return 0, 0, nil
// When pessimistic txn rollback statement, it may not leave a 'rollbacked' tombstone.

// Or maybe caused by concurrent prewrite operation.
// Especially in the non-block reading case, the secondary lock is likely to be
// written before the primary lock.

if rollbackIfNotExist {
batch := &leveldb.Batch{}
if err := rollbackLock(batch, primaryKey, lockTS); err != nil {
return 0, 0, errors.Trace(err)
}
if err := mvcc.db.Write(batch, nil); err != nil {
return 0, 0, errors.Trace(err)
}
return 0, 0, nil
}

return 0, 0, &ErrTxnNotFound{kvrpcpb.TxnNotFound{
StartTs: lockTS,
PrimaryKey: primaryKey,
}}
}

// TxnHeartBeat implements the MVCCStore interface.
Expand Down Expand Up @@ -1220,7 +1238,7 @@ func (mvcc *MVCCLevelDB) ResolveLock(startKey, endKey []byte, startTS, commitTS
if commitTS > 0 {
err = commitLock(batch, dec.lock, currKey, startTS, commitTS)
} else {
err = rollbackLock(batch, dec.lock, currKey, startTS)
err = rollbackLock(batch, currKey, startTS)
}
if err != nil {
return errors.Trace(err)
Expand Down Expand Up @@ -1260,7 +1278,7 @@ func (mvcc *MVCCLevelDB) BatchResolveLock(startKey, endKey []byte, txnInfos map[
if commitTS > 0 {
err = commitLock(batch, dec.lock, currKey, dec.lock.startTS, commitTS)
} else {
err = rollbackLock(batch, dec.lock, currKey, dec.lock.startTS)
err = rollbackLock(batch, currKey, dec.lock.startTS)
}
if err != nil {
return errors.Trace(err)
Expand Down
7 changes: 6 additions & 1 deletion store/mockstore/mocktikv/rpc.go
Expand Up @@ -98,6 +98,11 @@ func convertToKeyError(err error) *kvrpcpb.KeyError {
CommitTsExpired: &expired.CommitTsExpired,
}
}
if tmp, ok := errors.Cause(err).(*ErrTxnNotFound); ok {
return &kvrpcpb.KeyError{
TxnNotFound: &tmp.TxnNotFound,
}
}
return &kvrpcpb.KeyError{
Abort: err.Error(),
}
Expand Down Expand Up @@ -382,7 +387,7 @@ func (h *rpcHandler) handleKvCheckTxnStatus(req *kvrpcpb.CheckTxnStatusRequest)
panic("KvCheckTxnStatus: key not in region")
}
var resp kvrpcpb.CheckTxnStatusResponse
ttl, commitTS, err := h.mvccStore.CheckTxnStatus(req.GetPrimaryKey(), req.GetLockTs(), req.GetCallerStartTs(), req.GetCurrentTs())
ttl, commitTS, err := h.mvccStore.CheckTxnStatus(req.GetPrimaryKey(), req.GetLockTs(), req.GetCallerStartTs(), req.GetCurrentTs(), req.GetRollbackIfNotExist())
if err != nil {
resp.Error = convertToKeyError(err)
} else {
Expand Down
6 changes: 6 additions & 0 deletions store/tikv/2pc.go
Expand Up @@ -24,6 +24,7 @@ import (

"github.com/opentracing/opentracing-go"
"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
pb "github.com/pingcap/kvproto/pkg/kvrpcpb"
"github.com/pingcap/parser/terror"
"github.com/pingcap/tidb/kv"
Expand Down Expand Up @@ -1098,6 +1099,11 @@ func (c *twoPhaseCommitter) execute(ctx context.Context) error {
return err
}

failpoint.Inject("mockSleepBetween2PC", func() error {
time.Sleep(100 * time.Millisecond)
return nil
})

start = time.Now()
commitBo := NewBackoffer(ctx, CommitMaxBackoff).WithVars(c.txn.vars)
err = c.commitKeys(commitBo, c.keys)
Expand Down
2 changes: 1 addition & 1 deletion store/tikv/2pc_test.go
Expand Up @@ -597,7 +597,7 @@ func (s *testCommitterSuite) TestPessimisticTTL(c *C) {

lr := newLockResolver(s.store)
bo := NewBackoffer(context.Background(), getMaxBackoff)
status, err := lr.getTxnStatus(bo, txn.startTS, key2, 0, txn.startTS)
status, err := lr.getTxnStatus(bo, txn.startTS, key2, 0, txn.startTS, true)
c.Assert(err, IsNil)
c.Assert(status.ttl, GreaterEqual, lockInfo.LockTtl)

Expand Down
7 changes: 6 additions & 1 deletion store/tikv/backoff.go
Expand Up @@ -142,6 +142,7 @@ const (
BoRegionMiss
BoUpdateLeader
boServerBusy
boTxnNotFound
)

func (t backoffType) createFn(vars *kv.Variables) func(context.Context, int) int {
Expand All @@ -160,6 +161,8 @@ func (t backoffType) createFn(vars *kv.Variables) func(context.Context, int) int
case BoRegionMiss:
// change base time to 2ms, because it may recover soon.
return NewBackoffFn(2, 500, NoJitter)
case boTxnNotFound:
return NewBackoffFn(2, 500, NoJitter)
case BoUpdateLeader:
return NewBackoffFn(1, 10, NoJitter)
case boServerBusy:
Expand All @@ -184,6 +187,8 @@ func (t backoffType) String() string {
return "updateLeader"
case boServerBusy:
return "serverBusy"
case boTxnNotFound:
return "txnNotFound"
}
return ""
}
Expand All @@ -192,7 +197,7 @@ func (t backoffType) TError() error {
switch t {
case boTiKVRPC:
return ErrTiKVServerTimeout
case BoTxnLock, boTxnLockFast:
case BoTxnLock, boTxnLockFast, boTxnNotFound:
return ErrResolveLockTimeout
case BoPDRPC:
return ErrPDServerTimeout
Expand Down