Skip to content

Commit

Permalink
store: implement non-block reading for Get and BatchGet under the lar…
Browse files Browse the repository at this point in the history
…ge transaction protocol (pingcap#13599)
  • Loading branch information
tiancaiamao authored and XiaTianliang committed Dec 21, 2019
1 parent e383432 commit 0e75382
Show file tree
Hide file tree
Showing 10 changed files with 88 additions and 26 deletions.
4 changes: 2 additions & 2 deletions store/mockstore/mocktikv/executor.go
Expand Up @@ -180,7 +180,7 @@ func (e *tableScanExec) Next(ctx context.Context) (value [][]byte, err error) {
}

func (e *tableScanExec) getRowFromPoint(ran kv.KeyRange) ([][]byte, error) {
val, err := e.mvccStore.Get(ran.StartKey, e.startTS, e.isolationLevel)
val, err := e.mvccStore.Get(ran.StartKey, e.startTS, e.isolationLevel, e.resolvedLocks)
if err != nil {
return nil, errors.Trace(err)
}
Expand Down Expand Up @@ -361,7 +361,7 @@ func (e *indexScanExec) Next(ctx context.Context) (value [][]byte, err error) {

// getRowFromPoint is only used for unique key.
func (e *indexScanExec) getRowFromPoint(ran kv.KeyRange) ([][]byte, error) {
val, err := e.mvccStore.Get(ran.StartKey, e.startTS, e.isolationLevel)
val, err := e.mvccStore.Get(ran.StartKey, e.startTS, e.isolationLevel, e.resolvedLocks)
if err != nil {
return nil, errors.Trace(err)
}
Expand Down
8 changes: 8 additions & 0 deletions store/mockstore/mocktikv/executor_test.go
Expand Up @@ -93,6 +93,14 @@ func (s *testExecutorSuite) TestResolvedLargeTxnLocks(c *C) {
// After that, the query should read the previous version data.
tk.MustQuery("select * from t").Check(testkit.Rows("1 1"))

// Cover BatchGet.
tk.MustQuery("select * from t where id in (1)").Check(testkit.Rows("1 1"))

// Cover PointGet.
tk.MustExec("begin")
tk.MustQuery("select * from t where id = 1").Check(testkit.Rows("1 1"))
tk.MustExec("rollback")

// And check the large txn is still alive.
pairs = s.mvccStore.Scan([]byte("primary"), nil, 1, tso, kvrpcpb.IsolationLevel_SI, nil)
c.Assert(pairs, HasLen, 1)
Expand Down
10 changes: 5 additions & 5 deletions store/mockstore/mocktikv/mock_tikv_test.go
Expand Up @@ -75,25 +75,25 @@ func lock(key, primary string, ts uint64) *kvrpcpb.LockInfo {
}

func (s *testMockTiKVSuite) mustGetNone(c *C, key string, ts uint64) {
val, err := s.store.Get([]byte(key), ts, kvrpcpb.IsolationLevel_SI)
val, err := s.store.Get([]byte(key), ts, kvrpcpb.IsolationLevel_SI, nil)
c.Assert(err, IsNil)
c.Assert(val, IsNil)
}

func (s *testMockTiKVSuite) mustGetErr(c *C, key string, ts uint64) {
val, err := s.store.Get([]byte(key), ts, kvrpcpb.IsolationLevel_SI)
val, err := s.store.Get([]byte(key), ts, kvrpcpb.IsolationLevel_SI, nil)
c.Assert(err, NotNil)
c.Assert(val, IsNil)
}

func (s *testMockTiKVSuite) mustGetOK(c *C, key string, ts uint64, expect string) {
val, err := s.store.Get([]byte(key), ts, kvrpcpb.IsolationLevel_SI)
val, err := s.store.Get([]byte(key), ts, kvrpcpb.IsolationLevel_SI, nil)
c.Assert(err, IsNil)
c.Assert(string(val), Equals, expect)
}

func (s *testMockTiKVSuite) mustGetRC(c *C, key string, ts uint64, expect string) {
val, err := s.store.Get([]byte(key), ts, kvrpcpb.IsolationLevel_RC)
val, err := s.store.Get([]byte(key), ts, kvrpcpb.IsolationLevel_RC, nil)
c.Assert(err, IsNil)
c.Assert(string(val), Equals, expect)
}
Expand Down Expand Up @@ -411,7 +411,7 @@ func (s *testMockTiKVSuite) TestBatchGet(c *C) {
s.mustPutOK(c, "k2", "v2", 3, 4)
s.mustPutOK(c, "k3", "v3", 1, 2)
batchKeys := [][]byte{[]byte("k1"), []byte("k2"), []byte("k3")}
pairs := s.store.BatchGet(batchKeys, 5, kvrpcpb.IsolationLevel_SI)
pairs := s.store.BatchGet(batchKeys, 5, kvrpcpb.IsolationLevel_SI, nil)
for _, pair := range pairs {
c.Assert(pair.Err, IsNil)
}
Expand Down
4 changes: 2 additions & 2 deletions store/mockstore/mocktikv/mvcc.go
Expand Up @@ -251,10 +251,10 @@ func (e *rawEntry) Less(than btree.Item) bool {

// MVCCStore is a mvcc key-value storage.
type MVCCStore interface {
Get(key []byte, startTS uint64, isoLevel kvrpcpb.IsolationLevel) ([]byte, error)
Get(key []byte, startTS uint64, isoLevel kvrpcpb.IsolationLevel, resolvedLocks []uint64) ([]byte, error)
Scan(startKey, endKey []byte, limit int, startTS uint64, isoLevel kvrpcpb.IsolationLevel, resolvedLocks []uint64) []Pair
ReverseScan(startKey, endKey []byte, limit int, startTS uint64, isoLevel kvrpcpb.IsolationLevel, resolvedLocks []uint64) []Pair
BatchGet(ks [][]byte, startTS uint64, isoLevel kvrpcpb.IsolationLevel) []Pair
BatchGet(ks [][]byte, startTS uint64, isoLevel kvrpcpb.IsolationLevel, resolvedLocks []uint64) []Pair
PessimisticLock(mutations []*kvrpcpb.Mutation, primary []byte, startTS,
forUpdateTS uint64, ttl uint64, lockWaitTime int64) []error
PessimisticRollback(keys [][]byte, startTS, forUpdateTS uint64) []error
Expand Down
9 changes: 4 additions & 5 deletions store/mockstore/mocktikv/mvcc_leveldb.go
Expand Up @@ -264,12 +264,11 @@ func (dec *skipDecoder) Decode(iter *Iterator) (bool, error) {

// Get implements the MVCCStore interface.
// key cannot be nil or []byte{}
func (mvcc *MVCCLevelDB) Get(key []byte, startTS uint64, isoLevel kvrpcpb.IsolationLevel) ([]byte, error) {
func (mvcc *MVCCLevelDB) Get(key []byte, startTS uint64, isoLevel kvrpcpb.IsolationLevel, resolvedLocks []uint64) ([]byte, error) {
mvcc.mu.RLock()
defer mvcc.mu.RUnlock()

// TODO: Update the nil here to support point-get for non-block reading on the large transaction.
return mvcc.getValue(key, startTS, isoLevel, nil)
return mvcc.getValue(key, startTS, isoLevel, resolvedLocks)
}

func (mvcc *MVCCLevelDB) getValue(key []byte, startTS uint64, isoLevel kvrpcpb.IsolationLevel, resolvedLocks []uint64) ([]byte, error) {
Expand Down Expand Up @@ -317,13 +316,13 @@ func getValue(iter *Iterator, key []byte, startTS uint64, isoLevel kvrpcpb.Isola
}

// BatchGet implements the MVCCStore interface.
func (mvcc *MVCCLevelDB) BatchGet(ks [][]byte, startTS uint64, isoLevel kvrpcpb.IsolationLevel) []Pair {
func (mvcc *MVCCLevelDB) BatchGet(ks [][]byte, startTS uint64, isoLevel kvrpcpb.IsolationLevel, resolvedLocks []uint64) []Pair {
mvcc.mu.RLock()
defer mvcc.mu.RUnlock()

pairs := make([]Pair, 0, len(ks))
for _, k := range ks {
v, err := mvcc.getValue(k, startTS, isoLevel, nil)
v, err := mvcc.getValue(k, startTS, isoLevel, resolvedLocks)
if v == nil && err == nil {
continue
}
Expand Down
4 changes: 2 additions & 2 deletions store/mockstore/mocktikv/rpc.go
Expand Up @@ -258,7 +258,7 @@ func (h *rpcHandler) handleKvGet(req *kvrpcpb.GetRequest) *kvrpcpb.GetResponse {
panic("KvGet: key not in region")
}

val, err := h.mvccStore.Get(req.Key, req.GetVersion(), h.isolationLevel)
val, err := h.mvccStore.Get(req.Key, req.GetVersion(), h.isolationLevel, req.Context.GetResolvedLocks())
if err != nil {
return &kvrpcpb.GetResponse{
Error: convertToKeyError(err),
Expand Down Expand Up @@ -417,7 +417,7 @@ func (h *rpcHandler) handleKvBatchGet(req *kvrpcpb.BatchGetRequest) *kvrpcpb.Bat
panic("KvBatchGet: key not in region")
}
}
pairs := h.mvccStore.BatchGet(req.Keys, req.GetVersion(), h.isolationLevel)
pairs := h.mvccStore.BatchGet(req.Keys, req.GetVersion(), h.isolationLevel, req.Context.GetResolvedLocks())
return &kvrpcpb.BatchGetResponse{
Pairs: convertToPbPairs(pairs),
}
Expand Down
6 changes: 3 additions & 3 deletions store/tikv/coprocessor.go
Expand Up @@ -699,7 +699,7 @@ func (worker *copIteratorWorker) handleTaskOnce(bo *Backoffer, task *copTask, ch
ScanDetail: true,
})
startTime := time.Now()
resp, rpcCtx, storeAddr, err := worker.SendReqCtx(bo, req, task.region, task.storeType)
resp, rpcCtx, storeAddr, err := worker.SendReqCtx(bo, req, task.region, ReadTimeoutMedium, task.storeType)
if err != nil {
return nil, errors.Trace(err)
}
Expand Down Expand Up @@ -774,10 +774,10 @@ func (ch *clientHelper) ResolveLocks(bo *Backoffer, callerStartTS uint64, locks
}

// SendReqCtx wraps the SendReqCtx function and use the resolved lock result in the kvrpcpb.Context.
func (ch *clientHelper) SendReqCtx(bo *Backoffer, req *tikvrpc.Request, regionID RegionVerID, sType kv.StoreType) (*tikvrpc.Response, *RPCContext, string, error) {
func (ch *clientHelper) SendReqCtx(bo *Backoffer, req *tikvrpc.Request, regionID RegionVerID, timeout time.Duration, sType kv.StoreType) (*tikvrpc.Response, *RPCContext, string, error) {
sender := NewRegionRequestSender(ch.RegionCache, ch.Client)
req.Context.ResolvedLocks = ch.minCommitTSPushed.Get()
resp, ctx, err := sender.SendReqCtx(bo, req, regionID, ReadTimeoutMedium, sType)
resp, ctx, err := sender.SendReqCtx(bo, req, regionID, timeout, sType)
return resp, ctx, sender.storeAddr, err
}

Expand Down
34 changes: 27 additions & 7 deletions store/tikv/snapshot.go
Expand Up @@ -61,6 +61,7 @@ type tikvSnapshot struct {
vars *kv.Variables
replicaRead kv.ReplicaReadType
replicaReadSeed uint32
minCommitTSPushed

// Cache the result of BatchGet.
// The invariance is that calling BatchGet multiple times using the same start ts,
Expand All @@ -80,13 +81,18 @@ func newTiKVSnapshot(store *tikvStore, ver kv.Version, replicaReadSeed uint32) *
priority: pb.CommandPri_Normal,
vars: kv.DefaultVars,
replicaReadSeed: replicaReadSeed,
minCommitTSPushed: minCommitTSPushed{
data: make(map[uint64]struct{}, 5),
},
}
}

func (s *tikvSnapshot) setSnapshotTS(ts uint64) {
// Invalidate cache if the snapshotTS change!
s.version.Ver = ts
s.cached = nil
// And also the minCommitTS pushed information.
s.minCommitTSPushed.data = make(map[uint64]struct{}, 5)
}

// BatchGet gets all the keys' value from kv-server and returns a map contains key/value pairs.
Expand Down Expand Up @@ -191,7 +197,12 @@ func (s *tikvSnapshot) batchGetKeysByRegions(bo *Backoffer, keys [][]byte, colle
}

func (s *tikvSnapshot) batchGetSingleRegion(bo *Backoffer, batch batchKeys, collectF func(k, v []byte)) error {
sender := NewRegionRequestSender(s.store.regionCache, s.store.client)
cli := clientHelper{
LockResolver: s.store.lockResolver,
RegionCache: s.store.regionCache,
minCommitTSPushed: &s.minCommitTSPushed,
Client: s.store.client,
}

pending := batch.keys
for {
Expand All @@ -202,7 +213,9 @@ func (s *tikvSnapshot) batchGetSingleRegion(bo *Backoffer, batch batchKeys, coll
Priority: s.priority,
NotFillCache: s.notFillCache,
})
resp, err := sender.SendReq(bo, req, batch.region, ReadTimeoutMedium)

resp, _, _, err := cli.SendReqCtx(bo, req, batch.region, ReadTimeoutMedium, kv.TiKV)

if err != nil {
return errors.Trace(err)
}
Expand Down Expand Up @@ -240,7 +253,7 @@ func (s *tikvSnapshot) batchGetSingleRegion(bo *Backoffer, batch batchKeys, coll
locks = append(locks, lock)
}
if len(lockedKeys) > 0 {
msBeforeExpired, _, err := s.store.lockResolver.ResolveLocks(bo, s.version.Ver, locks)
msBeforeExpired, err := cli.ResolveLocks(bo, s.version.Ver, locks)
if err != nil {
return errors.Trace(err)
}
Expand Down Expand Up @@ -285,10 +298,17 @@ func (s *tikvSnapshot) get(bo *Backoffer, k kv.Key) ([]byte, error) {
}

failpoint.Inject("snapshot-get-cache-fail", func(_ failpoint.Value) {
panic("cache miss")
if bo.ctx.Value("TestSnapshotCache") != nil {
panic("cache miss")
}
})

sender := NewRegionRequestSender(s.store.regionCache, s.store.client)
cli := clientHelper{
LockResolver: s.store.lockResolver,
RegionCache: s.store.regionCache,
minCommitTSPushed: &s.minCommitTSPushed,
Client: s.store.client,
}

req := tikvrpc.NewReplicaReadRequest(tikvrpc.CmdGet,
&pb.GetRequest{
Expand All @@ -303,7 +323,7 @@ func (s *tikvSnapshot) get(bo *Backoffer, k kv.Key) ([]byte, error) {
if err != nil {
return nil, errors.Trace(err)
}
resp, err := sender.SendReq(bo, req, loc.Region, readTimeoutShort)
resp, _, _, err := cli.SendReqCtx(bo, req, loc.Region, readTimeoutShort, kv.TiKV)
if err != nil {
return nil, errors.Trace(err)
}
Expand All @@ -328,7 +348,7 @@ func (s *tikvSnapshot) get(bo *Backoffer, k kv.Key) ([]byte, error) {
if err != nil {
return nil, errors.Trace(err)
}
msBeforeExpired, _, err := s.store.lockResolver.ResolveLocks(bo, s.version.Ver, []*Lock{lock})
msBeforeExpired, err := cli.ResolveLocks(bo, s.version.Ver, []*Lock{lock})
if err != nil {
return nil, errors.Trace(err)
}
Expand Down
31 changes: 31 additions & 0 deletions store/tikv/snapshot_test.go
Expand Up @@ -19,6 +19,7 @@ import (
"time"

. "github.com/pingcap/check"
"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
pb "github.com/pingcap/kvproto/pkg/kvrpcpb"
"github.com/pingcap/tidb/kv"
Expand Down Expand Up @@ -121,6 +122,7 @@ func (s *testSnapshotSuite) TestBatchGet(c *C) {
func (s *testSnapshotSuite) TestSnapshotCache(c *C) {
txn := s.beginTxn(c)
c.Assert(txn.Set(kv.Key("x"), []byte("x")), IsNil)
c.Assert(txn.Delete(kv.Key("y")), IsNil) // store data is affected by other tests.
c.Assert(txn.Commit(context.Background()), IsNil)

txn = s.beginTxn(c)
Expand Down Expand Up @@ -205,3 +207,32 @@ func (s *testSnapshotSuite) TestLockNotFoundPrint(c *C) {
key := prettyLockNotFoundKey(msg)
c.Assert(key, Equals, "{tableID=12937, indexID=1, indexValues={C19092900000048625523, }}")
}

func (s *testSnapshotSuite) TestSkipLargeTxnLock(c *C) {
txn := s.beginTxn(c)
c.Assert(txn.Set(kv.Key("x"), []byte("x")), IsNil)
c.Assert(txn.Set(kv.Key("y"), []byte("y")), IsNil)
ctx := context.Background()
bo := NewBackoffer(ctx, PrewriteMaxBackoff)
committer, err := newTwoPhaseCommitterWithInit(txn, 0)
c.Assert(err, IsNil)
committer.lockTTL = txnLockTTL(txn.startTime, 10<<20)
c.Assert(committer.prewriteKeys(bo, committer.keys), IsNil)

txn1 := s.beginTxn(c)
// txn1 is not blocked by txn in the large txn protocol.
_, err = txn1.Get(ctx, kv.Key("x"))
c.Assert(kv.IsErrNotFound(errors.Trace(err)), IsTrue)

res, err := txn1.BatchGet(ctx, []kv.Key{kv.Key("x"), kv.Key("y"), kv.Key("z")})
c.Assert(err, IsNil)
c.Assert(res, HasLen, 0)

// Commit txn, check the final commit ts is pushed.
committer.commitTS = txn.StartTS() + 1
c.Assert(committer.commitKeys(bo, committer.keys), IsNil)
status, err := s.store.lockResolver.GetTxnStatus(txn.StartTS(), 0, []byte("x"))
c.Assert(err, IsNil)
c.Assert(status.IsCommitted(), IsTrue)
c.Assert(status.CommitTS(), Greater, txn1.StartTS())
}
4 changes: 4 additions & 0 deletions store/tikv/tikvrpc/tikvrpc.go
Expand Up @@ -653,6 +653,10 @@ func GenRegionErrorResp(req *Request, e *errorpb.Error) (*Response, error) {
p = &kvrpcpb.TxnHeartBeatResponse{
RegionError: e,
}
case CmdCheckTxnStatus:
p = &kvrpcpb.CheckTxnStatusResponse{
RegionError: e,
}
default:
return nil, fmt.Errorf("invalid request type %v", req.Type)
}
Expand Down

0 comments on commit 0e75382

Please sign in to comment.