Skip to content

Commit

Permalink
Merge #70513 #70734
Browse files Browse the repository at this point in the history
70513: streamingccl: unskip TestStreamIngestionFrontierProcessor r=rhu713 a=adityamaru

This test does not flake anymore. There have been fixes around
leaked goroutines that have been checked in in the recent past
but the test mistakenly remained skipped:
#69262

I got 3000 runs under stress.

Fixes: #68704

70734: colencoding: reuse scratch space when key decoding bytes and decimals r=yuzefovich a=yuzefovich

When we're key decoding bytes-like columns, we need to use the scratch
byte slice to decode into (in case of decimals, we might need the space
temporarily). Previously, we would always allocate a new byte slice only
to deep copy it later when calling `coldata.Bytes.Set`. This commit
teaches the cFetcher and the relevant decoding methods to reuse the same
scratch space which should reduce the memory allocations.

One notable change is that now when we're calling
`DecodeBytesAscending`, we have to make sure to perform a deep copy so
that it is safe to reuse the returned value as the scratch space in the
future.

Release note: None

Co-authored-by: Aditya Maru <adityamaru@gmail.com>
Co-authored-by: Yahor Yuzefovich <yahor@cockroachlabs.com>
  • Loading branch information
3 people committed Sep 28, 2021
3 parents 049aff1 + a9c31fc + d3c354d commit bdb4c1a
Show file tree
Hide file tree
Showing 3 changed files with 60 additions and 37 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/cockroachdb/cockroach/pkg/testutils/distsqlutils"
"github.com/cockroachdb/cockroach/pkg/testutils/skip"
"github.com/cockroachdb/cockroach/pkg/testutils/testcluster"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
Expand All @@ -37,7 +36,6 @@ type partitionToEvent map[streamingccl.PartitionAddress][]streamingccl.Event

func TestStreamIngestionFrontierProcessor(t *testing.T) {
defer leaktest.AfterTest(t)()
skip.WithIssue(t, 68795, "flaky test")
ctx := context.Background()

tc := testcluster.StartTestCluster(t, 3 /* nodes */, base.TestClusterArgs{})
Expand Down
73 changes: 41 additions & 32 deletions pkg/sql/colencoding/key_encoding.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,8 @@ func DecodeIndexKeyToCols(
colDirs []descpb.IndexDescriptor_Direction,
key roachpb.Key,
invertedColIdx int,
) (remainingKey roachpb.Key, matches bool, foundNull bool, _ error) {
scratch []byte,
) (remainingKey roachpb.Key, matches bool, foundNull bool, retScratch []byte, _ error) {
var decodedTableID descpb.ID
var decodedIndexID descpb.IndexID
var err error
Expand All @@ -68,28 +69,28 @@ func DecodeIndexKeyToCols(
lastKeyComponentLength := len(key)
key, decodedTableID, decodedIndexID, err = rowenc.DecodePartialTableIDIndexID(key)
if err != nil {
return nil, false, false, err
return nil, false, false, scratch, err
}
if decodedTableID != ancestor.TableID || decodedIndexID != ancestor.IndexID {
// We don't match. Return a key with the table ID / index ID we're
// searching for, so the caller knows what to seek to.
curPos := len(origKey) - lastKeyComponentLength
// Prevent unwanted aliasing on the origKey by setting the capacity.
key = rowenc.EncodePartialTableIDIndexID(origKey[:curPos:curPos], ancestor.TableID, ancestor.IndexID)
return key, false, false, nil
return key, false, false, scratch, nil
}
}

length := int(ancestor.SharedPrefixLen)
// We don't care about whether this call to DecodeKeyVals found a null or not, because
// it is a interleaving ancestor.
var isNull bool
key, isNull, err = DecodeKeyValsToCols(
key, isNull, scratch, err = DecodeKeyValsToCols(
da, vecs, idx, indexColIdx[:length], checkAllColsForNull, types[:length],
colDirs[:length], nil /* unseen */, key, invertedColIdx,
colDirs[:length], nil /* unseen */, key, invertedColIdx, scratch,
)
if err != nil {
return nil, false, false, err
return nil, false, false, scratch, err
}
indexColIdx, types, colDirs = indexColIdx[length:], types[length:], colDirs[length:]
foundNull = foundNull || isNull
Expand All @@ -103,31 +104,32 @@ func DecodeIndexKeyToCols(
curPos := len(origKey) - len(key)
// Prevent unwanted aliasing on the origKey by setting the capacity.
key = encoding.EncodeInterleavedSentinel(origKey[:curPos:curPos])
return key, false, false, nil
return key, false, false, scratch, nil
}
}

lastKeyComponentLength := len(key)
key, decodedTableID, decodedIndexID, err = rowenc.DecodePartialTableIDIndexID(key)
if err != nil {
return nil, false, false, err
return nil, false, false, scratch, err
}
if decodedTableID != desc.GetID() || decodedIndexID != index.GetID() {
// We don't match. Return a key with the table ID / index ID we're
// searching for, so the caller knows what to seek to.
curPos := len(origKey) - lastKeyComponentLength
// Prevent unwanted aliasing on the origKey by setting the capacity.
key = rowenc.EncodePartialTableIDIndexID(origKey[:curPos:curPos], desc.GetID(), index.GetID())
return key, false, false, nil
return key, false, false, scratch, nil
}
}

var isNull bool
key, isNull, err = DecodeKeyValsToCols(
da, vecs, idx, indexColIdx, checkAllColsForNull, types, colDirs, nil /* unseen */, key, invertedColIdx,
key, isNull, scratch, err = DecodeKeyValsToCols(
da, vecs, idx, indexColIdx, checkAllColsForNull, types, colDirs,
nil /* unseen */, key, invertedColIdx, scratch,
)
if err != nil {
return nil, false, false, err
return nil, false, false, scratch, err
}
foundNull = foundNull || isNull

Expand All @@ -139,10 +141,10 @@ func DecodeIndexKeyToCols(
curPos := len(origKey) - lastKeyComponentLength
// Prevent unwanted aliasing on the origKey by setting the capacity.
key = encoding.EncodeNullDescending(origKey[:curPos:curPos])
return key, false, false, nil
return key, false, false, scratch, nil
}

return key, true, foundNull, nil
return key, true, foundNull, scratch, nil
}

// DecodeKeyValsToCols decodes the values that are part of the key, writing the
Expand Down Expand Up @@ -171,7 +173,8 @@ func DecodeKeyValsToCols(
unseen *util.FastIntSet,
key []byte,
invertedColIdx int,
) (remainingKey []byte, foundNull bool, _ error) {
scratch []byte,
) (remainingKey []byte, foundNull bool, retScratch []byte, _ error) {
for j := range types {
var err error
i := indexColIdx[j]
Expand All @@ -188,14 +191,14 @@ func DecodeKeyValsToCols(
}
var isNull bool
isInverted := invertedColIdx == i
key, isNull, err = decodeTableKeyToCol(da, vecs[i], idx, types[j], key, directions[j], isInverted)
key, isNull, scratch, err = decodeTableKeyToCol(da, vecs[i], idx, types[j], key, directions[j], isInverted, scratch)
foundNull = isNull || foundNull
}
if err != nil {
return nil, false, err
return nil, false, scratch, err
}
}
return key, foundNull, nil
return key, foundNull, scratch, nil
}

