Skip to content

Commit

Permalink
Merge #103267
Browse files Browse the repository at this point in the history
103267: kv: non-blocking write-read conflicts for weak isolation txns r=arulajmani a=nvanbenschoten

Fixes #102014.

This commit updates write-read conflict rules to allow non-blocking behavior for weak isolation transactions. Specifically, PUSH_TIMESTAMP requests are now allowed to succeed when encountering PENDING pushees if any of the following conditions is true:
- the pushee is a weak isolation transaction
- OR the pusher is a weak isolation transaction with an equal priority as the pushee
- OR the pusher has a greater priority than the pushee (previous behavior)

The rationale for this behavior is that weak isolation transactions can tolerate write skew without a refresh or retry. Because of this, they face no consequence from being pushed and also expect others to be pushable (non-blocking).

Longer-term, we would like all write-read conflicts to become non-blocking. For now, these rules ensure that all write-read conflicts between weak isolation transactions are non-blocking, and also that write-read conflicts between a weak isolation transaction and a strong isolation (serializable) transaction has reasonable, tunable behavior.

Release note: None

Co-authored-by: Nathan VanBenschoten <nvanbenschoten@gmail.com>
  • Loading branch information
craig[bot] and nvanbenschoten committed May 19, 2023
2 parents a269433 + 5a53d25 commit fd63cd6
Show file tree
Hide file tree
Showing 10 changed files with 1,248 additions and 42 deletions.
56 changes: 56 additions & 0 deletions pkg/kv/kvclient/kvcoord/txn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -358,11 +358,67 @@ func TestTxnWeakIsolationLevelsTolerateWriteSkew(t *testing.T) {
require.IsType(t, &kvpb.TransactionRetryWithProtoRefreshError{}, err)
}
}

for _, isoLevel := range isolation.Levels() {
t.Run(isoLevel.String(), func(t *testing.T) { run(isoLevel) })
}
}

// TestTxnWriteReadConflict verifies that write-read conflicts are non-blocking
// to the reader, except when both the writer and reader are both serializable
// transactions. In that case, the reader will block until the writer completes.
//
// NOTE: the test does not exercise different priority levels. For that, see
// TestCanPushWithPriority.
func TestTxnWriteReadConflict(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

run := func(writeIsoLevel, readIsoLevel isolation.Level) {
s := createTestDB(t)
defer s.Stop()
ctx := context.Background()

// Begin the test's writer and reader transactions.
writeTxn := s.DB.NewTxn(ctx, "writer")
require.NoError(t, writeTxn.SetIsoLevel(writeIsoLevel))
readTxn := s.DB.NewTxn(ctx, "reader")
require.NoError(t, readTxn.SetIsoLevel(readIsoLevel))

// Perform a write to key "a" in the writer transaction.
require.NoError(t, writeTxn.Put(ctx, "a", "value"))

// Read from key "a" in the reader transaction.
expBlocking := writeIsoLevel == isolation.Serializable && readIsoLevel == isolation.Serializable
readCtx := ctx
if expBlocking {
var cancel func()
readCtx, cancel = context.WithTimeout(ctx, 500*time.Millisecond)
defer cancel()
}
res, err := readTxn.Get(readCtx, "a")

// Verify the expected blocking behavior.
if expBlocking {
require.Error(t, err)
require.ErrorIs(t, context.DeadlineExceeded, err)
} else {
require.NoError(t, err)
require.False(t, res.Exists())
}

require.NoError(t, writeTxn.Rollback(ctx))
require.NoError(t, readTxn.Rollback(ctx))
}

for _, writeIsoLevel := range isolation.Levels() {
for _, readIsoLevel := range isolation.Levels() {
name := fmt.Sprintf("writeIso=%s,readIso=%s", writeIsoLevel, readIsoLevel)
t.Run(name, func(t *testing.T) { run(writeIsoLevel, readIsoLevel) })
}
}
}

// TestPriorityRatchetOnAbortOrPush verifies that the priority of
// a transaction is ratcheted by successive aborts or pushes. In
// particular, we want to ensure ratcheted priorities when the txn
Expand Down
5 changes: 3 additions & 2 deletions pkg/kv/kvserver/batcheval/cmd_push_txn.go
Original file line number Diff line number Diff line change
Expand Up @@ -243,9 +243,10 @@ func PushTxn(
}
}

pusherIso, pusheeIso := args.PusherTxn.IsoLevel, reply.PusheeTxn.IsoLevel
pusherPri, pusheePri := args.PusherTxn.Priority, reply.PusheeTxn.Priority
var pusherWins bool
var reason string

