Skip to content

Commit c54cc1a

Browse files
committed
sstable: include all filter blocks in Reader.Layout
Previously, (*sstable.Reader).Layout only included 1 filter block and only if it matched the reader's configuration. This commit updates the Layout method to collect the block handles of all filter blocks contained within the metaindex block.
1 parent 7058aad commit c54cc1a

File tree

5 files changed

+190
-61
lines changed

5 files changed

+190
-61
lines changed

sstable/copier_test.go

Lines changed: 44 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -30,8 +30,35 @@ func TestCopySpan(t *testing.T) {
3030
defer cacheHandle.Close()
3131
fileNameToNum := make(map[string]base.FileNum)
3232
nextFileNum := base.FileNum(1)
33-
3433
keySchema := colblk.DefaultKeySchema(testkeys.Comparer, 16)
34+
35+
getReader := func(d *datadriven.TestData) (*Reader, error) {
36+
f, err := fs.Open(d.CmdArgs[0].Key)
37+
if err != nil {
38+
return nil, err
39+
}
40+
readable, err := NewSimpleReadable(f)
41+
if err != nil {
42+
return nil, err
43+
}
44+
45+
rOpts := ReaderOptions{
46+
ReaderOptions: block.ReaderOptions{
47+
CacheOpts: sstableinternal.CacheOptions{
48+
CacheHandle: cacheHandle,
49+
FileNum: base.DiskFileNum(fileNameToNum[d.CmdArgs[0].Key]),
50+
},
51+
},
52+
Comparer: testkeys.Comparer,
53+
KeySchemas: KeySchemas{keySchema.Name: &keySchema},
54+
}
55+
r, err := NewReader(context.TODO(), readable, rOpts)
56+
if err != nil {
57+
return nil, errors.CombineErrors(err, readable.Close())
58+
}
59+
return r, nil
60+
}
61+
3562
datadriven.RunTest(t, "testdata/copy_span", func(t *testing.T, d *datadriven.TestData) string {
3663
switch d.Cmd {
3764
case "build":
@@ -70,14 +97,6 @@ func TestCopySpan(t *testing.T) {
7097

7198
case "iter":
7299
// Iterate over the specified sstable
73-
f, err := fs.Open(d.CmdArgs[0].Key)
74-
if err != nil {
75-
return err.Error()
76-
}
77-
readable, err := NewSimpleReadable(f)
78-
if err != nil {
79-
return err.Error()
80-
}
81100
var start, end []byte
82101
for _, arg := range d.CmdArgs[1:] {
83102
switch arg.Key {
@@ -87,22 +106,12 @@ func TestCopySpan(t *testing.T) {
87106
end = []byte(arg.FirstVal(t))
88107
}
89108
}
90-
rOpts := ReaderOptions{
91-
ReaderOptions: block.ReaderOptions{
92-
CacheOpts: sstableinternal.CacheOptions{
93-
CacheHandle: cacheHandle,
94-
FileNum: base.DiskFileNum(fileNameToNum[d.CmdArgs[0].Key]),
95-
},
96-
},
97-
Comparer: testkeys.Comparer,
98-
KeySchemas: KeySchemas{keySchema.Name: &keySchema},
99-
}
100109

101-
r, err := NewReader(context.TODO(), readable, rOpts)
102-
defer r.Close()
110+
r, err := getReader(d)
103111
if err != nil {
104-
return errors.CombineErrors(err, readable.Close()).Error()
112+
return err.Error()
105113
}
114+
defer r.Close()
106115
iter, err := r.NewIter(block.NoTransforms, start, end, AssertNoBlobHandles)
107116
if err != nil {
108117
return err.Error()
@@ -132,14 +141,6 @@ func TestCopySpan(t *testing.T) {
132141
fileNameToNum[outputFile] = nextFileNum
133142
nextFileNum++
134143

135-
f, err := fs.Open(inputFile)
136-
if err != nil {
137-
t.Fatalf("failed to open sstable: %v", err)
138-
}
139-
readable, err := NewSimpleReadable(f)
140-
if err != nil {
141-
return err.Error()
142-
}
143144
rOpts := ReaderOptions{
144145
ReaderOptions: block.ReaderOptions{
145146
CacheOpts: sstableinternal.CacheOptions{
@@ -150,11 +151,12 @@ func TestCopySpan(t *testing.T) {
150151
Comparer: testkeys.Comparer,
151152
KeySchemas: KeySchemas{keySchema.Name: &keySchema},
152153
}
153-
r, err := NewReader(context.TODO(), readable, rOpts)
154+
r, err := getReader(d)
154155
if err != nil {
155-
return errors.CombineErrors(err, readable.Close()).Error()
156+
return err.Error()
156157
}
157158
defer r.Close()
159+
158160
wOpts := WriterOptions{
159161
Comparer: testkeys.Comparer,
160162
KeySchema: &keySchema,
@@ -175,27 +177,28 @@ func TestCopySpan(t *testing.T) {
175177
return fmt.Sprintf("copied %d bytes", size)
176178

177179
case "describe":
178-
f, err := fs.Open(d.CmdArgs[0].Key)
180+
r, err := getReader(d)
179181
if err != nil {
180182
return err.Error()
181183
}
182-
readable, err := NewSimpleReadable(f)
184+
defer r.Close()
185+
l, err := r.Layout()
183186
if err != nil {
184187
return err.Error()
185188
}
186-
r, err := NewReader(context.TODO(), readable, ReaderOptions{
187-
Comparer: testkeys.Comparer,
188-
KeySchemas: KeySchemas{keySchema.Name: &keySchema},
189-
})
189+
return l.Describe(false /* verbose */, r, nil)
190+
191+
case "props":
192+
r, err := getReader(d)
190193
if err != nil {
191-
return errors.CombineErrors(err, readable.Close()).Error()
194+
return err.Error()
192195
}
193196
defer r.Close()
194-
l, err := r.Layout()
197+
props, err := r.ReadPropertiesBlock(context.TODO(), nil)
195198
if err != nil {
196199
return err.Error()
197200
}
198-
return l.Describe(true, r, nil)
201+
return props.String()
199202

200203
default:
201204
t.Fatalf("unknown command: %s", d.Cmd)

sstable/layout.go

Lines changed: 22 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -714,6 +714,8 @@ func forEachIndexEntry(
714714
return indexIter.Close()
715715
}
716716

717+
// decodeMetaindex decodes a row-based meta index block. The returned map owns
718+
// all its memory and can outlive the provided data slice.
717719
func decodeMetaindex(
718720
data []byte,
719721
) (meta map[string]block.Handle, vbih valblk.IndexHandle, err error) {
@@ -723,9 +725,11 @@ func decodeMetaindex(
723725
}
724726
defer func() { err = firstError(err, i.Close()) }()
725727

728+
var keysAlloc bytealloc.A
726729
meta = map[string]block.Handle{}
727730
for valid := i.First(); valid; valid = i.Next() {
728731
value := i.Value()
732+
var bh block.Handle
729733
if bytes.Equal(i.Key().UserKey, []byte(metaValueIndexName)) {
730734
var n int
731735
vbih, n, err = valblk.DecodeIndexHandle(i.Value())
@@ -735,27 +739,36 @@ func decodeMetaindex(
735739
if n == 0 || n != len(value) {
736740
return nil, vbih, base.CorruptionErrorf("pebble/table: invalid table (bad value blocks index handle)")
737741
}
742+
bh = vbih.Handle
738743
} else {
739-
bh, n := block.DecodeHandle(value)
744+
var n int
745+
bh, n = block.DecodeHandle(value)
740746
if n == 0 || n != len(value) {
741747
return nil, vbih, base.CorruptionErrorf("pebble/table: invalid table (bad block handle)")
742748
}
743-
meta[string(i.Key().UserKey)] = bh
744749
}
750+
var key []byte
751+
keysAlloc, key = keysAlloc.Copy(i.Key().UserKey)
752+
keyStr := unsafe.String(unsafe.SliceData(key), len(key))
753+
meta[keyStr] = bh
745754
}
746755
return meta, vbih, nil
747756
}
748757

758+
// decodeColumnarMetaIndex decodes a columnar meta index block. The returned map
759+
// owns all its memory and can outlive the provided data slice.
749760
func decodeColumnarMetaIndex(
750761
data []byte,
751762
) (meta map[string]block.Handle, vbih valblk.IndexHandle, err error) {
752763
var decoder colblk.KeyValueBlockDecoder
753764
decoder.Init(data)
765+
var keysAlloc bytealloc.A
754766
meta = map[string]block.Handle{}
755767
for i := 0; i < decoder.BlockDecoder().Rows(); i++ {
756768
key := decoder.KeyAt(i)
757769
value := decoder.ValueAt(i)
758770

771+
var bh block.Handle
759772
if bytes.Equal(key, []byte(metaValueIndexName)) {
760773
var n int
761774
vbih, n, err = valblk.DecodeIndexHandle(value)
@@ -765,13 +778,18 @@ func decodeColumnarMetaIndex(
765778
if n == 0 || n != len(value) {
766779
return nil, vbih, base.CorruptionErrorf("pebble/table: invalid table (bad value blocks index handle)")
767780
}
781+
bh = vbih.Handle
768782
} else {
769-
bh, n := block.DecodeHandle(value)
783+
var n int
784+
bh, n = block.DecodeHandle(value)
770785
if n == 0 || n != len(value) {
771786
return nil, vbih, base.CorruptionErrorf("pebble/table: invalid table (bad block handle)")
772787
}
773-
meta[string(key)] = bh
774788
}
789+
var keyCopy []byte
790+
keysAlloc, keyCopy = keysAlloc.Copy(key)
791+
keyStr := unsafe.String(unsafe.SliceData(keyCopy), len(keyCopy))
792+
meta[keyStr] = bh
775793
}
776794
return meta, vbih, nil
777795
}

sstable/reader.go

Lines changed: 36 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import (
1111
"fmt"
1212
"io"
1313
"slices"
14+
"strings"
1415
"sync"
1516

1617
"github.com/cockroachdb/errors"
@@ -434,31 +435,41 @@ var metaBufferPools = sync.Pool{
434435
},
435436
}
436437

437-
func (r *Reader) readMetaindex(
438-
ctx context.Context,
439-
bufferPool *block.BufferPool,
440-
readHandle objstorage.ReadHandle,
441-
filters map[string]FilterPolicy,
442-
) error {
438+
func (r *Reader) readAndDecodeMetaindex(
439+
ctx context.Context, bufferPool *block.BufferPool, readHandle objstorage.ReadHandle,
440+
) (map[string]block.Handle, valblk.IndexHandle, error) {
443441
metaEnv := block.ReadEnv{BufferPool: bufferPool}
444442
b, err := r.readMetaindexBlock(ctx, metaEnv, readHandle)
445443
if err != nil {
446-
return err
444+
return nil, valblk.IndexHandle{}, err
447445
}
448446
data := b.BlockData()
449447
defer b.Release()
450448

451449
if uint64(len(data)) != r.metaindexBH.Length {
452-
return base.CorruptionErrorf("pebble/table: unexpected metaindex block size: %d vs %d",
450+
return nil, valblk.IndexHandle{}, base.CorruptionErrorf("pebble/table: unexpected metaindex block size: %d vs %d",
453451
errors.Safe(len(data)), errors.Safe(r.metaindexBH.Length))
454452
}
455453

456454
var meta map[string]block.Handle
455+
var valueBIH valblk.IndexHandle
457456
if r.tableFormat >= TableFormatPebblev6 {
458-
meta, r.valueBIH, err = decodeColumnarMetaIndex(data)
457+
meta, valueBIH, err = decodeColumnarMetaIndex(data)
459458
} else {
460-
meta, r.valueBIH, err = decodeMetaindex(data)
459+
meta, valueBIH, err = decodeMetaindex(data)
461460
}
461+
return meta, valueBIH, err
462+
}
463+
464+
func (r *Reader) initMetaindexBlocks(
465+
ctx context.Context,
466+
bufferPool *block.BufferPool,
467+
readHandle objstorage.ReadHandle,
468+
filters map[string]FilterPolicy,
469+
) error {
470+
var meta map[string]block.Handle
471+
var err error
472+
meta, r.valueBIH, err = r.readAndDecodeMetaindex(ctx, bufferPool, readHandle)
462473
if err != nil {
463474
return err
464475
}
@@ -577,10 +588,21 @@ func (r *Reader) Layout() (*Layout, error) {
577588
Format: r.tableFormat,
578589
BlobReferenceIndex: r.blobRefIndexBH,
579590
}
580-
if r.filterBH.Length > 0 {
581-
l.Filter = []NamedBlockHandle{{Name: "fullfilter." + r.tableFilter.policy.Name(), Handle: r.filterBH}}
582-
}
591+
592+
bufferPool := metaBufferPools.Get().(*block.BufferPool)
593+
defer metaBufferPools.Put(bufferPool)
594+
defer bufferPool.Release()
595+
583596
ctx := context.TODO()
597+
meta, _, err := r.readAndDecodeMetaindex(ctx, bufferPool, noReadHandle)
598+
if err != nil {
599+
return nil, err
600+
}
601+
for name, bh := range meta {
602+
if strings.HasPrefix(name, "fullfilter.") {
603+
l.Filter = append(l.Filter, NamedBlockHandle{Name: name, Handle: bh})
604+
}
605+
}
584606

585607
indexH, err := r.readTopLevelIndexBlock(ctx, block.NoReadEnv, noReadHandle)
586608
if err != nil {
@@ -950,7 +972,7 @@ func NewReader(ctx context.Context, f objstorage.Readable, o ReaderOptions) (*Re
950972
// allocator.
951973
defer bufferPool.Release()
952974

953-
if err := r.readMetaindex(ctx, bufferPool, rh, o.Filters); err != nil {
975+
if err := r.initMetaindexBlocks(ctx, bufferPool, rh, o.Filters); err != nil {
954976
r.err = err
955977
return nil, err
956978
}

sstable/test_utils.go

Lines changed: 21 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -261,8 +261,27 @@ func ParseWriterOptions[StringOrStringer any](o *WriterOptions, args ...StringOr
261261
o.IndexBlockSize, err = strconv.Atoi(value)
262262

263263
case "filter":
264-
o.FilterPolicy = bloom.FilterPolicy(10)
265-
264+
fields := strings.FieldsFunc(value, func(r rune) bool {
265+
return r == '(' || r == ')'
266+
})
267+
if len(fields) == 0 {
268+
o.FilterPolicy = bloom.FilterPolicy(10)
269+
continue
270+
} else if len(fields) != 2 {
271+
return errors.Errorf("expected filter policy name and parameters, got %q", value)
272+
}
273+
switch fields[0] {
274+
case "bloom":
275+
bits, err := strconv.Atoi(fields[1])
276+
if err != nil {
277+
return errors.Wrapf(err, "parsing bloom filter bits")
278+
}
279+
o.FilterPolicy = bloom.FilterPolicy(bits)
280+
case "none":
281+
o.FilterPolicy = nil
282+
default:
283+
return errors.Errorf("unknown filter policy: %q", value)
284+
}
266285
case "comparer":
267286
o.Comparer, err = comparerFromCmdArg(value)
268287

0 commit comments

Comments
 (0)