diff --git a/pkg/kv/kvclient/kvcoord/txn_coord_sender.go b/pkg/kv/kvclient/kvcoord/txn_coord_sender.go index be30aa59d065..d08409085ba5 100644 --- a/pkg/kv/kvclient/kvcoord/txn_coord_sender.go +++ b/pkg/kv/kvclient/kvcoord/txn_coord_sender.go @@ -1082,6 +1082,13 @@ func (tc *TxnCoordSender) Epoch() enginepb.TxnEpoch { return tc.mu.txn.Epoch } +// IsLocking is part of the client.TxnSender interface. +func (tc *TxnCoordSender) IsLocking() bool { + tc.mu.Lock() + defer tc.mu.Unlock() + return tc.mu.txn.IsLocking() +} + // IsTracking returns true if the heartbeat loop is running. func (tc *TxnCoordSender) IsTracking() bool { tc.mu.Lock() diff --git a/pkg/kv/mock_transactional_sender.go b/pkg/kv/mock_transactional_sender.go index 1e0726204d6b..22117cfdc347 100644 --- a/pkg/kv/mock_transactional_sender.go +++ b/pkg/kv/mock_transactional_sender.go @@ -160,6 +160,9 @@ func (m *MockTransactionalSender) ReleaseSavepoint(context.Context, SavepointTok // Epoch is part of the TxnSender interface. func (m *MockTransactionalSender) Epoch() enginepb.TxnEpoch { panic("unimplemented") } +// IsLocking is part of the TxnSender interface. +func (m *MockTransactionalSender) IsLocking() bool { return false } + // TestingCloneTxn is part of the TxnSender interface. func (m *MockTransactionalSender) TestingCloneTxn() *roachpb.Transaction { return m.txn.Clone() diff --git a/pkg/kv/sender.go b/pkg/kv/sender.go index 9bf6cdf424a7..9ab8e2e1ce44 100644 --- a/pkg/kv/sender.go +++ b/pkg/kv/sender.go @@ -260,6 +260,9 @@ type TxnSender interface { // Epoch returns the txn's epoch. Epoch() enginepb.TxnEpoch + // IsLocking returns whether the transaction has begun acquiring locks. + IsLocking() bool + // PrepareRetryableError generates a // TransactionRetryWithProtoRefreshError with a payload initialized // from this txn. diff --git a/pkg/kv/txn.go b/pkg/kv/txn.go index 23031265a824..d453620ecfff 100644 --- a/pkg/kv/txn.go +++ b/pkg/kv/txn.go @@ -395,7 +395,7 @@ func (txn *Txn) DisablePipelining() error { // NewBatch creates and returns a new empty batch object for use with the Txn. func (txn *Txn) NewBatch() *Batch { - return &Batch{txn: txn, AdmissionHeader: txn.admissionHeader} + return &Batch{txn: txn, AdmissionHeader: txn.AdmissionHeader()} } // Get retrieves the value for a key, returning the retrieved key/value or an @@ -982,9 +982,11 @@ func (txn *Txn) Send( // Some callers have not initialized ba using a Batch constructed using // Txn.NewBatch. So we fallback to partially overwriting here. - noMem := ba.AdmissionHeader.NoMemoryReservedAtSource - ba.AdmissionHeader = txn.admissionHeader - ba.AdmissionHeader.NoMemoryReservedAtSource = noMem + if ba.AdmissionHeader.CreateTime == 0 { + noMem := ba.AdmissionHeader.NoMemoryReservedAtSource + ba.AdmissionHeader = txn.AdmissionHeader() + ba.AdmissionHeader.NoMemoryReservedAtSource = noMem + } txn.mu.Lock() requestTxnID := txn.mu.ID @@ -1507,5 +1509,20 @@ func (txn *Txn) DeferCommitWait(ctx context.Context) func(context.Context) error // AdmissionHeader returns the admission header for work done in the context // of this transaction. func (txn *Txn) AdmissionHeader() roachpb.AdmissionHeader { - return txn.admissionHeader + h := txn.admissionHeader + if txn.mu.sender.IsLocking() { + // Assign higher priority to requests by txns that are locking, so that + // they release locks earlier. Note that this is a crude approach, and is + // worse than priority inheritance used for locks in realtime systems. We + // do this because admission control does not have visibility into the + // exact locks held by waiters in the admission queue, and cannot compare + // that with priorities of waiting requests in the various lock table + // queues. This crude approach has shown some benefit in tpcc with 3000 + // warehouses, where it halved the number of lock waiters, and increased + // the transaction throughput by 10+%. In that experiment 40% of the + // BatchRequests evaluated by KV had been assigned high priority due to + // locking. + h.Priority = int32(admission.HighPri) + } + return h }