Skip to content

Commit

Permalink
only skip bloom pages when there is no error
Browse files Browse the repository at this point in the history
Signed-off-by: Owen Diehl <ow.diehl@gmail.com>
  • Loading branch information
owen-d committed Apr 29, 2024
1 parent 3ca36fa commit 5575682
Show file tree
Hide file tree
Showing 5 changed files with 20 additions and 17 deletions.
2 changes: 1 addition & 1 deletion pkg/storage/bloom/v1/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ func (bq *BlockQuerier) Seek(fp model.Fingerprint) error {
func (bq *BlockQuerier) Next() bool {
for bq.series.Next() {
series := bq.series.At()
if ok := bq.blooms.LoadOffset(series.Offset); !ok {
if skip := bq.blooms.LoadOffset(series.Offset); skip {
// can't seek to the desired bloom, likely because the page was too large to load
// so we skip this series and move on to the next
continue
Expand Down
3 changes: 1 addition & 2 deletions pkg/storage/bloom/v1/bloom.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ import (
// gateways to OOM.
// Figure out a decent default maximum page size that we can process.
var DefaultMaxPageSize = 64 << 20 // 64MB
var ErrPageTooLarge = errors.Errorf("bloom page too large")

type Bloom struct {
filter.ScalableBloomFilter
Expand Down Expand Up @@ -290,7 +289,7 @@ func (b *BloomBlock) BloomPageDecoder(r io.ReadSeeker, pageIdx int, maxPageSize
if page.Len > maxPageSize {
metrics.pagesSkipped.WithLabelValues(pageTypeBloom, skipReasonTooLarge).Inc()
metrics.bytesSkipped.WithLabelValues(pageTypeBloom, skipReasonTooLarge).Add(float64(page.DecompressedLen))
return nil, true, ErrPageTooLarge
return nil, true, nil
}

if _, err = r.Seek(int64(page.Offset), io.SeekStart); err != nil {
Expand Down
23 changes: 13 additions & 10 deletions pkg/storage/bloom/v1/bloom_querier.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,9 @@ func (it *LazyBloomIter) ensureInit() {
}
}

func (it *LazyBloomIter) LoadOffset(offset BloomOffset) (ok bool) {
// LoadOffset returns whether the bloom page at the given offset should
// be skipped (due to being too large) _and_ there's no error
func (it *LazyBloomIter) LoadOffset(offset BloomOffset) (skip bool) {
it.ensureInit()

// if we need a different page or the current page hasn't been loaded,
Expand All @@ -61,21 +63,22 @@ func (it *LazyBloomIter) LoadOffset(offset BloomOffset) (ok bool) {
return false
}
decoder, skip, err := it.b.blooms.BloomPageDecoder(r, offset.Page, it.m, it.b.metrics)
if skip {
return false
}
if err != nil {
it.err = errors.Wrap(err, "loading bloom page")
return false
}

if skip {
return true
}

it.curPageIndex = offset.Page
it.curPage = decoder

}

it.curPage.Seek(offset.ByteOffset)
return true
return false
}

func (it *LazyBloomIter) Next() bool {
Expand Down Expand Up @@ -107,16 +110,16 @@ func (it *LazyBloomIter) next() bool {
it.m,
it.b.metrics,
)
if skip {
// this page was skipped; check the next
it.curPageIndex++
continue
}
// this page wasn't skipped & produced an error, return
if err != nil {
it.err = err
return false
}
if skip {
// this page was skipped; check the next
it.curPageIndex++
continue
}
}

if !it.curPage.Next() {
Expand Down
5 changes: 3 additions & 2 deletions pkg/storage/bloom/v1/fuse.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,11 +99,12 @@ func (fq *FusedQuerier) Run() error {
}

// Now that we've found the series, we need to find the unpack the bloom
ok := fq.bq.blooms.LoadOffset(series.Offset)
if !ok {
skip := fq.bq.blooms.LoadOffset(series.Offset)
if skip {
// could not seek to the desired bloom,
// likely because the page was too large to load
fq.noRemovals(nextBatch, fp)
continue
}

if !fq.bq.blooms.Next() {
Expand Down
4 changes: 2 additions & 2 deletions pkg/storage/bloom/v1/fuse_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -229,10 +229,10 @@ func TestLazyBloomIter_Seek_ResetError(t *testing.T) {
seekable = false
}
if !seekable {
require.False(t, querier.blooms.LoadOffset(series.Offset))
require.True(t, querier.blooms.LoadOffset(series.Offset))
continue
}
require.True(t, querier.blooms.LoadOffset(series.Offset))
require.False(t, querier.blooms.LoadOffset(series.Offset))
require.True(t, querier.blooms.Next())
require.NoError(t, querier.blooms.Err())
}
Expand Down

0 comments on commit 5575682

Please sign in to comment.