Skip to content

Commit

Permalink
sql: remove the interleaved logic from fetchers
Browse files Browse the repository at this point in the history
This commit removes the remaining bits of handling the interleaved
tables from both row fetcher and cFetcher.

Release note: None
  • Loading branch information
yuzefovich committed Sep 16, 2021
1 parent 604ff8d commit db3e8f4
Show file tree
Hide file tree
Showing 5 changed files with 119 additions and 412 deletions.
1 change: 0 additions & 1 deletion pkg/sql/colencoding/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ go_library(
deps = [
"//pkg/col/coldata",
"//pkg/roachpb:with-mocks",
"//pkg/sql/catalog",
"//pkg/sql/catalog/descpb",
"//pkg/sql/rowenc",
"//pkg/sql/sem/tree",
Expand Down
122 changes: 0 additions & 122 deletions pkg/sql/colencoding/key_encoding.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ import (
"github.com/cockroachdb/apd/v2"
"github.com/cockroachdb/cockroach/pkg/col/coldata"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/sql/catalog"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb"
"github.com/cockroachdb/cockroach/pkg/sql/rowenc"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
Expand All @@ -28,123 +27,6 @@ import (
"github.com/cockroachdb/errors"
)

// DecodeIndexKeyToCols decodes an index key into the idx'th position of the
// provided slices of colexec.Vecs. The input index key must already have its
// tenant id and first table id / index id prefix removed. If matches is false,
// the key is from a different table, and the returned remainingKey indicates a
// "seek prefix": the next key that might be part of the table being searched
// for. See the analog in sqlbase/index_encoding.go.
//
// Sometimes it is necessary to determine if the value of a column is NULL even
// though the value itself is not needed. If checkAllColsForNull is true, then
// foundNull=true will be returned if any columns in the key are NULL,
// regardless of whether or not indexColIdx indicates that the column should be
// decoded.
func DecodeIndexKeyToCols(
da *rowenc.DatumAlloc,
vecs []coldata.Vec,
idx int,
desc catalog.TableDescriptor,
index catalog.Index,
indexColIdx []int,
checkAllColsForNull bool,
types []*types.T,
colDirs []descpb.IndexDescriptor_Direction,
key roachpb.Key,
invertedColIdx int,
) (remainingKey roachpb.Key, matches bool, foundNull bool, _ error) {
var decodedTableID descpb.ID
var decodedIndexID descpb.IndexID
var err error

origKey := key

if index.NumInterleaveAncestors() > 0 {
for i := 0; i < index.NumInterleaveAncestors(); i++ {
ancestor := index.GetInterleaveAncestor(i)
// Our input key had its first table id / index id chopped off, so
// don't try to decode those for the first ancestor.
if i != 0 {
lastKeyComponentLength := len(key)
key, decodedTableID, decodedIndexID, err = rowenc.DecodePartialTableIDIndexID(key)
if err != nil {
return nil, false, false, err
}
if decodedTableID != ancestor.TableID || decodedIndexID != ancestor.IndexID {
// We don't match. Return a key with the table ID / index ID we're
// searching for, so the caller knows what to seek to.
curPos := len(origKey) - lastKeyComponentLength
// Prevent unwanted aliasing on the origKey by setting the capacity.
key = rowenc.EncodePartialTableIDIndexID(origKey[:curPos:curPos], ancestor.TableID, ancestor.IndexID)
return key, false, false, nil
}
}

length := int(ancestor.SharedPrefixLen)
// We don't care about whether this call to DecodeKeyVals found a null or not, because
// it is a interleaving ancestor.
var isNull bool
key, isNull, err = DecodeKeyValsToCols(
da, vecs, idx, indexColIdx[:length], checkAllColsForNull, types[:length],
colDirs[:length], nil /* unseen */, key, invertedColIdx,
)
if err != nil {
return nil, false, false, err
}
indexColIdx, types, colDirs = indexColIdx[length:], types[length:], colDirs[length:]
foundNull = foundNull || isNull

// Consume the interleaved sentinel.
var ok bool
key, ok = encoding.DecodeIfInterleavedSentinel(key)
if !ok {
// We're expecting an interleaved sentinel but didn't find one. Append
// one so the caller can seek to it.
curPos := len(origKey) - len(key)
// Prevent unwanted aliasing on the origKey by setting the capacity.
key = encoding.EncodeInterleavedSentinel(origKey[:curPos:curPos])
return key, false, false, nil
}
}

lastKeyComponentLength := len(key)
key, decodedTableID, decodedIndexID, err = rowenc.DecodePartialTableIDIndexID(key)
if err != nil {
return nil, false, false, err
}
if decodedTableID != desc.GetID() || decodedIndexID != index.GetID() {
// We don't match. Return a key with the table ID / index ID we're
// searching for, so the caller knows what to seek to.
curPos := len(origKey) - lastKeyComponentLength
// Prevent unwanted aliasing on the origKey by setting the capacity.
key = rowenc.EncodePartialTableIDIndexID(origKey[:curPos:curPos], desc.GetID(), index.GetID())
return key, false, false, nil
}
}

var isNull bool
key, isNull, err = DecodeKeyValsToCols(
da, vecs, idx, indexColIdx, checkAllColsForNull, types, colDirs, nil /* unseen */, key, invertedColIdx,
)
if err != nil {
return nil, false, false, err
}
foundNull = foundNull || isNull

// We're expecting a column family id next (a varint). If
// interleavedSentinel is actually next, then this key is for a child
// table.
lastKeyComponentLength := len(key)
if _, ok := encoding.DecodeIfInterleavedSentinel(key); ok {
curPos := len(origKey) - lastKeyComponentLength
// Prevent unwanted aliasing on the origKey by setting the capacity.
key = encoding.EncodeNullDescending(origKey[:curPos:curPos])
return key, false, false, nil
}

return key, true, foundNull, nil
}

// DecodeKeyValsToCols decodes the values that are part of the key, writing the
// result to the idx'th slot of the input slice of colexec.Vecs. If the
// directions slice is nil, the direction used will default to
Expand Down Expand Up @@ -223,10 +105,6 @@ func decodeTableKeyToCol(
vec.Nulls().SetNull(idx)
return key, true, nil
}
// We might have read a NULL value in the interleaved child table which
// would update the nulls vector, so we need to explicitly unset the null
// value here.
vec.Nulls().UnsetNull(idx)

// Inverted columns should not be decoded, but should instead be
// passed on as a DBytes datum.
Expand Down
109 changes: 11 additions & 98 deletions pkg/sql/colfetcher/cfetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -237,10 +237,8 @@ type cFetcher struct {
// firstBatchLimit.
maxKeysPerRow int

// True if the index key must be decoded.
// If there is more than one table, the index key must always be decoded.
// This is only false if there are no needed columns and the (single)
// table has no interleave children.
// True if the index key must be decoded. This is only false if there are no
// needed columns.
mustDecodeIndexKey bool

// lockStrength represents the row-level locking mode to use when fetching rows.
Expand Down Expand Up @@ -282,8 +280,6 @@ type cFetcher struct {
rowIdx int
// nextKV is the kv to process next.
nextKV roachpb.KeyValue
// seekPrefix is the prefix to seek to in stateSeekPrefix.
seekPrefix roachpb.Key

// limitHint is a hint as to the number of rows that the caller expects
// to be returned from this fetch. It will be decremented whenever a
Expand Down Expand Up @@ -567,14 +563,9 @@ func (rf *cFetcher) Init(
}
}

// - If there are interleaves, we need to read the index key in order to
// determine whether this row is actually part of the index we're scanning.
// - If there are needed columns from the index key, we need to read it.
//
// Otherwise, we can completely avoid decoding the index key.
if neededIndexCols > 0 || table.index.NumInterleavedBy() > 0 || table.index.NumInterleaveAncestors() > 0 {
rf.mustDecodeIndexKey = true
}
// If there are needed columns from the index key, we need to read it;
// otherwise, we can completely avoid decoding the index key.
rf.mustDecodeIndexKey = neededIndexCols > 0

if table.isSecondaryIndex {
colIDs := table.index.CollectKeyColumnIDs()
Expand Down Expand Up @@ -744,27 +735,13 @@ const (
// set.
// 1. skip common prefix
// 2. parse key (past common prefix) into row buffer, setting last row prefix buffer
// 3. interleave detected?
// - set skip prefix
// -> seekPrefix(decodeFirstKVOfRow)
// 4. parse value into row buffer.
// 5. 1-cf or secondary index?
// 3. parse value into row buffer.
// 4. 1-cf or secondary index?
// -> doneRow(initFetch)
// else:
// -> fetchNextKVWithUnfinishedRow
stateDecodeFirstKVOfRow

// stateSeekPrefix is the state of skipping all keys that sort before
// (or after, in the case of a reverse scan) a prefix. s.machine.seekPrefix
// must be set to the prefix to seek to. state[1] must be set, and seekPrefix
// will transition to that state once it finds the first key with that prefix.
// 1. fetch next kv into nextKV buffer
// 2. kv doesn't match seek prefix?
// -> seekPrefix
// else:
// -> nextState
stateSeekPrefix

// stateFetchNextKVWithUnfinishedRow is the state of getting a new key for
// the current row. The machine will read a new key from the underlying
// fetcher, process it, and either add the results to the current row, or
Expand All @@ -775,9 +752,6 @@ const (
// 4. no?
// -> finalizeRow(decodeFirstKVOfRow)
// 5. skip to end of last row prefix buffer
// 6. interleave detected?
// - set skip prefix
// -> finalizeRow(seekPrefix(decodeFirstKVOfRow))
// 6. parse value into row buffer
// 7. -> fetchNextKVWithUnfinishedRow
stateFetchNextKVWithUnfinishedRow
Expand Down Expand Up @@ -895,9 +869,8 @@ func (rf *cFetcher) NextBatch(ctx context.Context) (coldata.Batch, error) {
log.Infof(ctx, "decoding first key %s", rf.machine.nextKV.Key)
}
var (
key []byte
matches bool
err error
key []byte
err error
)
indexOrds := rf.table.indexColOrdinals
if rf.traceKV {
Expand All @@ -908,39 +881,24 @@ func (rf *cFetcher) NextBatch(ctx context.Context) (coldata.Batch, error) {
// to determine whether a KV belongs to the same row as the
// previous KV or a different row.
checkAllColsForNull := rf.table.isSecondaryIndex && rf.table.index.IsUnique() && rf.table.desc.NumFamilies() != 1
key, matches, foundNull, err = colencoding.DecodeIndexKeyToCols(
key, foundNull, err = colencoding.DecodeKeyValsToCols(
&rf.table.da,
rf.machine.colvecs,
rf.machine.rowIdx,
rf.table.desc,
rf.table.index,
indexOrds,
checkAllColsForNull,
rf.table.keyValTypes,
rf.table.indexColumnDirs,
nil, /* unseen */
rf.machine.nextKV.Key[rf.table.knownPrefixLength:],
rf.table.invertedColOrdinal,
)
if err != nil {
return nil, err
}
if !matches {
// We found an interleave. Set our skip prefix.
seekPrefix := rf.machine.nextKV.Key[:len(key)+rf.table.knownPrefixLength]
if debugState {
log.Infof(ctx, "setting seek prefix to %s", seekPrefix)
}
rf.machine.seekPrefix = seekPrefix
rf.machine.state[0] = stateSeekPrefix
rf.machine.state[1] = stateDecodeFirstKVOfRow
continue
}
prefix := rf.machine.nextKV.Key[:len(rf.machine.nextKV.Key)-len(key)]
rf.machine.lastRowPrefix = prefix
} else {
// If mustDecodeIndexKey was false, we can't possibly have an
// interleaved row on our hands, so we can figure out our row prefix
// without parsing any keys by using GetRowPrefixLength.
prefixLen, err := keys.GetRowPrefixLength(rf.machine.nextKV.Key)
if err != nil {
return nil, err
Expand Down Expand Up @@ -1019,39 +977,6 @@ func (rf *cFetcher) NextBatch(ctx context.Context) (coldata.Batch, error) {
// If the table has more than one column family, then the next KV
// may belong to the same row as the current KV.
rf.machine.state[0] = stateFetchNextKVWithUnfinishedRow
case stateSeekPrefix:
// Note: seekPrefix is only used for interleaved tables.
for {
moreKVs, kv, finalReferenceToBatch, err := rf.fetcher.NextKV(ctx, rf.mvccDecodeStrategy)
if err != nil {
return nil, rf.convertFetchError(ctx, err)
}
if debugState {
log.Infof(ctx, "found kv %s, seeking to prefix %s", kv.Key, rf.machine.seekPrefix)
}
if !moreKVs {
// We ran out of data, so ignore whatever our next state was going to
// be and emit the final batch.
rf.machine.state[1] = stateEmitLastBatch
break
}
// The order we perform the comparison in depends on whether we are
// performing a reverse scan or not. If we are performing a reverse
// scan, then we want to seek until we find a key less than seekPrefix.
var comparison int
if rf.reverse {
comparison = bytes.Compare(rf.machine.seekPrefix, kv.Key)
} else {
comparison = bytes.Compare(kv.Key, rf.machine.seekPrefix)
}
// TODO(jordan): if nextKV returns newSpan = true, set the new span
// prefix and indicate that it needs decoding.
if comparison >= 0 {
rf.setNextKV(kv, finalReferenceToBatch)
break
}
}
rf.shiftState()

case stateFetchNextKVWithUnfinishedRow:
moreKVs, kv, finalReferenceToBatch, err := rf.fetcher.NextKV(ctx, rf.mvccDecodeStrategy)
Expand Down Expand Up @@ -1080,18 +1005,6 @@ func (rf *cFetcher) NextBatch(ctx context.Context) (coldata.Batch, error) {
continue
}

key := kv.Key[len(rf.machine.lastRowPrefix):]
_, foundInterleave := encoding.DecodeIfInterleavedSentinel(key)

if foundInterleave {
// The key we just found isn't relevant to the current row, so finalize
// the current row, then skip all KVs with the current interleave prefix.
rf.machine.state[0] = stateFinalizeRow
rf.machine.state[1] = stateSeekPrefix
rf.machine.state[2] = stateDecodeFirstKVOfRow
continue
}

familyID, err := rf.getCurrentColumnFamilyID()
if err != nil {
return nil, err
Expand Down
13 changes: 6 additions & 7 deletions pkg/sql/colfetcher/fetcherstate_string.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

0 comments on commit db3e8f4

Please sign in to comment.