Skip to content

Commit

Permalink
executor: also collect unchanged unique keys for lock (pingcap#36498)
Browse files Browse the repository at this point in the history
  • Loading branch information
zyguan committed Jul 25, 2022
1 parent e2f1ba9 commit c80026e
Show file tree
Hide file tree
Showing 4 changed files with 60 additions and 13 deletions.
2 changes: 1 addition & 1 deletion executor/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -733,7 +733,7 @@ func (a *ExecStmt) handlePessimisticDML(ctx context.Context, e Executor) error {
if err1 != nil {
return err1
}
keys = txnCtx.CollectUnchangedRowKeys(keys)
keys = txnCtx.CollectUnchangedLockKeys(keys)
if len(keys) == 0 {
return nil
}
Expand Down
16 changes: 15 additions & 1 deletion executor/write.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,21 @@ func updateRecord(ctx context.Context, sctx sessionctx.Context, h kv.Handle, old
unchangedRowKey := tablecodec.EncodeRowKeyWithHandle(physicalID, h)
txnCtx := sctx.GetSessionVars().TxnCtx
if txnCtx.IsPessimistic {
txnCtx.AddUnchangedRowKey(unchangedRowKey)
txnCtx.AddUnchangedLockKey(unchangedRowKey)
for _, idx := range t.Indices() {
if !idx.Meta().Unique {
continue
}
ukVals, err := idx.FetchValues(oldData, nil)
if err != nil {
return false, err
}
unchangedUniqueKey, _, err := idx.GenIndexKey(sc, ukVals, h, nil)
if err != nil {
return false, err
}
txnCtx.AddUnchangedLockKey(unchangedUniqueKey)
}
}
return false, nil
}
Expand Down
23 changes: 12 additions & 11 deletions sessionctx/variable/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,8 +169,9 @@ type TxnCtxNoNeedToRestore struct {
currentShard int64
shardRand *rand.Rand

// unchangedRowKeys is used to store the unchanged rows that needs to lock for pessimistic transaction.
unchangedRowKeys map[string]struct{}
// unchangedLockKeys is used to store the unchanged keys that needs to lock for pessimistic transaction, including
// row keys and unique keys.
unchangedLockKeys map[string]struct{}

PessimisticCacheHit int

Expand Down Expand Up @@ -239,20 +240,20 @@ func (tc *TransactionContext) updateShard() {
tc.currentShard = int64(murmur3.Sum32(buf[:]))
}

// AddUnchangedRowKey adds an unchanged row key in update statement for pessimistic lock.
func (tc *TransactionContext) AddUnchangedRowKey(key []byte) {
if tc.unchangedRowKeys == nil {
tc.unchangedRowKeys = map[string]struct{}{}
// AddUnchangedLockKey adds an unchanged key in update statement for pessimistic lock.
func (tc *TransactionContext) AddUnchangedLockKey(key []byte) {
if tc.unchangedLockKeys == nil {
tc.unchangedLockKeys = map[string]struct{}{}
}
tc.unchangedRowKeys[string(key)] = struct{}{}
tc.unchangedLockKeys[string(key)] = struct{}{}
}

// CollectUnchangedRowKeys collects unchanged row keys for pessimistic lock.
func (tc *TransactionContext) CollectUnchangedRowKeys(buf []kv.Key) []kv.Key {
for key := range tc.unchangedRowKeys {
// CollectUnchangedLockKeys collects unchanged keys for pessimistic lock.
func (tc *TransactionContext) CollectUnchangedLockKeys(buf []kv.Key) []kv.Key {
for key := range tc.unchangedLockKeys {
buf = append(buf, kv.Key(key))
}
tc.unchangedRowKeys = nil
tc.unchangedLockKeys = nil
return buf
}

Expand Down
32 changes: 32 additions & 0 deletions tests/realtikvtest/pessimistictest/pessimistic_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -503,6 +503,38 @@ func TestLockUnchangedRowKey(t *testing.T) {
tk2.MustExec("rollback")
}

func TestLockUnchangedUniqueKey(t *testing.T) {
store, clean := realtikvtest.CreateMockStoreAndSetup(t)
defer clean()

tk := testkit.NewTestKit(t, store)
tk2 := testkit.NewTestKit(t, store)
tk.MustExec("use test")
tk2.MustExec("use test")

// ref https://github.com/pingcap/tidb/issues/36438
tk.MustExec("drop table if exists t")
tk.MustExec("create table t (i varchar(10), unique key(i))")
tk.MustExec("insert into t values ('a')")
tk.MustExec("begin pessimistic")
tk.MustExec("update t set i = 'a'")

errCh := make(chan error, 1)
go func() {
_, err := tk2.Exec("insert into t values ('a')")
errCh <- err
}()

select {
case <-errCh:
require.Fail(t, "insert is not blocked by update")
case <-time.After(500 * time.Millisecond):
tk.MustExec("rollback")
}

require.Error(t, <-errCh)
}

func TestOptimisticConflicts(t *testing.T) {
store, clean := realtikvtest.CreateMockStoreAndSetup(t)
defer clean()
Expand Down

0 comments on commit c80026e

Please sign in to comment.