Skip to content

Commit

Permalink
fix(blooms): dont break iterator conventions (#12808)
Browse files Browse the repository at this point in the history
Followup to #12806 which exposes skipped pages more explicitly than as an error.

* refactors skip logic for bloom pages that are too large
* s/Seek/LoadOffset/ for LazyBloomIter
* removes unused code
  • Loading branch information
owen-d committed Apr 30, 2024
1 parent af5be90 commit 1665e85
Show file tree
Hide file tree
Showing 5 changed files with 67 additions and 123 deletions.
79 changes: 5 additions & 74 deletions pkg/storage/bloom/v1/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,14 +144,12 @@ func (bq *BlockQuerier) Seek(fp model.Fingerprint) error {
func (bq *BlockQuerier) Next() bool {
for bq.series.Next() {
series := bq.series.At()
bq.blooms.Seek(series.Offset)
if skip := bq.blooms.LoadOffset(series.Offset); skip {
// can't seek to the desired bloom, likely because the page was too large to load
// so we skip this series and move on to the next
continue
}
if !bq.blooms.Next() {
// skip blocks that are too large
if errors.Is(bq.blooms.Err(), ErrPageTooLarge) {
// fmt.Printf("skipping bloom page: %s (%d)\n", series.Fingerprint, series.Chunks.Len())
bq.blooms.err = nil
continue
}
return false
}
bloom := bq.blooms.At()
Expand All @@ -175,70 +173,3 @@ func (bq *BlockQuerier) Err() error {

return bq.blooms.Err()
}

// CheckChunksForSeries checks if the given chunks pass a set of searches in the given bloom block.
// It returns the list of chunks which will need to be downloaded for a query based on the initial list
// passed as the `chks` argument. Chunks will be removed from the result set if they are indexed in the bloom
// and fail to pass all the searches.
func (bq *BlockQuerier) CheckChunksForSeries(fp model.Fingerprint, chks ChunkRefs, searches [][]byte) (ChunkRefs, error) {
schema, err := bq.Schema()
if err != nil {
return chks, fmt.Errorf("getting schema: %w", err)
}

if err := bq.Seek(fp); err != nil {
return chks, errors.Wrapf(err, "seeking to series for fp: %v", fp)
}

if !bq.series.Next() {
return chks, nil
}

series := bq.series.At()
if series.Fingerprint != fp {
return chks, nil
}

bq.blooms.Seek(series.Offset)
if !bq.blooms.Next() {
return chks, fmt.Errorf("seeking to bloom for fp: %v", fp)
}

bloom := bq.blooms.At()

// First, see if the search passes the series level bloom before checking for chunks individually
for _, search := range searches {
if !bloom.Test(search) {
// the entire series bloom didn't pass one of the searches,
// so we can skip checking chunks individually.
// We still return all chunks that are not included in the bloom
// as they may still have the data
return chks.Unless(series.Chunks), nil
}
}

// TODO(salvacorts): pool tokenBuf
var tokenBuf []byte
var prefixLen int

// Check chunks individually now
mustCheck, inBlooms := chks.Compare(series.Chunks, true)

outer:
for _, chk := range inBlooms {
// Get buf to concatenate the chunk and search token
tokenBuf, prefixLen = prefixedToken(schema.NGramLen(), chk, tokenBuf)
for _, search := range searches {
tokenBuf = append(tokenBuf[:prefixLen], search...)

if !bloom.Test(tokenBuf) {
// chunk didn't pass the search, continue to the next chunk
continue outer
}
}
// chunk passed all searches, add to the list of chunks to download
mustCheck = append(mustCheck, chk)

}
return mustCheck, nil
}
16 changes: 9 additions & 7 deletions pkg/storage/bloom/v1/bloom.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ import (
// gateways to OOM.
// Figure out a decent default maximum page size that we can process.
var DefaultMaxPageSize = 64 << 20 // 64MB
var ErrPageTooLarge = errors.Errorf("bloom page too large")

type Bloom struct {
filter.ScalableBloomFilter
Expand Down Expand Up @@ -275,11 +274,14 @@ func (b *BloomBlock) DecodeHeaders(r io.ReadSeeker) (uint32, error) {
return checksum, nil
}

func (b *BloomBlock) BloomPageDecoder(r io.ReadSeeker, pageIdx int, maxPageSize int, metrics *Metrics) (res *BloomPageDecoder, err error) {
// BloomPageDecoder returns a decoder for the given page index.
// It may skip the page if it's too large.
// NB(owen-d): if `skip` is true, err _must_ be nil.
func (b *BloomBlock) BloomPageDecoder(r io.ReadSeeker, pageIdx int, maxPageSize int, metrics *Metrics) (res *BloomPageDecoder, skip bool, err error) {
if pageIdx < 0 || pageIdx >= len(b.pageHeaders) {
metrics.pagesSkipped.WithLabelValues(pageTypeBloom, skipReasonOOB).Inc()
metrics.bytesSkipped.WithLabelValues(pageTypeBloom, skipReasonOOB).Add(float64(b.pageHeaders[pageIdx].DecompressedLen))
return nil, fmt.Errorf("invalid page (%d) for bloom page decoding", pageIdx)
return nil, false, fmt.Errorf("invalid page (%d) for bloom page decoding", pageIdx)
}

page := b.pageHeaders[pageIdx]
Expand All @@ -288,13 +290,13 @@ func (b *BloomBlock) BloomPageDecoder(r io.ReadSeeker, pageIdx int, maxPageSize
if page.Len > maxPageSize {
metrics.pagesSkipped.WithLabelValues(pageTypeBloom, skipReasonTooLarge).Inc()
metrics.bytesSkipped.WithLabelValues(pageTypeBloom, skipReasonTooLarge).Add(float64(page.DecompressedLen))
return nil, ErrPageTooLarge
return nil, true, nil
}

if _, err = r.Seek(int64(page.Offset), io.SeekStart); err != nil {
metrics.pagesSkipped.WithLabelValues(pageTypeBloom, skipReasonErr).Inc()
metrics.bytesSkipped.WithLabelValues(pageTypeBloom, skipReasonErr).Add(float64(page.DecompressedLen))
return nil, errors.Wrap(err, "seeking to bloom page")
return nil, false, errors.Wrap(err, "seeking to bloom page")
}

if b.schema.encoding == chunkenc.EncNone {
Expand All @@ -306,10 +308,10 @@ func (b *BloomBlock) BloomPageDecoder(r io.ReadSeeker, pageIdx int, maxPageSize
if err != nil {
metrics.pagesSkipped.WithLabelValues(pageTypeBloom, skipReasonErr).Inc()
metrics.bytesSkipped.WithLabelValues(pageTypeBloom, skipReasonErr).Add(float64(page.DecompressedLen))
return nil, errors.Wrap(err, "decoding bloom page")
return nil, false, errors.Wrap(err, "decoding bloom page")
}

metrics.pagesRead.WithLabelValues(pageTypeBloom).Inc()
metrics.bytesRead.WithLabelValues(pageTypeBloom).Add(float64(page.DecompressedLen))
return res, nil
return res, false, nil
}
30 changes: 19 additions & 11 deletions pkg/storage/bloom/v1/bloom_querier.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,14 +42,11 @@ func (it *LazyBloomIter) ensureInit() {
}
}

func (it *LazyBloomIter) Seek(offset BloomOffset) {
// LoadOffset returns whether the bloom page at the given offset should
// be skipped (due to being too large) _and_ there's no error
func (it *LazyBloomIter) LoadOffset(offset BloomOffset) (skip bool) {
it.ensureInit()

// reset error from any previous seek/next that yield pages too large
if errors.Is(it.err, ErrPageTooLarge) {
it.err = nil
}

// if we need a different page or the current page hasn't been loaded,
// load the desired page
if it.curPageIndex != offset.Page || it.curPage == nil {
Expand All @@ -63,12 +60,16 @@ func (it *LazyBloomIter) Seek(offset BloomOffset) {
r, err := it.b.reader.Blooms()
if err != nil {
it.err = errors.Wrap(err, "getting blooms reader")
return
return false
}
decoder, err := it.b.blooms.BloomPageDecoder(r, offset.Page, it.m, it.b.metrics)
decoder, skip, err := it.b.blooms.BloomPageDecoder(r, offset.Page, it.m, it.b.metrics)
if err != nil {
it.err = errors.Wrap(err, "loading bloom page")
return
return false
}

if skip {
return true
}

it.curPageIndex = offset.Page
Expand All @@ -77,6 +78,7 @@ func (it *LazyBloomIter) Seek(offset BloomOffset) {
}

it.curPage.Seek(offset.ByteOffset)
return false
}

func (it *LazyBloomIter) Next() bool {
Expand All @@ -101,17 +103,23 @@ func (it *LazyBloomIter) next() bool {
return false
}

it.curPage, err = it.b.blooms.BloomPageDecoder(
var skip bool
it.curPage, skip, err = it.b.blooms.BloomPageDecoder(
r,
it.curPageIndex,
it.m,
it.b.metrics,
)
// this page wasn't skipped & produced an error, return
if err != nil {
it.err = err
return false
}
continue
if skip {
// this page was skipped; check the next
it.curPageIndex++
continue
}
}

if !it.curPage.Next() {
Expand Down
32 changes: 19 additions & 13 deletions pkg/storage/bloom/v1/fuse.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,15 @@ func NewFusedQuerier(bq *BlockQuerier, inputs []PeekingIterator[Request], logger
}
}

func (fq *FusedQuerier) noRemovals(batch []Request, fp model.Fingerprint) {
for _, input := range batch {
input.Response <- Output{
Fp: fp,
Removals: nil,
}
}
}

func (fq *FusedQuerier) Run() error {
schema, err := fq.bq.Schema()
if err != nil {
Expand All @@ -85,26 +94,23 @@ func (fq *FusedQuerier) Run() error {
if series.Fingerprint != fp {
// fingerprint not found, can't remove chunks
level.Debug(fq.logger).Log("msg", "fingerprint not found", "fp", series.Fingerprint, "err", fq.bq.series.Err())
for _, input := range nextBatch {
input.Response <- Output{
Fp: fp,
Removals: nil,
}
}
fq.noRemovals(nextBatch, fp)
continue
}

// Now that we've found the series, we need to find the unpack the bloom
fq.bq.blooms.Seek(series.Offset)
skip := fq.bq.blooms.LoadOffset(series.Offset)
if skip {
// could not seek to the desired bloom,
// likely because the page was too large to load
fq.noRemovals(nextBatch, fp)
continue
}

if !fq.bq.blooms.Next() {
// fingerprint not found, can't remove chunks
level.Debug(fq.logger).Log("msg", "fingerprint not found", "fp", series.Fingerprint, "err", fq.bq.blooms.Err())
for _, input := range nextBatch {
input.Response <- Output{
Fp: fp,
Removals: nil,
}
}
fq.noRemovals(nextBatch, fp)
continue
}

Expand Down
33 changes: 15 additions & 18 deletions pkg/storage/bloom/v1/fuse_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,10 @@ func TestLazyBloomIter_Seek_ResetError(t *testing.T) {
writer := NewMemoryBlockWriter(indexBuf, bloomsBuf)
reader := NewByteReader(indexBuf, bloomsBuf)

largeSeries := func(i int) bool {
return i%2 == 0
}

numSeries := 4
data := make([]SeriesWithBloom, 0, numSeries)
tokenizer := NewNGramTokenizer(4, 0)
Expand All @@ -170,8 +174,10 @@ func TestLazyBloomIter_Seek_ResetError(t *testing.T) {
bloom.ScalableBloomFilter = *filter.NewScalableBloomFilter(1024, 0.01, 0.8)

nLines := 10
if i == 0 || i == 2 {
// Add enough lines to make the bloom page too large for series 1
// all even series will have a larger bloom (more than 1 filter)
if largeSeries(i) {
// Add enough lines to make the bloom page too large and
// trigger another filter addition
nLines = 10000
}

Expand Down Expand Up @@ -218,14 +224,15 @@ func TestLazyBloomIter_Seek_ResetError(t *testing.T) {
series := querier.series.At()
require.Equal(t, fp, series.Fingerprint)

querier.blooms.Seek(series.Offset)

if fp == 0 || fp == 2 {
require.False(t, querier.blooms.Next())
require.Error(t, querier.blooms.Err())
seekable := true
if largeSeries(int(fp)) {
seekable = false
}
if !seekable {
require.True(t, querier.blooms.LoadOffset(series.Offset))
continue
}

require.False(t, querier.blooms.LoadOffset(series.Offset))
require.True(t, querier.blooms.Next())
require.NoError(t, querier.blooms.Err())
}
Expand Down Expand Up @@ -293,16 +300,6 @@ func BenchmarkBlockQuerying(b *testing.B) {
// benchmark
b.StartTimer()

b.Run("single-pass", func(b *testing.B) {
for i := 0; i < b.N; i++ {
for _, chain := range requestChains {
for _, req := range chain {
_, _ = querier.CheckChunksForSeries(req.Fp, req.Chks, nil)
}
}
}

})
b.Run("fused", func(b *testing.B) {
// spin up some goroutines to consume the responses so they don't block
go func() {
Expand Down

0 comments on commit 1665e85

Please sign in to comment.