// decodeTableKeyToCol decodes a value encoded by EncodeTableKey, writing the result
Expand All @@ -210,14 +213,15 @@ func decodeTableKeyToCol(
key []byte,
dir descpb.IndexDescriptor_Direction,
isInverted bool,
) ([]byte, bool, error) {
scratch []byte,
) (_ []byte, _ bool, retScratch []byte, _ error) {
if (dir != descpb.IndexDescriptor_ASC) && (dir != descpb.IndexDescriptor_DESC) {
return nil, false, errors.AssertionFailedf("invalid direction: %d", log.Safe(dir))
return nil, false, scratch, errors.AssertionFailedf("invalid direction: %d", log.Safe(dir))
}
var isNull bool
if key, isNull = encoding.DecodeIfNull(key); isNull {
vec.Nulls().SetNull(idx)
return key, true, nil
return key, true, scratch, nil
}
// We might have read a NULL value in the interleaved child table which
// would update the nulls vector, so we need to explicitly unset the null
Expand All @@ -229,10 +233,10 @@ func decodeTableKeyToCol(
if isInverted {
keyLen, err := encoding.PeekLength(key)
if err != nil {
return nil, false, err
return nil, false, scratch, err
}
vec.Bytes().Set(idx, key[:keyLen])
return key[keyLen:], false, nil
return key[keyLen:], false, scratch, nil
}

var rkey []byte
Expand Down Expand Up @@ -272,21 +276,26 @@ func decodeTableKeyToCol(
case types.DecimalFamily:
var d apd.Decimal
if dir == descpb.IndexDescriptor_ASC {
rkey, d, err = encoding.DecodeDecimalAscending(key, nil)
rkey, d, err = encoding.DecodeDecimalAscending(key, scratch[:0])
} else {
rkey, d, err = encoding.DecodeDecimalDescending(key, nil)
rkey, d, err = encoding.DecodeDecimalDescending(key, scratch[:0])
}
vec.Decimal()[idx] = d
case types.BytesFamily, types.StringFamily, types.UuidFamily:
var r []byte
if dir == descpb.IndexDescriptor_ASC {
// No need to perform the deep copy since Set() below will do that
// for us.
rkey, r, err = encoding.DecodeBytesAscending(key, nil)
// We ask for the deep copy to be made so that scratch doesn't
// reference the memory of key - this allows us to return scratch
// to the caller to be reused. The deep copy additionally ensures
// that the memory of the BatchResponse (where key came from) can be
// GCed.
rkey, scratch, err = encoding.DecodeBytesAscendingDeepCopy(key, scratch[:0])
} else {
rkey, r, err = encoding.DecodeBytesDescending(key, nil)
rkey, scratch, err = encoding.DecodeBytesDescending(key, scratch[:0])
}
vec.Bytes().Set(idx, r)
// Set() performs a deep copy, so it is safe to return the scratch slice
// to the caller. Any modifications to the scratch slice made by the
// caller will not affect the value in the vector.
vec.Bytes().Set(idx, scratch)
case types.TimestampFamily, types.TimestampTZFamily:
var t time.Time
if dir == descpb.IndexDescriptor_ASC {
Expand Down Expand Up @@ -319,7 +328,7 @@ func decodeTableKeyToCol(
d, rkey, err = rowenc.DecodeTableKey(da, valType, key, encDir)
vec.Datum().Set(idx, d)
}
return rkey, false, err
return rkey, false, scratch, err
}

// UnmarshalColumnValueToCol decodes the value from a roachpb.Value using the
Expand Down
22 changes: 19 additions & 3 deletions pkg/sql/colfetcher/cfetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -329,6 +329,10 @@ type cFetcher struct {
tableoidCol coldata.DatumVec
}

// scratch is a scratch space used when decoding bytes-like and decimal
// keys.
scratch []byte

typs []*types.T
accountingHelper colmem.SetAccountingHelper
memoryLimit int64
Expand Down Expand Up @@ -512,12 +516,14 @@ func (rf *cFetcher) Init(
}
indexColOrdinals := table.indexColOrdinals
_ = indexColOrdinals[len(indexColumnIDs)-1]
needToDecodeDecimalKey := false
for i, id := range indexColumnIDs {
colIdx, ok := tableArgs.ColIdxMap.Get(id)
if (ok && neededCols.Contains(int(id))) || rf.traceKV {
//gcassert:bce
indexColOrdinals[i] = colIdx
rf.mustDecodeIndexKey = true
needToDecodeDecimalKey = needToDecodeDecimalKey || typs[colIdx].Family() == types.DecimalFamily
// A composite column might also have a value encoding which must be
// decoded. Others can be removed from neededValueColsByIdx.
if compositeColumnIDs.Contains(int(id)) {
Expand All @@ -533,6 +539,13 @@ func (rf *cFetcher) Init(
}
}
}
if needToDecodeDecimalKey && cap(rf.scratch) < 64 {
// If we need to decode the decimal key encoding, it might use a scratch
// byte slice internally, so we'll allocate such a space to be reused
// for every decimal.
// TODO(yuzefovich): 64 was chosen arbitrarily, tune it.
rf.scratch = make([]byte, 64)
}
table.invertedColOrdinal = -1
if table.index.GetType() == descpb.IndexDescriptor_INVERTED {
id := table.index.InvertedColumnID()
Expand Down Expand Up @@ -892,7 +905,7 @@ func (rf *cFetcher) NextBatch(ctx context.Context) (coldata.Batch, error) {
// to determine whether a KV belongs to the same row as the
// previous KV or a different row.
checkAllColsForNull := rf.table.isSecondaryIndex && rf.table.index.IsUnique() && rf.table.desc.NumFamilies() != 1
key, matches, foundNull, err = colencoding.DecodeIndexKeyToCols(
key, matches, foundNull, rf.scratch, err = colencoding.DecodeIndexKeyToCols(
&rf.table.da,
rf.machine.colvecs,
rf.machine.rowIdx,
Expand All @@ -904,6 +917,7 @@ func (rf *cFetcher) NextBatch(ctx context.Context) (coldata.Batch, error) {
rf.table.indexColumnDirs,
rf.machine.nextKV.Key[rf.table.knownPrefixLength:],
rf.table.invertedColOrdinal,
rf.scratch,
)
if err != nil {
return nil, err
Expand Down Expand Up @@ -1261,7 +1275,7 @@ func (rf *cFetcher) processValue(ctx context.Context, familyID descpb.FamilyID)
if table.isSecondaryIndex && table.index.IsUnique() {
// This is a unique secondary index; decode the extra
// column values from the value.
valueBytes, _, err = colencoding.DecodeKeyValsToCols(
valueBytes, _, rf.scratch, err = colencoding.DecodeKeyValsToCols(
&table.da,
rf.machine.colvecs,
rf.machine.rowIdx,
Expand All @@ -1272,6 +1286,7 @@ func (rf *cFetcher) processValue(ctx context.Context, familyID descpb.FamilyID)
&rf.machine.remainingValueColsByIdx,
valueBytes,
rf.table.invertedColOrdinal,
rf.scratch,
)
if err != nil {
return scrub.WrapError(scrub.SecondaryIndexKeyExtraValueDecodingError, err)
Expand Down Expand Up @@ -1577,7 +1592,8 @@ func (rf *cFetcher) Release() {
*rf = cFetcher{
// The types are small objects, so we don't bother deeply resetting this
// slice.
typs: rf.typs[:0],
typs: rf.typs[:0],
scratch: rf.scratch[:0],
}
cFetcherPool.Put(rf)
}
Expand Down

0 comments on commit bdb4c1a

Please sign in to comment.