Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

store: add rateLimiter into txn commit batch actions #11817

Merged
merged 10 commits into from Aug 28, 2019
201 changes: 133 additions & 68 deletions store/tikv/2pc.go
Expand Up @@ -115,6 +115,18 @@ type twoPhaseCommitter struct {
regionTxnSize map[uint64]int
}

// batchExecutor is txn controller providing rate control like utils
type batchExecutor struct {
rateLim int // concurrent worker numbers
rateLimiter *rateLimit // rate limiter for concurrency control, maybe more strategies
committer *twoPhaseCommitter // here maybe more different type committer in the future
action twoPhaseCommitAction // the work action type
procFn procOneBatchFn // injected proc batch function
backoffer *Backoffer // Backoffer
}

type procOneBatchFn func(bo *Backoffer, batch batchKeys) error

type mutationEx struct {
pb.Mutation
asserted bool
Expand Down Expand Up @@ -377,10 +389,18 @@ func (c *twoPhaseCommitter) doActionOnKeys(bo *Backoffer, action twoPhaseCommitA

// doActionOnBatches does action to batches in parallel.
func (c *twoPhaseCommitter) doActionOnBatches(bo *Backoffer, action twoPhaseCommitAction, batches []batchKeys) error {
if len(batches) == 0 {
return nil
singleBatchActionFunc, err := c.getProcFuncByType(action)
if err != nil {
return err
}
var singleBatchActionFunc func(bo *Backoffer, batch batchKeys) error
rateLim := len(batches) // this will be used for LargeTxn, set rateLim here
batchExecutor := newBatchExecutor(rateLim, c, action, singleBatchActionFunc, bo)
err = batchExecutor.process(batches)
return errors.Trace(err)
}

func (c *twoPhaseCommitter) getProcFuncByType(action twoPhaseCommitAction) (procOneBatchFn, error) {
var singleBatchActionFunc procOneBatchFn
switch action {
case actionPrewrite:
singleBatchActionFunc = c.prewriteSingleBatch
Expand All @@ -392,72 +412,10 @@ func (c *twoPhaseCommitter) doActionOnBatches(bo *Backoffer, action twoPhaseComm
singleBatchActionFunc = c.pessimisticLockSingleBatch
case actionPessimisticRollback:
singleBatchActionFunc = c.pessimisticRollbackSingleBatch
default:
return nil, errors.Errorf("invalid action type=%v", action)
}
if len(batches) == 1 {
e := singleBatchActionFunc(bo, batches[0])
if e != nil {
logutil.BgLogger().Debug("2PC doActionOnBatches failed",
zap.Uint64("conn", c.connID),
zap.Stringer("action type", action),
zap.Error(e),
zap.Uint64("txnStartTS", c.startTS))
}
return errors.Trace(e)
}

// For prewrite, stop sending other requests after receiving first error.
backoffer := bo
var cancel context.CancelFunc
if action == actionPrewrite {
backoffer, cancel = bo.Fork()
defer cancel()
}

// Concurrently do the work for each batch.
ch := make(chan error, len(batches))
for _, batch1 := range batches {

batch := batch1
go func() {
if action == actionCommit {
// Because the secondary batches of the commit actions are implemented to be
// committed asynchronously in background goroutines, we should not
// fork a child context and call cancel() while the foreground goroutine exits.
// Otherwise the background goroutines will be canceled execeptionally.
// Here we makes a new clone of the original backoffer for this goroutine
// exclusively to avoid the data race when using the same backoffer
// in concurrent goroutines.
singleBatchBackoffer := backoffer.Clone()
ch <- singleBatchActionFunc(singleBatchBackoffer, batch)
} else {
singleBatchBackoffer, singleBatchCancel := backoffer.Fork()
defer singleBatchCancel()
ch <- singleBatchActionFunc(singleBatchBackoffer, batch)
}
}()
}
var err error
for i := 0; i < len(batches); i++ {
if e := <-ch; e != nil {
logutil.BgLogger().Debug("2PC doActionOnBatches failed",
zap.Uint64("conn", c.connID),
zap.Stringer("action type", action),
zap.Error(e),
zap.Uint64("txnStartTS", c.startTS))
// Cancel other requests and return the first error.
if cancel != nil {
logutil.BgLogger().Debug("2PC doActionOnBatches to cancel other actions",
zap.Uint64("conn", c.connID),
zap.Stringer("action type", action),
zap.Uint64("txnStartTS", c.startTS))
cancel()
}
if err == nil {
err = e
}
}
}
return errors.Trace(err)
return singleBatchActionFunc, nil
}

func (c *twoPhaseCommitter) keyValueSize(key []byte) int {
Expand Down Expand Up @@ -1050,3 +1008,110 @@ func appendBatchBySize(b []batchKeys, region RegionVerID, keys [][]byte, sizeFn
}
return b
}

// newBatchExecutor create processor to handle concurrent batch works(prewrite/commit etc)
func newBatchExecutor(rateLimit int, committer *twoPhaseCommitter,
action twoPhaseCommitAction, procFn procOneBatchFn, backoffer *Backoffer) *batchExecutor {
return &batchExecutor{rateLimit, nil, committer,
action, procFn, backoffer}
}

// initUtils do initialize batchExecutor related policies like rateLimit util
func (batchExe *batchExecutor) initUtils() error {
// init rateLimiter by injected rate limit number
batchExe.rateLimiter = newRateLimit(batchExe.rateLim)
return nil
}

// startWork concurrently do the work for each batch considering rate limit
func (batchExe *batchExecutor) startWorker(exitCh chan struct{}, ch chan error, batches []batchKeys) {
for idx, batch1 := range batches {
if exit := batchExe.rateLimiter.getToken(exitCh); !exit {
lysu marked this conversation as resolved.
Show resolved Hide resolved
batch := batch1
go func() {
var procRes error
defer batchExe.rateLimiter.putToken()
if batchExe.action == actionCommit {
// Because the secondary batches of the commit actions are implemented to be
// committed asynchronously in background goroutines, we should not
// fork a child context and call cancel() while the foreground goroutine exits.
// Otherwise the background goroutines will be canceled execeptionally.
// Here we makes a new clone of the original backoffer for this goroutine
// exclusively to avoid the data race when using the same backoffer
// in concurrent goroutines.
singleBatchBackoffer := batchExe.backoffer.Clone()
procRes = batchExe.procFn(singleBatchBackoffer, batch)
} else {
singleBatchBackoffer, singleBatchCancel := batchExe.backoffer.Fork()
defer singleBatchCancel()
procRes = batchExe.procFn(singleBatchBackoffer, batch)
}
ch <- procRes
}()
} else {
logutil.Logger(batchExe.backoffer.ctx).Info("break startWorker",
zap.Stringer("action", batchExe.action), zap.Int("batch size", len(batches)),
zap.Int("index", idx))
break
}
}
}

// process will start worker routine and collect results
func (batchExe *batchExecutor) process(batches []batchKeys) error {
var err error
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can move all the logic in doActionOnBatches here.

if len(batches) == 0 {
return nil
}
if len(batches) == 1 {
crazycs520 marked this conversation as resolved.
Show resolved Hide resolved
e := batchExe.procFn(batchExe.backoffer, batches[0])
if e != nil {
logutil.BgLogger().Debug("2PC doActionOnBatches failed",
zap.Uint64("conn", batchExe.committer.connID),
zap.Stringer("action type", batchExe.action),
zap.Error(e),
zap.Uint64("txnStartTS", batchExe.committer.startTS))
}
return errors.Trace(e)
}
err = batchExe.initUtils()
if err != nil {
logutil.Logger(batchExe.backoffer.ctx).Error("batchExecutor initUtils failed", zap.Error(err))
return err
}

// For prewrite, stop sending other requests after receiving first error.
backoffer := batchExe.backoffer
var cancel context.CancelFunc
if batchExe.action == actionPrewrite {
backoffer, cancel = batchExe.backoffer.Fork()
defer cancel()
}
// concurrently do the work for each batch.
ch := make(chan error, len(batches))
exitCh := make(chan struct{})
go batchExe.startWorker(exitCh, ch, batches)
// check results
for i := 0; i < len(batches); i++ {
if e := <-ch; e != nil {
logutil.Logger(backoffer.ctx).Debug("2PC doActionOnBatches failed",
zap.Uint64("conn", batchExe.committer.connID),
zap.Stringer("action type", batchExe.action),
zap.Error(e),
zap.Uint64("txnStartTS", batchExe.committer.startTS))
// Cancel other requests and return the first error.
if cancel != nil {
logutil.Logger(backoffer.ctx).Debug("2PC doActionOnBatches to cancel other actions",
zap.Uint64("conn", batchExe.committer.connID),
zap.Stringer("action type", batchExe.action),
zap.Uint64("txnStartTS", batchExe.committer.startTS))
cancel()
}
if err == nil {
err = e
}
}
}
close(exitCh)
return err
}