Skip to content

Commit

Permalink
store: implement non-block read when coprocessor meets the lock of a …
Browse files Browse the repository at this point in the history
…large transaction (#11986)
  • Loading branch information
tiancaiamao authored and coocood committed Nov 19, 2019
1 parent 087d2bd commit 359a667
Show file tree
Hide file tree
Showing 16 changed files with 333 additions and 64 deletions.
5 changes: 0 additions & 5 deletions go.sum
Expand Up @@ -180,11 +180,6 @@ github.com/pingcap/fn v0.0.0-20191016082858-07623b84a47d/go.mod h1:fMRU1BA1y+r89
github.com/pingcap/goleveldb v0.0.0-20171020122428-b9ff6c35079e h1:P73/4dPCL96rGrobssy1nVy2VaVpNCuLpCbr+FEaTA8=
github.com/pingcap/goleveldb v0.0.0-20171020122428-b9ff6c35079e/go.mod h1:O17XtbryoCJhkKGbT62+L2OlrniwqiGLSqrmdHCMzZw=
github.com/pingcap/kvproto v0.0.0-20190822090350-11ea838aedf7/go.mod h1:QMdbTAXCHzzygQzqcG9uVUgU2fKeSN1GmfMiykdSzzY=
github.com/pingcap/kvproto v0.0.0-20191113075618-7ce83b774d70 h1:l9VcGUPRHvmM7mkFHo4JqxZeCvioRuL1/4tFUQcs6jQ=
github.com/pingcap/kvproto v0.0.0-20191113075618-7ce83b774d70/go.mod h1:WWLmULLO7l8IOcQG+t+ItJ3fEcrL5FxF0Wu+HrMy26w=
github.com/pingcap/kvproto v0.0.0-20191113115126-45e0702fff1e/go.mod h1:WWLmULLO7l8IOcQG+t+ItJ3fEcrL5FxF0Wu+HrMy26w=
github.com/pingcap/kvproto v0.0.0-20191118030148-ec389ef1b41f h1:CJ1IdT7bPbIvyq2Of9VhC/fhEGh6+0ItdT1dPBv7x7I=
github.com/pingcap/kvproto v0.0.0-20191118030148-ec389ef1b41f/go.mod h1:WWLmULLO7l8IOcQG+t+ItJ3fEcrL5FxF0Wu+HrMy26w=
github.com/pingcap/kvproto v0.0.0-20191118050206-47672e7eabc0 h1:CHOC95Ct4abJ9EdmWqzpUxV+bgjB4lOxd3AFxqgoyzQ=
github.com/pingcap/kvproto v0.0.0-20191118050206-47672e7eabc0/go.mod h1:WWLmULLO7l8IOcQG+t+ItJ3fEcrL5FxF0Wu+HrMy26w=
github.com/pingcap/log v0.0.0-20190715063458-479153f07ebd/go.mod h1:WpHUKhNZ18v116SvGrmjkA9CBhYmuUTKL+p8JC9ANEw=
Expand Down
2 changes: 2 additions & 0 deletions session/session_test.go
Expand Up @@ -2420,6 +2420,7 @@ func (s *testSessionSuite) TestDBUserNameLength(c *C) {
}

func (s *testSessionSuite) TestKVVars(c *C) {
c.Skip("there is no backoff here in the large txn, so this test is stale")
tk := testkit.NewTestKitWithInit(c, s.store)
tk.MustExec("create table kvvars (a int, b int)")
tk.MustExec("insert kvvars values (1, 1)")
Expand Down Expand Up @@ -2456,6 +2457,7 @@ func (s *testSessionSuite) TestKVVars(c *C) {
}()
wg.Wait()
c.Assert(failpoint.Disable("github.com/pingcap/tidb/store/tikv/mockSleepBetween2PC"), IsNil)

for {
tk2.MustQuery("select * from kvvars")
if atomic.LoadInt32(backOffWeightVal) != 0 {
Expand Down
2 changes: 1 addition & 1 deletion store/mockstore/mocktikv/cluster.go
Expand Up @@ -445,7 +445,7 @@ func (c *Cluster) splitRange(mvccStore MVCCStore, start, end MvccKey, count int)
func (c *Cluster) getEntriesGroupByRegions(mvccStore MVCCStore, start, end MvccKey, count int) [][]Pair {
startTS := uint64(math.MaxUint64)
limit := int(math.MaxInt32)
pairs := mvccStore.Scan(start.Raw(), end.Raw(), limit, startTS, kvrpcpb.IsolationLevel_SI)
pairs := mvccStore.Scan(start.Raw(), end.Raw(), limit, startTS, kvrpcpb.IsolationLevel_SI, nil)
regionEntriesSlice := make([][]Pair, 0, count)
quotient := len(pairs) / count
remainder := len(pairs) % count
Expand Down
4 changes: 2 additions & 2 deletions store/mockstore/mocktikv/cluster_test.go
Expand Up @@ -88,7 +88,7 @@ func (s *testClusterSuite) TestClusterSplit(c *C) {
if !bytes.HasPrefix(startKey, recordPrefix) {
continue
}
pairs := mvccStore.Scan(startKey, endKey, math.MaxInt64, math.MaxUint64, kvrpcpb.IsolationLevel_SI)
pairs := mvccStore.Scan(startKey, endKey, math.MaxInt64, math.MaxUint64, kvrpcpb.IsolationLevel_SI, nil)
if len(pairs) > 0 {
c.Assert(pairs, HasLen, 100)
}
Expand All @@ -109,7 +109,7 @@ func (s *testClusterSuite) TestClusterSplit(c *C) {
if !bytes.HasPrefix(startKey, indexPrefix) {
continue
}
pairs := mvccStore.Scan(startKey, endKey, math.MaxInt64, math.MaxUint64, kvrpcpb.IsolationLevel_SI)
pairs := mvccStore.Scan(startKey, endKey, math.MaxInt64, math.MaxUint64, kvrpcpb.IsolationLevel_SI, nil)
if len(pairs) > 0 {
c.Assert(pairs, HasLen, 100)
}
Expand Down
2 changes: 2 additions & 0 deletions store/mockstore/mocktikv/cop_handler_dag.go
Expand Up @@ -202,6 +202,7 @@ func (h *rpcHandler) buildTableScan(ctx *dagContext, executor *tipb.Executor) (*
colIDs: ctx.evalCtx.colIDs,
startTS: ctx.dagReq.GetStartTs(),
isolationLevel: h.isolationLevel,
resolvedLocks: h.resolvedLocks,
mvccStore: h.mvccStore,
execDetail: new(execDetail),
}
Expand Down Expand Up @@ -240,6 +241,7 @@ func (h *rpcHandler) buildIndexScan(ctx *dagContext, executor *tipb.Executor) (*
colsLen: len(columns),
startTS: ctx.dagReq.GetStartTs(),
isolationLevel: h.isolationLevel,
resolvedLocks: h.resolvedLocks,
mvccStore: h.mvccStore,
pkStatus: pkStatus,
execDetail: new(execDetail),
Expand Down
10 changes: 6 additions & 4 deletions store/mockstore/mocktikv/executor.go
Expand Up @@ -74,6 +74,7 @@ type tableScanExec struct {
kvRanges []kv.KeyRange
startTS uint64
isolationLevel kvrpcpb.IsolationLevel
resolvedLocks []uint64
mvccStore MVCCStore
cursor int
seekKey []byte
Expand Down Expand Up @@ -208,9 +209,9 @@ func (e *tableScanExec) getRowFromRange(ran kv.KeyRange) ([][]byte, error) {
var pairs []Pair
var pair Pair
if e.Desc {
pairs = e.mvccStore.ReverseScan(ran.StartKey, e.seekKey, 1, e.startTS, e.isolationLevel)
pairs = e.mvccStore.ReverseScan(ran.StartKey, e.seekKey, 1, e.startTS, e.isolationLevel, e.resolvedLocks)
} else {
pairs = e.mvccStore.Scan(e.seekKey, ran.EndKey, 1, e.startTS, e.isolationLevel)
pairs = e.mvccStore.Scan(e.seekKey, ran.EndKey, 1, e.startTS, e.isolationLevel, e.resolvedLocks)
}
if len(pairs) > 0 {
pair = pairs[0]
Expand Down Expand Up @@ -251,6 +252,7 @@ type indexScanExec struct {
kvRanges []kv.KeyRange
startTS uint64
isolationLevel kvrpcpb.IsolationLevel
resolvedLocks []uint64
mvccStore MVCCStore
cursor int
seekKey []byte
Expand Down Expand Up @@ -380,9 +382,9 @@ func (e *indexScanExec) getRowFromRange(ran kv.KeyRange) ([][]byte, error) {
var pairs []Pair
var pair Pair
if e.Desc {
pairs = e.mvccStore.ReverseScan(ran.StartKey, e.seekKey, 1, e.startTS, e.isolationLevel)
pairs = e.mvccStore.ReverseScan(ran.StartKey, e.seekKey, 1, e.startTS, e.isolationLevel, e.resolvedLocks)
} else {
pairs = e.mvccStore.Scan(e.seekKey, ran.EndKey, 1, e.startTS, e.isolationLevel)
pairs = e.mvccStore.Scan(e.seekKey, ran.EndKey, 1, e.startTS, e.isolationLevel, e.resolvedLocks)
}
if len(pairs) > 0 {
pair = pairs[0]
Expand Down
101 changes: 101 additions & 0 deletions store/mockstore/mocktikv/executor_test.go
@@ -0,0 +1,101 @@
// Copyright 2019-present, PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package mocktikv_test

import (
"context"

. "github.com/pingcap/check"
"github.com/pingcap/errors"
"github.com/pingcap/kvproto/pkg/kvrpcpb"
"github.com/pingcap/parser/model"
"github.com/pingcap/tidb/domain"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/session"
"github.com/pingcap/tidb/store/mockstore"
"github.com/pingcap/tidb/store/mockstore/mocktikv"
"github.com/pingcap/tidb/tablecodec"
"github.com/pingcap/tidb/util/testkit"
)

var _ = Suite(&testExecutorSuite{})

type testExecutorSuite struct {
cluster *mocktikv.Cluster
store kv.Storage
mvccStore mocktikv.MVCCStore
dom *domain.Domain
}

func (s *testExecutorSuite) SetUpSuite(c *C) {
s.cluster = mocktikv.NewCluster()
mocktikv.BootstrapWithSingleStore(s.cluster)
s.mvccStore = mocktikv.MustNewMVCCStore()
store, err := mockstore.NewMockTikvStore(
mockstore.WithCluster(s.cluster),
mockstore.WithMVCCStore(s.mvccStore),
)
c.Assert(err, IsNil)
s.store = store
session.SetSchemaLease(0)
session.DisableStats4Test()
s.dom, err = session.BootstrapSession(s.store)
c.Assert(err, IsNil)
}

func (s *testExecutorSuite) TearDownSuite(c *C) {
s.dom.Close()
s.store.Close()
}

func (s *testExecutorSuite) TestResolvedLargeTxnLocks(c *C) {
// This test checks the resolve lock functionality.
// When a txn meets the lock of a large transaction, it should not block by the
// lock.
tk := testkit.NewTestKit(c, s.store)
tk.MustExec("use test")
tk.MustExec("create table t (id int primary key, val int)")
dom := domain.GetDomain(tk.Se)
schema := dom.InfoSchema()
tbl, err := schema.TableByName(model.NewCIStr("test"), model.NewCIStr("t"))
c.Assert(err, IsNil)

tk.MustExec("insert into t values (1, 1)")

oracle := s.store.GetOracle()
tso, err := oracle.GetTimestamp(context.Background())
c.Assert(err, IsNil)

key := tablecodec.EncodeRowKeyWithHandle(tbl.Meta().ID, 1)
pairs := s.mvccStore.Scan(key, nil, 1, tso, kvrpcpb.IsolationLevel_SI, nil)
c.Assert(pairs, HasLen, 1)
c.Assert(pairs[0].Err, IsNil)

// Simulate a large txn (holding a pk lock with large TTL).
// Secondary lock 200ms, primary lock 100s
mocktikv.MustPrewriteOK(c, s.mvccStore, mocktikv.PutMutations("primary", "value"), "primary", tso, 100000)
mocktikv.MustPrewriteOK(c, s.mvccStore, mocktikv.PutMutations(string(key), "value"), "primary", tso, 200)

// Simulate the action of reading meet the lock of a large txn.
// The lock of the large transaction should not block read.
// The first time, this query should meet a lock on the secondary key, then resolve lock.
// After that, the query should read the previous version data.
tk.MustQuery("select * from t").Check(testkit.Rows("1 1"))

// And check the large txn is still alive.
pairs = s.mvccStore.Scan([]byte("primary"), nil, 1, tso, kvrpcpb.IsolationLevel_SI, nil)
c.Assert(pairs, HasLen, 1)
_, ok := errors.Cause(pairs[0].Err).(*mocktikv.ErrLocked)
c.Assert(ok, IsTrue)
}
29 changes: 27 additions & 2 deletions store/mockstore/mocktikv/mock_tikv_test.go
Expand Up @@ -51,6 +51,9 @@ func (s *testMockTiKVSuite) SetUpTest(c *C) {
c.Assert(err, IsNil)
}

// PutMutations is exported for testing.
var PutMutations func(kvpairs ...string) []*kvrpcpb.Mutation = putMutations

func putMutations(kvpairs ...string) []*kvrpcpb.Mutation {
var mutations []*kvrpcpb.Mutation
for i := 0; i < len(kvpairs); i += 2 {
Expand Down Expand Up @@ -134,7 +137,7 @@ func (s *testMockTiKVSuite) mustScanOK(c *C, start string, limit int, ts uint64,
}

func (s *testMockTiKVSuite) mustRangeScanOK(c *C, start, end string, limit int, ts uint64, expect ...string) {
pairs := s.store.Scan([]byte(start), []byte(end), limit, ts, kvrpcpb.IsolationLevel_SI)
pairs := s.store.Scan([]byte(start), []byte(end), limit, ts, kvrpcpb.IsolationLevel_SI, nil)
c.Assert(len(pairs)*2, Equals, len(expect))
for i := 0; i < len(pairs); i++ {
c.Assert(pairs[i].Err, IsNil)
Expand All @@ -148,7 +151,7 @@ func (s *testMockTiKVSuite) mustReverseScanOK(c *C, end string, limit int, ts ui
}

func (s *testMockTiKVSuite) mustRangeReverseScanOK(c *C, start, end string, limit int, ts uint64, expect ...string) {
pairs := s.store.ReverseScan([]byte(start), []byte(end), limit, ts, kvrpcpb.IsolationLevel_SI)
pairs := s.store.ReverseScan([]byte(start), []byte(end), limit, ts, kvrpcpb.IsolationLevel_SI, nil)
c.Assert(len(pairs)*2, Equals, len(expect))
for i := 0; i < len(pairs); i++ {
c.Assert(pairs[i].Err, IsNil)
Expand All @@ -157,6 +160,11 @@ func (s *testMockTiKVSuite) mustRangeReverseScanOK(c *C, start, end string, limi
}
}

func MustPrewriteOK(c *C, store MVCCStore, mutations []*kvrpcpb.Mutation, primary string, startTS uint64, ttl uint64) {
s := testMockTiKVSuite{store}
s.mustPrewriteWithTTLOK(c, mutations, primary, startTS, ttl)
}

func (s *testMockTiKVSuite) mustPrewriteOK(c *C, mutations []*kvrpcpb.Mutation, primary string, startTS uint64) {
s.mustPrewriteWithTTLOK(c, mutations, primary, startTS, 0)
}
Expand Down Expand Up @@ -433,6 +441,23 @@ func (s *testMockTiKVSuite) TestScanLock(c *C) {
})
}

func (s *testMockTiKVSuite) TestScanWithResolvedLock(c *C) {
s.mustPrewriteOK(c, putMutations("p1", "v5", "s1", "v5"), "p1", 5)
s.mustPrewriteOK(c, putMutations("p2", "v10", "s2", "v10"), "p1", 5)

pairs := s.store.Scan([]byte("p1"), nil, 3, 10, kvrpcpb.IsolationLevel_SI, nil)
lock, ok := errors.Cause(pairs[0].Err).(*ErrLocked)
c.Assert(ok, IsTrue)
_, ok = errors.Cause(pairs[1].Err).(*ErrLocked)
c.Assert(ok, IsTrue)

// Mock the request after resolving lock.
pairs = s.store.Scan([]byte("p1"), nil, 3, 10, kvrpcpb.IsolationLevel_SI, []uint64{lock.StartTS})
for _, pair := range pairs {
c.Assert(pair.Err, IsNil)
}
}

func (s *testMockTiKVSuite) TestCommitConflict(c *C) {
// txn A want set x to A
// txn B want set x to B
Expand Down
16 changes: 11 additions & 5 deletions store/mockstore/mocktikv/mvcc.go
Expand Up @@ -207,7 +207,7 @@ func (l *mvccLock) lockErr(key []byte) error {
}
}

func (l *mvccLock) check(ts uint64, key []byte) (uint64, error) {
func (l *mvccLock) check(ts uint64, key []byte, resolvedLocks []uint64) (uint64, error) {
// ignore when ts is older than lock or lock's type is Lock.
// Pessimistic lock doesn't block read.
if l.startTS > ts || l.op == kvrpcpb.Op_Lock || l.op == kvrpcpb.Op_PessimisticLock {
Expand All @@ -217,17 +217,23 @@ func (l *mvccLock) check(ts uint64, key []byte) (uint64, error) {
if ts == math.MaxUint64 && bytes.Equal(l.primary, key) {
return l.startTS - 1, nil
}
// Skip lock if the lock is resolved.
for _, resolved := range resolvedLocks {
if l.startTS == resolved {
return ts, nil
}
}
return 0, l.lockErr(key)
}

func (e *mvccEntry) Less(than btree.Item) bool {
return bytes.Compare(e.key, than.(*mvccEntry).key) < 0
}

func (e *mvccEntry) Get(ts uint64, isoLevel kvrpcpb.IsolationLevel) ([]byte, error) {
func (e *mvccEntry) Get(ts uint64, isoLevel kvrpcpb.IsolationLevel, resolvedLocks []uint64) ([]byte, error) {
if isoLevel == kvrpcpb.IsolationLevel_SI && e.lock != nil {
var err error
ts, err = e.lock.check(ts, e.key.Raw())
ts, err = e.lock.check(ts, e.key.Raw(), resolvedLocks)
if err != nil {
return nil, err
}
Expand All @@ -252,8 +258,8 @@ func (e *rawEntry) Less(than btree.Item) bool {
// MVCCStore is a mvcc key-value storage.
type MVCCStore interface {
Get(key []byte, startTS uint64, isoLevel kvrpcpb.IsolationLevel) ([]byte, error)
Scan(startKey, endKey []byte, limit int, startTS uint64, isoLevel kvrpcpb.IsolationLevel) []Pair
ReverseScan(startKey, endKey []byte, limit int, startTS uint64, isoLevel kvrpcpb.IsolationLevel) []Pair
Scan(startKey, endKey []byte, limit int, startTS uint64, isoLevel kvrpcpb.IsolationLevel, resolvedLocks []uint64) []Pair
ReverseScan(startKey, endKey []byte, limit int, startTS uint64, isoLevel kvrpcpb.IsolationLevel, resolvedLocks []uint64) []Pair
BatchGet(ks [][]byte, startTS uint64, isoLevel kvrpcpb.IsolationLevel) []Pair
PessimisticLock(mutations []*kvrpcpb.Mutation, primary []byte, startTS,
forUpdateTS uint64, ttl uint64, lockWaitTime int64) []error
Expand Down

0 comments on commit 359a667

Please sign in to comment.