Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

store: stop updating pessimistic transaction's lock TTL when the session is killed (#12959) #13046

Merged
merged 4 commits into from Nov 11, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
19 changes: 3 additions & 16 deletions config/config.go
Expand Up @@ -35,9 +35,9 @@ import (

// Config number limitations
const (
MaxLogFileSize = 4096 // MB
MinPessimisticTTL = time.Second * 15
MaxPessimisticTTL = time.Second * 120
MaxLogFileSize = 4096 // MB
// DefTxnTotalSizeLimit is the default value of TxnTxnTotalSizeLimit.
DefTxnTotalSizeLimit = 100 * 1024 * 1024
)

// Valid config maps
Expand Down Expand Up @@ -308,8 +308,6 @@ type PessimisticTxn struct {
Enable bool `toml:"enable" json:"enable"`
// The max count of retry for a single statement in a pessimistic transaction.
MaxRetryCount uint `toml:"max-retry-count" json:"max-retry-count"`
// The pessimistic lock ttl.
TTL string `toml:"ttl" json:"ttl"`
}

// StmtSummary is the config for statement summary.
Expand Down Expand Up @@ -411,7 +409,6 @@ var defaultConf = Config{
PessimisticTxn: PessimisticTxn{
Enable: true,
MaxRetryCount: 256,
TTL: "40s",
},
StmtSummary: StmtSummary{
MaxStmtCount: 100,
Expand Down Expand Up @@ -586,16 +583,6 @@ func (c *Config) Valid() error {
if c.TiKVClient.MaxTxnTimeUse == 0 {
return fmt.Errorf("max-txn-time-use should be greater than 0")
}
if c.PessimisticTxn.TTL != "" {
dur, err := time.ParseDuration(c.PessimisticTxn.TTL)
if err != nil {
return err
}
if dur < MinPessimisticTTL || dur > MaxPessimisticTTL {
return fmt.Errorf("pessimistic transaction ttl %s out of range [%s, %s]",
dur, MinPessimisticTTL, MaxPessimisticTTL)
}
}
return nil
}

Expand Down
4 changes: 0 additions & 4 deletions config/config.toml.example
Expand Up @@ -307,10 +307,6 @@ enable = true
# max retry count for a statement in a pessimistic transaction.
max-retry-count = 256

# default TTL in milliseconds for pessimistic lock.
# The value must between "15s" and "120s".
ttl = "40s"

[stmt-summary]
# max number of statements kept in memory.
max-stmt-count = 100
Expand Down
17 changes: 0 additions & 17 deletions config/config_test.go
Expand Up @@ -205,23 +205,6 @@ func (s *testConfigSuite) TestConfigDiff(c *C) {
c.Assert(diffs["Performance.FeedbackProbability"][1], Equals, float64(23.33))
}

func (s *testConfigSuite) TestValid(c *C) {
c1 := NewConfig()
tests := []struct {
ttl string
valid bool
}{
{"14s", false},
{"15s", true},
{"120s", true},
{"121s", false},
}
for _, tt := range tests {
c1.PessimisticTxn.TTL = tt.ttl
c.Assert(c1.Valid() == nil, Equals, tt.valid)
}
}

func (s *testConfigSuite) TestOOMActionValid(c *C) {
c1 := NewConfig()
tests := []struct {
Expand Down
18 changes: 18 additions & 0 deletions session/pessimistic_test.go
Expand Up @@ -421,6 +421,24 @@ func (s *testPessimisticSuite) TestWaitLockKill(c *C) {
tk.MustExec("rollback")
}

func (s *testPessimisticSuite) TestKillStopTTLManager(c *C) {
// Test killing an idle pessimistic session stop its ttlManager.
tk := testkit.NewTestKitWithInit(c, s.store)
tk2 := testkit.NewTestKitWithInit(c, s.store)
tk.MustExec("drop table if exists test_kill")
tk.MustExec("create table test_kill (id int primary key, c int)")
tk.MustExec("insert test_kill values (1, 1)")
tk.MustExec("begin pessimistic")
tk2.MustExec("begin pessimistic")
tk.MustQuery("select * from test_kill where id = 1 for update")
sessVars := tk.Se.GetSessionVars()
succ := atomic.CompareAndSwapUint32(&sessVars.Killed, 0, 1)
c.Assert(succ, IsTrue)

// This query should success rather than returning a ResolveLock error.
tk2.MustExec("update test_kill set c = c + 1 where id = 1")
}

func (s *testPessimisticSuite) TestInnodbLockWaitTimeout(c *C) {
tk := testkit.NewTestKitWithInit(c, s.store)
tk.MustExec("drop table if exists tk")
Expand Down
4 changes: 4 additions & 0 deletions store/mockstore/mocktikv/mvcc_leveldb.go
Expand Up @@ -740,6 +740,10 @@ func prewriteMutation(db *leveldb.DB, batch *leveldb.Batch, mutation *kvrpcpb.Mu
return nil
}
// Overwrite the pessimistic lock.
if ttl < dec.lock.ttl {
// Maybe ttlManager has already set the lock TTL, don't decrease it.
ttl = dec.lock.ttl
}
} else {
if isPessimisticLock {
return ErrAbort("pessimistic lock not found")
Expand Down
22 changes: 15 additions & 7 deletions store/tikv/2pc.go
Expand Up @@ -72,7 +72,7 @@ var (

// Global variable set by config file.
var (
PessimisticLockTTL uint64 = 15000 // 15s ~ 40s
PessimisticLockTTL uint64 = 20000 // 20s
)

func (actionPrewrite) String() string {
Expand Down Expand Up @@ -651,15 +651,17 @@ const (
)

type ttlManager struct {
state ttlManagerState
ch chan struct{}
state ttlManagerState
ch chan struct{}
killed *uint32
}

func (tm *ttlManager) run(c *twoPhaseCommitter) {
func (tm *ttlManager) run(c *twoPhaseCommitter, killed *uint32) {
// Run only once.
if !atomic.CompareAndSwapUint32((*uint32)(&tm.state), uint32(stateUninitialized), uint32(stateRunning)) {
return
}
tm.killed = killed
go tm.keepAlive(c)
}

Expand All @@ -671,14 +673,18 @@ func (tm *ttlManager) close() {
}

func (tm *ttlManager) keepAlive(c *twoPhaseCommitter) {
// Ticker is set to 1/3 of the PessimisticLockTTL.
ticker := time.NewTicker(time.Duration(PessimisticLockTTL) * time.Millisecond / 3)
// Ticker is set to 1/2 of the PessimisticLockTTL.
ticker := time.NewTicker(time.Duration(PessimisticLockTTL) * time.Millisecond / 2)
defer ticker.Stop()
for {
select {
case <-tm.ch:
return
case <-ticker.C:
// If kill signal is received, the ttlManager should exit.
if tm.killed != nil && atomic.LoadUint32(tm.killed) != 0 {
return
}
bo := NewBackoffer(context.Background(), pessimisticLockMaxBackoff)
now, err := c.store.GetOracle().GetTimestamp(bo.ctx)
if err != nil {
Expand Down Expand Up @@ -730,14 +736,16 @@ func (action actionPessimisticLock) handleSingleBatch(c *twoPhaseCommitter, bo *
mutations[i] = mut
}

t0 := oracle.GetTimeFromTS(c.forUpdateTS)
elapsed := uint64(time.Since(t0) / time.Millisecond)
req := &tikvrpc.Request{
Type: tikvrpc.CmdPessimisticLock,
PessimisticLock: &pb.PessimisticLockRequest{
Mutations: mutations,
PrimaryLock: c.primary(),
StartVersion: c.startTS,
ForUpdateTs: c.forUpdateTS,
LockTtl: c.pessimisticTTL,
LockTtl: elapsed + PessimisticLockTTL,
IsFirstLock: c.isFirstLock,
WaitTimeout: action.lockWaitTime,
},
Expand Down
11 changes: 8 additions & 3 deletions store/tikv/2pc_test.go
Expand Up @@ -37,6 +37,11 @@ type testCommitterSuite struct {

var _ = Suite(&testCommitterSuite{})

func (s *testCommitterSuite) SetUpSuite(c *C) {
PessimisticLockTTL = 3000 // 3s
s.OneByOneSuite.SetUpSuite(c)
}

func (s *testCommitterSuite) SetUpTest(c *C) {
s.cluster = mocktikv.NewCluster()
mocktikv.BootstrapWithMultiRegions(s.cluster, []byte("a"), []byte("b"), []byte("c"))
Expand Down Expand Up @@ -538,14 +543,14 @@ func (s *testCommitterSuite) TestPessimisticTTL(c *C) {
err = txn.LockKeys(context.Background(), nil, txn.startTS, kv.LockAlwaysWait, key2)
c.Assert(err, IsNil)
lockInfo := s.getLockInfo(c, key)
elapsedTTL := lockInfo.LockTtl - PessimisticLockTTL
c.Assert(elapsedTTL, GreaterEqual, uint64(100))
msBeforeLockExpired := s.store.GetOracle().UntilExpired(txn.StartTS(), lockInfo.LockTtl)
c.Assert(msBeforeLockExpired, GreaterEqual, int64(100))

lr := newLockResolver(s.store)
bo := NewBackoffer(context.Background(), getMaxBackoff)
status, err := lr.getTxnStatus(bo, txn.startTS, key2, txn.startTS)
c.Assert(err, IsNil)
c.Assert(status.ttl, Equals, lockInfo.LockTtl)
c.Assert(status.ttl, GreaterEqual, lockInfo.LockTtl)

// Check primary lock TTL is auto increasing while the pessimistic txn is ongoing.
for i := 0; i < 50; i++ {
Expand Down
7 changes: 1 addition & 6 deletions store/tikv/txn.go
Expand Up @@ -382,11 +382,6 @@ func (txn *tikvTxn) LockKeys(ctx context.Context, killed *uint32, forUpdateTS ui
return err
}
}
if txn.committer.pessimisticTTL == 0 {
// add elapsed time to pessimistic TTL on the first LockKeys request.
elapsed := uint64(time.Since(txn.startTime) / time.Millisecond)
txn.committer.pessimisticTTL = PessimisticLockTTL + elapsed
}
var assignedPrimaryKey bool
if txn.committer.primaryKey == nil {
txn.committer.primaryKey = keys[0]
Expand Down Expand Up @@ -428,7 +423,7 @@ func (txn *tikvTxn) LockKeys(ctx context.Context, killed *uint32, forUpdateTS ui
return err
}
if assignedPrimaryKey {
txn.committer.ttlManager.run(txn.committer)
txn.committer.ttlManager.run(txn.committer, killed)
}
}
txn.mu.Lock()
Expand Down
1 change: 0 additions & 1 deletion tidb-server/main.go
Expand Up @@ -526,7 +526,6 @@ func setGlobalVars() {
}

tikv.CommitMaxBackoff = int(parseDuration(cfg.TiKVClient.CommitTimeout).Seconds() * 1000)
tikv.PessimisticLockTTL = uint64(parseDuration(cfg.PessimisticTxn.TTL).Seconds() * 1000)
tikv.RegionCacheTTLSec = int64(cfg.TiKVClient.RegionCacheTTL)
}

Expand Down