From 120ac096f757ffbed42df2b11b37cc7205292939 Mon Sep 17 00:00:00 2001 From: sumeerbhola Date: Tue, 24 Aug 2021 18:19:02 -0400 Subject: [PATCH] kv,kvcoord: assign high priority for admission control if the txn has locks This is a crude way to limit priority inversion where a txn holding locks could be waiting in an admission queue while admitted requests are waiting in the lock table queues for this txn to make progress and release locks. It can also fare better than no admission control, since work from txns holding locks will get prioritized, versus no prioritization in goroutine scheduler. A tpcc run with 3000 warehouses shows 2x reduction in lock waiters and 10+% improvement in txn throughput with this change (both before and after experiments running with admission control enabled). When compared with admission control disabled, we see even higher improvements in lock waiters and txn throughput. Informs #65955 Release note: None Release justification: Low-risk update to new functionality. --- pkg/kv/kvclient/kvcoord/txn_coord_sender.go | 7 ++++++ pkg/kv/mock_transactional_sender.go | 3 +++ pkg/kv/sender.go | 3 +++ pkg/kv/txn.go | 27 +++++++++++++++++---- 4 files changed, 35 insertions(+), 5 deletions(-) diff --git a/pkg/kv/kvclient/kvcoord/txn_coord_sender.go b/pkg/kv/kvclient/kvcoord/txn_coord_sender.go index be30aa59d065..d08409085ba5 100644 --- a/pkg/kv/kvclient/kvcoord/txn_coord_sender.go +++ b/pkg/kv/kvclient/kvcoord/txn_coord_sender.go @@ -1082,6 +1082,13 @@ func (tc *TxnCoordSender) Epoch() enginepb.TxnEpoch { return tc.mu.txn.Epoch } +// IsLocking is part of the client.TxnSender interface. +func (tc *TxnCoordSender) IsLocking() bool { + tc.mu.Lock() + defer tc.mu.Unlock() + return tc.mu.txn.IsLocking() +} + // IsTracking returns true if the heartbeat loop is running. func (tc *TxnCoordSender) IsTracking() bool { tc.mu.Lock() diff --git a/pkg/kv/mock_transactional_sender.go b/pkg/kv/mock_transactional_sender.go index 1e0726204d6b..22117cfdc347 100644 --- a/pkg/kv/mock_transactional_sender.go +++ b/pkg/kv/mock_transactional_sender.go @@ -160,6 +160,9 @@ func (m *MockTransactionalSender) ReleaseSavepoint(context.Context, SavepointTok // Epoch is part of the TxnSender interface. func (m *MockTransactionalSender) Epoch() enginepb.TxnEpoch { panic("unimplemented") } +// IsLocking is part of the TxnSender interface. +func (m *MockTransactionalSender) IsLocking() bool { return false } + // TestingCloneTxn is part of the TxnSender interface. func (m *MockTransactionalSender) TestingCloneTxn() *roachpb.Transaction { return m.txn.Clone() diff --git a/pkg/kv/sender.go b/pkg/kv/sender.go index 9bf6cdf424a7..9ab8e2e1ce44 100644 --- a/pkg/kv/sender.go +++ b/pkg/kv/sender.go @@ -260,6 +260,9 @@ type TxnSender interface { // Epoch returns the txn's epoch. Epoch() enginepb.TxnEpoch + // IsLocking returns whether the transaction has begun acquiring locks. + IsLocking() bool + // PrepareRetryableError generates a // TransactionRetryWithProtoRefreshError with a payload initialized // from this txn. diff --git a/pkg/kv/txn.go b/pkg/kv/txn.go index 23031265a824..d453620ecfff 100644 --- a/pkg/kv/txn.go +++ b/pkg/kv/txn.go @@ -395,7 +395,7 @@ func (txn *Txn) DisablePipelining() error { // NewBatch creates and returns a new empty batch object for use with the Txn. func (txn *Txn) NewBatch() *Batch { - return &Batch{txn: txn, AdmissionHeader: txn.admissionHeader} + return &Batch{txn: txn, AdmissionHeader: txn.AdmissionHeader()} } // Get retrieves the value for a key, returning the retrieved key/value or an @@ -982,9 +982,11 @@ func (txn *Txn) Send( // Some callers have not initialized ba using a Batch constructed using // Txn.NewBatch. So we fallback to partially overwriting here. - noMem := ba.AdmissionHeader.NoMemoryReservedAtSource - ba.AdmissionHeader = txn.admissionHeader - ba.AdmissionHeader.NoMemoryReservedAtSource = noMem + if ba.AdmissionHeader.CreateTime == 0 { + noMem := ba.AdmissionHeader.NoMemoryReservedAtSource + ba.AdmissionHeader = txn.AdmissionHeader() + ba.AdmissionHeader.NoMemoryReservedAtSource = noMem + } txn.mu.Lock() requestTxnID := txn.mu.ID @@ -1507,5 +1509,20 @@ func (txn *Txn) DeferCommitWait(ctx context.Context) func(context.Context) error // AdmissionHeader returns the admission header for work done in the context // of this transaction. func (txn *Txn) AdmissionHeader() roachpb.AdmissionHeader { - return txn.admissionHeader + h := txn.admissionHeader + if txn.mu.sender.IsLocking() { + // Assign higher priority to requests by txns that are locking, so that + // they release locks earlier. Note that this is a crude approach, and is + // worse than priority inheritance used for locks in realtime systems. We + // do this because admission control does not have visibility into the + // exact locks held by waiters in the admission queue, and cannot compare + // that with priorities of waiting requests in the various lock table + // queues. This crude approach has shown some benefit in tpcc with 3000 + // warehouses, where it halved the number of lock waiters, and increased + // the transaction throughput by 10+%. In that experiment 40% of the + // BatchRequests evaluated by KV had been assigned high priority due to + // locking. + h.Priority = int32(admission.HighPri) + } + return h }