Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions pkg/kv/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -370,6 +370,8 @@ func (b *Batch) AddRawRequest(reqs ...kvpb.Request) {
*kvpb.IncrementRequest,
*kvpb.DeleteRequest:
numRows = 1
case *kvpb.ReverseScanRequest:
b.Header.IsReverse = true
}
b.appendReqs(args)
b.initResult(1 /* calls */, numRows, raw, nil)
Expand Down Expand Up @@ -740,6 +742,7 @@ func (b *Batch) scan(
str kvpb.KeyLockingStrengthType,
dur kvpb.KeyLockingDurabilityType,
) {
b.Header.IsReverse = isReverse
begin, err := marshalKey(s)
if err != nil {
b.initResult(0, 0, notRaw, err)
Expand Down
1 change: 1 addition & 0 deletions pkg/kv/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -499,6 +499,7 @@ func (db *DB) scan(
if maxRows > 0 {
b.Header.MaxSpanRequestKeys = maxRows
}
b.Header.IsReverse = isReverse
b.scan(begin, end, isReverse, str, dur)
r, err := getOneResult(db.Run(ctx, b), b)
return r.Rows, err
Expand Down
116 changes: 55 additions & 61 deletions pkg/kv/kvclient/kvcoord/dist_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -1069,35 +1069,46 @@ func (ds *DistSender) initAndVerifyBatch(ctx context.Context, ba *kvpb.BatchRequ
return kvpb.NewErrorf("empty batch")
}

if ba.MaxSpanRequestKeys != 0 || ba.TargetBytes != 0 {
// Verify that the batch contains only specific requests. Verify that a
// batch with a ReverseScan only contains ReverseScan range requests.
var foundForward, foundReverse bool
for _, req := range ba.Requests {
inner := req.GetInner()
switch inner.(type) {
case *kvpb.ScanRequest, *kvpb.ResolveIntentRangeRequest,
*kvpb.DeleteRangeRequest, *kvpb.RevertRangeRequest,
*kvpb.ExportRequest, *kvpb.QueryLocksRequest, *kvpb.IsSpanEmptyRequest:
// Accepted forward range requests.
foundForward = true
// Verify that forward and reverse range requests are never in the same
// batch. Also verify that the batch with limits contains only specific
// requests.
var foundForward, foundReverse bool
var disallowedReq string
for _, req := range ba.Requests {
inner := req.GetInner()
switch inner.(type) {
case *kvpb.ScanRequest, *kvpb.ResolveIntentRangeRequest,
*kvpb.DeleteRangeRequest, *kvpb.RevertRangeRequest,
*kvpb.ExportRequest, *kvpb.QueryLocksRequest, *kvpb.IsSpanEmptyRequest:
// Accepted forward range requests.
foundForward = true

case *kvpb.ReverseScanRequest:
// Accepted reverse range requests.
foundReverse = true
case *kvpb.ReverseScanRequest:
// Accepted reverse range requests.
foundReverse = true

case *kvpb.QueryIntentRequest, *kvpb.EndTxnRequest,
*kvpb.GetRequest, *kvpb.ResolveIntentRequest, *kvpb.DeleteRequest, *kvpb.PutRequest:
// Accepted point requests that can be in batches with limit.
case *kvpb.QueryIntentRequest, *kvpb.EndTxnRequest,
*kvpb.GetRequest, *kvpb.ResolveIntentRequest, *kvpb.DeleteRequest, *kvpb.PutRequest:
// Accepted point requests that can be in batches with limit. No
// need to set disallowedReq.

default:
return kvpb.NewErrorf("batch with limit contains %s request", inner.Method())
}
}
if foundForward && foundReverse {
return kvpb.NewErrorf("batch with limit contains both forward and reverse scans")
default:
disallowedReq = inner.Method().String()
}
}
if foundForward && foundReverse {
return kvpb.NewErrorf("batch contains both forward and reverse requests")
}
if (ba.MaxSpanRequestKeys != 0 || ba.TargetBytes != 0) && disallowedReq != "" {
return kvpb.NewErrorf("batch with limit contains %s request", disallowedReq)
}
// Also verify that IsReverse is set accordingly on the batch header.
if foundForward && ba.Header.IsReverse {
return kvpb.NewErrorf("batch contains forward requests but IsReverse is set")
}
if foundReverse && !ba.Header.IsReverse {
return kvpb.NewErrorf("batch contains reverse requests but IsReverse is not set")
}

switch ba.WaitPolicy {
case lock.WaitPolicy_Block, lock.WaitPolicy_Error:
Expand Down Expand Up @@ -1239,7 +1250,6 @@ func (ds *DistSender) Send(
if err != nil {
return nil, kvpb.NewError(err)
}
isReverse := ba.IsReverse()

// Determine whether this part of the BatchRequest contains a committing
// EndTxn request.
Expand All @@ -1253,9 +1263,9 @@ func (ds *DistSender) Send(
var rpl *kvpb.BatchResponse
var pErr *kvpb.Error
if withParallelCommit {
rpl, pErr = ds.divideAndSendParallelCommit(ctx, ba, rs, isReverse, 0 /* batchIdx */)
rpl, pErr = ds.divideAndSendParallelCommit(ctx, ba, rs, 0 /* batchIdx */)
} else {
rpl, pErr = ds.divideAndSendBatchToRanges(ctx, ba, rs, isReverse, withCommit, 0 /* batchIdx */)
rpl, pErr = ds.divideAndSendBatchToRanges(ctx, ba, rs, withCommit, 0 /* batchIdx */)
}

if pErr == errNo1PCTxn {
Expand Down Expand Up @@ -1446,7 +1456,7 @@ type response struct {
// method is never invoked recursively, but it is exposed to maintain symmetry
// with divideAndSendBatchToRanges.
func (ds *DistSender) divideAndSendParallelCommit(
ctx context.Context, ba *kvpb.BatchRequest, rs roachpb.RSpan, isReverse bool, batchIdx int,
ctx context.Context, ba *kvpb.BatchRequest, rs roachpb.RSpan, batchIdx int,
) (br *kvpb.BatchResponse, pErr *kvpb.Error) {
// Search backwards, looking for the first pre-commit QueryIntent.
swapIdx := -1
Expand All @@ -1462,7 +1472,7 @@ func (ds *DistSender) divideAndSendParallelCommit(
if swapIdx == -1 {
// No pre-commit QueryIntents. Nothing to split.
log.VEvent(ctx, 3, "no pre-commit QueryIntents found, sending batch as-is")
return ds.divideAndSendBatchToRanges(ctx, ba, rs, isReverse, true /* withCommit */, batchIdx)
return ds.divideAndSendBatchToRanges(ctx, ba, rs, true /* withCommit */, batchIdx)
}

// Swap the EndTxn request and the first pre-commit QueryIntent. This
Expand All @@ -1489,7 +1499,8 @@ func (ds *DistSender) divideAndSendParallelCommit(
if err != nil {
return br, kvpb.NewError(err)
}
qiIsReverse := false // QueryIntentRequests do not carry the isReverse flag
// No need to process QueryIntentRequests in the reverse order.
qiBa.IsReverse = false
qiBatchIdx := batchIdx + 1
qiResponseCh := make(chan response, 1)

Expand Down Expand Up @@ -1519,7 +1530,7 @@ func (ds *DistSender) divideAndSendParallelCommit(

// Send the batch with withCommit=true since it will be inflight
// concurrently with the EndTxn batch below.
reply, pErr := ds.divideAndSendBatchToRanges(ctx, qiBa, qiRS, qiIsReverse, true /* withCommit */, qiBatchIdx)
reply, pErr := ds.divideAndSendBatchToRanges(ctx, qiBa, qiRS, true /* withCommit */, qiBatchIdx)
qiResponseCh <- response{reply: reply, positions: positions, pErr: pErr}
}); err != nil {
return nil, kvpb.NewError(err)
Expand All @@ -1534,10 +1545,7 @@ func (ds *DistSender) divideAndSendParallelCommit(
if err != nil {
return nil, kvpb.NewError(err)
}
// Note that we don't need to recompute isReverse for the updated batch
// since we only separated out QueryIntentRequests which don't carry the
// isReverse flag.
br, pErr = ds.divideAndSendBatchToRanges(ctx, ba, rs, isReverse, true /* withCommit */, batchIdx)
br, pErr = ds.divideAndSendBatchToRanges(ctx, ba, rs, true /* withCommit */, batchIdx)

// Wait for the QueryIntent-only batch to complete and stitch
// the responses together.
Expand Down Expand Up @@ -1706,12 +1714,6 @@ func mergeErrors(pErr1, pErr2 *kvpb.Error) *kvpb.Error {
// is trimmed against each range which is part of the span and sent
// either serially or in parallel, if possible.
//
// isReverse indicates the direction that the provided span should be
// iterated over while sending requests. It is passed in by callers
// instead of being recomputed based on the requests in the batch to
// prevent the iteration direction from switching midway through a
// batch, in cases where partial batches recurse into this function.
//
// withCommit indicates that the batch contains a transaction commit
// or that a transaction commit is being run concurrently with this
// batch. Either way, if this is true then sendToReplicas will need
Expand All @@ -1721,12 +1723,7 @@ func mergeErrors(pErr1, pErr2 *kvpb.Error) *kvpb.Error {
// being processed by this method. It's specified as non-zero when
// this method is invoked recursively.
func (ds *DistSender) divideAndSendBatchToRanges(
ctx context.Context,
ba *kvpb.BatchRequest,
rs roachpb.RSpan,
isReverse bool,
withCommit bool,
batchIdx int,
ctx context.Context, ba *kvpb.BatchRequest, rs roachpb.RSpan, withCommit bool, batchIdx int,
) (br *kvpb.BatchResponse, pErr *kvpb.Error) {
// Clone the BatchRequest's transaction so that future mutations to the
// proto don't affect the proto in this batch.
Expand All @@ -1736,7 +1733,7 @@ func (ds *DistSender) divideAndSendBatchToRanges(
// Get initial seek key depending on direction of iteration.
var scanDir ScanDirection
var seekKey roachpb.RKey
if !isReverse {
if !ba.IsReverse {
scanDir = Ascending
seekKey = rs.Key
} else {
Expand All @@ -1751,7 +1748,7 @@ func (ds *DistSender) divideAndSendBatchToRanges(
// Take the fast path if this batch fits within a single range.
if !ri.NeedAnother(rs) {
resp := ds.sendPartialBatch(
ctx, ba, rs, isReverse, withCommit, batchIdx, ri.Token(),
ctx, ba, rs, withCommit, batchIdx, ri.Token(),
)
// resp.positions remains nil since the original batch is fully
// contained within a single range.
Expand Down Expand Up @@ -1873,7 +1870,7 @@ func (ds *DistSender) divideAndSendBatchToRanges(
}

if pErr == nil && couldHaveSkippedResponses {
fillSkippedResponses(ba, br, seekKey, resumeReason, isReverse)
fillSkippedResponses(ba, br, seekKey, resumeReason)
}
}()

Expand Down Expand Up @@ -1953,7 +1950,7 @@ func (ds *DistSender) divideAndSendBatchToRanges(
// If we can reserve one of the limited goroutines available for parallel
// batch RPCs, send asynchronously.
if canParallelize && !lastRange && !ds.disableParallelBatches {
if ds.sendPartialBatchAsync(ctx, curRangeBatch, curRangeRS, isReverse, withCommit, batchIdx, ri.Token(), responseCh, positions) {
if ds.sendPartialBatchAsync(ctx, curRangeBatch, curRangeRS, withCommit, batchIdx, ri.Token(), responseCh, positions) {
asyncSent = true
} else {
asyncThrottled = true
Expand All @@ -1968,7 +1965,7 @@ func (ds *DistSender) divideAndSendBatchToRanges(
}()
}
return ds.sendPartialBatch(
ctx, curRangeBatch, curRangeRS, isReverse, withCommit, batchIdx, ri.Token(),
ctx, curRangeBatch, curRangeRS, withCommit, batchIdx, ri.Token(),
)
}()
resp.positions = positions
Expand Down Expand Up @@ -2055,7 +2052,6 @@ func (ds *DistSender) sendPartialBatchAsync(
ctx context.Context,
ba *kvpb.BatchRequest,
rs roachpb.RSpan,
isReverse bool,
withCommit bool,
batchIdx int,
routing rangecache.EvictionToken,
Expand All @@ -2074,7 +2070,7 @@ func (ds *DistSender) sendPartialBatchAsync(
ds.metrics.AsyncSentCount.Inc(1)
ds.metrics.AsyncInProgress.Inc(1)
defer ds.metrics.AsyncInProgress.Dec(1)
resp := ds.sendPartialBatch(ctx, ba, rs, isReverse, withCommit, batchIdx, routing)
resp := ds.sendPartialBatch(ctx, ba, rs, withCommit, batchIdx, routing)
resp.positions = positions
responseCh <- resp
},
Expand Down Expand Up @@ -2142,7 +2138,6 @@ func (ds *DistSender) sendPartialBatch(
ctx context.Context,
ba *kvpb.BatchRequest,
rs roachpb.RSpan,
isReverse bool,
withCommit bool,
batchIdx int,
routingTok rangecache.EvictionToken,
Expand Down Expand Up @@ -2170,7 +2165,7 @@ func (ds *DistSender) sendPartialBatch(

if !routingTok.Valid() {
var descKey roachpb.RKey
if isReverse {
if ba.IsReverse {
descKey = rs.EndKey
} else {
descKey = rs.Key
Expand All @@ -2186,7 +2181,7 @@ func (ds *DistSender) sendPartialBatch(
// replica, while detecting hazardous cases where the follower does
// not have the latest information and the current descriptor did
// not result in a successful send.
routingTok, err = ds.getRoutingInfo(ctx, descKey, prevTok, isReverse)
routingTok, err = ds.getRoutingInfo(ctx, descKey, prevTok, ba.IsReverse)
if err != nil {
log.VErrEventf(ctx, 1, "range descriptor re-lookup failed: %s", err)
// We set pErr if we encountered an error getting the descriptor in
Expand All @@ -2209,7 +2204,7 @@ func (ds *DistSender) sendPartialBatch(
}
if !intersection.Equal(rs) {
log.Eventf(ctx, "range shrunk; sub-dividing the request")
reply, pErr = ds.divideAndSendBatchToRanges(ctx, ba, rs, isReverse, withCommit, batchIdx)
reply, pErr = ds.divideAndSendBatchToRanges(ctx, ba, rs, withCommit, batchIdx)
return response{reply: reply, pErr: pErr}
}
}
Expand Down Expand Up @@ -2311,7 +2306,7 @@ func (ds *DistSender) sendPartialBatch(
// batch here would give a potentially larger response slice
// with unknown mapping to our truncated reply).
log.VEventf(ctx, 1, "likely split; will resend. Got new descriptors: %s", tErr.Ranges)
reply, pErr = ds.divideAndSendBatchToRanges(ctx, ba, rs, isReverse, withCommit, batchIdx)
reply, pErr = ds.divideAndSendBatchToRanges(ctx, ba, rs, withCommit, batchIdx)
return response{reply: reply, pErr: pErr}
}
break
Expand Down Expand Up @@ -2360,7 +2355,6 @@ func fillSkippedResponses(
br *kvpb.BatchResponse,
nextKey roachpb.RKey,
resumeReason kvpb.ResumeReason,
isReverse bool,
) {
// Some requests might have no response at all if we used a batch-wide
// limit; simply create trivial responses for those. Note that any type
Expand Down Expand Up @@ -2390,7 +2384,7 @@ func fillSkippedResponses(
for i, resp := range br.Responses {
req := ba.Requests[i].GetInner()
hdr := resp.GetInner().Header()
maybeSetResumeSpan(req, &hdr, nextKey, isReverse)
maybeSetResumeSpan(req, &hdr, nextKey, ba.IsReverse)
if hdr.ResumeSpan != nil {
hdr.ResumeReason = resumeReason
}
Expand Down
3 changes: 1 addition & 2 deletions pkg/kv/kvclient/kvcoord/dist_sender_server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1009,11 +1009,10 @@ func TestMultiRequestBatchWithFwdAndReverseRequests(t *testing.T) {
t.Fatal(err)
}
b := &kv.Batch{}
b.Header.MaxSpanRequestKeys = 100
b.Scan("a", "b")
b.ReverseScan("a", "b")
if err := db.Run(ctx, b); !testutils.IsError(
err, "batch with limit contains both forward and reverse scans",
err, "batch contains both forward and reverse requests",
) {
t.Fatal(err)
}
Expand Down
1 change: 1 addition & 0 deletions pkg/kv/kvclient/kvstreamer/streamer.go
Original file line number Diff line number Diff line change
Expand Up @@ -1409,6 +1409,7 @@ func (w *workerCoordinator) performRequestAsync(
ba.Header.TargetBytes = targetBytes
ba.Header.AllowEmpty = !headOfLine
ba.Header.WholeRowsOfSize = w.s.maxKeysPerRow
ba.Header.IsReverse = w.s.reverse
// TODO(yuzefovich): consider setting MaxSpanRequestKeys whenever
// applicable (#67885).
ba.AdmissionHeader = w.requestAdmissionHeader
Expand Down
10 changes: 8 additions & 2 deletions pkg/kv/kvnemesis/applier_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -252,10 +252,16 @@ func TestApplier(t *testing.T) {
"txn-si-err", step(closureTxn(ClosureTxnType_Commit, isolation.Snapshot, delRange(k2, k4, 1))),
},
{
"batch-mixed", step(batch(put(k2, 2), get(k1), del(k2, 1), del(k3, 1), scan(k1, k3), reverseScanForUpdate(k1, k5))),
"batch-mixed-fwd", step(batch(put(k2, 2), get(k1), del(k2, 1), del(k3, 1), scan(k1, k3))),
},
{
"batch-mixed-err", step(batch(put(k2, 2), getForUpdate(k1), scanForUpdate(k1, k3), reverseScan(k1, k3))),
"batch-mixed-rev", step(batch(put(k2, 2), get(k1), del(k2, 1), del(k3, 1), reverseScanForUpdate(k1, k5))),
},
{
"batch-mixed-err-fwd", step(batch(put(k2, 2), getForUpdate(k1), scanForUpdate(k1, k3))),
},
{
"batch-mixed-err-rev", step(batch(put(k2, 2), getForUpdate(k1), reverseScan(k1, k3))),
},
{
"txn-ssi-commit-mixed", step(closureTxn(ClosureTxnType_Commit, isolation.Serializable, put(k5, 5), batch(put(k6, 6), delRange(k3, k5, 1)))),
Expand Down
22 changes: 21 additions & 1 deletion pkg/kv/kvnemesis/generator.go
Original file line number Diff line number Diff line change
Expand Up @@ -1484,8 +1484,28 @@ func makeRandBatch(c *ClientOperationConfig) opGenFunc {
g.registerClientOps(&allowed, c)
numOps := rng.Intn(4)
ops := make([]Operation, numOps)
for i := range ops {
var addedForwardScan, addedReverseScan bool
for i := 0; i < numOps; i++ {
ops[i] = g.selectOp(rng, allowed)
if ops[i].Scan != nil {
if !ops[i].Scan.Reverse {
if addedReverseScan {
// We cannot include the forward scan into the batch
// that already contains the reverse scan.
i--
continue
}
addedForwardScan = true
} else {
if addedForwardScan {
// We cannot include the reverse scan into the batch
// that already contains the forward scan.
i--
continue
}
addedReverseScan = true
}
}
}
return batch(ops...)
}
Expand Down
10 changes: 0 additions & 10 deletions pkg/kv/kvnemesis/testdata/TestApplier/batch-mixed-err

This file was deleted.

Loading