Skip to content

Commit

Permalink
Merge pull request #118981 from erikgrinaker/backport23.2.1-rc-117612
Browse files Browse the repository at this point in the history
  • Loading branch information
erikgrinaker committed Feb 8, 2024
2 parents 5374abe + a674362 commit b0fb75e
Show file tree
Hide file tree
Showing 38 changed files with 1,494 additions and 184 deletions.
3 changes: 2 additions & 1 deletion pkg/kv/batch.go
Expand Up @@ -1116,7 +1116,7 @@ func (b *Batch) queryResolvedTimestamp(s, e interface{}) {
b.initResult(1, 0, notRaw, nil)
}

func (b *Batch) barrier(s, e interface{}) {
func (b *Batch) barrier(s, e interface{}, withLAI bool) {
begin, err := marshalKey(s)
if err != nil {
b.initResult(0, 0, notRaw, err)
Expand All @@ -1132,6 +1132,7 @@ func (b *Batch) barrier(s, e interface{}) {
Key: begin,
EndKey: end,
},
WithLeaseAppliedIndex: withLAI,
}
b.appendReqs(req)
b.initResult(1, 0, notRaw, nil)
Expand Down
44 changes: 34 additions & 10 deletions pkg/kv/db.go
Expand Up @@ -889,23 +889,47 @@ func (db *DB) QueryResolvedTimestamp(
// writes on the specified key range to finish.
func (db *DB) Barrier(ctx context.Context, begin, end interface{}) (hlc.Timestamp, error) {
b := &Batch{}
b.barrier(begin, end)
err := getOneErr(db.Run(ctx, b), b)
if err != nil {
b.barrier(begin, end, false /* withLAI */)
if err := getOneErr(db.Run(ctx, b), b); err != nil {
return hlc.Timestamp{}, err
}
responses := b.response.Responses
if len(responses) == 0 {
return hlc.Timestamp{}, errors.Errorf("unexpected empty response for Barrier")
if l := len(b.response.Responses); l != 1 {
return hlc.Timestamp{}, errors.Errorf("got %d responses for Barrier", l)
}
resp, ok := responses[0].GetInner().(*kvpb.BarrierResponse)
if !ok {
return hlc.Timestamp{}, errors.Errorf("unexpected response of type %T for Barrier",
responses[0].GetInner())
resp := b.response.Responses[0].GetBarrier()
if resp == nil {
return hlc.Timestamp{}, errors.Errorf("unexpected response %T for Barrier",
b.response.Responses[0].GetInner())
}
return resp.Timestamp, nil
}

// BarrierWithLAI is like Barrier, but also returns the lease applied index and
// range descriptor at which the barrier was applied. In this case, the barrier
// can't span multiple ranges, otherwise a RangeKeyMismatchError is returned.
//
// NB: the protocol support for this was added in a patch release, and is not
// guaranteed to be present with nodes prior to 24.1. In this case, the request
// will return an empty result.
func (db *DB) BarrierWithLAI(
ctx context.Context, begin, end interface{},
) (kvpb.LeaseAppliedIndex, roachpb.RangeDescriptor, error) {
b := &Batch{}
b.barrier(begin, end, true /* withLAI */)
if err := getOneErr(db.Run(ctx, b), b); err != nil {
return 0, roachpb.RangeDescriptor{}, err
}
if l := len(b.response.Responses); l != 1 {
return 0, roachpb.RangeDescriptor{}, errors.Errorf("got %d responses for Barrier", l)
}
resp := b.response.Responses[0].GetBarrier()
if resp == nil {
return 0, roachpb.RangeDescriptor{}, errors.Errorf("unexpected response %T for Barrier",
b.response.Responses[0].GetInner())
}
return resp.LeaseAppliedIndex, resp.RangeDesc, nil
}

// sendAndFill is a helper which sends the given batch and fills its results,
// returning the appropriate error which is either from the first failing call,
// or an "internal" error.
Expand Down
3 changes: 3 additions & 0 deletions pkg/kv/kvclient/rangefeed/BUILD.bazel
Expand Up @@ -71,6 +71,7 @@ go_test(
"//pkg/kv/kvpb",
"//pkg/kv/kvserver",
"//pkg/kv/kvserver/closedts",
"//pkg/kv/kvserver/kvserverbase",
"//pkg/roachpb",
"//pkg/security/securityassets",
"//pkg/security/securitytest",
Expand All @@ -82,12 +83,14 @@ go_test(
"//pkg/storage",
"//pkg/testutils",
"//pkg/testutils/serverutils",
"//pkg/testutils/skip",
"//pkg/testutils/sqlutils",
"//pkg/testutils/storageutils",
"//pkg/testutils/testcluster",
"//pkg/util",
"//pkg/util/ctxgroup",
"//pkg/util/encoding",
"//pkg/util/future",
"//pkg/util/hlc",
"//pkg/util/leaktest",
"//pkg/util/log",
Expand Down

0 comments on commit b0fb75e

Please sign in to comment.