Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

store: implement non-block read when coprocessor meets the lock of a large transaction #11986

Merged
merged 33 commits into from Nov 19, 2019
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
eab9b2a
store: implement non-block read when coprocessor meets the lock of a …
tiancaiamao Sep 2, 2019
1bcb0f4
make golint happy
tiancaiamao Sep 2, 2019
d86c0bc
Merge branch 'master' into non-block-read
tiancaiamao Sep 10, 2019
294cfc3
Merge branch 'master' into non-block-read
tiancaiamao Sep 12, 2019
5e57d47
Merge branch 'master' into non-block-read
tiancaiamao Oct 14, 2019
d4e967f
address comment
tiancaiamao Oct 14, 2019
ac063c5
address comment
tiancaiamao Oct 16, 2019
4c5e6ac
address comment
tiancaiamao Oct 16, 2019
a457c9f
go fmt
tiancaiamao Oct 16, 2019
af8acac
address comment
tiancaiamao Oct 17, 2019
8dc46b2
Merge branch 'master' into non-block-read
tiancaiamao Nov 7, 2019
e813646
Merge branch 'master' into non-block-read
tiancaiamao Nov 7, 2019
6bcd36d
address comment
tiancaiamao Nov 7, 2019
57e6669
set minCommitTS in the prewrite request
tiancaiamao Nov 12, 2019
552d679
address comment
tiancaiamao Nov 12, 2019
0e75536
Merge branch 'master' into non-block-read
tiancaiamao Nov 12, 2019
ef41338
address comment
tiancaiamao Nov 12, 2019
2d0f7e0
address comment
tiancaiamao Nov 12, 2019
34b829e
Merge branch 'master' into non-block-read
tiancaiamao Nov 12, 2019
2514021
fix CI
tiancaiamao Nov 12, 2019
d0cd1f7
fix CI
tiancaiamao Nov 13, 2019
6f7566b
address comment
tiancaiamao Nov 13, 2019
414af18
address comment
tiancaiamao Nov 13, 2019
217dd9e
Merge branch 'master' into non-block-read
tiancaiamao Nov 13, 2019
a4465cc
address comment
tiancaiamao Nov 13, 2019
4413774
Merge branch 'master' into non-block-read
tiancaiamao Nov 15, 2019
a83d9dc
Merge branch 'master' into non-block-read
tiancaiamao Nov 18, 2019
8b94a33
do not push minCommitTS = 0
tiancaiamao Nov 18, 2019
95525ce
make golint happy
tiancaiamao Nov 18, 2019
40c850c
Merge branch 'master' into non-block-read
tiancaiamao Nov 18, 2019
1db31b6
address comment
tiancaiamao Nov 19, 2019
72eef5b
Merge branch 'master' into non-block-read
MyonKeminta Nov 19, 2019
aaaea2c
Merge branch 'master' into non-block-read
sre-bot Nov 19, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion store/mockstore/mocktikv/cluster.go
Expand Up @@ -444,7 +444,7 @@ func (c *Cluster) splitRange(mvccStore MVCCStore, start, end MvccKey, count int)
func (c *Cluster) getEntriesGroupByRegions(mvccStore MVCCStore, start, end MvccKey, count int) [][]Pair {
startTS := uint64(math.MaxUint64)
limit := int(math.MaxInt32)
pairs := mvccStore.Scan(start.Raw(), end.Raw(), limit, startTS, kvrpcpb.IsolationLevel_SI)
pairs := mvccStore.Scan(start.Raw(), end.Raw(), limit, startTS, kvrpcpb.IsolationLevel_SI, nil)
regionEntriesSlice := make([][]Pair, 0, count)
quotient := len(pairs) / count
remainder := len(pairs) % count
Expand Down
4 changes: 2 additions & 2 deletions store/mockstore/mocktikv/cluster_test.go
Expand Up @@ -88,7 +88,7 @@ func (s *testClusterSuite) TestClusterSplit(c *C) {
if !bytes.HasPrefix(startKey, recordPrefix) {
continue
}
pairs := mvccStore.Scan(startKey, endKey, math.MaxInt64, math.MaxUint64, kvrpcpb.IsolationLevel_SI)
pairs := mvccStore.Scan(startKey, endKey, math.MaxInt64, math.MaxUint64, kvrpcpb.IsolationLevel_SI, nil)
if len(pairs) > 0 {
c.Assert(pairs, HasLen, 100)
}
Expand All @@ -109,7 +109,7 @@ func (s *testClusterSuite) TestClusterSplit(c *C) {
if !bytes.HasPrefix(startKey, indexPrefix) {
continue
}
pairs := mvccStore.Scan(startKey, endKey, math.MaxInt64, math.MaxUint64, kvrpcpb.IsolationLevel_SI)
pairs := mvccStore.Scan(startKey, endKey, math.MaxInt64, math.MaxUint64, kvrpcpb.IsolationLevel_SI, nil)
if len(pairs) > 0 {
c.Assert(pairs, HasLen, 100)
}
Expand Down
2 changes: 2 additions & 0 deletions store/mockstore/mocktikv/cop_handler_dag.go
Expand Up @@ -200,6 +200,7 @@ func (h *rpcHandler) buildTableScan(ctx *dagContext, executor *tipb.Executor) (*
colIDs: ctx.evalCtx.colIDs,
startTS: ctx.dagReq.GetStartTs(),
isolationLevel: h.isolationLevel,
resolvedLocks: h.resolvedLocks,
mvccStore: h.mvccStore,
execDetail: new(execDetail),
}
Expand Down Expand Up @@ -238,6 +239,7 @@ func (h *rpcHandler) buildIndexScan(ctx *dagContext, executor *tipb.Executor) (*
colsLen: len(columns),
startTS: ctx.dagReq.GetStartTs(),
isolationLevel: h.isolationLevel,
resolvedLocks: h.resolvedLocks,
mvccStore: h.mvccStore,
pkStatus: pkStatus,
execDetail: new(execDetail),
Expand Down
10 changes: 6 additions & 4 deletions store/mockstore/mocktikv/executor.go
Expand Up @@ -74,6 +74,7 @@ type tableScanExec struct {
kvRanges []kv.KeyRange
startTS uint64
isolationLevel kvrpcpb.IsolationLevel
resolvedLocks []uint64
mvccStore MVCCStore
cursor int
seekKey []byte
Expand Down Expand Up @@ -208,9 +209,9 @@ func (e *tableScanExec) getRowFromRange(ran kv.KeyRange) ([][]byte, error) {
var pairs []Pair
var pair Pair
if e.Desc {
pairs = e.mvccStore.ReverseScan(ran.StartKey, e.seekKey, 1, e.startTS, e.isolationLevel)
pairs = e.mvccStore.ReverseScan(ran.StartKey, e.seekKey, 1, e.startTS, e.isolationLevel, e.resolvedLocks)
} else {
pairs = e.mvccStore.Scan(e.seekKey, ran.EndKey, 1, e.startTS, e.isolationLevel)
pairs = e.mvccStore.Scan(e.seekKey, ran.EndKey, 1, e.startTS, e.isolationLevel, e.resolvedLocks)
}
if len(pairs) > 0 {
pair = pairs[0]
Expand Down Expand Up @@ -251,6 +252,7 @@ type indexScanExec struct {
kvRanges []kv.KeyRange
startTS uint64
isolationLevel kvrpcpb.IsolationLevel
resolvedLocks []uint64
mvccStore MVCCStore
cursor int
seekKey []byte
Expand Down Expand Up @@ -380,9 +382,9 @@ func (e *indexScanExec) getRowFromRange(ran kv.KeyRange) ([][]byte, error) {
var pairs []Pair
var pair Pair
if e.Desc {
pairs = e.mvccStore.ReverseScan(ran.StartKey, e.seekKey, 1, e.startTS, e.isolationLevel)
pairs = e.mvccStore.ReverseScan(ran.StartKey, e.seekKey, 1, e.startTS, e.isolationLevel, e.resolvedLocks)
} else {
pairs = e.mvccStore.Scan(e.seekKey, ran.EndKey, 1, e.startTS, e.isolationLevel)
pairs = e.mvccStore.Scan(e.seekKey, ran.EndKey, 1, e.startTS, e.isolationLevel, e.resolvedLocks)
}
if len(pairs) > 0 {
pair = pairs[0]
Expand Down
101 changes: 101 additions & 0 deletions store/mockstore/mocktikv/executor_test.go
@@ -0,0 +1,101 @@
// Copyright 2019-present, PingCAP, Inc.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like this -present.

//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package mocktikv_test

import (
"context"

. "github.com/pingcap/check"
"github.com/pingcap/errors"
"github.com/pingcap/kvproto/pkg/kvrpcpb"
"github.com/pingcap/parser/model"
"github.com/pingcap/tidb/domain"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/session"
"github.com/pingcap/tidb/store/mockstore"
"github.com/pingcap/tidb/store/mockstore/mocktikv"
"github.com/pingcap/tidb/tablecodec"
"github.com/pingcap/tidb/util/testkit"
)

var _ = Suite(&testExecutorSuite{})

type testExecutorSuite struct {
cluster *mocktikv.Cluster
store kv.Storage
mvccStore mocktikv.MVCCStore
dom *domain.Domain
}

func (s *testExecutorSuite) SetUpSuite(c *C) {
s.cluster = mocktikv.NewCluster()
mocktikv.BootstrapWithSingleStore(s.cluster)
s.mvccStore = mocktikv.MustNewMVCCStore()
store, err := mockstore.NewMockTikvStore(
mockstore.WithCluster(s.cluster),
mockstore.WithMVCCStore(s.mvccStore),
)
c.Assert(err, IsNil)
s.store = store
session.SetSchemaLease(0)
session.DisableStats4Test()
s.dom, err = session.BootstrapSession(s.store)
c.Assert(err, IsNil)
}

func (s *testExecutorSuite) TearDownSuite(c *C) {
s.dom.Close()
s.store.Close()
}

func (s *testExecutorSuite) TestResolvedLargeTxnLocks(c *C) {
// This test checks the resolve lock functionality.
// When a txn meets the lock of a large transaction, it should not block by the
// lock.
tk := testkit.NewTestKit(c, s.store)
tk.MustExec("use test")
tk.MustExec("create table t (id int primary key, val int)")
dom := domain.GetDomain(tk.Se)
schema := dom.InfoSchema()
tbl, err := schema.TableByName(model.NewCIStr("test"), model.NewCIStr("t"))
c.Assert(err, IsNil)

tk.MustExec("insert into t values (1, 1)")

oracle := s.store.GetOracle()
tso, err := oracle.GetTimestamp(context.Background())
c.Assert(err, IsNil)

key := tablecodec.EncodeRowKeyWithHandle(tbl.Meta().ID, 1)
pairs := s.mvccStore.Scan(key, nil, 1, tso, kvrpcpb.IsolationLevel_SI, nil)
c.Assert(pairs, HasLen, 1)
c.Assert(pairs[0].Err, IsNil)

// Simulate a large txn (holding a pk lock with large TTL).
// Secondary lock 200ms, primary lock 100s
mocktikv.MustPrewriteOK(c, s.mvccStore, mocktikv.PutMutations("primary", "value"), "primary", tso, 100000)
tiancaiamao marked this conversation as resolved.
Show resolved Hide resolved
mocktikv.MustPrewriteOK(c, s.mvccStore, mocktikv.PutMutations(string(key), "value"), "primary", tso, 200)

// Simulate the action of reading meet the lock of a large txn.
// The lock of the large transaction should not block read.
// The first time, this query should meet a lock on the secondary key, then resolve lock.
// After that, the query should read the previous version data.
tk.MustQuery("select * from t").Check(testkit.Rows("1 1"))

// And check the large txn is still alive.
pairs = s.mvccStore.Scan([]byte("primary"), nil, 1, tso, kvrpcpb.IsolationLevel_SI, nil)
c.Assert(pairs, HasLen, 1)
_, ok := errors.Cause(pairs[0].Err).(*mocktikv.ErrLocked)
c.Assert(ok, IsTrue)
}
30 changes: 28 additions & 2 deletions store/mockstore/mocktikv/mock_tikv_test.go
Expand Up @@ -18,6 +18,7 @@ import (
"testing"

. "github.com/pingcap/check"
"github.com/pingcap/errors"
"github.com/pingcap/kvproto/pkg/kvrpcpb"
)

Expand Down Expand Up @@ -50,6 +51,9 @@ func (s *testMockTiKVSuite) SetUpTest(c *C) {
c.Assert(err, IsNil)
}

// PutMutations is exported for testing.
var PutMutations func(kvpairs ...string) []*kvrpcpb.Mutation = putMutations

func putMutations(kvpairs ...string) []*kvrpcpb.Mutation {
var mutations []*kvrpcpb.Mutation
for i := 0; i < len(kvpairs); i += 2 {
Expand Down Expand Up @@ -133,7 +137,7 @@ func (s *testMockTiKVSuite) mustScanOK(c *C, start string, limit int, ts uint64,
}

func (s *testMockTiKVSuite) mustRangeScanOK(c *C, start, end string, limit int, ts uint64, expect ...string) {
pairs := s.store.Scan([]byte(start), []byte(end), limit, ts, kvrpcpb.IsolationLevel_SI)
pairs := s.store.Scan([]byte(start), []byte(end), limit, ts, kvrpcpb.IsolationLevel_SI, nil)
c.Assert(len(pairs)*2, Equals, len(expect))
for i := 0; i < len(pairs); i++ {
c.Assert(pairs[i].Err, IsNil)
Expand All @@ -147,7 +151,7 @@ func (s *testMockTiKVSuite) mustReverseScanOK(c *C, end string, limit int, ts ui
}

func (s *testMockTiKVSuite) mustRangeReverseScanOK(c *C, start, end string, limit int, ts uint64, expect ...string) {
pairs := s.store.ReverseScan([]byte(start), []byte(end), limit, ts, kvrpcpb.IsolationLevel_SI)
pairs := s.store.ReverseScan([]byte(start), []byte(end), limit, ts, kvrpcpb.IsolationLevel_SI, nil)
c.Assert(len(pairs)*2, Equals, len(expect))
for i := 0; i < len(pairs); i++ {
c.Assert(pairs[i].Err, IsNil)
Expand All @@ -156,6 +160,11 @@ func (s *testMockTiKVSuite) mustRangeReverseScanOK(c *C, start, end string, limi
}
}

func MustPrewriteOK(c *C, store MVCCStore, mutations []*kvrpcpb.Mutation, primary string, startTS uint64, ttl uint64) {
s := testMockTiKVSuite{store}
s.mustPrewriteWithTTLOK(c, mutations, primary, startTS, ttl)
}

func (s *testMockTiKVSuite) mustPrewriteOK(c *C, mutations []*kvrpcpb.Mutation, primary string, startTS uint64) {
s.mustPrewriteWithTTLOK(c, mutations, primary, startTS, 0)
}
Expand Down Expand Up @@ -431,6 +440,23 @@ func (s *testMockTiKVSuite) TestScanLock(c *C) {
})
}

func (s *testMockTiKVSuite) TestScanWithResolvedLock(c *C) {
s.mustPrewriteOK(c, putMutations("p1", "v5", "s1", "v5"), "p1", 5)
s.mustPrewriteOK(c, putMutations("p2", "v10", "s2", "v10"), "p1", 5)

pairs := s.store.Scan([]byte("p1"), nil, 3, 10, kvrpcpb.IsolationLevel_SI, nil)
lock, ok := errors.Cause(pairs[0].Err).(*ErrLocked)
c.Assert(ok, IsTrue)
_, ok = errors.Cause(pairs[1].Err).(*ErrLocked)
c.Assert(ok, IsTrue)

// Mock the request after resolving lock.
pairs = s.store.Scan([]byte("p1"), nil, 3, 10, kvrpcpb.IsolationLevel_SI, []uint64{lock.StartTS})
for _, pair := range pairs {
c.Assert(pair.Err, IsNil)
}
}

func (s *testMockTiKVSuite) TestCommitConflict(c *C) {
// txn A want set x to A
// txn B want set x to B
Expand Down
16 changes: 11 additions & 5 deletions store/mockstore/mocktikv/mvcc.go
Expand Up @@ -206,7 +206,7 @@ func (l *mvccLock) lockErr(key []byte) error {
}
}

func (l *mvccLock) check(ts uint64, key []byte) (uint64, error) {
func (l *mvccLock) check(ts uint64, key []byte, resolvedLocks []uint64) (uint64, error) {
// ignore when ts is older than lock or lock's type is Lock.
// Pessimistic lock doesn't block read.
if l.startTS > ts || l.op == kvrpcpb.Op_Lock || l.op == kvrpcpb.Op_PessimisticLock {
Expand All @@ -216,17 +216,23 @@ func (l *mvccLock) check(ts uint64, key []byte) (uint64, error) {
if ts == math.MaxUint64 && bytes.Equal(l.primary, key) {
return l.startTS - 1, nil
}
// Skip lock if the lock is resolved.
for _, resolved := range resolvedLocks {
if l.startTS == resolved {
return l.startTS, nil
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should return the original TS because the lock.startTS may be smaller than the latest commit TS and smaller than read ts.

We return the ts only for MaxUint64.

}
}
return 0, l.lockErr(key)
}

func (e *mvccEntry) Less(than btree.Item) bool {
return bytes.Compare(e.key, than.(*mvccEntry).key) < 0
}

func (e *mvccEntry) Get(ts uint64, isoLevel kvrpcpb.IsolationLevel) ([]byte, error) {
func (e *mvccEntry) Get(ts uint64, isoLevel kvrpcpb.IsolationLevel, resolvedLocks []uint64) ([]byte, error) {
if isoLevel == kvrpcpb.IsolationLevel_SI && e.lock != nil {
var err error
ts, err = e.lock.check(ts, e.key.Raw())
ts, err = e.lock.check(ts, e.key.Raw(), resolvedLocks)
if err != nil {
return nil, err
}
Expand All @@ -251,8 +257,8 @@ func (e *rawEntry) Less(than btree.Item) bool {
// MVCCStore is a mvcc key-value storage.
type MVCCStore interface {
Get(key []byte, startTS uint64, isoLevel kvrpcpb.IsolationLevel) ([]byte, error)
Scan(startKey, endKey []byte, limit int, startTS uint64, isoLevel kvrpcpb.IsolationLevel) []Pair
ReverseScan(startKey, endKey []byte, limit int, startTS uint64, isoLevel kvrpcpb.IsolationLevel) []Pair
Scan(startKey, endKey []byte, limit int, startTS uint64, isoLevel kvrpcpb.IsolationLevel, resolvedLocks []uint64) []Pair
ReverseScan(startKey, endKey []byte, limit int, startTS uint64, isoLevel kvrpcpb.IsolationLevel, resolvedLocks []uint64) []Pair
BatchGet(ks [][]byte, startTS uint64, isoLevel kvrpcpb.IsolationLevel) []Pair
PessimisticLock(mutations []*kvrpcpb.Mutation, primary []byte, startTS, forUpdateTS uint64, ttl uint64) []error
PessimisticRollback(keys [][]byte, startTS, forUpdateTS uint64) []error
Expand Down