Skip to content

Commit

Permalink
Merge pull request #114473 from cockroachdb/blathers/backport-release…
Browse files Browse the repository at this point in the history
…-23.1-113719
  • Loading branch information
yuzefovich committed Nov 16, 2023
2 parents 1b43ae6 + 310e7fd commit 0c43ef7
Show file tree
Hide file tree
Showing 3 changed files with 33 additions and 9 deletions.
32 changes: 28 additions & 4 deletions pkg/kv/kvclient/kvstreamer/requests_provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,12 +146,36 @@ func (r singleRangeBatch) subPriority() int32 {
}

// String implements fmt.Stringer.
//
// Note that the implementation of this method doesn't include r.reqsKeys into
// the output because that field is redundant with r.reqs and is likely to be
// nil'ed out anyway.
func (r singleRangeBatch) String() string {
// We try to limit the size based on the number of requests ourselves, so
// this is just a sane upper-bound.
maxBytes := 10 << 10 /* 10KiB */
if len(r.reqs) > 10 {
// To keep the size of this log message relatively small, if we have
// more than 10 requests, then we only include the information about the
// first 5 and the last 5 requests.
headEndIdx := 5
tailStartIdx := len(r.reqs) - 5
subIdx := "[]"
if len(r.subRequestIdx) > 0 {
subIdx = fmt.Sprintf("%v...%v", r.subRequestIdx[:headEndIdx], r.subRequestIdx[tailStartIdx:])
}
return fmt.Sprintf(
"{reqs:%v...%v pos:%v...%v subIdx:%s start:%v gets:%v reserved:%v overhead:%v minTarget:%v}",
kvpb.TruncatedRequestsString(r.reqs[:headEndIdx], maxBytes),
kvpb.TruncatedRequestsString(r.reqs[tailStartIdx:], maxBytes),
r.positions[:headEndIdx], r.positions[tailStartIdx:],
subIdx, r.isScanStarted, r.numGetsInReqs, r.reqsReservedBytes, r.overheadAccountedFor, r.minTargetBytes,
)
}
return fmt.Sprintf(
"{reqs:%s keys:%v pos:%v subIdx:%v start:%v gets:%v reserved:%v overhead:%v minTarget:%v}",
kvpb.TruncatedRequestsString(r.reqs, 1024), r.reqsKeys, r.positions,
r.subRequestIdx, r.isScanStarted, r.numGetsInReqs, r.reqsReservedBytes, r.overheadAccountedFor,
r.minTargetBytes,
"{reqs:%v pos:%v subIdx:%v start:%v gets:%v reserved:%v overhead:%v minTarget:%v}",
kvpb.TruncatedRequestsString(r.reqs, maxBytes), r.positions, r.subRequestIdx,
r.isScanStarted, r.numGetsInReqs, r.reqsReservedBytes, r.overheadAccountedFor, r.minTargetBytes,
)
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvclient/kvstreamer/results_buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ type resultsBuffer interface {
//
// It is assumed that the budget's mutex is already being held.
//
// doneAddingLocked returns the naumber of results that have been added but
// doneAddingLocked returns the number of results that have been added but
// not yet returned to the client, and whether the client goroutine was woken.
doneAddingLocked(context.Context) (int, bool)

Expand Down
8 changes: 4 additions & 4 deletions pkg/kv/kvclient/kvstreamer/streamer.go
Original file line number Diff line number Diff line change
Expand Up @@ -767,9 +767,11 @@ func (s *Streamer) Enqueue(ctx context.Context, reqs []kvpb.RequestUnion) (retEr
// returned once all enqueued requests have been responded to.
//
// Calling GetResults() invalidates the results returned on the previous call.
func (s *Streamer) GetResults(ctx context.Context) ([]Result, error) {
func (s *Streamer) GetResults(ctx context.Context) (retResults []Result, retErr error) {
log.VEvent(ctx, 2, "GetResults")
defer log.VEvent(ctx, 2, "exiting GetResults")
defer func() {
log.VEventf(ctx, 2, "exiting GetResults (%d results, err=%v)", len(retResults), retErr)
}()
for {
results, allComplete, err := s.results.get(ctx)
if len(results) > 0 || allComplete || err != nil {
Expand Down Expand Up @@ -1667,7 +1669,6 @@ func processSingleRangeResults(
get := response
if get.ResumeSpan != nil {
// This Get wasn't completed.
log.VEvent(ctx, 2, "incomplete Get")
continue
}
// This Get was completed.
Expand Down Expand Up @@ -1700,7 +1701,6 @@ func processSingleRangeResults(
// multiple ranges and the last range has no data in it - we
// want to be able to set scanComplete field on such an empty
// Result).
log.VEvent(ctx, 2, "incomplete Scan")
continue
}
result := Result{
Expand Down

0 comments on commit 0c43ef7

Please sign in to comment.