Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

store: implement non-block read when coprocessor meets the lock of a large transaction #11986

Merged
merged 33 commits into from Nov 19, 2019
Merged
Changes from 1 commit
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
eab9b2a
store: implement non-block read when coprocessor meets the lock of a …
tiancaiamao Sep 2, 2019
1bcb0f4
make golint happy
tiancaiamao Sep 2, 2019
d86c0bc
Merge branch 'master' into non-block-read
tiancaiamao Sep 10, 2019
294cfc3
Merge branch 'master' into non-block-read
tiancaiamao Sep 12, 2019
5e57d47
Merge branch 'master' into non-block-read
tiancaiamao Oct 14, 2019
d4e967f
address comment
tiancaiamao Oct 14, 2019
ac063c5
address comment
tiancaiamao Oct 16, 2019
4c5e6ac
address comment
tiancaiamao Oct 16, 2019
a457c9f
go fmt
tiancaiamao Oct 16, 2019
af8acac
address comment
tiancaiamao Oct 17, 2019
8dc46b2
Merge branch 'master' into non-block-read
tiancaiamao Nov 7, 2019
e813646
Merge branch 'master' into non-block-read
tiancaiamao Nov 7, 2019
6bcd36d
address comment
tiancaiamao Nov 7, 2019
57e6669
set minCommitTS in the prewrite request
tiancaiamao Nov 12, 2019
552d679
address comment
tiancaiamao Nov 12, 2019
0e75536
Merge branch 'master' into non-block-read
tiancaiamao Nov 12, 2019
ef41338
address comment
tiancaiamao Nov 12, 2019
2d0f7e0
address comment
tiancaiamao Nov 12, 2019
34b829e
Merge branch 'master' into non-block-read
tiancaiamao Nov 12, 2019
2514021
fix CI
tiancaiamao Nov 12, 2019
d0cd1f7
fix CI
tiancaiamao Nov 13, 2019
6f7566b
address comment
tiancaiamao Nov 13, 2019
414af18
address comment
tiancaiamao Nov 13, 2019
217dd9e
Merge branch 'master' into non-block-read
tiancaiamao Nov 13, 2019
a4465cc
address comment
tiancaiamao Nov 13, 2019
4413774
Merge branch 'master' into non-block-read
tiancaiamao Nov 15, 2019
a83d9dc
Merge branch 'master' into non-block-read
tiancaiamao Nov 18, 2019
8b94a33
do not push minCommitTS = 0
tiancaiamao Nov 18, 2019
95525ce
make golint happy
tiancaiamao Nov 18, 2019
40c850c
Merge branch 'master' into non-block-read
tiancaiamao Nov 18, 2019
1db31b6
address comment
tiancaiamao Nov 19, 2019
72eef5b
Merge branch 'master' into non-block-read
MyonKeminta Nov 19, 2019
aaaea2c
Merge branch 'master' into non-block-read
sre-bot Nov 19, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
35 changes: 23 additions & 12 deletions store/tikv/coprocessor.go
Expand Up @@ -466,7 +466,6 @@ func (it *copIterator) open(ctx context.Context) {
it.wg.Add(it.concurrency)
// Start it.concurrency number of workers to handle cop requests.
for i := 0; i < it.concurrency; i++ {
sender := NewRegionRequestSender(it.store.regionCache, it.store.client)
worker := &copIteratorWorker{
taskCh: taskCh,
wg: &it.wg,
Expand All @@ -476,8 +475,10 @@ func (it *copIterator) open(ctx context.Context) {
finishCh: it.finishCh,
vars: it.vars,
clientHelper: clientHelper{
LockResolver: it.store.lockResolver,
RegionRequestSender: sender,
LockResolver: it.store.lockResolver,
RegionCache: it.store.regionCache,
Client: it.store.client,
minCommitTSPushed: make(map[uint64]struct{}),
},

memTracker: it.memTracker,
Expand Down Expand Up @@ -662,12 +663,12 @@ func (worker *copIteratorWorker) handleTaskOnce(bo *Backoffer, task *copTask, ch
ScanDetail: true,
})
startTime := time.Now()
resp, rpcCtx, err := worker.SendReqCtx(bo, req, task.region, task.storeType)
resp, rpcCtx, storeAddr, err := worker.SendReqCtx(bo, req, task.region, task.storeType)
if err != nil {
return nil, errors.Trace(err)
}
// Set task.storeAddr field so its task.String() method have the store address information.
task.storeAddr = worker.RegionRequestSender.storeAddr
task.storeAddr = storeAddr
costTime := time.Since(startTime)
if costTime > minLogCopTaskTime {
worker.logTimeCopTask(costTime, task, bo, resp)
Expand All @@ -691,9 +692,10 @@ func (worker *copIteratorWorker) handleTaskOnce(bo *Backoffer, task *copTask, ch
// meet the secondary lock again and run into a deadloop.
type clientHelper struct {
*LockResolver
*RegionRequestSender
*RegionCache
Client

minCommitTSPushed []uint64
minCommitTSPushed map[uint64]struct{}
tiancaiamao marked this conversation as resolved.
Show resolved Hide resolved
}

// ResolveLocks wraps the ResolveLocks function and store the resolved result.
Expand All @@ -702,15 +704,24 @@ func (ch *clientHelper) ResolveLocks(bo *Backoffer, region RegionVerID, callerSt
if err != nil {
return msBeforeTxnExpired, err
}
// Should we deduplicate here?
ch.minCommitTSPushed = append(ch.minCommitTSPushed, minCommitTSPushed...)
for _, v := range minCommitTSPushed {
ch.minCommitTSPushed[v] = struct{}{}
}
return msBeforeTxnExpired, nil
}

// SendReqCtx wraps the SendReqCtx function and use the resolved lock result in the kvrpcpb.Context.
func (ch *clientHelper) SendReqCtx(bo *Backoffer, req *tikvrpc.Request, regionID RegionVerID, sType kv.StoreType) (*tikvrpc.Response, *RPCContext, error) {
req.Context.ResolvedLocks = ch.minCommitTSPushed
return ch.RegionRequestSender.SendReqCtx(bo, req, regionID, ReadTimeoutMedium, sType)
func (ch *clientHelper) SendReqCtx(bo *Backoffer, req *tikvrpc.Request, regionID RegionVerID, sType kv.StoreType) (*tikvrpc.Response, *RPCContext, string, error) {
sender := NewRegionRequestSender(ch.RegionCache, ch.Client)
if len(ch.minCommitTSPushed) > 0 {
resolvedLocks := make([]uint64, 0, len(ch.minCommitTSPushed))
for k, _ := range ch.minCommitTSPushed {
resolvedLocks = append(resolvedLocks, k)
}
req.Context.ResolvedLocks = resolvedLocks
}
resp, ctx, err := sender.SendReqCtx(bo, req, regionID, ReadTimeoutMedium, sType)
return resp, ctx, sender.storeAddr, err
}

const (
Expand Down