Skip to content

Commit 4676ca9

Browse files
author
Ibrahim Jarif
authored
Add support for caching bloomfilters (#1204)
This PR adds support for caching bloom filters in ristretto. The bloom filters and blocks are removed from the cache when the table is deleted.
1 parent c3333a5 commit 4676ca9

File tree

3 files changed

+86
-20
lines changed

3 files changed

+86
-20
lines changed

table/builder_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@ func TestTableIndex(t *testing.T) {
5757
keysCount := 10000
5858
for _, opt := range opts {
5959
builder := NewTableBuilder(opt)
60-
filename := fmt.Sprintf("%s%c%d.sst", os.TempDir(), os.PathSeparator, rand.Int63())
60+
filename := fmt.Sprintf("%s%c%d.sst", os.TempDir(), os.PathSeparator, rand.Uint32())
6161
f, err := y.OpenSyncedFile(filename, true)
6262
require.NoError(t, err)
6363

table/table.go

Lines changed: 83 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ package table
1818

1919
import (
2020
"crypto/aes"
21+
"encoding/binary"
2122
"fmt"
2223
"io"
2324
"math"
@@ -81,7 +82,7 @@ type TableInterface interface {
8182
DoesNotHave(hash uint64) bool
8283
}
8384

84-
// Table represents a loaded table file with the info we have about it
85+
// Table represents a loaded table file with the info we have about it.
8586
type Table struct {
8687
sync.Mutex
8788

@@ -97,10 +98,11 @@ type Table struct {
9798
smallest, biggest []byte // Smallest and largest keys (with timestamps).
9899
id uint64 // file id, part of filename
99100

100-
bf *z.Bloom
101101
Checksum []byte
102102
// Stores the total size of key-values stored in this table (including the size on vlog).
103103
estimatedSize uint64
104+
indexStart int
105+
indexLen int
104106

105107
IsInmemory bool // Set to true if the table is on level 0 and opened in memory.
106108
opt *Options
@@ -146,6 +148,13 @@ func (t *Table) DecrRef() error {
146148
if err := os.Remove(filename); err != nil {
147149
return err
148150
}
151+
// Delete all blocks from the cache.
152+
for i := range t.blockIndex {
153+
t.opt.Cache.Del(t.blockCacheKey(i))
154+
}
155+
// Delete bloom filter from the cache.
156+
t.opt.Cache.Del(t.bfCacheKey())
157+
149158
}
150159
return nil
151160
}
@@ -336,10 +345,12 @@ func (t *Table) readIndex() error {
336345
// Read index size from the footer.
337346
readPos -= 4
338347
buf = t.readNoFail(readPos, 4)
339-
indexLen := int(y.BytesToU32(buf))
348+
t.indexLen = int(y.BytesToU32(buf))
349+
340350
// Read index.
341-
readPos -= indexLen
342-
data := t.readNoFail(readPos, indexLen)
351+
readPos -= t.indexLen
352+
t.indexStart = readPos
353+
data := t.readNoFail(readPos, t.indexLen)
343354

344355
if err := y.VerifyChecksum(data, expectedChk); err != nil {
345356
return y.Wrapf(err, "failed to verify checksum for table: %s", t.Filename())
@@ -358,11 +369,18 @@ func (t *Table) readIndex() error {
358369
y.Check(err)
359370

360371
t.estimatedSize = index.EstimatedSize
361-
if t.bf, err = z.JSONUnmarshal(index.BloomFilter); err != nil {
362-
return y.Wrapf(err, "failed to unmarshal bloom filter for the table %d in Table.readIndex",
363-
t.id)
364-
}
365372
t.blockIndex = index.Offsets
373+
374+
// Avoid the cost of unmarshalling the bloom filters if the cache is absent.
375+
if t.opt.Cache != nil {
376+
var bf *z.Bloom
377+
if bf, err = z.JSONUnmarshal(index.BloomFilter); err != nil {
378+
return y.Wrapf(err, "failed to unmarshal bloom filter for the table %d in Table.readIndex",
379+
t.id)
380+
}
381+
382+
t.opt.Cache.Set(t.bfCacheKey(), bf, int64(len(index.BloomFilter)))
383+
}
366384
return nil
367385
}
368386

@@ -443,10 +461,25 @@ func (t *Table) block(idx int) (*block, error) {
443461
return blk, nil
444462
}
445463

446-
func (t *Table) blockCacheKey(idx int) uint64 {
447-
y.AssertTrue(t.ID() < math.MaxUint32)
464+
// bfCacheKey returns the cache key for bloom filter.
465+
func (t *Table) bfCacheKey() []byte {
466+
y.AssertTrue(t.id < math.MaxUint32)
467+
buf := make([]byte, 4)
468+
binary.BigEndian.PutUint32(buf, uint32(t.id))
469+
470+
// Without the "bf" prefix, we will have conflict with the blockCacheKey.
471+
return append([]byte("bf"), buf...)
472+
}
473+
474+
func (t *Table) blockCacheKey(idx int) []byte {
475+
y.AssertTrue(t.id < math.MaxUint32)
448476
y.AssertTrue(uint32(idx) < math.MaxUint32)
449-
return (t.ID() << 32) | uint64(idx)
477+
478+
buf := make([]byte, 8)
479+
// Assume t.ID does not overflow uint32.
480+
binary.BigEndian.PutUint32(buf[:4], uint32(t.ID()))
481+
binary.BigEndian.PutUint32(buf[4:], uint32(idx))
482+
return buf
450483
}
451484

452485
// EstimatedSize returns the total size of key-values stored in this table (including the
@@ -470,7 +503,44 @@ func (t *Table) ID() uint64 { return t.id }
470503

471504
// DoesNotHave returns true if (but not "only if") the table does not have the key hash.
472505
// It does a bloom filter lookup.
473-
func (t *Table) DoesNotHave(hash uint64) bool { return !t.bf.Has(hash) }
506+
func (t *Table) DoesNotHave(hash uint64) bool {
507+
var bf *z.Bloom
508+
509+
// Return fast if cache is absent.
510+
if t.opt.Cache == nil {
511+
bf, _ := t.readBloomFilter()
512+
return !bf.Has(hash)
513+
}
514+
515+
// Check if the bloomfilter exists in the cache.
516+
if b, ok := t.opt.Cache.Get(t.bfCacheKey()); b != nil && ok {
517+
bf = b.(*z.Bloom)
518+
return !bf.Has(hash)
519+
}
520+
521+
bf, sz := t.readBloomFilter()
522+
t.opt.Cache.Set(t.bfCacheKey(), bf, int64(sz))
523+
return !bf.Has(hash)
524+
}
525+
526+
// readBloomFilter reads the bloom filter from the SST and returns its length
527+
// along with the bloom filter.
528+
func (t *Table) readBloomFilter() (*z.Bloom, int) {
529+
// Read bloom filter from the SST.
530+
data := t.readNoFail(t.indexStart, t.indexLen)
531+
index := pb.TableIndex{}
532+
var err error
533+
// Decrypt the table index if it is encrypted.
534+
if t.shouldDecrypt() {
535+
data, err = t.decrypt(data)
536+
y.Check(err)
537+
}
538+
y.Check(proto.Unmarshal(data, &index))
539+
540+
bf, err := z.JSONUnmarshal(index.BloomFilter)
541+
y.Check(err)
542+
return bf, len(index.BloomFilter)
543+
}
474544

475545
// VerifyChecksum verifies checksum for all blocks of table. This function is called by
476546
// OpenTable() function. This function is also called inside levelsController.VerifyChecksum().

table/table_test.go

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -77,13 +77,9 @@ func buildTable(t *testing.T, keyValues [][]string, opts Options) *os.File {
7777
defer b.Close()
7878
// TODO: Add test for file garbage collection here. No files should be left after the tests here.
7979

80-
filename := fmt.Sprintf("%s%s%d.sst", os.TempDir(), string(os.PathSeparator), rand.Int63())
80+
filename := fmt.Sprintf("%s%s%d.sst", os.TempDir(), string(os.PathSeparator), rand.Uint32())
8181
f, err := y.CreateSyncedFile(filename, true)
82-
if t != nil {
83-
require.NoError(t, err)
84-
} else {
85-
y.Check(err)
86-
}
82+
require.NoError(t, err)
8783

8884
sort.Slice(keyValues, func(i, j int) bool {
8985
return keyValues[i][0] < keyValues[j][0]

0 commit comments

Comments
 (0)