From ba13697aac5a020bfcc6d0ffc10912eacb8b5d2d Mon Sep 17 00:00:00 2001 From: lyang24 Date: Fri, 1 Dec 2023 13:51:22 -0800 Subject: [PATCH] kv: include conflicting request information in latch manager traces/logs This commit logs latch waiter and latch holder's request information when latch conflicts. This is achieved by adding redact.SafeFormatter from batch request into concurrency.Request and pass it down to latch Guard. Since the Guard struct embedded the signal struct, we will store store a pointer to the latch guard in latch struct instead. Fixes: https://github.com/cockroachdb/cockroach/issues/114601 Relase note: None --- .../concurrency/concurrency_control.go | 7 ++- .../concurrency/concurrency_manager.go | 2 +- .../concurrency/concurrency_manager_test.go | 24 +++++--- pkg/kv/kvserver/concurrency/latch_manager.go | 9 +-- .../kvserver/concurrency/lock_table_test.go | 44 ++++++++++---- .../concurrency/lock_table_waiter_test.go | 58 +++++++++++++----- .../testdata/concurrency_manager/barrier | 6 +- .../testdata/concurrency_manager/basic | 6 +- .../testdata/concurrency_manager/optimistic | 2 +- .../concurrency_manager/poison_policy_err | 4 +- .../poison_policy_err_indirect | 4 +- .../poison_policy_wait_disjoint | 4 +- .../poison_policy_wait_overlapping | 4 +- .../concurrency_manager/range_state_listener | 10 ++-- .../concurrency_manager/shared_locks_latches | 26 ++++---- .../slow_latch_observability | 4 +- pkg/kv/kvserver/replica_send.go | 1 + pkg/kv/kvserver/spanlatch/BUILD.bazel | 2 + pkg/kv/kvserver/spanlatch/manager.go | 46 ++++++++------ pkg/kv/kvserver/spanlatch/manager_test.go | 60 +++++++++++++++---- 20 files changed, 222 insertions(+), 101 deletions(-) diff --git a/pkg/kv/kvserver/concurrency/concurrency_control.go b/pkg/kv/kvserver/concurrency/concurrency_control.go index 7950d46d20ff..af093f4f4be0 100644 --- a/pkg/kv/kvserver/concurrency/concurrency_control.go +++ b/pkg/kv/kvserver/concurrency/concurrency_control.go @@ -27,6 +27,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/storage/enginepb" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/uuid" + "github.com/cockroachdb/redact" ) // Manager is a structure that sequences incoming requests and provides @@ -432,6 +433,10 @@ type Request struct { // passed to SequenceReq. Only supplied to SequenceReq if the method is // not also passed an exiting Guard. LockSpans *lockspanset.LockSpanSet + + // The SafeFormatter capable of formatting the request. This is used to enrich + // logging with request level information when latches conflict. + BaFmt redact.SafeFormatter } // Guard is returned from Manager.SequenceReq. The guard is passed back in to @@ -510,7 +515,7 @@ type latchManager interface { // WaitFor waits for conflicting latches on the specified spans without adding // any latches itself. Fast path for operations that only require flushing out // old operations without blocking any new ones. - WaitFor(ctx context.Context, spans *spanset.SpanSet, pp poison.Policy) *Error + WaitFor(ctx context.Context, spans *spanset.SpanSet, pp poison.Policy, baFmt redact.SafeFormatter) *Error // Poison a guard's latches, allowing waiters to fail fast. Poison(latchGuard) diff --git a/pkg/kv/kvserver/concurrency/concurrency_manager.go b/pkg/kv/kvserver/concurrency/concurrency_manager.go index 1e8aa222fbb2..3ee5558dad5f 100644 --- a/pkg/kv/kvserver/concurrency/concurrency_manager.go +++ b/pkg/kv/kvserver/concurrency/concurrency_manager.go @@ -260,7 +260,7 @@ func (m *managerImpl) sequenceReqWithGuard( // them. if shouldWaitOnLatchesWithoutAcquiring(g.Req) { log.Event(ctx, "waiting on latches without acquiring") - return nil, m.lm.WaitFor(ctx, g.Req.LatchSpans, g.Req.PoisonPolicy) + return nil, m.lm.WaitFor(ctx, g.Req.LatchSpans, g.Req.PoisonPolicy, g.Req.BaFmt) } // Provide the manager with an opportunity to intercept the request. It diff --git a/pkg/kv/kvserver/concurrency/concurrency_manager_test.go b/pkg/kv/kvserver/concurrency/concurrency_manager_test.go index 462df780626b..01eb99ddc62d 100644 --- a/pkg/kv/kvserver/concurrency/concurrency_manager_test.go +++ b/pkg/kv/kvserver/concurrency/concurrency_manager_test.go @@ -194,25 +194,33 @@ func TestConcurrencyManagerBasic(t *testing.T) { if d.HasArg("max-lock-wait-queue-length") { d.ScanArgs(t, "max-lock-wait-queue-length", &maxLockWaitQueueLength) } - + ba := &kvpb.BatchRequest{} pp := scanPoisonPolicy(t, d) // Each kvpb.Request is provided on an indented line. reqs, reqUnions := scanRequests(t, d, c) + ba.Txn = txn + ba.Timestamp = ts + ba.UserPriority = priority + ba.ReadConsistency = readConsistency + ba.WaitPolicy = waitPolicy + ba.LockTimeout = lockTimeout + ba.Requests = reqUnions latchSpans, lockSpans := c.collectSpans(t, txn, ts, waitPolicy, reqs) c.requestsByName[reqName] = concurrency.Request{ - Txn: txn, - Timestamp: ts, - NonTxnPriority: priority, - ReadConsistency: readConsistency, - WaitPolicy: waitPolicy, - LockTimeout: lockTimeout, + Txn: ba.Txn, + Timestamp: ba.Timestamp, + NonTxnPriority: ba.UserPriority, + ReadConsistency: ba.ReadConsistency, + WaitPolicy: ba.WaitPolicy, + LockTimeout: ba.LockTimeout, + Requests: ba.Requests, MaxLockWaitQueueLength: maxLockWaitQueueLength, - Requests: reqUnions, LatchSpans: latchSpans, LockSpans: lockSpans, PoisonPolicy: pp, + BaFmt: ba, } return "" diff --git a/pkg/kv/kvserver/concurrency/latch_manager.go b/pkg/kv/kvserver/concurrency/latch_manager.go index d40c249eae32..482aedba2877 100644 --- a/pkg/kv/kvserver/concurrency/latch_manager.go +++ b/pkg/kv/kvserver/concurrency/latch_manager.go @@ -17,6 +17,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv/kvserver/concurrency/poison" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/spanlatch" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/spanset" + "github.com/cockroachdb/redact" ) // latchManagerImpl implements the latchManager interface. @@ -25,7 +26,7 @@ type latchManagerImpl struct { } func (m *latchManagerImpl) Acquire(ctx context.Context, req Request) (latchGuard, *Error) { - lg, err := m.m.Acquire(ctx, req.LatchSpans, req.PoisonPolicy) + lg, err := m.m.Acquire(ctx, req.LatchSpans, req.PoisonPolicy, req.BaFmt) if err != nil { return nil, kvpb.NewError(err) } @@ -33,7 +34,7 @@ func (m *latchManagerImpl) Acquire(ctx context.Context, req Request) (latchGuard } func (m *latchManagerImpl) AcquireOptimistic(req Request) latchGuard { - lg := m.m.AcquireOptimistic(req.LatchSpans, req.PoisonPolicy) + lg := m.m.AcquireOptimistic(req.LatchSpans, req.PoisonPolicy, req.BaFmt) return lg } @@ -52,9 +53,9 @@ func (m *latchManagerImpl) WaitUntilAcquired( } func (m *latchManagerImpl) WaitFor( - ctx context.Context, ss *spanset.SpanSet, pp poison.Policy, + ctx context.Context, ss *spanset.SpanSet, pp poison.Policy, baFmt redact.SafeFormatter, ) *Error { - err := m.m.WaitFor(ctx, ss, pp) + err := m.m.WaitFor(ctx, ss, pp, baFmt) if err != nil { return kvpb.NewError(err) } diff --git a/pkg/kv/kvserver/concurrency/lock_table_test.go b/pkg/kv/kvserver/concurrency/lock_table_test.go index 0b02f6a225eb..ce5d2587ed7e 100644 --- a/pkg/kv/kvserver/concurrency/lock_table_test.go +++ b/pkg/kv/kvserver/concurrency/lock_table_test.go @@ -22,6 +22,7 @@ import ( "time" "github.com/cockroachdb/cockroach/pkg/keys" + "github.com/cockroachdb/cockroach/pkg/kv/kvpb" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/concurrency/isolation" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/concurrency/lock" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/concurrency/poison" @@ -321,21 +322,26 @@ func TestLockTableBasic(t *testing.T) { d.ScanArgs(t, "max-lock-wait-queue-length", &maxLockWaitQueueLength) } latchSpans, lockSpans := scanSpans(t, d, ts) + ba := &kvpb.BatchRequest{} + ba.Timestamp = ts + ba.WaitPolicy = waitPolicy req := Request{ - Timestamp: ts, - WaitPolicy: waitPolicy, + Timestamp: ba.Timestamp, + WaitPolicy: ba.WaitPolicy, MaxLockWaitQueueLength: maxLockWaitQueueLength, LatchSpans: latchSpans, LockSpans: lockSpans, + BaFmt: ba, } if txnMeta != nil { // Update the transaction's timestamp, if necessary. The transaction // may have needed to move its timestamp for any number of reasons. txnMeta.WriteTimestamp = ts - req.Txn = &roachpb.Transaction{ + ba.Txn = &roachpb.Transaction{ TxnMeta: *txnMeta, ReadTimestamp: ts, } + req.Txn = ba.Txn } requestsByName[reqName] = req return "" @@ -874,10 +880,13 @@ func TestLockTableMaxLocks(t *testing.T) { latchSpans.AddMVCC(spanset.SpanReadWrite, roachpb.Span{Key: k}, hlc.Timestamp{WallTime: 1}) lockSpans.Add(lock.Intent, roachpb.Span{Key: k}) } + ba := &kvpb.BatchRequest{} + ba.Timestamp = hlc.Timestamp{WallTime: 1} req := Request{ - Timestamp: hlc.Timestamp{WallTime: 1}, + Timestamp: ba.Timestamp, LatchSpans: latchSpans, LockSpans: lockSpans, + BaFmt: ba, } reqs = append(reqs, req) ltg, err := lt.ScanAndEnqueue(req, nil) @@ -1011,10 +1020,13 @@ func TestLockTableMaxLocksWithMultipleNotRemovableRefs(t *testing.T) { } latchSpans.AddMVCC(spanset.SpanReadWrite, roachpb.Span{Key: key}, hlc.Timestamp{WallTime: 1}) lockSpans.Add(lock.Intent, roachpb.Span{Key: key}) + ba := &kvpb.BatchRequest{} + ba.Timestamp = hlc.Timestamp{WallTime: 1} req := Request{ - Timestamp: hlc.Timestamp{WallTime: 1}, + Timestamp: ba.Timestamp, LatchSpans: latchSpans, LockSpans: lockSpans, + BaFmt: ba, } ltg, err := lt.ScanAndEnqueue(req, nil) require.Nil(t, err) @@ -1111,7 +1123,7 @@ func doWork(ctx context.Context, item *workItem, e *workloadExecutor) error { // cancellation, the code makes sure to release latches when returning // early due to error. Otherwise other requests will get stuck and // group.Wait() will not return until the test times out. - lg, err = e.lm.Acquire(context.Background(), item.request.LatchSpans, poison.Policy_Error) + lg, err = e.lm.Acquire(context.Background(), item.request.LatchSpans, poison.Policy_Error, item.request.BaFmt) if err != nil { return err } @@ -1519,11 +1531,15 @@ func TestLockTableConcurrentSingleRequests(t *testing.T) { ReadTimestamp: ts, } } + ba := &kvpb.BatchRequest{} + ba.Txn = txn + ba.Timestamp = ts request := &Request{ - Txn: txn, - Timestamp: ts, + Txn: ba.Txn, + Timestamp: ba.Timestamp, LatchSpans: latchSpans, LockSpans: lockSpans, + BaFmt: ba, } items = append(items, workloadItem{request: request}) if txn != nil { @@ -1601,10 +1617,13 @@ func TestLockTableConcurrentRequests(t *testing.T) { lockSpans := &lockspanset.LockSpanSet{} onlyReads := txnMeta == nil && rng.Intn(2) != 0 numKeys := rng.Intn(len(keys)-1) + 1 + ba := &kvpb.BatchRequest{} + ba.Timestamp = ts request := &Request{ - Timestamp: ts, + Timestamp: ba.Timestamp, LatchSpans: latchSpans, LockSpans: lockSpans, + BaFmt: ba, } if txnMeta != nil { request.Txn = &roachpb.Transaction{ @@ -1691,7 +1710,7 @@ func doBenchWork(item *benchWorkItem, env benchEnv, doneCh chan<- error) { var err error firstIter := true for { - if lg, err = env.lm.Acquire(context.Background(), item.LatchSpans, poison.Policy_Error); err != nil { + if lg, err = env.lm.Acquire(context.Background(), item.LatchSpans, poison.Policy_Error, item.BaFmt); err != nil { doneCh <- err return } @@ -1738,7 +1757,7 @@ func doBenchWork(item *benchWorkItem, env benchEnv, doneCh chan<- error) { return } // Release locks. - if lg, err = env.lm.Acquire(context.Background(), item.LatchSpans, poison.Policy_Error); err != nil { + if lg, err = env.lm.Acquire(context.Background(), item.LatchSpans, poison.Policy_Error, item.BaFmt); err != nil { doneCh <- err return } @@ -1766,11 +1785,14 @@ func createRequests(index int, numOutstanding int, numKeys int, numReadKeys int) ts := hlc.Timestamp{WallTime: 10} latchSpans := &spanset.SpanSet{} lockSpans := &lockspanset.LockSpanSet{} + ba := &kvpb.BatchRequest{} + ba.Timestamp = ts wi := benchWorkItem{ Request: Request{ Timestamp: ts, LatchSpans: latchSpans, LockSpans: lockSpans, + BaFmt: ba, }, } for i := 0; i < numKeys; i++ { diff --git a/pkg/kv/kvserver/concurrency/lock_table_waiter_test.go b/pkg/kv/kvserver/concurrency/lock_table_waiter_test.go index 3ad717c9abc7..770158634cc7 100644 --- a/pkg/kv/kvserver/concurrency/lock_table_waiter_test.go +++ b/pkg/kv/kvserver/concurrency/lock_table_waiter_test.go @@ -151,9 +151,13 @@ func TestLockTableWaiterWithTxn(t *testing.T) { if synthetic { txn.ReadTimestamp = txn.ReadTimestamp.WithSynthetic(true) } + ba := &kvpb.BatchRequest{} + ba.Txn = &txn + ba.Timestamp = txn.ReadTimestamp return Request{ Txn: &txn, - Timestamp: txn.ReadTimestamp, + Timestamp: ba.Timestamp, + BaFmt: ba, } } @@ -241,9 +245,13 @@ func TestLockTableWaiterWithNonTxn(t *testing.T) { reqHeaderTS := hlc.Timestamp{WallTime: 10} makeReq := func() Request { + ba := &kvpb.BatchRequest{} + ba.Timestamp = reqHeaderTS + ba.UserPriority = roachpb.NormalUserPriority return Request{ - Timestamp: reqHeaderTS, - NonTxnPriority: roachpb.NormalUserPriority, + Timestamp: ba.Timestamp, + NonTxnPriority: ba.UserPriority, + BaFmt: ba, } } @@ -441,10 +449,15 @@ func TestLockTableWaiterWithErrorWaitPolicy(t *testing.T) { makeReq := func() Request { txn := makeTxnProto("request") txn.GlobalUncertaintyLimit = uncertaintyLimit + ba := &kvpb.BatchRequest{} + ba.Txn = &txn + ba.Timestamp = txn.ReadTimestamp + ba.WaitPolicy = lock.WaitPolicy_Error return Request{ Txn: &txn, - Timestamp: txn.ReadTimestamp, - WaitPolicy: lock.WaitPolicy_Error, + Timestamp: ba.Timestamp, + WaitPolicy: ba.WaitPolicy, + BaFmt: ba, } } makeHighPriReq := func() Request { @@ -608,17 +621,26 @@ func TestLockTableWaiterWithLockTimeout(t *testing.T) { const lockTimeout = 1 * time.Millisecond makeReq := func() Request { txn := makeTxnProto("request") + ba := &kvpb.BatchRequest{} + ba.Txn = &txn + ba.Timestamp = txn.ReadTimestamp + ba.LockTimeout = lockTimeout return Request{ - Txn: &txn, - Timestamp: txn.ReadTimestamp, - LockTimeout: lockTimeout, + Txn: ba.Txn, + Timestamp: ba.Timestamp, + LockTimeout: ba.LockTimeout, + BaFmt: ba, } } if !txn { makeReq = func() Request { + ba := &kvpb.BatchRequest{} + ba.Timestamp = hlc.Timestamp{WallTime: 10} + ba.LockTimeout = lockTimeout return Request{ - Timestamp: hlc.Timestamp{WallTime: 10}, - LockTimeout: lockTimeout, + Timestamp: ba.Timestamp, + LockTimeout: ba.LockTimeout, + BaFmt: ba, } } } @@ -793,9 +815,13 @@ func TestLockTableWaiterIntentResolverError(t *testing.T) { err2 := kvpb.NewErrorf("error2") txn := makeTxnProto("request") + ba := &kvpb.BatchRequest{} + ba.Txn = &txn + ba.Timestamp = txn.ReadTimestamp req := Request{ - Txn: &txn, - Timestamp: txn.ReadTimestamp, + Txn: ba.Txn, + Timestamp: ba.Timestamp, + BaFmt: ba, } // Test with both synchronous and asynchronous pushes. @@ -849,9 +875,13 @@ func TestLockTableWaiterDeferredIntentResolverError(t *testing.T) { defer w.stopper.Stop(ctx) txn := makeTxnProto("request") + ba := &kvpb.BatchRequest{} + ba.Txn = &txn + ba.Timestamp = txn.ReadTimestamp req := Request{ - Txn: &txn, - Timestamp: txn.ReadTimestamp, + Txn: ba.Txn, + Timestamp: ba.Timestamp, + BaFmt: ba, } keyA := roachpb.Key("keyA") pusheeTxn := makeTxnProto("pushee") diff --git a/pkg/kv/kvserver/concurrency/testdata/concurrency_manager/barrier b/pkg/kv/kvserver/concurrency/testdata/concurrency_manager/barrier index 5ba3236e77d1..d03f34a0447c 100644 --- a/pkg/kv/kvserver/concurrency/testdata/concurrency_manager/barrier +++ b/pkg/kv/kvserver/concurrency/testdata/concurrency_manager/barrier @@ -53,7 +53,7 @@ sequence req=barrier2 ---- [2] sequence barrier2: sequencing request [2] sequence barrier2: waiting on latches without acquiring -[2] sequence barrier2: waiting to acquire write latch ‹{a-f}›@0,0, held by read latch ‹c›@15.000000000,1 +[2] sequence barrier2: waiting to acquire write latch ‹{a-f}›@0,0 for request Barrier [‹"a"›,‹"f"›), held by read latch ‹c›@15.000000000,1 for request Get [‹"c"›] [2] sequence barrier2: blocked on select in spanlatch.(*Manager).waitForSignal finish req=read1 @@ -96,7 +96,7 @@ sequence req=barrier1 ---- [2] sequence barrier1: sequencing request [2] sequence barrier1: waiting on latches without acquiring -[2] sequence barrier1: waiting to acquire write latch ‹{a-f}›@0,0, held by read latch ‹c›@10.000000000,1 +[2] sequence barrier1: waiting to acquire write latch ‹{a-f}›@0,0 for request Barrier [‹"a"›,‹"f"›), held by read latch ‹c›@10.000000000,1 for request Get [‹"c"›] [2] sequence barrier1: blocked on select in spanlatch.(*Manager).waitForSignal finish req=read1 @@ -143,7 +143,7 @@ sequence req=barrier1 ---- [2] sequence barrier1: sequencing request [2] sequence barrier1: waiting on latches without acquiring -[2] sequence barrier1: waiting to acquire write latch ‹{a-f}›@0,0, held by write latch ‹c›@10.000000000,1 +[2] sequence barrier1: waiting to acquire write latch ‹{a-f}›@0,0 for request Barrier [‹"a"›,‹"f"›), held by write latch ‹c›@10.000000000,1 for request Put [‹"c"›], [txn: 00000001] [2] sequence barrier1: blocked on select in spanlatch.(*Manager).waitForSignal debug-latch-manager diff --git a/pkg/kv/kvserver/concurrency/testdata/concurrency_manager/basic b/pkg/kv/kvserver/concurrency/testdata/concurrency_manager/basic index c028e249dc64..8671437d8b8e 100644 --- a/pkg/kv/kvserver/concurrency/testdata/concurrency_manager/basic +++ b/pkg/kv/kvserver/concurrency/testdata/concurrency_manager/basic @@ -143,7 +143,7 @@ sequence req=req4 ---- [3] sequence req4: sequencing request [3] sequence req4: acquiring latches -[3] sequence req4: waiting to acquire write latch ‹k›@10.000000000,1, held by read latch ‹k{-2}›@14.000000000,1 +[3] sequence req4: waiting to acquire write latch ‹k›@10.000000000,1 for request Put [‹"k"›], [txn: 00000001], held by read latch ‹k{-2}›@14.000000000,1 for request Get [‹"k"›], Scan [‹"k"›,‹"k2"›), [txn: 00000003] [3] sequence req4: blocked on select in spanlatch.(*Manager).waitForSignal debug-latch-manager @@ -245,13 +245,13 @@ sequence req=req7 ---- [4] sequence req7: sequencing request [4] sequence req7: acquiring latches -[4] sequence req7: waiting to acquire write latch ‹k›@12.000000000,1, held by read latch ‹{a-m}›@14.000000000,1 +[4] sequence req7: waiting to acquire write latch ‹k›@12.000000000,1 for request Put [‹"k"›], held by read latch ‹{a-m}›@14.000000000,1 for request Scan [‹"a"›,‹"m"›) [4] sequence req7: blocked on select in spanlatch.(*Manager).waitForSignal finish req=req5 ---- [-] finish req5: finishing request -[4] sequence req7: waiting to acquire write latch ‹k›@12.000000000,1, held by read latch ‹{c-z}›@16.000000000,1 +[4] sequence req7: waiting to acquire write latch ‹k›@12.000000000,1 for request Put [‹"k"›], held by read latch ‹{c-z}›@16.000000000,1 for request Scan [‹"c"›,‹"z"›) [4] sequence req7: blocked on select in spanlatch.(*Manager).waitForSignal finish req=req6 diff --git a/pkg/kv/kvserver/concurrency/testdata/concurrency_manager/optimistic b/pkg/kv/kvserver/concurrency/testdata/concurrency_manager/optimistic index c160357a2d20..e298dbe01180 100644 --- a/pkg/kv/kvserver/concurrency/testdata/concurrency_manager/optimistic +++ b/pkg/kv/kvserver/concurrency/testdata/concurrency_manager/optimistic @@ -172,7 +172,7 @@ sequence req=req6 eval-kind=pess-after-opt ---- [8] sequence req6: re-sequencing request after optimistic sequencing failed [8] sequence req6: optimistic failed, so waiting for latches -[8] sequence req6: waiting to acquire read latch ‹{a-e}›@12.000000000,1, held by write latch ‹d›@10.000000000,1 +[8] sequence req6: waiting to acquire read latch ‹{a-e}›@12.000000000,1 for request Scan [‹"a"›,‹"e"›), [txn: 00000002], held by write latch ‹d›@10.000000000,1 for request Put [‹"d"›], [txn: 00000003] [8] sequence req6: blocked on select in spanlatch.(*Manager).waitForSignal # req4 finishing releases the latch and allows req6 to proceed. diff --git a/pkg/kv/kvserver/concurrency/testdata/concurrency_manager/poison_policy_err b/pkg/kv/kvserver/concurrency/testdata/concurrency_manager/poison_policy_err index 837c614aec83..4262d08ce0e6 100644 --- a/pkg/kv/kvserver/concurrency/testdata/concurrency_manager/poison_policy_err +++ b/pkg/kv/kvserver/concurrency/testdata/concurrency_manager/poison_policy_err @@ -27,7 +27,7 @@ sequence req=readbf ---- [2] sequence readbf: sequencing request [2] sequence readbf: acquiring latches -[2] sequence readbf: waiting to acquire read latch ‹{b-f}›@11.000000000,1, held by write latch ‹c›@10.000000000,0 +[2] sequence readbf: waiting to acquire read latch ‹{b-f}›@11.000000000,1 for request Scan [‹"b"›,‹"f"›), held by write latch ‹c›@10.000000000,0 for request Put [‹"c"›] [2] sequence readbf: blocked on select in spanlatch.(*Manager).waitForSignal new-request txn=none name=pute ts=11,0 @@ -38,7 +38,7 @@ sequence req=pute ---- [3] sequence pute: sequencing request [3] sequence pute: acquiring latches -[3] sequence pute: waiting to acquire write latch ‹e›@11.000000000,0, held by read latch ‹{b-f}›@11.000000000,1 +[3] sequence pute: waiting to acquire write latch ‹e›@11.000000000,0 for request Put [‹"e"›], held by read latch ‹{b-f}›@11.000000000,1 for request Scan [‹"b"›,‹"f"›) [3] sequence pute: blocked on select in spanlatch.(*Manager).waitForSignal poison req=putc diff --git a/pkg/kv/kvserver/concurrency/testdata/concurrency_manager/poison_policy_err_indirect b/pkg/kv/kvserver/concurrency/testdata/concurrency_manager/poison_policy_err_indirect index bb5b400b322a..4732751b7827 100644 --- a/pkg/kv/kvserver/concurrency/testdata/concurrency_manager/poison_policy_err_indirect +++ b/pkg/kv/kvserver/concurrency/testdata/concurrency_manager/poison_policy_err_indirect @@ -26,7 +26,7 @@ sequence req=readbf ---- [2] sequence readbf: sequencing request [2] sequence readbf: acquiring latches -[2] sequence readbf: waiting to acquire read latch ‹{b-f}›@11.000000000,1, held by write latch ‹c›@10.000000000,0 +[2] sequence readbf: waiting to acquire read latch ‹{b-f}›@11.000000000,1 for request Scan [‹"b"›,‹"f"›), held by write latch ‹c›@10.000000000,0 for request Put [‹"c"›] [2] sequence readbf: blocked on select in spanlatch.(*Manager).waitForSignal new-request txn=none name=pute ts=11,0 @@ -37,7 +37,7 @@ sequence req=pute ---- [3] sequence pute: sequencing request [3] sequence pute: acquiring latches -[3] sequence pute: waiting to acquire write latch ‹e›@11.000000000,0, held by read latch ‹{b-f}›@11.000000000,1 +[3] sequence pute: waiting to acquire write latch ‹e›@11.000000000,0 for request Put [‹"e"›], held by read latch ‹{b-f}›@11.000000000,1 for request Scan [‹"b"›,‹"f"›) [3] sequence pute: blocked on select in spanlatch.(*Manager).waitForSignal poison req=putc diff --git a/pkg/kv/kvserver/concurrency/testdata/concurrency_manager/poison_policy_wait_disjoint b/pkg/kv/kvserver/concurrency/testdata/concurrency_manager/poison_policy_wait_disjoint index 32980a60e81a..227175415eaf 100644 --- a/pkg/kv/kvserver/concurrency/testdata/concurrency_manager/poison_policy_wait_disjoint +++ b/pkg/kv/kvserver/concurrency/testdata/concurrency_manager/poison_policy_wait_disjoint @@ -26,7 +26,7 @@ sequence req=readbf ---- [2] sequence readbf: sequencing request [2] sequence readbf: acquiring latches -[2] sequence readbf: waiting to acquire read latch ‹{b-f}›@11.000000000,1, held by write latch ‹c›@10.000000000,0 +[2] sequence readbf: waiting to acquire read latch ‹{b-f}›@11.000000000,1 for request Scan [‹"b"›,‹"f"›), held by write latch ‹c›@10.000000000,0 for request Put [‹"c"›] [2] sequence readbf: blocked on select in spanlatch.(*Manager).waitForSignal new-request txn=none name=pute ts=11,0 poison-policy=wait @@ -37,7 +37,7 @@ sequence req=pute ---- [3] sequence pute: sequencing request [3] sequence pute: acquiring latches -[3] sequence pute: waiting to acquire write latch ‹e›@11.000000000,0, held by read latch ‹{b-f}›@11.000000000,1 +[3] sequence pute: waiting to acquire write latch ‹e›@11.000000000,0 for request Put [‹"e"›], held by read latch ‹{b-f}›@11.000000000,1 for request Scan [‹"b"›,‹"f"›) [3] sequence pute: blocked on select in spanlatch.(*Manager).waitForSignal poison req=putc diff --git a/pkg/kv/kvserver/concurrency/testdata/concurrency_manager/poison_policy_wait_overlapping b/pkg/kv/kvserver/concurrency/testdata/concurrency_manager/poison_policy_wait_overlapping index a3e2ccea768b..f76b7e71a10a 100644 --- a/pkg/kv/kvserver/concurrency/testdata/concurrency_manager/poison_policy_wait_overlapping +++ b/pkg/kv/kvserver/concurrency/testdata/concurrency_manager/poison_policy_wait_overlapping @@ -26,7 +26,7 @@ sequence req=readbf ---- [2] sequence readbf: sequencing request [2] sequence readbf: acquiring latches -[2] sequence readbf: waiting to acquire read latch ‹{b-f}›@11.000000000,1, held by write latch ‹c›@10.000000000,0 +[2] sequence readbf: waiting to acquire read latch ‹{b-f}›@11.000000000,1 for request Scan [‹"b"›,‹"f"›), held by write latch ‹c›@10.000000000,0 for request Put [‹"c"›] [2] sequence readbf: blocked on select in spanlatch.(*Manager).waitForSignal new-request txn=none name=put2 ts=11,0 poison-policy=wait @@ -37,7 +37,7 @@ sequence req=put2 ---- [3] sequence put2: sequencing request [3] sequence put2: acquiring latches -[3] sequence put2: waiting to acquire write latch ‹c›@11.000000000,0, held by write latch ‹c›@10.000000000,0 +[3] sequence put2: waiting to acquire write latch ‹c›@11.000000000,0 for request Put [‹"c"›], held by write latch ‹c›@10.000000000,0 for request Put [‹"c"›] [3] sequence put2: blocked on select in spanlatch.(*Manager).waitForSignal poison req=put1 diff --git a/pkg/kv/kvserver/concurrency/testdata/concurrency_manager/range_state_listener b/pkg/kv/kvserver/concurrency/testdata/concurrency_manager/range_state_listener index 423ebd1d5775..d71378787f95 100644 --- a/pkg/kv/kvserver/concurrency/testdata/concurrency_manager/range_state_listener +++ b/pkg/kv/kvserver/concurrency/testdata/concurrency_manager/range_state_listener @@ -195,7 +195,7 @@ on-lock-updated req=reqRes1 txn=txn1 key=k status=committed [7] sequence req2: lock wait-queue event: done waiting [7] sequence req2: conflicted with ‹00000001-0000-0000-0000-000000000000› on ‹"k"› for 0.000s [7] sequence req2: acquiring latches -[7] sequence req2: waiting to acquire read latch ‹k2›@10.000000000,1, held by write latch ‹k2›@10.000000000,1 +[7] sequence req2: waiting to acquire read latch ‹k2›@10.000000000,1 for request Put [‹"k"›], Get [‹"k2"›], [txn: 00000002], held by write latch ‹k2›@10.000000000,1 for request ResolveIntent [‹"k"›], ResolveIntent [‹"k2"›] [7] sequence req2: blocked on select in spanlatch.(*Manager).waitForSignal on-lock-updated req=reqRes1 txn=txn1 key=k2 status=committed @@ -309,7 +309,7 @@ on-lock-updated req=reqRes2 txn=txn2 key=k status=committed [13] sequence req3: lock wait-queue event: done waiting [13] sequence req3: conflicted with ‹00000002-0000-0000-0000-000000000000› on ‹"k"› for 0.000s [13] sequence req3: acquiring latches -[13] sequence req3: waiting to acquire write latch ‹k›@10.000000000,1, held by write latch ‹k›@10.000000000,1 +[13] sequence req3: waiting to acquire write latch ‹k›@10.000000000,1 for request Put [‹"k"›], [txn: 00000003], held by write latch ‹k›@10.000000000,1 for request ResolveIntent [‹"k"›] [13] sequence req3: blocked on select in spanlatch.(*Manager).waitForSignal finish req=reqRes2 @@ -467,7 +467,7 @@ on-lock-updated req=reqRes1 txn=txn1 key=k status=committed [4] sequence req2: lock wait-queue event: done waiting [4] sequence req2: conflicted with ‹00000001-0000-0000-0000-000000000000› on ‹"k"› for 0.000s [4] sequence req2: acquiring latches -[4] sequence req2: waiting to acquire write latch ‹k›@10.000000000,1, held by write latch ‹k›@10.000000000,1 +[4] sequence req2: waiting to acquire write latch ‹k›@10.000000000,1 for request Put [‹"k"›], [txn: 00000002], held by write latch ‹k›@10.000000000,1 for request ResolveIntent [‹"k"›] [4] sequence req2: blocked on select in spanlatch.(*Manager).waitForSignal finish req=reqRes1 @@ -687,7 +687,7 @@ on-lock-updated req=reqRes1 txn=txn1 key=k status=committed [8] sequence req2: lock wait-queue event: done waiting [8] sequence req2: conflicted with ‹00000001-0000-0000-0000-000000000000› on ‹"k"› for 0.000s [8] sequence req2: acquiring latches -[8] sequence req2: waiting to acquire write latch ‹k›@10.000000000,1, held by write latch ‹k›@10.000000000,1 +[8] sequence req2: waiting to acquire write latch ‹k›@10.000000000,1 for request Put [‹"k"›], [txn: 00000002], held by write latch ‹k›@10.000000000,1 for request ResolveIntent [‹"k"›] [8] sequence req2: blocked on select in spanlatch.(*Manager).waitForSignal finish req=reqRes1 @@ -845,7 +845,7 @@ on-lock-updated req=reqRes1 txn=txn1 key=k status=committed [4] sequence req2: lock wait-queue event: done waiting [4] sequence req2: conflicted with ‹00000001-0000-0000-0000-000000000000› on ‹"k"› for 0.000s [4] sequence req2: acquiring latches -[4] sequence req2: waiting to acquire write latch ‹k›@10.000000000,1, held by write latch ‹k›@10.000000000,1 +[4] sequence req2: waiting to acquire write latch ‹k›@10.000000000,1 for request Put [‹"k"›], [txn: 00000002], held by write latch ‹k›@10.000000000,1 for request ResolveIntent [‹"k"›] [4] sequence req2: blocked on select in spanlatch.(*Manager).waitForSignal finish req=reqRes1 diff --git a/pkg/kv/kvserver/concurrency/testdata/concurrency_manager/shared_locks_latches b/pkg/kv/kvserver/concurrency/testdata/concurrency_manager/shared_locks_latches index 9781a62be7e0..47a83eba4aa8 100644 --- a/pkg/kv/kvserver/concurrency/testdata/concurrency_manager/shared_locks_latches +++ b/pkg/kv/kvserver/concurrency/testdata/concurrency_manager/shared_locks_latches @@ -176,7 +176,7 @@ sequence req=req10 ---- [10] sequence req10: sequencing request [10] sequence req10: acquiring latches -[10] sequence req10: waiting to acquire write latch ‹a›@9.000000000,1, held by read latch ‹a›@9223372036.854775807,2147483647 +[10] sequence req10: waiting to acquire write latch ‹a›@9.000000000,1 for request Get(Exclusive,Unreplicated) [‹"a"›], [txn: 00000003], held by read latch ‹a›@9223372036.854775807,2147483647 for request Get(Shared,Unreplicated) [‹"a"›], [txn: 00000001] [10] sequence req10: blocked on select in spanlatch.(*Manager).waitForSignal # exclusive_lock(ts) == shared_lock(ts) @@ -188,7 +188,7 @@ sequence req=req11 ---- [11] sequence req11: sequencing request [11] sequence req11: acquiring latches -[11] sequence req11: waiting to acquire write latch ‹b›@10.000000000,1, held by read latch ‹b›@9223372036.854775807,2147483647 +[11] sequence req11: waiting to acquire write latch ‹b›@10.000000000,1 for request Get(Exclusive,Unreplicated) [‹"b"›], [txn: 00000001], held by read latch ‹b›@9223372036.854775807,2147483647 for request Get(Shared,Unreplicated) [‹"b"›], [txn: 00000001] [11] sequence req11: blocked on select in spanlatch.(*Manager).waitForSignal # exclusive_lock(ts) > shared_lock(ts) @@ -200,7 +200,7 @@ sequence req=req12 ---- [12] sequence req12: sequencing request [12] sequence req12: acquiring latches -[12] sequence req12: waiting to acquire write latch ‹c›@11.000000000,1, held by read latch ‹c›@9223372036.854775807,2147483647 +[12] sequence req12: waiting to acquire write latch ‹c›@11.000000000,1 for request Get(Exclusive,Unreplicated) [‹"c"›], [txn: 00000002], held by read latch ‹c›@9223372036.854775807,2147483647 for request Get(Shared,Unreplicated) [‹"c"›], [txn: 00000001] [12] sequence req12: blocked on select in spanlatch.(*Manager).waitForSignal debug-latch-manager @@ -286,7 +286,7 @@ sequence req=req16 ---- [16] sequence req16: sequencing request [16] sequence req16: acquiring latches -[16] sequence req16: waiting to acquire write latch ‹a›@9.000000000,1, held by read latch ‹a›@9223372036.854775807,2147483647 +[16] sequence req16: waiting to acquire write latch ‹a›@9.000000000,1 for request Put [‹"a"›], [txn: 00000003], held by read latch ‹a›@9223372036.854775807,2147483647 for request Get(Shared,Unreplicated) [‹"a"›], [txn: 00000001] [16] sequence req16: blocked on select in spanlatch.(*Manager).waitForSignal # write(ts) == shared_lock(ts) @@ -298,7 +298,7 @@ sequence req=req17 ---- [17] sequence req17: sequencing request [17] sequence req17: acquiring latches -[17] sequence req17: waiting to acquire write latch ‹b›@10.000000000,1, held by read latch ‹b›@9223372036.854775807,2147483647 +[17] sequence req17: waiting to acquire write latch ‹b›@10.000000000,1 for request Put [‹"b"›], [txn: 00000001], held by read latch ‹b›@9223372036.854775807,2147483647 for request Get(Shared,Unreplicated) [‹"b"›], [txn: 00000001] [17] sequence req17: blocked on select in spanlatch.(*Manager).waitForSignal # write(ts) > shared_lock(ts) @@ -310,7 +310,7 @@ sequence req=req18 ---- [18] sequence req18: sequencing request [18] sequence req18: acquiring latches -[18] sequence req18: waiting to acquire write latch ‹c›@11.000000000,1, held by read latch ‹c›@9223372036.854775807,2147483647 +[18] sequence req18: waiting to acquire write latch ‹c›@11.000000000,1 for request Put [‹"c"›], [txn: 00000002], held by read latch ‹c›@9223372036.854775807,2147483647 for request Get(Shared,Unreplicated) [‹"c"›], [txn: 00000001] [18] sequence req18: blocked on select in spanlatch.(*Manager).waitForSignal debug-latch-manager @@ -470,7 +470,7 @@ sequence req=req26 ---- [26] sequence req26: sequencing request [26] sequence req26: acquiring latches -[26] sequence req26: waiting to acquire read latch ‹a›@9223372036.854775807,2147483647, held by write latch ‹a›@10.000000000,1 +[26] sequence req26: waiting to acquire read latch ‹a›@9223372036.854775807,2147483647 for request Get(Shared,Unreplicated) [‹"a"›], [txn: 00000003], held by write latch ‹a›@10.000000000,1 for request Get(Exclusive,Unreplicated) [‹"a"›], [txn: 00000001] [26] sequence req26: blocked on select in spanlatch.(*Manager).waitForSignal # shared_lock(ts) == exclusive_lock(ts) @@ -482,7 +482,7 @@ sequence req=req27 ---- [27] sequence req27: sequencing request [27] sequence req27: acquiring latches -[27] sequence req27: waiting to acquire read latch ‹b›@9223372036.854775807,2147483647, held by write latch ‹b›@10.000000000,1 +[27] sequence req27: waiting to acquire read latch ‹b›@9223372036.854775807,2147483647 for request Get(Shared,Unreplicated) [‹"b"›], [txn: 00000001], held by write latch ‹b›@10.000000000,1 for request Get(Exclusive,Unreplicated) [‹"b"›], [txn: 00000001] [27] sequence req27: blocked on select in spanlatch.(*Manager).waitForSignal # shared_lock(ts) > exclusive_lock(ts) @@ -494,7 +494,7 @@ sequence req=req28 ---- [28] sequence req28: sequencing request [28] sequence req28: acquiring latches -[28] sequence req28: waiting to acquire read latch ‹c›@9223372036.854775807,2147483647, held by write latch ‹c›@10.000000000,1 +[28] sequence req28: waiting to acquire read latch ‹c›@9223372036.854775807,2147483647 for request Get(Shared,Unreplicated) [‹"c"›], [txn: 00000002], held by write latch ‹c›@10.000000000,1 for request Get(Exclusive,Unreplicated) [‹"c"›], [txn: 00000001] [28] sequence req28: blocked on select in spanlatch.(*Manager).waitForSignal debug-latch-manager @@ -580,7 +580,7 @@ sequence req=req32 ---- [32] sequence req32: sequencing request [32] sequence req32: acquiring latches -[32] sequence req32: waiting to acquire read latch ‹a›@9223372036.854775807,2147483647, held by write latch ‹a›@10.000000000,1 +[32] sequence req32: waiting to acquire read latch ‹a›@9223372036.854775807,2147483647 for request Get(Shared,Unreplicated) [‹"a"›], [txn: 00000003], held by write latch ‹a›@10.000000000,1 for request Put [‹"a"›], [txn: 00000001] [32] sequence req32: blocked on select in spanlatch.(*Manager).waitForSignal # shared_lock(ts) == write(ts) @@ -592,7 +592,7 @@ sequence req=req33 ---- [33] sequence req33: sequencing request [33] sequence req33: acquiring latches -[33] sequence req33: waiting to acquire read latch ‹b›@9223372036.854775807,2147483647, held by write latch ‹b›@10.000000000,1 +[33] sequence req33: waiting to acquire read latch ‹b›@9223372036.854775807,2147483647 for request Get(Shared,Unreplicated) [‹"b"›], [txn: 00000001], held by write latch ‹b›@10.000000000,1 for request Put [‹"b"›], [txn: 00000001] [33] sequence req33: blocked on select in spanlatch.(*Manager).waitForSignal # shared_lock(ts) > write(ts) @@ -604,7 +604,7 @@ sequence req=req34 ---- [34] sequence req34: sequencing request [34] sequence req34: acquiring latches -[34] sequence req34: waiting to acquire read latch ‹c›@9223372036.854775807,2147483647, held by write latch ‹c›@10.000000000,1 +[34] sequence req34: waiting to acquire read latch ‹c›@9223372036.854775807,2147483647 for request Get(Shared,Unreplicated) [‹"c"›], [txn: 00000002], held by write latch ‹c›@10.000000000,1 for request Put [‹"c"›], [txn: 00000001] [34] sequence req34: blocked on select in spanlatch.(*Manager).waitForSignal debug-latch-manager @@ -667,7 +667,7 @@ sequence req=req36 ---- [36] sequence req36: sequencing request [36] sequence req36: acquiring latches -[36] sequence req36: waiting to acquire write latch ‹/Local/RangeID/1/r/ReplicatedSharedLocksTransactionLatch/"00000002-0000-0000-0000-000000000000"›@0,0, held by write latch ‹/Local/RangeID/1/r/ReplicatedSharedLocksTransactionLatch/"00000002-0000-0000-0000-000000000000"›@0,0 +[36] sequence req36: waiting to acquire write latch ‹/Local/RangeID/1/r/ReplicatedSharedLocksTransactionLatch/"00000002-0000-0000-0000-000000000000"›@0,0 for request Scan(Shared,Replicated) [‹"a"›,‹"f"›), [txn: 00000002], held by write latch ‹/Local/RangeID/1/r/ReplicatedSharedLocksTransactionLatch/"00000002-0000-0000-0000-000000000000"›@0,0 for request Get(Shared,Replicated) [‹"c"›], [txn: 00000002] [36] sequence req36: blocked on select in spanlatch.(*Manager).waitForSignal new-request name=req37 txn=txn1 ts=11,1 diff --git a/pkg/kv/kvserver/concurrency/testdata/concurrency_manager/slow_latch_observability b/pkg/kv/kvserver/concurrency/testdata/concurrency_manager/slow_latch_observability index 4a799a67e786..340a8d1616ed 100644 --- a/pkg/kv/kvserver/concurrency/testdata/concurrency_manager/slow_latch_observability +++ b/pkg/kv/kvserver/concurrency/testdata/concurrency_manager/slow_latch_observability @@ -28,7 +28,7 @@ sequence req=readbf ---- [2] sequence readbf: sequencing request [2] sequence readbf: acquiring latches -[2] sequence readbf: waiting to acquire read latch ‹{b-f}›@11.000000000,1, held by write latch ‹c›@10.000000000,0 +[2] sequence readbf: waiting to acquire read latch ‹{b-f}›@11.000000000,1 for request Scan [‹"b"›,‹"f"›), held by write latch ‹c›@10.000000000,0 for request Put [‹"c"›] [2] sequence readbf: blocked on select in spanlatch.(*Manager).waitForSignal new-request txn=none name=pute ts=11,0 @@ -39,7 +39,7 @@ sequence req=pute ---- [3] sequence pute: sequencing request [3] sequence pute: acquiring latches -[3] sequence pute: waiting to acquire write latch ‹e›@11.000000000,0, held by read latch ‹{b-f}›@11.000000000,1 +[3] sequence pute: waiting to acquire write latch ‹e›@11.000000000,0 for request Put [‹"e"›], held by read latch ‹{b-f}›@11.000000000,1 for request Scan [‹"b"›,‹"f"›) [3] sequence pute: blocked on select in spanlatch.(*Manager).waitForSignal finish req=putc diff --git a/pkg/kv/kvserver/replica_send.go b/pkg/kv/kvserver/replica_send.go index 7e5a89a83485..8f838f31ce2c 100644 --- a/pkg/kv/kvserver/replica_send.go +++ b/pkg/kv/kvserver/replica_send.go @@ -477,6 +477,7 @@ func (r *Replica) executeBatchWithConcurrencyRetries( Requests: ba.Requests, LatchSpans: latchSpans, // nil if g != nil LockSpans: lockSpans, // nil if g != nil + BaFmt: ba, }, requestEvalKind) if pErr != nil { if poisonErr := (*poison.PoisonedError)(nil); errors.As(pErr.GoError(), &poisonErr) { diff --git a/pkg/kv/kvserver/spanlatch/BUILD.bazel b/pkg/kv/kvserver/spanlatch/BUILD.bazel index 4f9e5f5b319b..edbfe23adcc5 100644 --- a/pkg/kv/kvserver/spanlatch/BUILD.bazel +++ b/pkg/kv/kvserver/spanlatch/BUILD.bazel @@ -41,6 +41,7 @@ go_test( shard_count = 16, deps = [ "//pkg/keys", + "//pkg/kv/kvpb", "//pkg/kv/kvserver/concurrency/poison", "//pkg/kv/kvserver/spanset", "//pkg/roachpb", @@ -49,6 +50,7 @@ go_test( "//pkg/util/leaktest", "//pkg/util/timeutil", # keep "@com_github_cockroachdb_errors//:errors", + "@com_github_cockroachdb_redact//:redact", "@com_github_stretchr_testify//require", ], ) diff --git a/pkg/kv/kvserver/spanlatch/manager.go b/pkg/kv/kvserver/spanlatch/manager.go index 32fe84e42081..2192c1517baf 100644 --- a/pkg/kv/kvserver/spanlatch/manager.go +++ b/pkg/kv/kvserver/spanlatch/manager.go @@ -85,7 +85,7 @@ func Make(stopper *stop.Stopper, slowReqs *metric.Gauge) Manager { // latches are stored in the Manager's btrees. They represent the latching // of a single key span. type latch struct { - *signals + g *Guard id uint64 span roachpb.Span ts hlc.Timestamp @@ -102,8 +102,12 @@ func (la *latch) String() string { } // SafeFormat implements the redact.SafeFormatter interface. -func (la *latch) SafeFormat(w redact.SafePrinter, _ rune) { +func (la *latch) SafeFormat(w redact.SafePrinter, verb rune) { w.Printf("%s@%s", la.span, la.ts) + if la.g != nil && la.g.baFmt != nil { + w.Printf(" for request ") + la.g.baFmt.SafeFormat(w, verb) + } } //go:generate ../../../util/interval/generic/gen.sh *latch spanlatch @@ -126,7 +130,8 @@ type signals struct { // Manager.Acquire and accepted by Manager.Release. type Guard struct { signals - pp poison.Policy + pp poison.Policy + baFmt redact.SafeFormatter // latches [spanset.NumSpanScope][spanset.NumSpanAccess][]latch, but half the size. latchesPtrs [spanset.NumSpanScope][spanset.NumSpanAccess]unsafe.Pointer latchesLens [spanset.NumSpanScope][spanset.NumSpanAccess]int32 @@ -183,10 +188,11 @@ func allocGuardAndLatches(nLatches int) (*Guard, []latch) { return new(Guard), make([]latch, nLatches) } -func newGuard(spans *spanset.SpanSet, pp poison.Policy) *Guard { +func newGuard(spans *spanset.SpanSet, pp poison.Policy, baFmt redact.SafeFormatter) *Guard { nLatches := spans.Len() guard, latches := allocGuardAndLatches(nLatches) guard.pp = pp + guard.baFmt = baFmt for s := spanset.SpanScope(0); s < spanset.NumSpanScope; s++ { for a := spanset.SpanAccess(0); a < spanset.NumSpanAccess; a++ { ss := spans.GetSpans(a, s) @@ -199,7 +205,7 @@ func newGuard(spans *spanset.SpanSet, pp poison.Policy) *Guard { for i := range ssLatches { latch := &latches[i] latch.span = ss[i].Span - latch.signals = &guard.signals + latch.g = guard latch.ts = ss[i].Timestamp // latch.setID() in Manager.insert, under lock. } @@ -222,9 +228,9 @@ func newGuard(spans *spanset.SpanSet, pp poison.Policy) *Guard { // // It returns a Guard which must be provided to Release. func (m *Manager) Acquire( - ctx context.Context, spans *spanset.SpanSet, pp poison.Policy, + ctx context.Context, spans *spanset.SpanSet, pp poison.Policy, baFmt redact.SafeFormatter, ) (*Guard, error) { - lg, snap := m.sequence(spans, pp) + lg, snap := m.sequence(spans, pp, baFmt) defer snap.close() err := m.wait(ctx, lg, snap) @@ -247,8 +253,10 @@ func (m *Manager) Acquire( // // The method returns a Guard which must be provided to the // CheckOptimisticNoConflicts, Release methods. -func (m *Manager) AcquireOptimistic(spans *spanset.SpanSet, pp poison.Policy) *Guard { - lg, snap := m.sequence(spans, pp) +func (m *Manager) AcquireOptimistic( + spans *spanset.SpanSet, pp poison.Policy, baFmt redact.SafeFormatter, +) *Guard { + lg, snap := m.sequence(spans, pp, baFmt) lg.snap = &snap return lg } @@ -256,10 +264,12 @@ func (m *Manager) AcquireOptimistic(spans *spanset.SpanSet, pp poison.Policy) *G // WaitFor waits for conflicting latches on the spans without adding // any latches itself. Fast path for operations that only require past latches // to be released without blocking new latches. -func (m *Manager) WaitFor(ctx context.Context, spans *spanset.SpanSet, pp poison.Policy) error { +func (m *Manager) WaitFor( + ctx context.Context, spans *spanset.SpanSet, pp poison.Policy, baFmt redact.SafeFormatter, +) error { // The guard is only used to store latches by this request. These latches // are not actually inserted using insertLocked. - lg := newGuard(spans, pp) + lg := newGuard(spans, pp, baFmt) m.mu.Lock() snap := m.snapshotLocked(spans) @@ -355,8 +365,10 @@ func (m *Manager) WaitUntilAcquired(ctx context.Context, lg *Guard) (*Guard, err // for each of the specified spans into the manager's interval trees, and // unlocks the manager. The role of the method is to sequence latch acquisition // attempts. -func (m *Manager) sequence(spans *spanset.SpanSet, pp poison.Policy) (*Guard, snapshot) { - lg := newGuard(spans, pp) +func (m *Manager) sequence( + spans *spanset.SpanSet, pp poison.Policy, baFmt redact.SafeFormatter, +) (*Guard, snapshot) { + lg := newGuard(spans, pp, baFmt) m.mu.Lock() snap := m.snapshotLocked(spans) @@ -527,7 +539,7 @@ func (m *Manager) iterAndWait( ) error { for it.FirstOverlap(wait); it.Valid(); it.NextOverlap(wait) { held := it.Cur() - if held.done.signaled() { + if held.g.done.signaled() { continue } if ignore(wait.ts, held.ts) { @@ -549,10 +561,10 @@ func (m *Manager) waitForSignal( wait, held *latch, ) error { log.Eventf(ctx, "waiting to acquire %s latch %s, held by %s latch %s", waitType, wait, heldType, held) - poisonCh := held.poison.signalChan() + poisonCh := held.g.poison.signalChan() for { select { - case <-held.done.signalChan(): + case <-held.g.done.signalChan(): return nil case <-poisonCh: // The latch we're waiting on was poisoned. If we continue to wait, we have to @@ -564,7 +576,7 @@ func (m *Manager) waitForSignal( return poison.NewPoisonedError(held.span, held.ts) case poison.Policy_Wait: log.Eventf(ctx, "encountered poisoned latch; continuing to wait") - wait.poison.signal() + wait.g.poison.signal() // No need to self-poison multiple times. poisonCh = nil default: diff --git a/pkg/kv/kvserver/spanlatch/manager_test.go b/pkg/kv/kvserver/spanlatch/manager_test.go index be8cc806cce1..11870cad0bfa 100644 --- a/pkg/kv/kvserver/spanlatch/manager_test.go +++ b/pkg/kv/kvserver/spanlatch/manager_test.go @@ -18,8 +18,10 @@ import ( "strings" "testing" "time" + "unsafe" "github.com/cockroachdb/cockroach/pkg/keys" + "github.com/cockroachdb/cockroach/pkg/kv/kvpb" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/concurrency/poison" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/spanset" "github.com/cockroachdb/cockroach/pkg/roachpb" @@ -27,6 +29,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/leaktest" "github.com/cockroachdb/errors" + "github.com/cockroachdb/redact" "github.com/stretchr/testify/require" ) @@ -94,7 +97,7 @@ func testLatchBlocks(t *testing.T, a Attempt) { // MustAcquire is like Acquire, except it can't return context cancellation // errors. func (m *Manager) MustAcquire(spans *spanset.SpanSet) *Guard { - lg, err := m.Acquire(context.Background(), spans, poison.Policy_Error) + lg, err := m.Acquire(context.Background(), spans, poison.Policy_Error, nil) if err != nil { panic(err) } @@ -122,7 +125,7 @@ func (m *Manager) MustAcquireChExt( ctx context.Context, spans *spanset.SpanSet, pp poison.Policy, ) Attempt { errCh := make(chan error, 1) - lg, snap := m.sequence(spans, pp) + lg, snap := m.sequence(spans, pp, nil) go func() { err := m.wait(ctx, lg, snap) if err != nil { @@ -607,13 +610,13 @@ func TestLatchManagerOptimistic(t *testing.T) { var m Manager // Acquire latches, no conflict. - lg1 := m.AcquireOptimistic(spans("d", "f", write, zeroTS), poison.Policy_Error) + lg1 := m.AcquireOptimistic(spans("d", "f", write, zeroTS), poison.Policy_Error, nil) require.True(t, m.CheckOptimisticNoConflicts(lg1, spans("d", "f", write, zeroTS)), poison.Policy_Error) lg1, err := m.WaitUntilAcquired(context.Background(), lg1) require.NoError(t, err) // Optimistic acquire encounters conflict in some cases. - lg2 := m.AcquireOptimistic(spans("a", "e", read, zeroTS), poison.Policy_Error) + lg2 := m.AcquireOptimistic(spans("a", "e", read, zeroTS), poison.Policy_Error, nil) require.False(t, m.CheckOptimisticNoConflicts(lg2, spans("a", "e", read, zeroTS))) require.True(t, m.CheckOptimisticNoConflicts(lg2, spans("a", "d", read, zeroTS))) waitUntilAcquiredCh := func(g *Guard) Attempt { @@ -630,7 +633,7 @@ func TestLatchManagerOptimistic(t *testing.T) { testLatchSucceeds(t, a2) // Optimistic acquire encounters conflict. - lg3 := m.AcquireOptimistic(spans("a", "e", write, zeroTS), poison.Policy_Error) + lg3 := m.AcquireOptimistic(spans("a", "e", write, zeroTS), poison.Policy_Error, nil) require.False(t, m.CheckOptimisticNoConflicts(lg3, spans("a", "e", write, zeroTS))) m.Release(lg2) // There is still a conflict even though lg2 has been released. @@ -642,7 +645,7 @@ func TestLatchManagerOptimistic(t *testing.T) { // Optimistic acquire for read below write encounters no conflict. oneTS, twoTS := hlc.Timestamp{WallTime: 1}, hlc.Timestamp{WallTime: 2} lg4 := m.MustAcquire(spans("c", "e", write, twoTS)) - lg5 := m.AcquireOptimistic(spans("a", "e", read, oneTS), poison.Policy_Error) + lg5 := m.AcquireOptimistic(spans("a", "e", read, oneTS), poison.Policy_Error, nil) require.True(t, m.CheckOptimisticNoConflicts(lg5, spans("a", "e", read, oneTS))) require.True(t, m.CheckOptimisticNoConflicts(lg5, spans("a", "c", read, oneTS))) lg5, err = m.WaitUntilAcquired(context.Background(), lg5) @@ -656,14 +659,14 @@ func TestLatchManagerWaitFor(t *testing.T) { var m Manager // Acquire latches, no conflict. - lg1, err := m.Acquire(context.Background(), spans("d", "f", write, zeroTS), poison.Policy_Error) + lg1, err := m.Acquire(context.Background(), spans("d", "f", write, zeroTS), poison.Policy_Error, nil) require.NoError(t, err) // See if WaitFor waits for above latch. waitForCh := func() Attempt { errCh := make(chan error) go func() { - errCh <- m.WaitFor(context.Background(), spans("a", "e", read, zeroTS), poison.Policy_Error) + errCh <- m.WaitFor(context.Background(), spans("a", "e", read, zeroTS), poison.Policy_Error, nil) }() return Attempt{errCh: errCh} } @@ -674,7 +677,7 @@ func TestLatchManagerWaitFor(t *testing.T) { // Optimistic acquire should _not_ encounter conflict - as WaitFor should // not lay any latches. - lg3 := m.AcquireOptimistic(spans("a", "e", write, zeroTS), poison.Policy_Error) + lg3 := m.AcquireOptimistic(spans("a", "e", write, zeroTS), poison.Policy_Error, nil) require.True(t, m.CheckOptimisticNoConflicts(lg3, spans("a", "e", write, zeroTS))) lg3, err = m.WaitUntilAcquired(context.Background(), lg3) require.NoError(t, err) @@ -722,7 +725,7 @@ func BenchmarkLatchManagerReadWriteMix(b *testing.B) { b.ResetTimer() for i := range spans { - lg, snap := m.sequence(&spans[i], poison.Policy_Error) + lg, snap := m.sequence(&spans[i], poison.Policy_Error, nil) snap.close() if len(lgBuf) == cap(lgBuf) { m.Release(<-lgBuf) @@ -741,3 +744,40 @@ func randBytes(n int) []byte { } return b } + +// TestSizeOfLatch tests the size of the latch struct. +func TestSizeOfLatch(t *testing.T) { + var la latch + size := int(unsafe.Sizeof(la)) + require.Equal(t, 96, size) +} + +// TestSizeOfLatchGuard tests the size of the latch Guard struct. +func TestSizeOfLatchGuard(t *testing.T) { + var lg Guard + size := int(unsafe.Sizeof(lg)) + require.Equal(t, 112, size) +} + +// TestLatchStringAndSafeformat tests the output of latch.SafeFormat. +func TestLatchStringAndSafeformat(t *testing.T) { + gr := &kvpb.GetRequest{ + RequestHeader: kvpb.RequestHeader{ + Key: roachpb.Key("a"), + }, + } + ba := &kvpb.BatchRequest{} + ba.Add(gr) + guard := new(Guard) + guard.baFmt = ba + la := &latch{ + g: guard, + span: span(11), + ts: hlc.Timestamp{WallTime: 10}, + next: nil, + prev: nil, + } + require.EqualValues(t, `00011{-\x00}@0.000000010,0 for request Get ["a"]`, la.String()) + require.EqualValues(t, `‹00011{-\x00}›@0.000000010,0 for request Get [‹"a"›]`, redact.Sprint(la)) + require.EqualValues(t, `‹×›@0.000000010,0 for request Get [‹×›]`, redact.Sprint(la).Redact()) +}