Skip to content

Commit

Permalink
roachpb: add lock durability info to Get/Scan/ReverseScan requests
Browse files Browse the repository at this point in the history
This patch adds lock durability information to Get, Scan, and
ReverseScan requests. This field is only ever meaningful in conjunction
with a locking strength that's not lock.None. It allows SQL to indicate
the durability with which locks, if acquired, should be held --
replicated or unreplicated.

To preserve the mixed-version story between 23.1 <-> 23.2 nodes, where
Get/Scan/ReverseScan requests can only acquire unreplicated locks, we
need switch the ordering in the lock.Durability protobuf. This is safe
for us to do -- the field is only sent over the wire when querying
`crdb_internal.cluster_locks` for observability. The only other place
it's used in, `roachpb.LockAcquisition`, is never sent over the wire.

Informs #109672

Release note: None
  • Loading branch information
arulajmani committed Sep 12, 2023
1 parent 6ef0a9a commit 65d846e
Show file tree
Hide file tree
Showing 23 changed files with 195 additions and 125 deletions.
4 changes: 2 additions & 2 deletions pkg/ccl/kvccl/kvfollowerreadsccl/followerreads_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,7 @@ func TestCanSendToFollower(t *testing.T) {
},
{
name: "stale locking read",
ba: batch(txn(stale), &kvpb.GetRequest{KeyLocking: lock.Exclusive}),
ba: batch(txn(stale), &kvpb.GetRequest{KeyLockingStrength: lock.Exclusive}),
exp: false,
},
{
Expand Down Expand Up @@ -345,7 +345,7 @@ func TestCanSendToFollower(t *testing.T) {
},
{
name: "stale locking read, global reads policy",
ba: batch(txn(stale), &kvpb.GetRequest{KeyLocking: lock.Exclusive}),
ba: batch(txn(stale), &kvpb.GetRequest{KeyLockingStrength: lock.Exclusive}),
ctPolicy: roachpb.LEAD_FOR_GLOBAL_READS,
exp: false,
},
Expand Down
4 changes: 2 additions & 2 deletions pkg/kv/kvclient/kvcoord/txn_interceptor_heartbeater_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ func TestTxnHeartbeaterLoopStartedOnFirstLock(t *testing.T) {
if write {
ba.Add(&kvpb.PutRequest{RequestHeader: keyAHeader})
} else {
ba.Add(&kvpb.ScanRequest{RequestHeader: keyAHeader, KeyLocking: lock.Exclusive})
ba.Add(&kvpb.ScanRequest{RequestHeader: keyAHeader, KeyLockingStrength: lock.Exclusive})
}

br, pErr = th.SendLocked(ctx, ba)
Expand Down Expand Up @@ -344,7 +344,7 @@ func TestTxnHeartbeaterLoopStartsBeforeExpiry(t *testing.T) {
ba.Header = kvpb.Header{Txn: txn.Clone()}
keyA := roachpb.Key("a")
keyAHeader := kvpb.RequestHeader{Key: keyA}
ba.Add(&kvpb.GetRequest{RequestHeader: keyAHeader, KeyLocking: lock.Exclusive})
ba.Add(&kvpb.GetRequest{RequestHeader: keyAHeader, KeyLockingStrength: lock.Exclusive})

br, pErr := th.SendLocked(ctx, ba)
require.Nil(t, pErr)
Expand Down
12 changes: 6 additions & 6 deletions pkg/kv/kvclient/kvcoord/txn_interceptor_pipeliner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,8 +122,8 @@ func TestTxnPipeliner1PCTransaction(t *testing.T) {
ba := &kvpb.BatchRequest{}
ba.Header = kvpb.Header{Txn: &txn}
scanArgs := kvpb.ScanRequest{
RequestHeader: kvpb.RequestHeader{Key: keyA, EndKey: keyB},
KeyLocking: lock.Exclusive,
RequestHeader: kvpb.RequestHeader{Key: keyA, EndKey: keyB},
KeyLockingStrength: lock.Exclusive,
}
ba.Add(&scanArgs)
putArgs := kvpb.PutRequest{RequestHeader: kvpb.RequestHeader{Key: keyA}}
Expand Down Expand Up @@ -1353,7 +1353,7 @@ func TestTxnPipelinerRecordsLocksOnFailure(t *testing.T) {
ba.Header = kvpb.Header{Txn: &txn}
ba.Add(&kvpb.PutRequest{RequestHeader: kvpb.RequestHeader{Key: keyA}})
ba.Add(&kvpb.DeleteRangeRequest{RequestHeader: kvpb.RequestHeader{Key: keyB, EndKey: keyB.Next()}})
ba.Add(&kvpb.ScanRequest{RequestHeader: kvpb.RequestHeader{Key: keyC, EndKey: keyC.Next()}, KeyLocking: lock.Exclusive})
ba.Add(&kvpb.ScanRequest{RequestHeader: kvpb.RequestHeader{Key: keyC, EndKey: keyC.Next()}, KeyLockingStrength: lock.Exclusive})

mockPErr := kvpb.NewErrorf("boom")
mockSender.MockSend(func(ba *kvpb.BatchRequest) (*kvpb.BatchResponse, *kvpb.Error) {
Expand Down Expand Up @@ -1382,7 +1382,7 @@ func TestTxnPipelinerRecordsLocksOnFailure(t *testing.T) {
ba.Requests = nil
ba.Add(&kvpb.PutRequest{RequestHeader: kvpb.RequestHeader{Key: keyD}})
ba.Add(&kvpb.DeleteRangeRequest{RequestHeader: kvpb.RequestHeader{Key: keyE, EndKey: keyE.Next()}})
ba.Add(&kvpb.ScanRequest{RequestHeader: kvpb.RequestHeader{Key: keyF, EndKey: keyF.Next()}, KeyLocking: lock.Exclusive})
ba.Add(&kvpb.ScanRequest{RequestHeader: kvpb.RequestHeader{Key: keyF, EndKey: keyF.Next()}, KeyLockingStrength: lock.Exclusive})

mockSender.MockSend(func(ba *kvpb.BatchRequest) (*kvpb.BatchResponse, *kvpb.Error) {
require.Len(t, ba.Requests, 3)
Expand Down Expand Up @@ -1450,7 +1450,7 @@ func TestTxnPipelinerIgnoresLocksOnUnambiguousFailure(t *testing.T) {
ba.Header = kvpb.Header{Txn: &txn}
ba.Add(&kvpb.ConditionalPutRequest{RequestHeader: kvpb.RequestHeader{Key: keyA}})
ba.Add(&kvpb.DeleteRangeRequest{RequestHeader: kvpb.RequestHeader{Key: keyB, EndKey: keyB.Next()}})
ba.Add(&kvpb.ScanRequest{RequestHeader: kvpb.RequestHeader{Key: keyC, EndKey: keyC.Next()}, KeyLocking: lock.Exclusive})
ba.Add(&kvpb.ScanRequest{RequestHeader: kvpb.RequestHeader{Key: keyC, EndKey: keyC.Next()}, KeyLockingStrength: lock.Exclusive})

condFailedErr := kvpb.NewError(&kvpb.ConditionFailedError{})
condFailedErr.SetErrorIndex(0)
Expand Down Expand Up @@ -1480,7 +1480,7 @@ func TestTxnPipelinerIgnoresLocksOnUnambiguousFailure(t *testing.T) {
ba.Requests = nil
ba.Add(&kvpb.ConditionalPutRequest{RequestHeader: kvpb.RequestHeader{Key: keyD}})
ba.Add(&kvpb.DeleteRangeRequest{RequestHeader: kvpb.RequestHeader{Key: keyE, EndKey: keyE.Next()}})
ba.Add(&kvpb.ScanRequest{RequestHeader: kvpb.RequestHeader{Key: keyF, EndKey: keyF.Next()}, KeyLocking: lock.Exclusive})
ba.Add(&kvpb.ScanRequest{RequestHeader: kvpb.RequestHeader{Key: keyF, EndKey: keyF.Next()}, KeyLockingStrength: lock.Exclusive})

lockConflictErr := kvpb.NewError(&kvpb.LockConflictError{})
lockConflictErr.SetErrorIndex(2)
Expand Down
4 changes: 2 additions & 2 deletions pkg/kv/kvclient/kvstreamer/streamer.go
Original file line number Diff line number Diff line change
Expand Up @@ -1744,7 +1744,7 @@ func buildResumeSingleRangeBatch(
newGet := gets[0]
gets = gets[1:]
newGet.req.SetSpan(*get.ResumeSpan)
newGet.req.KeyLocking = s.keyLocking
newGet.req.KeyLockingStrength = s.keyLocking
newGet.union.Get = &newGet.req
resumeReq.reqs[resumeReqIdx].Value = &newGet.union
resumeReq.positions = append(resumeReq.positions, position)
Expand Down Expand Up @@ -1772,7 +1772,7 @@ func buildResumeSingleRangeBatch(
scans = scans[1:]
newScan.req.SetSpan(*scan.ResumeSpan)
newScan.req.ScanFormat = kvpb.BATCH_RESPONSE
newScan.req.KeyLocking = s.keyLocking
newScan.req.KeyLockingStrength = s.keyLocking
newScan.union.Scan = &newScan.req
resumeReq.reqs[resumeReqIdx].Value = &newScan.union
resumeReq.positions = append(resumeReq.positions, position)
Expand Down
32 changes: 16 additions & 16 deletions pkg/kv/kvpb/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -198,28 +198,28 @@ type Request interface {
// strength of a read-only request.
type LockingReadRequest interface {
Request
KeyLockingStrength() lock.Strength
KeyLocking() (lock.Strength, lock.Durability)
}

var _ LockingReadRequest = (*GetRequest)(nil)

// KeyLockingStrength implements the LockingReadRequest interface.
func (gr *GetRequest) KeyLockingStrength() lock.Strength {
return gr.KeyLocking
// KeyLocking implements the LockingReadRequest interface.
func (gr *GetRequest) KeyLocking() (lock.Strength, lock.Durability) {
return gr.KeyLockingStrength, gr.KeyLockingDurability
}

var _ LockingReadRequest = (*ScanRequest)(nil)

// KeyLockingStrength implements the LockingReadRequest interface.
func (sr *ScanRequest) KeyLockingStrength() lock.Strength {
return sr.KeyLocking
// KeyLocking implements the LockingReadRequest interface.
func (sr *ScanRequest) KeyLocking() (lock.Strength, lock.Durability) {
return sr.KeyLockingStrength, sr.KeyLockingDurability
}

var _ LockingReadRequest = (*ReverseScanRequest)(nil)

// KeyLockingStrength implements the LockingReadRequest interface.
func (rsr *ReverseScanRequest) KeyLockingStrength() lock.Strength {
return rsr.KeyLocking
// KeyLocking implements the LockingReadRequest interface.
func (rsr *ReverseScanRequest) KeyLocking() (lock.Strength, lock.Durability) {
return rsr.KeyLockingStrength, rsr.KeyLockingDurability
}

// SizedWriteRequest is an interface used to expose the number of bytes a
Expand Down Expand Up @@ -1197,7 +1197,7 @@ func NewGet(key roachpb.Key, forUpdate bool) Request {
RequestHeader: RequestHeader{
Key: key,
},
KeyLocking: scanLockStrength(forUpdate),
KeyLockingStrength: scanLockStrength(forUpdate),
}
}

Expand Down Expand Up @@ -1323,7 +1323,7 @@ func NewScan(key, endKey roachpb.Key, forUpdate bool) Request {
Key: key,
EndKey: endKey,
},
KeyLocking: scanLockStrength(forUpdate),
KeyLockingStrength: scanLockStrength(forUpdate),
}
}

Expand All @@ -1336,7 +1336,7 @@ func NewReverseScan(key, endKey roachpb.Key, forUpdate bool) Request {
Key: key,
EndKey: endKey,
},
KeyLocking: scanLockStrength(forUpdate),
KeyLockingStrength: scanLockStrength(forUpdate),
}
}

Expand All @@ -1355,7 +1355,7 @@ func flagForLockStrength(l lock.Strength) flag {
}

func (gr *GetRequest) flags() flag {
maybeLocking := flagForLockStrength(gr.KeyLocking)
maybeLocking := flagForLockStrength(gr.KeyLockingStrength)
return isRead | isTxn | maybeLocking | updatesTSCache | needsRefresh | canSkipLocked
}

Expand Down Expand Up @@ -1445,12 +1445,12 @@ func (*RevertRangeRequest) flags() flag {
}

func (sr *ScanRequest) flags() flag {
maybeLocking := flagForLockStrength(sr.KeyLocking)
maybeLocking := flagForLockStrength(sr.KeyLockingStrength)
return isRead | isRange | isTxn | maybeLocking | updatesTSCache | needsRefresh | canSkipLocked
}

func (rsr *ReverseScanRequest) flags() flag {
maybeLocking := flagForLockStrength(rsr.KeyLocking)
maybeLocking := flagForLockStrength(rsr.KeyLockingStrength)
return isRead | isRange | isReverse | isTxn | maybeLocking | updatesTSCache | needsRefresh | canSkipLocked
}

Expand Down
91 changes: 80 additions & 11 deletions pkg/kv/kvpb/api.proto
Original file line number Diff line number Diff line change
Expand Up @@ -236,9 +236,32 @@ message GetRequest {
// The desired key-level locking mode used during this get. When set to None
// (the default), no key-level locking mode is used - meaning that the get
// does not acquire a lock. When set to any other strength, a lock of that
// strength is acquired with the Unreplicated durability (i.e. best-effort)
// the key, if it exists.
kv.kvserver.concurrency.lock.Strength key_locking = 2;
// strength is acquired with the associated durability guarantees on the key,
// if it exists.
kv.kvserver.concurrency.lock.Strength key_locking_strength = 2;

// KeyLockingDurability denotes the durability with which locks, if any are
// acquired, should be acquired with. It should only be set in conjunction
// with a non-None KeyLockingStrength.
//
// Unreplicated locks are kept in-memory on the leaseholder of the locked key.
// As such, their existence until a transaction commits is best-effort. They
// are susceptible to things like lease transfers and node crashes. However,
// they are faster to acquire and resolve when compared to replicated locks.
// This makes them an appealing choice when locks are not required for
// correctness. This includes things like (non-exhaustive list):
// 1. Transactions that run under serializable isolation level.
// 2. Implicit SFU for weaker isolation levels, where we know we will
// subsequently perform a (replicated) intent write on the key being locked.
//
// Replicated locks on the other hand, once acquired, are guaranteed to exist
// until the transaction finalizes (commits or aborts). They are not
// susceptible to things like lease transfers, range {splits,merges}, memory
// limits, node crashes etc. Replication adds a performance penalty for lock
// acquisition and resolution; as such, they should only be used by
// transactions that need guaranteed locks for correctness (read:
// read-committed or snapshot isolation transactions).
kv.kvserver.concurrency.lock.Durability key_locking_durability = 3;
}

// A GetResponse is the return value from the Get() method.
Expand Down Expand Up @@ -609,14 +632,37 @@ message ScanRequest {
// The desired key-level locking mode used during this scan. When set to None
// (the default), no key-level locking mode is used - meaning that the scan
// does not acquire any locks. When set to any other strength, a lock of that
// strength is acquired with the Unreplicated durability (i.e. best-effort) on
// each of the keys scanned by the request, subject to any key limit applied
// to the batch which limits the number of keys returned.
// strength is acquired with the associated durability guarantees on each of
// the keys scanned by the request, subject to any key limit applied to the
// batch which limits the number of keys returned.
//
// NOTE: the locks acquire with this strength are point locks on each of the
// keys returned by the request, not a single range lock over the entire span
// scanned by the request.
kv.kvserver.concurrency.lock.Strength key_locking = 5;
kv.kvserver.concurrency.lock.Strength key_locking_strength = 5;

// KeyLockingDurability denotes the durability with which locks, if any are
// acquired, should be acquired with. It should only be set in conjunction
// with a non-None KeyLockingStrength.
//
// Unreplicated locks are kept in-memory on the leaseholder of the locked key.
// As such, their existence until a transaction commits is best-effort. They
// are susceptible to things like lease transfers and node crashes. However,
// they are faster to acquire and resolve when compared to replicated locks.
// This makes them an appealing choice when locks are not required for
// correctness. This includes things like (non-exhaustive list):
// 1. Transactions that run under serializable isolation level.
// 2. Implicit SFU for weaker isolation levels, where we know we will
// subsequently perform a (replicated) intent write on the key being locked.
//
// Replicated locks on the other hand, once acquired, are guaranteed to exist
// until the transaction finalizes (commits or aborts). They are not
// susceptible to things like lease transfers, range {splits,merges}, memory
// limits, node crashes etc. Replication adds a performance penalty for lock
// acquisition and resolution; as such, they should only be used by
// transactions that need guaranteed locks for correctness (read:
// read-committed or snapshot isolation transactions).
kv.kvserver.concurrency.lock.Durability key_locking_durability = 6;
}

// A ScanResponse is the return value from the Scan() method.
Expand Down Expand Up @@ -674,14 +720,37 @@ message ReverseScanRequest {
// The desired key-level locking mode used during this scan. When set to None
// (the default), no key-level locking mode is used - meaning that the scan
// does not acquire any locks. When set to any other strength, a lock of that
// strength is acquired with the Unreplicated durability (i.e. best-effort) on
// each of the keys scanned by the request, subject to any key limit applied
// to the batch which limits the number of keys returned.
// strength is acquired with the associated durability guarantees on each of
// the keys scanned by the request, subject to any key limit applied to the
// batch which limits the number of keys returned.
//
// NOTE: the locks acquire with this strength are point locks on each of the
// keys returned by the request, not a single range lock over the entire span
// scanned by the request.
kv.kvserver.concurrency.lock.Strength key_locking = 5;
kv.kvserver.concurrency.lock.Strength key_locking_strength = 5;

// KeyLockingDurability denotes the durability with which locks, if any are
// acquired, should be acquired with. It should only be set in conjunction
// with a non-None KeyLockingStrength.
//
// Unreplicated locks are kept in-memory on the leaseholder of the locked key.
// As such, their existence until a transaction commits is best-effort. They
// are susceptible to things like lease transfers and node crashes. However,
// they are faster to acquire and resolve when compared to replicated locks.
// This makes them an appealing choice when locks are not required for
// correctness. This includes things like (non-exhaustive list):
// 1. Transactions that run under serializable isolation level.
// 2. Implicit SFU for weaker isolation levels, where we know we will
// subsequently perform a (replicated) intent write on the key being locked.
//
// Replicated locks on the other hand, once acquired, are guaranteed to exist
// until the transaction finalizes (commits or aborts). They are not
// susceptible to things like lease transfers, range {splits,merges}, memory
// limits, node crashes etc. Replication adds a performance penalty for lock
// acquisition and resolution; as such, they should only be used by
// transactions that need guaranteed locks for correctness (read:
// read-committed or snapshot isolation transactions).
kv.kvserver.concurrency.lock.Durability key_locking_durability = 6;
}

// A ReverseScanResponse is the return value from the ReverseScan() method.
Expand Down
6 changes: 3 additions & 3 deletions pkg/kv/kvpb/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -381,9 +381,9 @@ func TestFlagCombinations(t *testing.T) {
&AddSSTableRequest{SSTTimestampToRequestTimestamp: hlc.Timestamp{Logical: 1}},
&DeleteRangeRequest{Inline: true},
&DeleteRangeRequest{UseRangeTombstone: true},
&GetRequest{KeyLocking: lock.Exclusive},
&ReverseScanRequest{KeyLocking: lock.Exclusive},
&ScanRequest{KeyLocking: lock.Exclusive},
&GetRequest{KeyLockingStrength: lock.Exclusive},
&ReverseScanRequest{KeyLockingStrength: lock.Exclusive},
&ScanRequest{KeyLockingStrength: lock.Exclusive},
}

reqTypes := []Request{}
Expand Down
8 changes: 4 additions & 4 deletions pkg/kv/kvpb/batch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -236,9 +236,9 @@ func TestLockSpanIterate(t *testing.T) {
{&ReverseScanRequest{}, &ReverseScanResponse{}, sp("e", "g"), sp("f", "g")},
{&PutRequest{}, &PutResponse{}, sp("h", ""), sp("", "")},
{&DeleteRangeRequest{}, &DeleteRangeResponse{}, sp("i", "k"), sp("j", "k")},
{&GetRequest{KeyLocking: lock.Exclusive}, &GetResponse{}, sp("l", ""), sp("", "")},
{&ScanRequest{KeyLocking: lock.Exclusive}, &ScanResponse{}, sp("m", "o"), sp("n", "o")},
{&ReverseScanRequest{KeyLocking: lock.Exclusive}, &ReverseScanResponse{}, sp("p", "r"), sp("q", "r")},
{&GetRequest{KeyLockingStrength: lock.Exclusive}, &GetResponse{}, sp("l", ""), sp("", "")},
{&ScanRequest{KeyLockingStrength: lock.Exclusive}, &ScanResponse{}, sp("m", "o"), sp("n", "o")},
{&ReverseScanRequest{KeyLockingStrength: lock.Exclusive}, &ReverseScanResponse{}, sp("p", "r"), sp("q", "r")},
}

// NB: can't import testutils for RunTrueAndFalse.
Expand Down Expand Up @@ -277,7 +277,7 @@ func TestLockSpanIterate(t *testing.T) {
// The intent writes are replicated locking request.
require.Equal(t, toExpSpans(testReqs[3], testReqs[4]), spans[lock.Replicated])

// The scans with KeyLocking are unreplicated locking requests.
// The scans with KeyLockingStrength are unreplicated locking requests.
require.Equal(t, toExpSpans(testReqs[5], testReqs[6], testReqs[7]), spans[lock.Unreplicated])
})
}
Expand Down
6 changes: 3 additions & 3 deletions pkg/kv/kvserver/batcheval/cmd_get.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ func Get(
Inconsistent: h.ReadConsistency != kvpb.CONSISTENT,
SkipLocked: h.WaitPolicy == lock.WaitPolicy_SkipLocked,
Txn: h.Txn,
FailOnMoreRecent: args.KeyLocking != lock.None,
FailOnMoreRecent: args.KeyLockingStrength != lock.None,
ScanStats: cArgs.ScanStats,
Uncertainty: cArgs.Uncertainty,
MemoryAccount: cArgs.EvalCtx.GetResponseMemoryAccount(),
Expand Down Expand Up @@ -83,8 +83,8 @@ func Get(
}

var res result.Result
if args.KeyLocking != lock.None && h.Txn != nil && getRes.Value != nil {
acq := roachpb.MakeLockAcquisition(h.Txn, args.Key, lock.Unreplicated, args.KeyLocking)
if args.KeyLockingStrength != lock.None && h.Txn != nil && getRes.Value != nil {
acq := roachpb.MakeLockAcquisition(h.Txn, args.Key, lock.Unreplicated, args.KeyLockingStrength)
res.Local.AcquiredLocks = []roachpb.LockAcquisition{acq}
}
res.Local.EncounteredIntents = intents
Expand Down
6 changes: 3 additions & 3 deletions pkg/kv/kvserver/batcheval/cmd_reverse_scan.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ func ReverseScan(
TargetBytes: h.TargetBytes,
AllowEmpty: h.AllowEmpty,
WholeRowsOfSize: h.WholeRowsOfSize,
FailOnMoreRecent: args.KeyLocking != lock.None,
FailOnMoreRecent: args.KeyLockingStrength != lock.None,
Reverse: true,
MemoryAccount: cArgs.EvalCtx.GetResponseMemoryAccount(),
LockTable: cArgs.Concurrency,
Expand Down Expand Up @@ -109,8 +109,8 @@ func ReverseScan(
}
}

if args.KeyLocking != lock.None && h.Txn != nil {
err = acquireUnreplicatedLocksOnKeys(&res, h.Txn, args.KeyLocking, args.ScanFormat, &scanRes)
if args.KeyLockingStrength != lock.None && h.Txn != nil {
err = acquireUnreplicatedLocksOnKeys(&res, h.Txn, args.KeyLockingStrength, args.ScanFormat, &scanRes)
if err != nil {
return result.Result{}, err
}
Expand Down
Loading

0 comments on commit 65d846e

Please sign in to comment.