From 0e75382d8d2bfdc32ce5e797a056e1c4df264e63 Mon Sep 17 00:00:00 2001 From: tiancaiamao Date: Wed, 27 Nov 2019 18:54:58 +0800 Subject: [PATCH] store: implement non-block reading for Get and BatchGet under the large transaction protocol (#13599) --- store/mockstore/mocktikv/executor.go | 4 +-- store/mockstore/mocktikv/executor_test.go | 8 +++++ store/mockstore/mocktikv/mock_tikv_test.go | 10 +++---- store/mockstore/mocktikv/mvcc.go | 4 +-- store/mockstore/mocktikv/mvcc_leveldb.go | 9 +++--- store/mockstore/mocktikv/rpc.go | 4 +-- store/tikv/coprocessor.go | 6 ++-- store/tikv/snapshot.go | 34 +++++++++++++++++----- store/tikv/snapshot_test.go | 31 ++++++++++++++++++++ store/tikv/tikvrpc/tikvrpc.go | 4 +++ 10 files changed, 88 insertions(+), 26 deletions(-) diff --git a/store/mockstore/mocktikv/executor.go b/store/mockstore/mocktikv/executor.go index 1792ce0c428dc..214af6bf081be 100644 --- a/store/mockstore/mocktikv/executor.go +++ b/store/mockstore/mocktikv/executor.go @@ -180,7 +180,7 @@ func (e *tableScanExec) Next(ctx context.Context) (value [][]byte, err error) { } func (e *tableScanExec) getRowFromPoint(ran kv.KeyRange) ([][]byte, error) { - val, err := e.mvccStore.Get(ran.StartKey, e.startTS, e.isolationLevel) + val, err := e.mvccStore.Get(ran.StartKey, e.startTS, e.isolationLevel, e.resolvedLocks) if err != nil { return nil, errors.Trace(err) } @@ -361,7 +361,7 @@ func (e *indexScanExec) Next(ctx context.Context) (value [][]byte, err error) { // getRowFromPoint is only used for unique key. func (e *indexScanExec) getRowFromPoint(ran kv.KeyRange) ([][]byte, error) { - val, err := e.mvccStore.Get(ran.StartKey, e.startTS, e.isolationLevel) + val, err := e.mvccStore.Get(ran.StartKey, e.startTS, e.isolationLevel, e.resolvedLocks) if err != nil { return nil, errors.Trace(err) } diff --git a/store/mockstore/mocktikv/executor_test.go b/store/mockstore/mocktikv/executor_test.go index a53337b694234..0e9d55b8d0a38 100644 --- a/store/mockstore/mocktikv/executor_test.go +++ b/store/mockstore/mocktikv/executor_test.go @@ -93,6 +93,14 @@ func (s *testExecutorSuite) TestResolvedLargeTxnLocks(c *C) { // After that, the query should read the previous version data. tk.MustQuery("select * from t").Check(testkit.Rows("1 1")) + // Cover BatchGet. + tk.MustQuery("select * from t where id in (1)").Check(testkit.Rows("1 1")) + + // Cover PointGet. + tk.MustExec("begin") + tk.MustQuery("select * from t where id = 1").Check(testkit.Rows("1 1")) + tk.MustExec("rollback") + // And check the large txn is still alive. pairs = s.mvccStore.Scan([]byte("primary"), nil, 1, tso, kvrpcpb.IsolationLevel_SI, nil) c.Assert(pairs, HasLen, 1) diff --git a/store/mockstore/mocktikv/mock_tikv_test.go b/store/mockstore/mocktikv/mock_tikv_test.go index 737ad8ffd9b84..49ef737e99dd9 100644 --- a/store/mockstore/mocktikv/mock_tikv_test.go +++ b/store/mockstore/mocktikv/mock_tikv_test.go @@ -75,25 +75,25 @@ func lock(key, primary string, ts uint64) *kvrpcpb.LockInfo { } func (s *testMockTiKVSuite) mustGetNone(c *C, key string, ts uint64) { - val, err := s.store.Get([]byte(key), ts, kvrpcpb.IsolationLevel_SI) + val, err := s.store.Get([]byte(key), ts, kvrpcpb.IsolationLevel_SI, nil) c.Assert(err, IsNil) c.Assert(val, IsNil) } func (s *testMockTiKVSuite) mustGetErr(c *C, key string, ts uint64) { - val, err := s.store.Get([]byte(key), ts, kvrpcpb.IsolationLevel_SI) + val, err := s.store.Get([]byte(key), ts, kvrpcpb.IsolationLevel_SI, nil) c.Assert(err, NotNil) c.Assert(val, IsNil) } func (s *testMockTiKVSuite) mustGetOK(c *C, key string, ts uint64, expect string) { - val, err := s.store.Get([]byte(key), ts, kvrpcpb.IsolationLevel_SI) + val, err := s.store.Get([]byte(key), ts, kvrpcpb.IsolationLevel_SI, nil) c.Assert(err, IsNil) c.Assert(string(val), Equals, expect) } func (s *testMockTiKVSuite) mustGetRC(c *C, key string, ts uint64, expect string) { - val, err := s.store.Get([]byte(key), ts, kvrpcpb.IsolationLevel_RC) + val, err := s.store.Get([]byte(key), ts, kvrpcpb.IsolationLevel_RC, nil) c.Assert(err, IsNil) c.Assert(string(val), Equals, expect) } @@ -411,7 +411,7 @@ func (s *testMockTiKVSuite) TestBatchGet(c *C) { s.mustPutOK(c, "k2", "v2", 3, 4) s.mustPutOK(c, "k3", "v3", 1, 2) batchKeys := [][]byte{[]byte("k1"), []byte("k2"), []byte("k3")} - pairs := s.store.BatchGet(batchKeys, 5, kvrpcpb.IsolationLevel_SI) + pairs := s.store.BatchGet(batchKeys, 5, kvrpcpb.IsolationLevel_SI, nil) for _, pair := range pairs { c.Assert(pair.Err, IsNil) } diff --git a/store/mockstore/mocktikv/mvcc.go b/store/mockstore/mocktikv/mvcc.go index ee6f2dfb1b726..7842c04377fab 100644 --- a/store/mockstore/mocktikv/mvcc.go +++ b/store/mockstore/mocktikv/mvcc.go @@ -251,10 +251,10 @@ func (e *rawEntry) Less(than btree.Item) bool { // MVCCStore is a mvcc key-value storage. type MVCCStore interface { - Get(key []byte, startTS uint64, isoLevel kvrpcpb.IsolationLevel) ([]byte, error) + Get(key []byte, startTS uint64, isoLevel kvrpcpb.IsolationLevel, resolvedLocks []uint64) ([]byte, error) Scan(startKey, endKey []byte, limit int, startTS uint64, isoLevel kvrpcpb.IsolationLevel, resolvedLocks []uint64) []Pair ReverseScan(startKey, endKey []byte, limit int, startTS uint64, isoLevel kvrpcpb.IsolationLevel, resolvedLocks []uint64) []Pair - BatchGet(ks [][]byte, startTS uint64, isoLevel kvrpcpb.IsolationLevel) []Pair + BatchGet(ks [][]byte, startTS uint64, isoLevel kvrpcpb.IsolationLevel, resolvedLocks []uint64) []Pair PessimisticLock(mutations []*kvrpcpb.Mutation, primary []byte, startTS, forUpdateTS uint64, ttl uint64, lockWaitTime int64) []error PessimisticRollback(keys [][]byte, startTS, forUpdateTS uint64) []error diff --git a/store/mockstore/mocktikv/mvcc_leveldb.go b/store/mockstore/mocktikv/mvcc_leveldb.go index 742826fff4982..8e91689378a71 100644 --- a/store/mockstore/mocktikv/mvcc_leveldb.go +++ b/store/mockstore/mocktikv/mvcc_leveldb.go @@ -264,12 +264,11 @@ func (dec *skipDecoder) Decode(iter *Iterator) (bool, error) { // Get implements the MVCCStore interface. // key cannot be nil or []byte{} -func (mvcc *MVCCLevelDB) Get(key []byte, startTS uint64, isoLevel kvrpcpb.IsolationLevel) ([]byte, error) { +func (mvcc *MVCCLevelDB) Get(key []byte, startTS uint64, isoLevel kvrpcpb.IsolationLevel, resolvedLocks []uint64) ([]byte, error) { mvcc.mu.RLock() defer mvcc.mu.RUnlock() - // TODO: Update the nil here to support point-get for non-block reading on the large transaction. - return mvcc.getValue(key, startTS, isoLevel, nil) + return mvcc.getValue(key, startTS, isoLevel, resolvedLocks) } func (mvcc *MVCCLevelDB) getValue(key []byte, startTS uint64, isoLevel kvrpcpb.IsolationLevel, resolvedLocks []uint64) ([]byte, error) { @@ -317,13 +316,13 @@ func getValue(iter *Iterator, key []byte, startTS uint64, isoLevel kvrpcpb.Isola } // BatchGet implements the MVCCStore interface. -func (mvcc *MVCCLevelDB) BatchGet(ks [][]byte, startTS uint64, isoLevel kvrpcpb.IsolationLevel) []Pair { +func (mvcc *MVCCLevelDB) BatchGet(ks [][]byte, startTS uint64, isoLevel kvrpcpb.IsolationLevel, resolvedLocks []uint64) []Pair { mvcc.mu.RLock() defer mvcc.mu.RUnlock() pairs := make([]Pair, 0, len(ks)) for _, k := range ks { - v, err := mvcc.getValue(k, startTS, isoLevel, nil) + v, err := mvcc.getValue(k, startTS, isoLevel, resolvedLocks) if v == nil && err == nil { continue } diff --git a/store/mockstore/mocktikv/rpc.go b/store/mockstore/mocktikv/rpc.go index 68345b575af05..e9eba4f11db75 100644 --- a/store/mockstore/mocktikv/rpc.go +++ b/store/mockstore/mocktikv/rpc.go @@ -258,7 +258,7 @@ func (h *rpcHandler) handleKvGet(req *kvrpcpb.GetRequest) *kvrpcpb.GetResponse { panic("KvGet: key not in region") } - val, err := h.mvccStore.Get(req.Key, req.GetVersion(), h.isolationLevel) + val, err := h.mvccStore.Get(req.Key, req.GetVersion(), h.isolationLevel, req.Context.GetResolvedLocks()) if err != nil { return &kvrpcpb.GetResponse{ Error: convertToKeyError(err), @@ -417,7 +417,7 @@ func (h *rpcHandler) handleKvBatchGet(req *kvrpcpb.BatchGetRequest) *kvrpcpb.Bat panic("KvBatchGet: key not in region") } } - pairs := h.mvccStore.BatchGet(req.Keys, req.GetVersion(), h.isolationLevel) + pairs := h.mvccStore.BatchGet(req.Keys, req.GetVersion(), h.isolationLevel, req.Context.GetResolvedLocks()) return &kvrpcpb.BatchGetResponse{ Pairs: convertToPbPairs(pairs), } diff --git a/store/tikv/coprocessor.go b/store/tikv/coprocessor.go index f09f865abc368..2f60d280725c3 100644 --- a/store/tikv/coprocessor.go +++ b/store/tikv/coprocessor.go @@ -699,7 +699,7 @@ func (worker *copIteratorWorker) handleTaskOnce(bo *Backoffer, task *copTask, ch ScanDetail: true, }) startTime := time.Now() - resp, rpcCtx, storeAddr, err := worker.SendReqCtx(bo, req, task.region, task.storeType) + resp, rpcCtx, storeAddr, err := worker.SendReqCtx(bo, req, task.region, ReadTimeoutMedium, task.storeType) if err != nil { return nil, errors.Trace(err) } @@ -774,10 +774,10 @@ func (ch *clientHelper) ResolveLocks(bo *Backoffer, callerStartTS uint64, locks } // SendReqCtx wraps the SendReqCtx function and use the resolved lock result in the kvrpcpb.Context. -func (ch *clientHelper) SendReqCtx(bo *Backoffer, req *tikvrpc.Request, regionID RegionVerID, sType kv.StoreType) (*tikvrpc.Response, *RPCContext, string, error) { +func (ch *clientHelper) SendReqCtx(bo *Backoffer, req *tikvrpc.Request, regionID RegionVerID, timeout time.Duration, sType kv.StoreType) (*tikvrpc.Response, *RPCContext, string, error) { sender := NewRegionRequestSender(ch.RegionCache, ch.Client) req.Context.ResolvedLocks = ch.minCommitTSPushed.Get() - resp, ctx, err := sender.SendReqCtx(bo, req, regionID, ReadTimeoutMedium, sType) + resp, ctx, err := sender.SendReqCtx(bo, req, regionID, timeout, sType) return resp, ctx, sender.storeAddr, err } diff --git a/store/tikv/snapshot.go b/store/tikv/snapshot.go index ccdaabc21f685..0f47476f24f9e 100644 --- a/store/tikv/snapshot.go +++ b/store/tikv/snapshot.go @@ -61,6 +61,7 @@ type tikvSnapshot struct { vars *kv.Variables replicaRead kv.ReplicaReadType replicaReadSeed uint32 + minCommitTSPushed // Cache the result of BatchGet. // The invariance is that calling BatchGet multiple times using the same start ts, @@ -80,6 +81,9 @@ func newTiKVSnapshot(store *tikvStore, ver kv.Version, replicaReadSeed uint32) * priority: pb.CommandPri_Normal, vars: kv.DefaultVars, replicaReadSeed: replicaReadSeed, + minCommitTSPushed: minCommitTSPushed{ + data: make(map[uint64]struct{}, 5), + }, } } @@ -87,6 +91,8 @@ func (s *tikvSnapshot) setSnapshotTS(ts uint64) { // Invalidate cache if the snapshotTS change! s.version.Ver = ts s.cached = nil + // And also the minCommitTS pushed information. + s.minCommitTSPushed.data = make(map[uint64]struct{}, 5) } // BatchGet gets all the keys' value from kv-server and returns a map contains key/value pairs. @@ -191,7 +197,12 @@ func (s *tikvSnapshot) batchGetKeysByRegions(bo *Backoffer, keys [][]byte, colle } func (s *tikvSnapshot) batchGetSingleRegion(bo *Backoffer, batch batchKeys, collectF func(k, v []byte)) error { - sender := NewRegionRequestSender(s.store.regionCache, s.store.client) + cli := clientHelper{ + LockResolver: s.store.lockResolver, + RegionCache: s.store.regionCache, + minCommitTSPushed: &s.minCommitTSPushed, + Client: s.store.client, + } pending := batch.keys for { @@ -202,7 +213,9 @@ func (s *tikvSnapshot) batchGetSingleRegion(bo *Backoffer, batch batchKeys, coll Priority: s.priority, NotFillCache: s.notFillCache, }) - resp, err := sender.SendReq(bo, req, batch.region, ReadTimeoutMedium) + + resp, _, _, err := cli.SendReqCtx(bo, req, batch.region, ReadTimeoutMedium, kv.TiKV) + if err != nil { return errors.Trace(err) } @@ -240,7 +253,7 @@ func (s *tikvSnapshot) batchGetSingleRegion(bo *Backoffer, batch batchKeys, coll locks = append(locks, lock) } if len(lockedKeys) > 0 { - msBeforeExpired, _, err := s.store.lockResolver.ResolveLocks(bo, s.version.Ver, locks) + msBeforeExpired, err := cli.ResolveLocks(bo, s.version.Ver, locks) if err != nil { return errors.Trace(err) } @@ -285,10 +298,17 @@ func (s *tikvSnapshot) get(bo *Backoffer, k kv.Key) ([]byte, error) { } failpoint.Inject("snapshot-get-cache-fail", func(_ failpoint.Value) { - panic("cache miss") + if bo.ctx.Value("TestSnapshotCache") != nil { + panic("cache miss") + } }) - sender := NewRegionRequestSender(s.store.regionCache, s.store.client) + cli := clientHelper{ + LockResolver: s.store.lockResolver, + RegionCache: s.store.regionCache, + minCommitTSPushed: &s.minCommitTSPushed, + Client: s.store.client, + } req := tikvrpc.NewReplicaReadRequest(tikvrpc.CmdGet, &pb.GetRequest{ @@ -303,7 +323,7 @@ func (s *tikvSnapshot) get(bo *Backoffer, k kv.Key) ([]byte, error) { if err != nil { return nil, errors.Trace(err) } - resp, err := sender.SendReq(bo, req, loc.Region, readTimeoutShort) + resp, _, _, err := cli.SendReqCtx(bo, req, loc.Region, readTimeoutShort, kv.TiKV) if err != nil { return nil, errors.Trace(err) } @@ -328,7 +348,7 @@ func (s *tikvSnapshot) get(bo *Backoffer, k kv.Key) ([]byte, error) { if err != nil { return nil, errors.Trace(err) } - msBeforeExpired, _, err := s.store.lockResolver.ResolveLocks(bo, s.version.Ver, []*Lock{lock}) + msBeforeExpired, err := cli.ResolveLocks(bo, s.version.Ver, []*Lock{lock}) if err != nil { return nil, errors.Trace(err) } diff --git a/store/tikv/snapshot_test.go b/store/tikv/snapshot_test.go index 5207891e57c5b..fa7cac8de24b2 100644 --- a/store/tikv/snapshot_test.go +++ b/store/tikv/snapshot_test.go @@ -19,6 +19,7 @@ import ( "time" . "github.com/pingcap/check" + "github.com/pingcap/errors" "github.com/pingcap/failpoint" pb "github.com/pingcap/kvproto/pkg/kvrpcpb" "github.com/pingcap/tidb/kv" @@ -121,6 +122,7 @@ func (s *testSnapshotSuite) TestBatchGet(c *C) { func (s *testSnapshotSuite) TestSnapshotCache(c *C) { txn := s.beginTxn(c) c.Assert(txn.Set(kv.Key("x"), []byte("x")), IsNil) + c.Assert(txn.Delete(kv.Key("y")), IsNil) // store data is affected by other tests. c.Assert(txn.Commit(context.Background()), IsNil) txn = s.beginTxn(c) @@ -205,3 +207,32 @@ func (s *testSnapshotSuite) TestLockNotFoundPrint(c *C) { key := prettyLockNotFoundKey(msg) c.Assert(key, Equals, "{tableID=12937, indexID=1, indexValues={C19092900000048625523, }}") } + +func (s *testSnapshotSuite) TestSkipLargeTxnLock(c *C) { + txn := s.beginTxn(c) + c.Assert(txn.Set(kv.Key("x"), []byte("x")), IsNil) + c.Assert(txn.Set(kv.Key("y"), []byte("y")), IsNil) + ctx := context.Background() + bo := NewBackoffer(ctx, PrewriteMaxBackoff) + committer, err := newTwoPhaseCommitterWithInit(txn, 0) + c.Assert(err, IsNil) + committer.lockTTL = txnLockTTL(txn.startTime, 10<<20) + c.Assert(committer.prewriteKeys(bo, committer.keys), IsNil) + + txn1 := s.beginTxn(c) + // txn1 is not blocked by txn in the large txn protocol. + _, err = txn1.Get(ctx, kv.Key("x")) + c.Assert(kv.IsErrNotFound(errors.Trace(err)), IsTrue) + + res, err := txn1.BatchGet(ctx, []kv.Key{kv.Key("x"), kv.Key("y"), kv.Key("z")}) + c.Assert(err, IsNil) + c.Assert(res, HasLen, 0) + + // Commit txn, check the final commit ts is pushed. + committer.commitTS = txn.StartTS() + 1 + c.Assert(committer.commitKeys(bo, committer.keys), IsNil) + status, err := s.store.lockResolver.GetTxnStatus(txn.StartTS(), 0, []byte("x")) + c.Assert(err, IsNil) + c.Assert(status.IsCommitted(), IsTrue) + c.Assert(status.CommitTS(), Greater, txn1.StartTS()) +} diff --git a/store/tikv/tikvrpc/tikvrpc.go b/store/tikv/tikvrpc/tikvrpc.go index bc6efc867621d..986566e539230 100644 --- a/store/tikv/tikvrpc/tikvrpc.go +++ b/store/tikv/tikvrpc/tikvrpc.go @@ -653,6 +653,10 @@ func GenRegionErrorResp(req *Request, e *errorpb.Error) (*Response, error) { p = &kvrpcpb.TxnHeartBeatResponse{ RegionError: e, } + case CmdCheckTxnStatus: + p = &kvrpcpb.CheckTxnStatusResponse{ + RegionError: e, + } default: return nil, fmt.Errorf("invalid request type %v", req.Type) }