Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

store/tikv: refactor the ResolveLocks() function for large transaction's implementation #11999

Merged
merged 22 commits into from Oct 14, 2019
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
6 changes: 4 additions & 2 deletions store/tikv/2pc.go
Expand Up @@ -564,7 +564,9 @@ func (c *twoPhaseCommitter) prewriteSingleBatch(bo *Backoffer, batch batchKeys)
locks = append(locks, lock)
}
start := time.Now()
msBeforeExpired, err := c.store.lockResolver.ResolveLocks(bo, locks)

// Prewrite pessimistic transaction should not meet lock?
msBeforeExpired, _, err := c.store.lockResolver.ResolveLocks(bo, c.startTS, locks)
youjiali1995 marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
return errors.Trace(err)
}
Expand Down Expand Up @@ -647,7 +649,7 @@ func (c *twoPhaseCommitter) pessimisticLockSingleBatch(bo *Backoffer, batch batc
}
locks = append(locks, lock)
}
_, err = c.store.lockResolver.ResolveLocks(bo, locks)
_, _, err = c.store.lockResolver.ResolveLocks(bo, c.startTS, locks)
if err != nil {
return errors.Trace(err)
}
Expand Down
2 changes: 1 addition & 1 deletion store/tikv/coprocessor.go
Expand Up @@ -782,7 +782,7 @@ func (worker *copIteratorWorker) handleCopResponse(bo *Backoffer, rpcCtx *RPCCon
if lockErr := resp.pbResp.GetLocked(); lockErr != nil {
logutil.BgLogger().Debug("coprocessor encounters",
zap.Stringer("lock", lockErr))
msBeforeExpired, err1 := worker.store.lockResolver.ResolveLocks(bo, []*Lock{NewLock(lockErr)})
msBeforeExpired, _, err1 := worker.store.lockResolver.ResolveLocks(bo, worker.req.StartTs, []*Lock{NewLock(lockErr)})
if err1 != nil {
return nil, errors.Trace(err1)
}
Expand Down
97 changes: 82 additions & 15 deletions store/tikv/lock_resolver.go
Expand Up @@ -266,59 +266,126 @@ func (lr *LockResolver) BatchResolveLocks(bo *Backoffer, locks []*Lock, loc Regi
// commit status.
// 3) Send `ResolveLock` cmd to the lock's region to resolve all locks belong to
// the same transaction.
func (lr *LockResolver) ResolveLocks(bo *Backoffer, locks []*Lock) (msBeforeTxnExpired int64, err error) {
func (lr *LockResolver) ResolveLocks(bo *Backoffer, startTS uint64, locks []*Lock) (msBeforeTxnExpired int64, resolved []uint64, err error) {
if len(locks) == 0 {
return
}

tikvLockResolverCountWithResolve.Inc()

var expiredLocks []*Lock
var expiredSecondaryLocks []*Lock
for _, l := range locks {
msBeforeLockExpired := lr.store.GetOracle().UntilExpired(l.TxnID, l.TTL)
if msBeforeLockExpired <= 0 {
tikvLockResolverCountWithExpired.Inc()
expiredLocks = append(expiredLocks, l)
expiredSecondaryLocks = append(expiredSecondaryLocks, l)
} else {
if msBeforeTxnExpired == 0 || msBeforeLockExpired < msBeforeTxnExpired {
msBeforeTxnExpired = msBeforeLockExpired
}
tikvLockResolverCountWithNotExpired.Inc()
}
}
if len(expiredLocks) == 0 {
if len(expiredSecondaryLocks) == 0 {
if msBeforeTxnExpired > 0 {
tikvLockResolverCountWithWaitExpired.Inc()
}
return
}

currentTS, err := lr.store.GetOracle().GetLowResolutionTimestamp(bo.ctx)
if err != nil {
return 0, nil, err
}

// TxnID -> []Region, record resolved Regions.
// TODO: Maybe put it in LockResolver and share by all txns.
cleanTxns := make(map[uint64]map[RegionVerID]struct{})
for _, l := range expiredLocks {
for _, l := range expiredSecondaryLocks {
var status TxnStatus
status, err = lr.getTxnStatus(bo, l.TxnID, l.Primary)
status, err = lr.checkTxnStatus(bo, l.TxnID, l.Primary, startTS, currentTS)
if err != nil {
msBeforeTxnExpired = 0
err = errors.Trace(err)
return
}

cleanRegions, exists := cleanTxns[l.TxnID]
if !exists {
cleanRegions = make(map[RegionVerID]struct{})
cleanTxns[l.TxnID] = cleanRegions
if status.ttl == 0 {
// If the lock is committed or rollbacked, resolve lock.
cleanRegions, exists := cleanTxns[l.TxnID]
if !exists {
cleanRegions = make(map[RegionVerID]struct{})
cleanTxns[l.TxnID] = cleanRegions
}

err = lr.resolveLock(bo, l, status, cleanRegions)
if err != nil {
msBeforeTxnExpired = 0
err = errors.Trace(err)
return
}
} else {
// If the lock is valid, the txn may be a large transaction.
resolved = append(resolved, l.TxnID)
msBeforeLockExpired := lr.store.GetOracle().UntilExpired(l.TxnID, status.ttl)
if msBeforeLockExpired <= 0 {
// The txn is a large transaction, and it's primary lock will expire soon, but
// TxnHeartBeat could update the TTL, so we should not clean up the lock.
continue
}
if msBeforeTxnExpired == 0 || msBeforeLockExpired < msBeforeTxnExpired {
msBeforeTxnExpired = msBeforeLockExpired
}
}
}
return
}

err = lr.resolveLock(bo, l, status, cleanRegions)
func (lr *LockResolver) checkTxnStatus(bo *Backoffer, txnID uint64, primary []byte, callerStartTS uint64, currentTS uint64) (TxnStatus, error) {
var status TxnStatus
req := tikvrpc.NewRequest(tikvrpc.CmdCheckTxnStatus, &kvrpcpb.CheckTxnStatusRequest{
PrimaryKey: primary,
LockTs: txnID,
CallerStartTs: callerStartTS,
CurrentTs: currentTS,
})
for {
loc, err := lr.store.GetRegionCache().LocateKey(bo, primary)
if err != nil {
msBeforeTxnExpired = 0
err = errors.Trace(err)
return
return status, errors.Trace(err)
}
resp, err := lr.store.SendReq(bo, req, loc.Region, readTimeoutShort)
if err != nil {
return status, errors.Trace(err)
}
regionErr, err := resp.GetRegionError()
if err != nil {
return status, errors.Trace(err)
}
if regionErr != nil {
err = bo.Backoff(BoRegionMiss, errors.New(regionErr.String()))
if err != nil {
return status, errors.Trace(err)
}
continue
}
if resp.Resp == nil {
return status, errors.Trace(ErrBodyMissing)
}
cmdResp := resp.Resp.(*kvrpcpb.CheckTxnStatusResponse)
if keyErr := cmdResp.GetError(); keyErr != nil {
err = errors.Errorf("unexpected err: %s, tid: %v", keyErr, txnID)
logutil.BgLogger().Error("checkTxnStatus error", zap.Error(err))
return status, err
}
if cmdResp.LockTtl != 0 {
status.ttl = cmdResp.LockTtl
} else {
status.commitTS = cmdResp.CommitVersion
}
lr.saveResolved(txnID, status)
return status, nil
}
return
}

// GetTxnStatus queries tikv-server for a txn's status (commit/rollback).
Expand Down
34 changes: 34 additions & 0 deletions store/tikv/lock_test.go
Expand Up @@ -228,6 +228,40 @@ func (s *testLockSuite) TestTxnHeartBeat(c *C) {
c.Assert(newTTL, Equals, uint64(0))
}

func (s *testLockSuite) TestCheckTxnStatus(c *C) {
txn, err := s.store.Begin()
c.Assert(err, IsNil)
txn.Set(kv.Key("key"), []byte("value"))
s.prewriteTxn(c, txn.(*tikvTxn))

oracle := s.store.GetOracle()
currentTS, err := oracle.GetTimestamp(context.Background())
c.Assert(err, IsNil)
bo := NewBackoffer(context.Background(), prewriteMaxBackoff)
status, err := newLockResolver(s.store).checkTxnStatus(bo, txn.StartTS(), []byte("key"), currentTS, currentTS)
c.Assert(err, IsNil)
c.Assert(status.IsCommitted(), IsFalse)
c.Assert(status.ttl, Greater, uint64(0))
c.Assert(status.CommitTS(), Equals, uint64(0))

// The getTxnStatus API is confusing, it really means rollback!
status, err = newLockResolver(s.store).getTxnStatus(bo, txn.StartTS(), []byte("key"))
c.Assert(err, IsNil)

currentTS, err = oracle.GetTimestamp(context.Background())
c.Assert(err, IsNil)
status, err = newLockResolver(s.store).checkTxnStatus(bo, txn.StartTS(), []byte("key"), currentTS, currentTS)
c.Assert(err, IsNil)
c.Assert(status.ttl, Equals, uint64(0))
c.Assert(status.commitTS, Equals, uint64(0))

startTS, commitTS := s.putKV(c, []byte("a"), []byte("a"))
status, err = newLockResolver(s.store).checkTxnStatus(bo, startTS, []byte("a"), currentTS, currentTS)
c.Assert(err, IsNil)
c.Assert(status.ttl, Equals, uint64(0))
c.Assert(status.commitTS, Equals, commitTS)
}

func (s *testLockSuite) prewriteTxn(c *C, txn *tikvTxn) {
committer, err := newTwoPhaseCommitterWithInit(txn, 0)
c.Assert(err, IsNil)
Expand Down
4 changes: 2 additions & 2 deletions store/tikv/snapshot.go
Expand Up @@ -234,7 +234,7 @@ func (s *tikvSnapshot) batchGetSingleRegion(bo *Backoffer, batch batchKeys, coll
locks = append(locks, lock)
}
if len(lockedKeys) > 0 {
msBeforeExpired, err := s.store.lockResolver.ResolveLocks(bo, locks)
msBeforeExpired, _, err := s.store.lockResolver.ResolveLocks(bo, s.version.Ver, locks)
if err != nil {
return errors.Trace(err)
}
Expand Down Expand Up @@ -312,7 +312,7 @@ func (s *tikvSnapshot) get(bo *Backoffer, k kv.Key) ([]byte, error) {
if err != nil {
return nil, errors.Trace(err)
}
msBeforeExpired, err := s.store.lockResolver.ResolveLocks(bo, []*Lock{lock})
msBeforeExpired, _, err := s.store.lockResolver.ResolveLocks(bo, s.version.Ver, []*Lock{lock})
if err != nil {
return nil, errors.Trace(err)
}
Expand Down