Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ttljob: fix job to handle composite PKs #116988

Merged
merged 1 commit into from
Jan 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
31 changes: 26 additions & 5 deletions pkg/ccl/streamingccl/replicationtestutils/encoding.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,29 @@ func EncodeKV(
t *testing.T, codec keys.SQLCodec, descr catalog.TableDescriptor, pkeyVals ...interface{},
) roachpb.KeyValue {
require.Equal(t, 1, descr.NumFamilies(), "there can be only one")
primary := descr.GetPrimaryIndex()
require.LessOrEqual(t, primary.NumKeyColumns(), len(pkeyVals))
indexEntries := encodeKVImpl(t, codec, descr, pkeyVals...)
require.Equal(t, 1, len(indexEntries))
return roachpb.KeyValue{Key: indexEntries[0].Key, Value: indexEntries[0].Value}
}

// EncodeKVs is similar to EncodeKV, but can be used for a table with multiple
// column families, in which case up to one KV is returned per family.
func EncodeKVs(
t *testing.T, codec keys.SQLCodec, descr catalog.TableDescriptor, pkeyVals ...interface{},
) []roachpb.KeyValue {
indexEntries := encodeKVImpl(t, codec, descr, pkeyVals...)
require.GreaterOrEqual(t, len(indexEntries), 1)
kvs := make([]roachpb.KeyValue, len(indexEntries))
for i := range indexEntries {
kvs[i] = roachpb.KeyValue{Key: indexEntries[i].Key, Value: indexEntries[i].Value}
}
return kvs
}

