Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions pkg/kv/kvclient/kvcoord/txn_coord_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -1082,6 +1082,13 @@ func (tc *TxnCoordSender) Epoch() enginepb.TxnEpoch {
return tc.mu.txn.Epoch
}

// IsLocking is part of the client.TxnSender interface.
func (tc *TxnCoordSender) IsLocking() bool {
tc.mu.Lock()
defer tc.mu.Unlock()
return tc.mu.txn.IsLocking()
}

// IsTracking returns true if the heartbeat loop is running.
func (tc *TxnCoordSender) IsTracking() bool {
tc.mu.Lock()
Expand Down
3 changes: 3 additions & 0 deletions pkg/kv/mock_transactional_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,9 @@ func (m *MockTransactionalSender) ReleaseSavepoint(context.Context, SavepointTok
// Epoch is part of the TxnSender interface.
func (m *MockTransactionalSender) Epoch() enginepb.TxnEpoch { panic("unimplemented") }

// IsLocking is part of the TxnSender interface.
func (m *MockTransactionalSender) IsLocking() bool { return false }

// TestingCloneTxn is part of the TxnSender interface.
func (m *MockTransactionalSender) TestingCloneTxn() *roachpb.Transaction {
return m.txn.Clone()
Expand Down
3 changes: 3 additions & 0 deletions pkg/kv/sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -260,6 +260,9 @@ type TxnSender interface {
// Epoch returns the txn's epoch.
Epoch() enginepb.TxnEpoch

// IsLocking returns whether the transaction has begun acquiring locks.
IsLocking() bool

// PrepareRetryableError generates a
// TransactionRetryWithProtoRefreshError with a payload initialized
// from this txn.
Expand Down
27 changes: 22 additions & 5 deletions pkg/kv/txn.go
Original file line number Diff line number Diff line change
Expand Up @@ -395,7 +395,7 @@ func (txn *Txn) DisablePipelining() error {

// NewBatch creates and returns a new empty batch object for use with the Txn.
func (txn *Txn) NewBatch() *Batch {
return &Batch{txn: txn, AdmissionHeader: txn.admissionHeader}
return &Batch{txn: txn, AdmissionHeader: txn.AdmissionHeader()}
}

// Get retrieves the value for a key, returning the retrieved key/value or an
Expand Down Expand Up @@ -982,9 +982,11 @@ func (txn *Txn) Send(

// Some callers have not initialized ba using a Batch constructed using
// Txn.NewBatch. So we fallback to partially overwriting here.
noMem := ba.AdmissionHeader.NoMemoryReservedAtSource
ba.AdmissionHeader = txn.admissionHeader
ba.AdmissionHeader.NoMemoryReservedAtSource = noMem
if ba.AdmissionHeader.CreateTime == 0 {
noMem := ba.AdmissionHeader.NoMemoryReservedAtSource
ba.AdmissionHeader = txn.AdmissionHeader()
ba.AdmissionHeader.NoMemoryReservedAtSource = noMem
}

txn.mu.Lock()
requestTxnID := txn.mu.ID
Expand Down Expand Up @@ -1507,5 +1509,20 @@ func (txn *Txn) DeferCommitWait(ctx context.Context) func(context.Context) error
// AdmissionHeader returns the admission header for work done in the context
// of this transaction.
func (txn *Txn) AdmissionHeader() roachpb.AdmissionHeader {
return txn.admissionHeader
h := txn.admissionHeader
if txn.mu.sender.IsLocking() {
// Assign higher priority to requests by txns that are locking, so that
// they release locks earlier. Note that this is a crude approach, and is
// worse than priority inheritance used for locks in realtime systems. We
// do this because admission control does not have visibility into the
// exact locks held by waiters in the admission queue, and cannot compare
// that with priorities of waiting requests in the various lock table
// queues. This crude approach has shown some benefit in tpcc with 3000
// warehouses, where it halved the number of lock waiters, and increased
// the transaction throughput by 10+%. In that experiment 40% of the
// BatchRequests evaluated by KV had been assigned high priority due to
// locking.
h.Priority = int32(admission.HighPri)
}
return h
}