switch {
case txnwait.IsExpired(cArgs.EvalCtx.Clock().Now(), &reply.PusheeTxn):
reason = "pushee is expired"
Expand All @@ -257,7 +258,7 @@ func PushTxn(
// If just attempting to cleanup old or already-committed txns,
// pusher always fails.
pusherWins = false
case txnwait.CanPushWithPriority(args.PusherTxn.Priority, reply.PusheeTxn.Priority):
case txnwait.CanPushWithPriority(pushType, pusherIso, pusheeIso, pusherPri, pusheePri):
reason = "pusher has priority"
pusherWins = true
case args.Force:
Expand Down
2 changes: 2 additions & 0 deletions pkg/kv/kvserver/concurrency/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ go_library(
deps = [
"//pkg/kv",
"//pkg/kv/kvpb",
"//pkg/kv/kvserver/concurrency/isolation",
"//pkg/kv/kvserver/concurrency/lock",
"//pkg/kv/kvserver/concurrency/poison",
"//pkg/kv/kvserver/intentresolver",
Expand Down Expand Up @@ -63,6 +64,7 @@ go_test(
"//pkg/keys",
"//pkg/kv/kvpb",
"//pkg/kv/kvserver/batcheval",
"//pkg/kv/kvserver/concurrency/isolation",
"//pkg/kv/kvserver/concurrency/lock",
"//pkg/kv/kvserver/concurrency/poison",
"//pkg/kv/kvserver/intentresolver",
Expand Down
20 changes: 14 additions & 6 deletions pkg/kv/kvserver/concurrency/concurrency_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/kv/kvpb"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/batcheval"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/concurrency"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/concurrency/isolation"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/concurrency/lock"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/intentresolver"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/lockspanset"
Expand Down Expand Up @@ -58,8 +59,8 @@ import (
//
// The input files use the following DSL:
//
// new-txn name=<txn-name> ts=<int>[,<int>] [epoch=<int>] [priority] [uncertainty-limit=<int>[,<int>]]
// new-request name=<req-name> txn=<txn-name>|none ts=<int>[,<int>] [priority] [inconsistent] [wait-policy=<policy>] [lock-timeout] [max-lock-wait-queue-length=<int>] [poison-policy=[err|wait]]
// new-txn name=<txn-name> ts=<int>[,<int>] [epoch=<int>] [iso=<level>] [priority=<priority>] [uncertainty-limit=<int>[,<int>]]
// new-request name=<req-name> txn=<txn-name>|none ts=<int>[,<int>] [priority=<priority>] [inconsistent] [wait-policy=<policy>] [lock-timeout] [max-lock-wait-queue-length=<int>] [poison-policy=[err|wait]]
//
// <proto-name> [<field-name>=<field-value>...] (hint: see scanSingleRequest)
//
Expand Down Expand Up @@ -113,6 +114,7 @@ func TestConcurrencyManagerBasic(t *testing.T) {
d.ScanArgs(t, "epoch", &epoch)
}

iso := scanIsoLevel(t, d)
priority := scanTxnPriority(t, d)

uncertaintyLimit := ts
Expand All @@ -130,6 +132,7 @@ func TestConcurrencyManagerBasic(t *testing.T) {
txn = &roachpb.Transaction{
TxnMeta: enginepb.TxnMeta{
ID: id,
IsoLevel: iso,
Epoch: enginepb.TxnEpoch(epoch),
WriteTimestamp: ts,
MinTimestamp: ts,
Expand Down Expand Up @@ -706,16 +709,21 @@ func (c *cluster) PushTransaction(
}
defer c.unregisterPush(push)
}
var pusherPriority enginepb.TxnPriority
var pusherIso isolation.Level
var pusherPri enginepb.TxnPriority
if h.Txn != nil {
pusherPriority = h.Txn.Priority
pusherIso = h.Txn.IsoLevel
pusherPri = h.Txn.Priority
} else {
pusherPriority = roachpb.MakePriority(h.UserPriority)
pusherIso = isolation.Serializable
pusherPri = roachpb.MakePriority(h.UserPriority)
}
pushTo := h.Timestamp.Next()
for {
// Is the pushee pushed?
pusheeTxn, pusheeRecordSig := pusheeRecord.asTxn()
pusheeIso := pusheeTxn.IsoLevel
pusheePri := pusheeTxn.Priority
// NOTE: this logic is adapted from cmd_push_txn.go.
var pusherWins bool
switch {
Expand All @@ -727,7 +735,7 @@ func (c *cluster) PushTransaction(
return pusheeTxn, nil
case pushType == kvpb.PUSH_TOUCH:
pusherWins = false
case txnwait.CanPushWithPriority(pusherPriority, pusheeTxn.Priority):
case txnwait.CanPushWithPriority(pushType, pusherIso, pusheeIso, pusherPri, pusheePri):
pusherWins = true
default:
pusherWins = false
Expand Down
28 changes: 25 additions & 3 deletions pkg/kv/kvserver/concurrency/datadriven_util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"testing"

"github.com/cockroachdb/cockroach/pkg/kv/kvpb"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/concurrency/isolation"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/concurrency/lock"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/concurrency/poison"
"github.com/cockroachdb/cockroach/pkg/roachpb"
Expand Down Expand Up @@ -45,6 +46,26 @@ func scanTimestampWithName(t *testing.T, d *datadriven.TestData, name string) hl
return ts
}

func scanIsoLevel(t *testing.T, d *datadriven.TestData) isolation.Level {
const key = "iso"
if !d.HasArg(key) {
return isolation.Serializable
}
var isoS string
d.ScanArgs(t, key, &isoS)
switch isoS {
case "serializable":
return isolation.Serializable
case "snapshot":
return isolation.Snapshot
case "read-committed":
return isolation.ReadCommitted
default:
d.Fatalf(t, "unknown isolation level: %s", isoS)
return 0
}
}

func scanTxnPriority(t *testing.T, d *datadriven.TestData) enginepb.TxnPriority {
priority := scanUserPriority(t, d)
// NB: don't use roachpb.MakePriority to avoid randomness.
Expand All @@ -63,10 +84,11 @@ func scanTxnPriority(t *testing.T, d *datadriven.TestData) enginepb.TxnPriority

func scanUserPriority(t *testing.T, d *datadriven.TestData) roachpb.UserPriority {
const key = "priority"
priS := "normal"
if d.HasArg(key) {
d.ScanArgs(t, key, &priS)
if !d.HasArg(key) {
return roachpb.NormalUserPriority
}
var priS string
d.ScanArgs(t, key, &priS)
switch priS {
case "low":
return roachpb.MinUserPriority
Expand Down
35 changes: 21 additions & 14 deletions pkg/kv/kvserver/concurrency/lock_table_waiter.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"time"

"github.com/cockroachdb/cockroach/pkg/kv/kvpb"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/concurrency/isolation"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/concurrency/lock"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/intentresolver"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/txnwait"
Expand Down Expand Up @@ -476,8 +477,7 @@ func (w *lockTableWaiterImpl) pushLockTxn(
// under the lock. For write-write conflicts, try to abort the lock
// holder entirely so the write request can revoke and replace the lock
// with its own lock.
switch ws.guardStrength {
case lock.None:
if ws.guardStrength == lock.None {
pushType = kvpb.PUSH_TIMESTAMP
beforePushObs = roachpb.ObservedTimestamp{
NodeID: w.nodeDesc.NodeID,
Expand All @@ -498,12 +498,9 @@ func (w *lockTableWaiterImpl) pushLockTxn(
// adjusting the intent, accepting that the intent would then no longer
// round-trip and would lose the local timestamp if rewritten later.
log.VEventf(ctx, 2, "pushing timestamp of txn %s above %s", ws.txn.ID.Short(), h.Timestamp)

case lock.Intent:
} else {
pushType = kvpb.PUSH_ABORT
log.VEventf(ctx, 2, "pushing txn %s to abort", ws.txn.ID.Short())
default:
log.Fatalf(ctx, "unhandled lock strength %s", ws.guardStrength)
}

case lock.WaitPolicy_Error:
Expand Down Expand Up @@ -1243,18 +1240,28 @@ func newWriteIntentErr(req Request, ws waitingState, reason kvpb.WriteIntentErro
}

func canPushWithPriority(req Request, s waitingState) bool {
var pusher, pushee enginepb.TxnPriority
if req.Txn != nil {
pusher = req.Txn.Priority
} else {
pusher = roachpb.MakePriority(req.NonTxnPriority)
}
if s.txn == nil {
// Can't push a non-transactional request.
return false
}
pushee = s.txn.Priority
return txnwait.CanPushWithPriority(pusher, pushee)
var pushType kvpb.PushTxnType
if s.guardStrength == lock.None {
pushType = kvpb.PUSH_TIMESTAMP
} else {
pushType = kvpb.PUSH_ABORT
}
var pusherIso, pusheeIso isolation.Level
var pusherPri, pusheePri enginepb.TxnPriority
if req.Txn != nil {
pusherIso = req.Txn.IsoLevel
pusherPri = req.Txn.Priority
} else {
pusherIso = isolation.Serializable
pusherPri = roachpb.MakePriority(req.NonTxnPriority)
}
pusheeIso = s.txn.IsoLevel
pusheePri = s.txn.Priority
return txnwait.CanPushWithPriority(pushType, pusherIso, pusheeIso, pusherPri, pusheePri)
}

func logResolveIntent(ctx context.Context, intent roachpb.LockUpdate) {
Expand Down

0 comments on commit fd63cd6

Please sign in to comment.