Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

store/tikv: refactor the ResolveLocks() function for large transaction's implementation #11999

Merged
merged 22 commits into from Oct 14, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
4 changes: 2 additions & 2 deletions store/tikv/2pc.go
Expand Up @@ -565,7 +565,7 @@ func (c *twoPhaseCommitter) prewriteSingleBatch(bo *Backoffer, batch batchKeys)
locks = append(locks, lock)
}
start := time.Now()
msBeforeExpired, err := c.store.lockResolver.ResolveLocks(bo, locks)
msBeforeExpired, err := c.store.lockResolver.ResolveLocks(bo, c.startTS, locks)
if err != nil {
return errors.Trace(err)
}
Expand Down Expand Up @@ -722,7 +722,7 @@ func (c *twoPhaseCommitter) pessimisticLockSingleBatch(bo *Backoffer, batch batc
}
locks = append(locks, lock)
}
_, err = c.store.lockResolver.ResolveLocks(bo, locks)
_, err = c.store.lockResolver.ResolveLocks(bo, c.startTS, locks)
if err != nil {
return errors.Trace(err)
}
Expand Down
2 changes: 1 addition & 1 deletion store/tikv/2pc_test.go
Expand Up @@ -542,7 +542,7 @@ func (s *testCommitterSuite) TestPessimisticTTL(c *C) {

lr := newLockResolver(s.store)
bo := NewBackoffer(context.Background(), getMaxBackoff)
status, err := lr.getTxnStatus(bo, txn.startTS, key2, txn.startTS)
status, err := lr.getTxnStatus(bo, txn.startTS, key2, 0, txn.startTS)
c.Assert(err, IsNil)
c.Assert(status.ttl, Equals, lockInfo.LockTtl)

Expand Down
2 changes: 1 addition & 1 deletion store/tikv/coprocessor.go
Expand Up @@ -784,7 +784,7 @@ func (worker *copIteratorWorker) handleCopResponse(bo *Backoffer, rpcCtx *RPCCon
if lockErr := resp.pbResp.GetLocked(); lockErr != nil {
logutil.BgLogger().Debug("coprocessor encounters",
zap.Stringer("lock", lockErr))
msBeforeExpired, err1 := worker.store.lockResolver.ResolveLocks(bo, []*Lock{NewLock(lockErr)})
msBeforeExpired, err1 := worker.store.lockResolver.ResolveLocks(bo, worker.req.StartTs, []*Lock{NewLock(lockErr)})
if err1 != nil {
return nil, errors.Trace(err1)
}
Expand Down
92 changes: 55 additions & 37 deletions store/tikv/lock_resolver.go
Expand Up @@ -17,6 +17,7 @@ import (
"container/list"
"context"
"fmt"
"math"
"sync"
"time"

Expand Down Expand Up @@ -199,17 +200,32 @@ func (lr *LockResolver) BatchResolveLocks(bo *Backoffer, locks []*Lock, loc Regi
return false, nil
}

startTime := time.Now()
startTS, err := lr.store.GetOracle().GetTimestamp(bo.ctx)
if err != nil {
return false, errors.Trace(err)
}

txnInfos := make(map[uint64]uint64)
startTime := time.Now()
for _, l := range expiredLocks {
if _, ok := txnInfos[l.TxnID]; ok {
continue
}

status, err := lr.getTxnStatus(bo, l.TxnID, l.Primary, 0)
currentTS, err := lr.store.GetOracle().GetLowResolutionTimestamp(bo.ctx)
if err != nil {
return false, errors.Trace(err)
return false, err
}
status, err := lr.getTxnStatus(bo, l.TxnID, l.Primary, startTS, currentTS)
if err != nil {
return false, err
}

if status.ttl > 0 {
// Do not clean lock that is not expired.
continue
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This function need to resolve all locks in locks. If some of the locks can't be resolved, this function shouldn't return true.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Then large txn would block GC...

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To continue GC anyway, you should resolve all the locks before cleaning up the data. Otherwise not-committed secondaries may miss their primary.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You may remove these locks regardless of their TTL (of course, primary first), but you can never leave these locks there.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We may need to update the CmdGCRequest instead of simply leave the lock here.
@coocood

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How will GC's logic be? By the way, GC Request is not used by default now.

}

txnInfos[l.TxnID] = uint64(status.commitTS)
}
logutil.BgLogger().Info("BatchResolveLocks: lookup txn status",
Expand Down Expand Up @@ -267,7 +283,7 @@ func (lr *LockResolver) BatchResolveLocks(bo *Backoffer, locks []*Lock, loc Regi
// commit status.
// 3) Send `ResolveLock` cmd to the lock's region to resolve all locks belong to
// the same transaction.
func (lr *LockResolver) ResolveLocks(bo *Backoffer, locks []*Lock) (int64, error) {
func (lr *LockResolver) ResolveLocks(bo *Backoffer, callerStartTS uint64, locks []*Lock) (int64, error) {
var msBeforeTxnExpired txnExpireTime
if len(locks) == 0 {
return msBeforeTxnExpired.value(), nil
Expand All @@ -289,7 +305,7 @@ func (lr *LockResolver) ResolveLocks(bo *Backoffer, locks []*Lock) (int64, error
// TODO: Maybe put it in LockResolver and share by all txns.
cleanTxns := make(map[uint64]map[RegionVerID]struct{})
for _, l := range expiredLocks {
status, err := lr.getTxnStatusFromLock(bo, l)
status, err := lr.getTxnStatusFromLock(bo, l, callerStartTS)
if err != nil {
msBeforeTxnExpired.update(0)
err = errors.Trace(err)
Expand Down Expand Up @@ -357,44 +373,47 @@ func (t *txnExpireTime) value() int64 {
// If the primary key is still locked, it will launch a Rollback to abort it.
// To avoid unnecessarily aborting too many txns, it is wiser to wait a few
// seconds before calling it after Prewrite.
func (lr *LockResolver) GetTxnStatus(txnID uint64, primary []byte) (TxnStatus, error) {
func (lr *LockResolver) GetTxnStatus(txnID uint64, callerStartTS uint64, primary []byte) (TxnStatus, error) {
var status TxnStatus
bo := NewBackoffer(context.Background(), cleanupMaxBackoff)
currentTS, err := lr.store.GetOracle().GetLowResolutionTimestamp(bo.ctx)
if err != nil {
return status, err
}
return lr.getTxnStatus(bo, txnID, primary, currentTS)
return lr.getTxnStatus(bo, txnID, primary, callerStartTS, currentTS)
}

func (lr *LockResolver) getTxnStatusFromLock(bo *Backoffer, l *Lock) (TxnStatus, error) {
// NOTE: l.TTL = 0 is a special protocol!!!
// When the pessimistic txn prewrite meets locks of a txn, it should rollback that txn **unconditionally**.
// In this case, TiKV set the lock TTL = 0, and TiDB use currentTS = 0 to call
// getTxnStatus, and getTxnStatus with currentTS = 0 would rollback the transaction.
func (lr *LockResolver) getTxnStatusFromLock(bo *Backoffer, l *Lock, callerStartTS uint64) (TxnStatus, error) {
var currentTS uint64
if l.TTL == 0 {
return lr.getTxnStatus(bo, l.TxnID, l.Primary, 0)
tiancaiamao marked this conversation as resolved.
Show resolved Hide resolved
}

currentTS, err := lr.store.GetOracle().GetLowResolutionTimestamp(bo.ctx)
if err != nil {
return TxnStatus{}, err
// NOTE: l.TTL = 0 is a special protocol!!!
// When the pessimistic txn prewrite meets locks of a txn, it should resolve the lock **unconditionally**.
// In this case, TiKV use lock TTL = 0 to notify TiDB, and TiDB should resolve the lock!
// Set currentTS to max uint64 to make the lock expired.
currentTS = math.MaxUint64
} else {
var err error
currentTS, err = lr.store.GetOracle().GetLowResolutionTimestamp(bo.ctx)
if err != nil {
return TxnStatus{}, err
}
}
return lr.getTxnStatus(bo, l.TxnID, l.Primary, currentTS)
return lr.getTxnStatus(bo, l.TxnID, l.Primary, callerStartTS, currentTS)
}

func (lr *LockResolver) getTxnStatus(bo *Backoffer, txnID uint64, primary []byte, currentTS uint64) (TxnStatus, error) {
func (lr *LockResolver) getTxnStatus(bo *Backoffer, txnID uint64, primary []byte, callerStartTS, currentTS uint64) (TxnStatus, error) {
if s, ok := lr.getResolved(txnID); ok {
return s, nil
}

tikvLockResolverCountWithQueryTxnStatus.Inc()

var status TxnStatus
req := tikvrpc.NewRequest(tikvrpc.CmdCleanup, &kvrpcpb.CleanupRequest{
Key: primary,
StartVersion: txnID,
CurrentTs: currentTS,
req := tikvrpc.NewRequest(tikvrpc.CmdCheckTxnStatus, &kvrpcpb.CheckTxnStatusRequest{
PrimaryKey: primary,
LockTs: txnID,
CallerStartTs: callerStartTS,
CurrentTs: currentTS,
})
for {
loc, err := lr.store.GetRegionCache().LocateKey(bo, primary)
Expand All @@ -419,25 +438,24 @@ func (lr *LockResolver) getTxnStatus(bo *Backoffer, txnID uint64, primary []byte
if resp.Resp == nil {
return status, errors.Trace(ErrBodyMissing)
}
cmdResp := resp.Resp.(*kvrpcpb.CleanupResponse)
cmdResp := resp.Resp.(*kvrpcpb.CheckTxnStatusResponse)
if keyErr := cmdResp.GetError(); keyErr != nil {
// If the TTL of the primary lock is not outdated, the proto returns a ErrLocked contains the TTL.
if lockInfo := keyErr.GetLocked(); lockInfo != nil {
status.ttl = lockInfo.LockTtl
status.commitTS = 0
return status, nil
}
err = errors.Errorf("unexpected cleanup err: %s, tid: %v", keyErr, txnID)
err = errors.Errorf("unexpected err: %s, tid: %v", keyErr, txnID)
logutil.BgLogger().Error("getTxnStatus error", zap.Error(err))
return status, err
}
if cmdResp.CommitVersion != 0 {
status = TxnStatus{0, cmdResp.GetCommitVersion()}
tikvLockResolverCountWithQueryTxnStatusCommitted.Inc()
if cmdResp.LockTtl != 0 {
status.ttl = cmdResp.LockTtl
} else {
tikvLockResolverCountWithQueryTxnStatusRolledBack.Inc()
if cmdResp.CommitVersion == 0 {
tikvLockResolverCountWithQueryTxnStatusRolledBack.Inc()
} else {
tikvLockResolverCountWithQueryTxnStatusCommitted.Inc()
}

status.commitTS = cmdResp.CommitVersion
lr.saveResolved(txnID, status)
}
lr.saveResolved(txnID, status)
return status, nil
}
}
Expand Down
64 changes: 57 additions & 7 deletions store/tikv/lock_test.go
Expand Up @@ -185,19 +185,19 @@ func (s *testLockSuite) TestCleanLock(c *C) {

func (s *testLockSuite) TestGetTxnStatus(c *C) {
startTS, commitTS := s.putKV(c, []byte("a"), []byte("a"))
status, err := s.store.lockResolver.GetTxnStatus(startTS, []byte("a"))
status, err := s.store.lockResolver.GetTxnStatus(startTS, startTS, []byte("a"))
c.Assert(err, IsNil)
c.Assert(status.IsCommitted(), IsTrue)
c.Assert(status.CommitTS(), Equals, commitTS)

startTS, commitTS = s.lockKey(c, []byte("a"), []byte("a"), []byte("a"), []byte("a"), true)
status, err = s.store.lockResolver.GetTxnStatus(startTS, []byte("a"))
status, err = s.store.lockResolver.GetTxnStatus(startTS, startTS, []byte("a"))
c.Assert(err, IsNil)
c.Assert(status.IsCommitted(), IsTrue)
c.Assert(status.CommitTS(), Equals, commitTS)

startTS, _ = s.lockKey(c, []byte("a"), []byte("a"), []byte("a"), []byte("a"), false)
status, err = s.store.lockResolver.GetTxnStatus(startTS, []byte("a"))
status, err = s.store.lockResolver.GetTxnStatus(startTS, startTS, []byte("a"))
c.Assert(err, IsNil)
c.Assert(status.IsCommitted(), IsFalse)
c.Assert(status.ttl, Greater, uint64(0))
Expand All @@ -209,10 +209,13 @@ func (s *testLockSuite) TestCheckTxnStatusTTL(c *C) {
txn.Set(kv.Key("key"), []byte("value"))
s.prewriteTxn(c, txn.(*tikvTxn))

// Check the lock TTL of a transaction.
bo := NewBackoffer(context.Background(), prewriteMaxBackoff)
lr := newLockResolver(s.store)
status, err := lr.GetTxnStatus(txn.StartTS(), []byte("key"))
callerStartTS, err := lr.store.GetOracle().GetTimestamp(bo.ctx)
c.Assert(err, IsNil)

// Check the lock TTL of a transaction.
status, err := lr.GetTxnStatus(txn.StartTS(), callerStartTS, []byte("key"))
c.Assert(err, IsNil)
c.Assert(status.IsCommitted(), IsFalse)
c.Assert(status.ttl, Greater, uint64(0))
Expand All @@ -226,14 +229,14 @@ func (s *testLockSuite) TestCheckTxnStatusTTL(c *C) {
c.Assert(err, IsNil)

// Check its status is rollbacked.
status, err = lr.GetTxnStatus(txn.StartTS(), []byte("key"))
status, err = lr.GetTxnStatus(txn.StartTS(), callerStartTS, []byte("key"))
c.Assert(err, IsNil)
c.Assert(status.ttl, Equals, uint64(0))
c.Assert(status.commitTS, Equals, uint64(0))

// Check a committed txn.
startTS, commitTS := s.putKV(c, []byte("a"), []byte("a"))
status, err = lr.GetTxnStatus(startTS, []byte("a"))
status, err = lr.GetTxnStatus(startTS, callerStartTS, []byte("a"))
c.Assert(err, IsNil)
c.Assert(status.ttl, Equals, uint64(0))
c.Assert(status.commitTS, Equals, commitTS)
Expand Down Expand Up @@ -265,6 +268,53 @@ func (s *testLockSuite) TestTxnHeartBeat(c *C) {
c.Assert(newTTL, Equals, uint64(0))
}

func (s *testLockSuite) TestCheckTxnStatus(c *C) {
txn, err := s.store.Begin()
c.Assert(err, IsNil)
txn.Set(kv.Key("key"), []byte("value"))
txn.Set(kv.Key("second"), []byte("xxx"))
s.prewriteTxn(c, txn.(*tikvTxn))

oracle := s.store.GetOracle()
currentTS, err := oracle.GetTimestamp(context.Background())
c.Assert(err, IsNil)
bo := NewBackoffer(context.Background(), prewriteMaxBackoff)
resolver := newLockResolver(s.store)
// Call getTxnStatus to check the lock status.
status, err := resolver.getTxnStatus(bo, txn.StartTS(), []byte("key"), currentTS, currentTS)
c.Assert(err, IsNil)
c.Assert(status.IsCommitted(), IsFalse)
c.Assert(status.ttl, Greater, uint64(0))
c.Assert(status.CommitTS(), Equals, uint64(0))

// Test the ResolveLocks API
lock := s.mustGetLock(c, []byte("second"))
timeBeforeExpire, err := resolver.ResolveLocks(bo, currentTS, []*Lock{lock})
c.Assert(err, IsNil)
c.Assert(timeBeforeExpire >= int64(0), IsTrue)

// Force rollback the lock using lock.TTL = 0.
lock.TTL = uint64(0)
timeBeforeExpire, err = resolver.ResolveLocks(bo, currentTS, []*Lock{lock})
c.Assert(err, IsNil)
c.Assert(timeBeforeExpire, Equals, int64(0))

// Then call getTxnStatus again and check the lock status.
currentTS, err = oracle.GetTimestamp(context.Background())
c.Assert(err, IsNil)
status, err = resolver.getTxnStatus(bo, txn.StartTS(), []byte("key"), currentTS, currentTS)
c.Assert(err, IsNil)
c.Assert(status.ttl, Equals, uint64(0))
c.Assert(status.commitTS, Equals, uint64(0))

// Call getTxnStatus on a committed transaction.
startTS, commitTS := s.putKV(c, []byte("a"), []byte("a"))
status, err = newLockResolver(s.store).getTxnStatus(bo, startTS, []byte("a"), currentTS, currentTS)
c.Assert(err, IsNil)
c.Assert(status.ttl, Equals, uint64(0))
c.Assert(status.commitTS, Equals, commitTS)
}

func (s *testLockSuite) prewriteTxn(c *C, txn *tikvTxn) {
committer, err := newTwoPhaseCommitterWithInit(txn, 0)
c.Assert(err, IsNil)
Expand Down
4 changes: 2 additions & 2 deletions store/tikv/snapshot.go
Expand Up @@ -234,7 +234,7 @@ func (s *tikvSnapshot) batchGetSingleRegion(bo *Backoffer, batch batchKeys, coll
locks = append(locks, lock)
}
if len(lockedKeys) > 0 {
msBeforeExpired, err := s.store.lockResolver.ResolveLocks(bo, locks)
msBeforeExpired, err := s.store.lockResolver.ResolveLocks(bo, s.version.Ver, locks)
if err != nil {
return errors.Trace(err)
}
Expand Down Expand Up @@ -312,7 +312,7 @@ func (s *tikvSnapshot) get(bo *Backoffer, k kv.Key) ([]byte, error) {
if err != nil {
return nil, errors.Trace(err)
}
msBeforeExpired, err := s.store.lockResolver.ResolveLocks(bo, []*Lock{lock})
msBeforeExpired, err := s.store.lockResolver.ResolveLocks(bo, s.version.Ver, []*Lock{lock})
if err != nil {
return nil, errors.Trace(err)
}
Expand Down
2 changes: 2 additions & 0 deletions store/tikv/tikvrpc/tikvrpc.go
Expand Up @@ -442,6 +442,8 @@ func FromBatchCommandsResponse(res *tikvpb.BatchCommandsResponse_Response) *Resp
return &Response{Resp: res.Empty}
case *tikvpb.BatchCommandsResponse_Response_TxnHeartBeat:
return &Response{Resp: res.TxnHeartBeat}
case *tikvpb.BatchCommandsResponse_Response_CheckTxnStatus:
return &Response{Resp: res.CheckTxnStatus}
}
return nil
}
Expand Down