func encodeKVImpl(
t *testing.T, codec keys.SQLCodec, descr catalog.TableDescriptor, pkeyVals ...interface{},
) []rowenc.IndexEntry {
primary := descr.GetPrimaryIndex()
var datums tree.Datums
var colMap catalog.TableColMap
for i, val := range pkeyVals {
Expand All @@ -42,9 +62,10 @@ func EncodeKV(
indexEntries, err := rowenc.EncodePrimaryIndex(codec, descr, primary,
colMap, datums, includeEmpty)
require.NoError(t, err)
require.Equal(t, 1, len(indexEntries))
indexEntries[0].Value.InitChecksum(indexEntries[0].Key)
return roachpb.KeyValue{Key: indexEntries[0].Key, Value: indexEntries[0].Value}
for i := range indexEntries {
indexEntries[i].Value.InitChecksum(indexEntries[i].Key)
}
return indexEntries
}

func nativeToDatum(t *testing.T, native interface{}) tree.Datum {
Expand Down
4 changes: 4 additions & 0 deletions pkg/sql/randgen/datum.go
Original file line number Diff line number Diff line change
Expand Up @@ -553,6 +553,7 @@ var (
},
types.FloatFamily: {
tree.NewDFloat(tree.DFloat(0)),
tree.NewDFloat(tree.DFloat(math.Copysign(0, -1))), // -0
tree.NewDFloat(tree.DFloat(1)),
tree.NewDFloat(tree.DFloat(-1)),
tree.NewDFloat(tree.DFloat(math.SmallestNonzeroFloat32)),
Expand All @@ -566,9 +567,12 @@ var (
types.DecimalFamily: func() []tree.Datum {
var res []tree.Datum
for _, s := range []string{
"-0",
"0",
"1",
"1.0",
"-1",
"-1.0",
"Inf",
"-Inf",
"NaN",
Expand Down
1 change: 1 addition & 0 deletions pkg/sql/rowenc/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ go_library(
"//pkg/geo/geoindex",
"//pkg/geo/geopb",
"//pkg/keys",
"//pkg/kv",
"//pkg/roachpb",
"//pkg/sql/catalog",
"//pkg/sql/catalog/catalogkeys",
Expand Down
71 changes: 68 additions & 3 deletions pkg/sql/rowenc/index_encoding.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,15 @@
package rowenc

import (
"bytes"
"context"
"sort"
"unsafe"

"github.com/cockroachdb/cockroach/pkg/geo/geoindex"
"github.com/cockroachdb/cockroach/pkg/geo/geopb"
"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/sql/catalog"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/catalogkeys"
Expand Down Expand Up @@ -425,7 +427,8 @@ func DecodeIndexKeyPrefix(
}

// DecodeIndexKey decodes the values that are a part of the specified index
// key (setting vals).
// key (setting vals). This function does not handle types that have composite
// encoding. See DecodeIndexKeyToDatums for a function that does.
// numVals returns the number of vals populated - this can be less than
// len(vals) if key ran out of bytes while populating vals.
func DecodeIndexKey(
Expand All @@ -445,18 +448,80 @@ func DecodeIndexKey(

// DecodeIndexKeyToDatums decodes a key to tree.Datums. It is similar to
// DecodeIndexKey, but eagerly decodes the []EncDatum to tree.Datums.
// Also, unlike DecodeIndexKey, this function is able to handle types
// with composite encoding.
func DecodeIndexKeyToDatums(
codec keys.SQLCodec,
colIDs catalog.TableColMap,
types []*types.T,
colDirs []catenumpb.IndexColumn_Direction,
key []byte,
keyValues []kv.KeyValue,
a *tree.DatumAlloc,
) (tree.Datums, error) {
if len(keyValues) == 0 {
return nil, errors.AssertionFailedf("no key values to decode")
}
vals := make([]EncDatum, len(types))
numVals, err := DecodeIndexKey(codec, vals, colDirs, key)
numVals, err := DecodeIndexKey(codec, vals, colDirs, keyValues[0].Key)
if err != nil {
return nil, err
}
prefixLen, err := keys.GetRowPrefixLength(keyValues[0].Key)
if err != nil {
return nil, err
}
rowPrefix := keyValues[0].Key[:prefixLen]

// Types that have a composite encoding can their data stored in the value.
// See docs/tech-notes/encoding.md#composite-encoding for details.
for _, keyValue := range keyValues {
kvVal := keyValue.Value

if !bytes.HasPrefix(keyValue.Key, rowPrefix) {
// This KV is not part of the same row as the start primary key. Sometimes
// a KV is omitted if all the columns in its column family are NULL. This
// could cause us to scan more KVs than needed to decode the primary index
// columns, so the slice we're iterating through might contain KVs from a
// different row at the end.
break
}

// The composite encoding for primary index keys is always a tuple, so we
// can ignore anything else.
if kvVal == nil || kvVal.GetTag() != roachpb.ValueType_TUPLE {
continue
}
valueBytes, err := kvVal.GetTuple()
if err != nil {
return nil, err
}

var lastColID descpb.ColumnID = 0
for len(valueBytes) > 0 {
typeOffset, _, colIDDiff, _, err := encoding.DecodeValueTag(valueBytes)
if err != nil {
return nil, err
}
colID := lastColID + descpb.ColumnID(colIDDiff)
lastColID = colID
colOrdinal, ok := colIDs.Get(colID)
if !ok {
// This is for a column that is not in the index. We still need to
// consume the data.
_, encLen, err := encoding.PeekValueLength(valueBytes[typeOffset:])
if err != nil {
return nil, err
}
valueBytes = valueBytes[typeOffset+encLen:]
continue
}
vals[colOrdinal], valueBytes, err = EncDatumFromBuffer(catenumpb.DatumEncoding_VALUE, valueBytes[typeOffset:])
if err != nil {
return nil, err
}
}
}

datums := make(tree.Datums, 0, numVals)
for i, encDatum := range vals[:numVals] {
if err := encDatum.EnsureDecoded(types[i], a); err != nil {
Expand Down
26 changes: 17 additions & 9 deletions pkg/sql/ttl/ttljob/ttljob_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,9 +98,11 @@ func (t *ttlProcessor) work(ctx context.Context) error {

var (
relationName string
pkColIDs catalog.TableColMap
pkColNames []string
pkColTypes []*types.T
pkColDirs []catenumpb.IndexColumn_Direction
numFamilies int
labelMetrics bool
processorRowCount int64
)
Expand All @@ -110,6 +112,7 @@ func (t *ttlProcessor) work(ctx context.Context) error {
return err
}

numFamilies = desc.NumFamilies()
var buf bytes.Buffer
primaryIndexDesc := desc.GetPrimaryIndex().IndexDesc()
pkColNames = make([]string, 0, len(primaryIndexDesc.KeyColumnNames))
Expand All @@ -123,6 +126,10 @@ func (t *ttlProcessor) work(ctx context.Context) error {
return err
}
pkColDirs = primaryIndexDesc.KeyColumnDirections
pkColIDs = catalog.TableColMap{}
for i, id := range primaryIndexDesc.KeyColumnIDs {
pkColIDs.Set(id, i)
}

if !desc.HasRowLevelTTL() {
return errors.Newf("unable to find TTL on table %s", desc.GetName())
Expand Down Expand Up @@ -215,8 +222,10 @@ func (t *ttlProcessor) work(ctx context.Context) error {
ctx,
kvDB,
codec,
pkColIDs,
pkColTypes,
pkColDirs,
numFamilies,
span,
&alloc,
); err != nil {
Expand Down Expand Up @@ -400,23 +409,24 @@ func SpanToQueryBounds(
ctx context.Context,
kvDB *kv.DB,
codec keys.SQLCodec,
pkColIDs catalog.TableColMap,
pkColTypes []*types.T,
pkColDirs []catenumpb.IndexColumn_Direction,
numFamilies int,
span roachpb.Span,
alloc *tree.DatumAlloc,
) (bounds QueryBounds, hasRows bool, _ error) {
const maxRows = 1
partialStartKey := span.Key
partialEndKey := span.EndKey
startKeyValues, err := kvDB.Scan(ctx, partialStartKey, partialEndKey, maxRows)
startKeyValues, err := kvDB.Scan(ctx, partialStartKey, partialEndKey, int64(numFamilies))
if err != nil {
return bounds, false, errors.Wrapf(err, "scan error startKey=%x endKey=%x", []byte(partialStartKey), []byte(partialEndKey))
}
// If span has 0 rows then return early - it will not be processed.
if len(startKeyValues) == 0 {
return bounds, false, nil
}
endKeyValues, err := kvDB.ReverseScan(ctx, partialStartKey, partialEndKey, maxRows)
endKeyValues, err := kvDB.ReverseScan(ctx, partialStartKey, partialEndKey, int64(numFamilies))
if err != nil {
return bounds, false, errors.Wrapf(err, "reverse scan error startKey=%x endKey=%x", []byte(partialStartKey), []byte(partialEndKey))
}
Expand All @@ -426,15 +436,13 @@ func SpanToQueryBounds(
if len(endKeyValues) == 0 {
return bounds, false, nil
}
startKey := startKeyValues[0].Key
bounds.Start, err = rowenc.DecodeIndexKeyToDatums(codec, pkColTypes, pkColDirs, startKey, alloc)
bounds.Start, err = rowenc.DecodeIndexKeyToDatums(codec, pkColIDs, pkColTypes, pkColDirs, startKeyValues, alloc)
if err != nil {
return bounds, false, errors.Wrapf(err, "decode startKey error key=%x", []byte(startKey))
return bounds, false, errors.Wrapf(err, "decode startKeyValues error on %+v", startKeyValues)
}
endKey := endKeyValues[0].Key
bounds.End, err = rowenc.DecodeIndexKeyToDatums(codec, pkColTypes, pkColDirs, endKey, alloc)
bounds.End, err = rowenc.DecodeIndexKeyToDatums(codec, pkColIDs, pkColTypes, pkColDirs, endKeyValues, alloc)
if err != nil {
return bounds, false, errors.Wrapf(err, "decode endKey error key=%x", []byte(endKey))
return bounds, false, errors.Wrapf(err, "decode endKeyValues error on %+v", endKeyValues)
}
return bounds, true, nil
}
Expand Down