Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

store/tikv: refactor the ResolveLocks() function for large transaction's implementation #11999

Merged
merged 22 commits into from Oct 14, 2019
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
4 changes: 2 additions & 2 deletions store/tikv/2pc.go
Expand Up @@ -565,7 +565,7 @@ func (c *twoPhaseCommitter) prewriteSingleBatch(bo *Backoffer, batch batchKeys)
locks = append(locks, lock)
}
start := time.Now()
msBeforeExpired, err := c.store.lockResolver.ResolveLocks(bo, locks)
msBeforeExpired, err := c.store.lockResolver.ResolveLocks(bo, c.startTS, locks)
if err != nil {
return errors.Trace(err)
}
Expand Down Expand Up @@ -722,7 +722,7 @@ func (c *twoPhaseCommitter) pessimisticLockSingleBatch(bo *Backoffer, batch batc
}
locks = append(locks, lock)
}
_, err = c.store.lockResolver.ResolveLocks(bo, locks)
_, err = c.store.lockResolver.ResolveLocks(bo, c.startTS, locks)
if err != nil {
return errors.Trace(err)
}
Expand Down
2 changes: 1 addition & 1 deletion store/tikv/2pc_test.go
Expand Up @@ -542,7 +542,7 @@ func (s *testCommitterSuite) TestPessimisticTTL(c *C) {

lr := newLockResolver(s.store)
bo := NewBackoffer(context.Background(), getMaxBackoff)
status, err := lr.getTxnStatus(bo, txn.startTS, key2, txn.startTS)
status, err := lr.getTxnStatus(bo, txn.startTS, key2, 0, txn.startTS)
c.Assert(err, IsNil)
c.Assert(status.ttl, Equals, lockInfo.LockTtl)

Expand Down
2 changes: 1 addition & 1 deletion store/tikv/coprocessor.go
Expand Up @@ -784,7 +784,7 @@ func (worker *copIteratorWorker) handleCopResponse(bo *Backoffer, rpcCtx *RPCCon
if lockErr := resp.pbResp.GetLocked(); lockErr != nil {
logutil.BgLogger().Debug("coprocessor encounters",
zap.Stringer("lock", lockErr))
msBeforeExpired, err1 := worker.store.lockResolver.ResolveLocks(bo, []*Lock{NewLock(lockErr)})
msBeforeExpired, err1 := worker.store.lockResolver.ResolveLocks(bo, worker.req.StartTs, []*Lock{NewLock(lockErr)})
if err1 != nil {
return nil, errors.Trace(err1)
}
Expand Down
61 changes: 34 additions & 27 deletions store/tikv/lock_resolver.go
Expand Up @@ -206,7 +206,8 @@ func (lr *LockResolver) BatchResolveLocks(bo *Backoffer, locks []*Lock, loc Regi
continue
}

status, err := lr.getTxnStatus(bo, l.TxnID, l.Primary, 0)
// Use math.MaxUint64 as currentTS, so the txn is either committed or rollbacked.
status, err := lr.getTxnStatus(bo, l.TxnID, l.Primary, 0, math.MaxUint64)
tiancaiamao marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
return false, errors.Trace(err)
}
Expand Down Expand Up @@ -267,7 +268,7 @@ func (lr *LockResolver) BatchResolveLocks(bo *Backoffer, locks []*Lock, loc Regi
// commit status.
// 3) Send `ResolveLock` cmd to the lock's region to resolve all locks belong to
// the same transaction.
func (lr *LockResolver) ResolveLocks(bo *Backoffer, locks []*Lock) (int64, error) {
func (lr *LockResolver) ResolveLocks(bo *Backoffer, startTS uint64, locks []*Lock) (int64, error) {
tiancaiamao marked this conversation as resolved.
Show resolved Hide resolved
var msBeforeTxnExpired txnExpireTime
if len(locks) == 0 {
return msBeforeTxnExpired.value(), nil
Expand All @@ -289,7 +290,7 @@ func (lr *LockResolver) ResolveLocks(bo *Backoffer, locks []*Lock) (int64, error
// TODO: Maybe put it in LockResolver and share by all txns.
cleanTxns := make(map[uint64]map[RegionVerID]struct{})
for _, l := range expiredLocks {
status, err := lr.getTxnStatusFromLock(bo, l)
status, err := lr.getTxnStatusFromLock(bo, l, startTS, cleanTxns)
if err != nil {
msBeforeTxnExpired.update(0)
err = errors.Trace(err)
Expand Down Expand Up @@ -364,37 +365,44 @@ func (lr *LockResolver) GetTxnStatus(txnID uint64, primary []byte) (TxnStatus, e
if err != nil {
return status, err
}
return lr.getTxnStatus(bo, txnID, primary, currentTS)
return lr.getTxnStatus(bo, txnID, primary, 0, currentTS)
}

func (lr *LockResolver) getTxnStatusFromLock(bo *Backoffer, l *Lock) (TxnStatus, error) {
func (lr *LockResolver) getTxnStatusFromLock(bo *Backoffer, l *Lock, callerStartTS uint64, cleanTxns map[uint64]map[RegionVerID]struct{}) (TxnStatus, error) {
// NOTE: l.TTL = 0 is a special protocol!!!
// When the pessimistic txn prewrite meets locks of a txn, it should rollback that txn **unconditionally**.
tiancaiamao marked this conversation as resolved.
Show resolved Hide resolved
// In this case, TiKV set the lock TTL = 0, and TiDB use currentTS = 0 to call
// getTxnStatus, and getTxnStatus with currentTS = 0 would rollback the transaction.
// In this case, TiKV use lock TTL = 0 to notify TiDB, and TiDB should resolve the lock!
if l.TTL == 0 {
return lr.getTxnStatus(bo, l.TxnID, l.Primary, 0)
tiancaiamao marked this conversation as resolved.
Show resolved Hide resolved
var status TxnStatus
cleanRegions, exists := cleanTxns[l.TxnID]
if !exists {
cleanRegions = make(map[RegionVerID]struct{})
cleanTxns[l.TxnID] = cleanRegions
}
err := lr.resolveLock(bo, l, status, cleanRegions)
tiancaiamao marked this conversation as resolved.
Show resolved Hide resolved
return status, err
}

currentTS, err := lr.store.GetOracle().GetLowResolutionTimestamp(bo.ctx)
if err != nil {
return TxnStatus{}, err
}
return lr.getTxnStatus(bo, l.TxnID, l.Primary, currentTS)
return lr.getTxnStatus(bo, l.TxnID, l.Primary, callerStartTS, currentTS)
}

func (lr *LockResolver) getTxnStatus(bo *Backoffer, txnID uint64, primary []byte, currentTS uint64) (TxnStatus, error) {
func (lr *LockResolver) getTxnStatus(bo *Backoffer, txnID uint64, primary []byte, callerStartTS, currentTS uint64) (TxnStatus, error) {
if s, ok := lr.getResolved(txnID); ok {
return s, nil
}

tikvLockResolverCountWithQueryTxnStatus.Inc()

var status TxnStatus
req := tikvrpc.NewRequest(tikvrpc.CmdCleanup, &kvrpcpb.CleanupRequest{
Key: primary,
StartVersion: txnID,
CurrentTs: currentTS,
req := tikvrpc.NewRequest(tikvrpc.CmdCheckTxnStatus, &kvrpcpb.CheckTxnStatusRequest{
PrimaryKey: primary,
LockTs: txnID,
CallerStartTs: callerStartTS,
CurrentTs: currentTS,
})
for {
loc, err := lr.store.GetRegionCache().LocateKey(bo, primary)
Expand All @@ -419,25 +427,24 @@ func (lr *LockResolver) getTxnStatus(bo *Backoffer, txnID uint64, primary []byte
if resp.Resp == nil {
return status, errors.Trace(ErrBodyMissing)
}
cmdResp := resp.Resp.(*kvrpcpb.CleanupResponse)
cmdResp := resp.Resp.(*kvrpcpb.CheckTxnStatusResponse)
if keyErr := cmdResp.GetError(); keyErr != nil {
// If the TTL of the primary lock is not outdated, the proto returns a ErrLocked contains the TTL.
if lockInfo := keyErr.GetLocked(); lockInfo != nil {
status.ttl = lockInfo.LockTtl
status.commitTS = 0
return status, nil
}
err = errors.Errorf("unexpected cleanup err: %s, tid: %v", keyErr, txnID)
err = errors.Errorf("unexpected err: %s, tid: %v", keyErr, txnID)
logutil.BgLogger().Error("getTxnStatus error", zap.Error(err))
return status, err
}
if cmdResp.CommitVersion != 0 {
status = TxnStatus{0, cmdResp.GetCommitVersion()}
tikvLockResolverCountWithQueryTxnStatusCommitted.Inc()
if cmdResp.LockTtl != 0 {
status.ttl = cmdResp.LockTtl
} else {
tikvLockResolverCountWithQueryTxnStatusRolledBack.Inc()
if cmdResp.CommitVersion == 0 {
tikvLockResolverCountWithQueryTxnStatusRolledBack.Inc()
} else {
tikvLockResolverCountWithQueryTxnStatusCommitted.Inc()
}

status.commitTS = cmdResp.CommitVersion
lr.saveResolved(txnID, status)
}
lr.saveResolved(txnID, status)
return status, nil
}
}
Expand Down
34 changes: 34 additions & 0 deletions store/tikv/lock_test.go
Expand Up @@ -265,6 +265,40 @@ func (s *testLockSuite) TestTxnHeartBeat(c *C) {
c.Assert(newTTL, Equals, uint64(0))
}

func (s *testLockSuite) TestCheckTxnStatus(c *C) {
txn, err := s.store.Begin()
c.Assert(err, IsNil)
txn.Set(kv.Key("key"), []byte("value"))
s.prewriteTxn(c, txn.(*tikvTxn))

oracle := s.store.GetOracle()
currentTS, err := oracle.GetTimestamp(context.Background())
c.Assert(err, IsNil)
bo := NewBackoffer(context.Background(), prewriteMaxBackoff)
status, err := newLockResolver(s.store).getTxnStatus(bo, txn.StartTS(), []byte("key"), currentTS, currentTS)
c.Assert(err, IsNil)
c.Assert(status.IsCommitted(), IsFalse)
c.Assert(status.ttl, Greater, uint64(0))
c.Assert(status.CommitTS(), Equals, uint64(0))

// It is confusing here, getTxnStatus with currentTS = MaxUint64 really means rollback!
status, err = newLockResolver(s.store).getTxnStatus(bo, txn.StartTS(), []byte("key"), currentTS, math.MaxUint64)
tiancaiamao marked this conversation as resolved.
Show resolved Hide resolved
c.Assert(err, IsNil)

currentTS, err = oracle.GetTimestamp(context.Background())
c.Assert(err, IsNil)
status, err = newLockResolver(s.store).getTxnStatus(bo, txn.StartTS(), []byte("key"), currentTS, currentTS)
c.Assert(err, IsNil)
c.Assert(status.ttl, Equals, uint64(0))
c.Assert(status.commitTS, Equals, uint64(0))

startTS, commitTS := s.putKV(c, []byte("a"), []byte("a"))
status, err = newLockResolver(s.store).getTxnStatus(bo, startTS, []byte("a"), currentTS, currentTS)
c.Assert(err, IsNil)
c.Assert(status.ttl, Equals, uint64(0))
c.Assert(status.commitTS, Equals, commitTS)
}

func (s *testLockSuite) prewriteTxn(c *C, txn *tikvTxn) {
committer, err := newTwoPhaseCommitterWithInit(txn, 0)
c.Assert(err, IsNil)
Expand Down
4 changes: 2 additions & 2 deletions store/tikv/snapshot.go
Expand Up @@ -234,7 +234,7 @@ func (s *tikvSnapshot) batchGetSingleRegion(bo *Backoffer, batch batchKeys, coll
locks = append(locks, lock)
}
if len(lockedKeys) > 0 {
msBeforeExpired, err := s.store.lockResolver.ResolveLocks(bo, locks)
msBeforeExpired, err := s.store.lockResolver.ResolveLocks(bo, s.version.Ver, locks)
if err != nil {
return errors.Trace(err)
}
Expand Down Expand Up @@ -312,7 +312,7 @@ func (s *tikvSnapshot) get(bo *Backoffer, k kv.Key) ([]byte, error) {
if err != nil {
return nil, errors.Trace(err)
}
msBeforeExpired, err := s.store.lockResolver.ResolveLocks(bo, []*Lock{lock})
msBeforeExpired, err := s.store.lockResolver.ResolveLocks(bo, s.version.Ver, []*Lock{lock})
if err != nil {
return nil, errors.Trace(err)
}
Expand Down