Skip to content

Commit

Permalink
do init limiter if needed
Browse files Browse the repository at this point in the history
  • Loading branch information
cfzjywxk committed Aug 23, 2019
1 parent 37f03cf commit 3ea5ba8
Showing 1 changed file with 16 additions and 2 deletions.
18 changes: 16 additions & 2 deletions store/tikv/2pc.go
Expand Up @@ -117,7 +117,8 @@ type twoPhaseCommitter struct {

// batchExecutor is txn controller providing rate control like utils
type batchExecutor struct {
rateLimiter *rateLimit // rate limit for concurrency control, maybe more strategies
rateLim int // concurrent worker numbers
rateLimiter *rateLimit // rate limiter for concurrency control, maybe more strategies
committer *twoPhaseCommitter // here maybe more different type committer in the future
action twoPhaseCommitAction // the work action type
procFn procOneBatchFn // injected proc batch function
Expand Down Expand Up @@ -1011,10 +1012,17 @@ func appendBatchBySize(b []batchKeys, region RegionVerID, keys [][]byte, sizeFn
// newBatchExecutor create processor to handle concurrent batch works(prewrite/commit etc)
func newBatchExecutor(rateLimit int, committer *twoPhaseCommitter,
action twoPhaseCommitAction, procFn procOneBatchFn, backoffer *Backoffer) *batchExecutor {
return &batchExecutor{newRateLimit(rateLimit), committer,
return &batchExecutor{rateLimit, nil, committer,
action, procFn, backoffer}
}

// initUtils do initialize batchExecutor related policies like rateLimit util
func (batchExe *batchExecutor) initUtils() error {
// init rateLimiter by injected rate limit number
batchExe.rateLimiter = newRateLimit(batchExe.rateLim)
return nil
}

// startWork concurrently do the work for each batch considering rate limit
func (batchExe *batchExecutor) startWorker(exitCh chan struct{}, ch chan error, batches []batchKeys) {
for idx, batch1 := range batches {
Expand Down Expand Up @@ -1066,6 +1074,12 @@ func (batchExe *batchExecutor) process(batches []batchKeys) error {
}
return errors.Trace(e)
}
err = batchExe.initUtils()
if err != nil {
logutil.Logger(batchExe.backoffer.ctx).Error("batchExecutor initUtils failed", zap.Error(err))
return err
}

// For prewrite, stop sending other requests after receiving first error.
backoffer := batchExe.backoffer
var cancel context.CancelFunc
Expand Down

0 comments on commit 3ea5ba8

Please sign in to comment.