Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(blooms): dont break iterator conventions #12808

Merged
merged 2 commits into from
Apr 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
79 changes: 5 additions & 74 deletions pkg/storage/bloom/v1/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,14 +144,12 @@ func (bq *BlockQuerier) Seek(fp model.Fingerprint) error {
func (bq *BlockQuerier) Next() bool {
for bq.series.Next() {
series := bq.series.At()
bq.blooms.Seek(series.Offset)
if skip := bq.blooms.LoadOffset(series.Offset); skip {
// can't seek to the desired bloom, likely because the page was too large to load
// so we skip this series and move on to the next
continue
}
if !bq.blooms.Next() {
// skip blocks that are too large
if errors.Is(bq.blooms.Err(), ErrPageTooLarge) {
// fmt.Printf("skipping bloom page: %s (%d)\n", series.Fingerprint, series.Chunks.Len())
bq.blooms.err = nil
continue
}
return false
}
bloom := bq.blooms.At()
Expand All @@ -175,70 +173,3 @@ func (bq *BlockQuerier) Err() error {

return bq.blooms.Err()
}

// CheckChunksForSeries checks if the given chunks pass a set of searches in the given bloom block.
// It returns the list of chunks which will need to be downloaded for a query based on the initial list
// passed as the `chks` argument. Chunks will be removed from the result set if they are indexed in the bloom
// and fail to pass all the searches.
func (bq *BlockQuerier) CheckChunksForSeries(fp model.Fingerprint, chks ChunkRefs, searches [][]byte) (ChunkRefs, error) {
schema, err := bq.Schema()
if err != nil {
return chks, fmt.Errorf("getting schema: %w", err)
}

if err := bq.Seek(fp); err != nil {
return chks, errors.Wrapf(err, "seeking to series for fp: %v", fp)
}

if !bq.series.Next() {
return chks, nil
}

series := bq.series.At()
if series.Fingerprint != fp {
return chks, nil
}

bq.blooms.Seek(series.Offset)
if !bq.blooms.Next() {
return chks, fmt.Errorf("seeking to bloom for fp: %v", fp)
}

bloom := bq.blooms.At()

// First, see if the search passes the series level bloom before checking for chunks individually
for _, search := range searches {
if !bloom.Test(search) {
// the entire series bloom didn't pass one of the searches,
// so we can skip checking chunks individually.
// We still return all chunks that are not included in the bloom
// as they may still have the data
return chks.Unless(series.Chunks), nil
}
}

// TODO(salvacorts): pool tokenBuf
var tokenBuf []byte
var prefixLen int

// Check chunks individually now
mustCheck, inBlooms := chks.Compare(series.Chunks, true)

outer:
for _, chk := range inBlooms {
// Get buf to concatenate the chunk and search token
tokenBuf, prefixLen = prefixedToken(schema.NGramLen(), chk, tokenBuf)
for _, search := range searches {
tokenBuf = append(tokenBuf[:prefixLen], search...)

if !bloom.Test(tokenBuf) {
// chunk didn't pass the search, continue to the next chunk
continue outer
}
}
// chunk passed all searches, add to the list of chunks to download
mustCheck = append(mustCheck, chk)

}
return mustCheck, nil
}
16 changes: 9 additions & 7 deletions pkg/storage/bloom/v1/bloom.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ import (
// gateways to OOM.
// Figure out a decent default maximum page size that we can process.
var DefaultMaxPageSize = 64 << 20 // 64MB
var ErrPageTooLarge = errors.Errorf("bloom page too large")

type Bloom struct {
filter.ScalableBloomFilter
Expand Down Expand Up @@ -275,11 +274,14 @@ func (b *BloomBlock) DecodeHeaders(r io.ReadSeeker) (uint32, error) {
return checksum, nil
}

func (b *BloomBlock) BloomPageDecoder(r io.ReadSeeker, pageIdx int, maxPageSize int, metrics *Metrics) (res *BloomPageDecoder, err error) {
// BloomPageDecoder returns a decoder for the given page index.
// It may skip the page if it's too large.
// NB(owen-d): if `skip` is true, err _must_ be nil.
func (b *BloomBlock) BloomPageDecoder(r io.ReadSeeker, pageIdx int, maxPageSize int, metrics *Metrics) (res *BloomPageDecoder, skip bool, err error) {
if pageIdx < 0 || pageIdx >= len(b.pageHeaders) {
metrics.pagesSkipped.WithLabelValues(pageTypeBloom, skipReasonOOB).Inc()
metrics.bytesSkipped.WithLabelValues(pageTypeBloom, skipReasonOOB).Add(float64(b.pageHeaders[pageIdx].DecompressedLen))
return nil, fmt.Errorf("invalid page (%d) for bloom page decoding", pageIdx)
return nil, false, fmt.Errorf("invalid page (%d) for bloom page decoding", pageIdx)
}

page := b.pageHeaders[pageIdx]
Expand All @@ -288,13 +290,13 @@ func (b *BloomBlock) BloomPageDecoder(r io.ReadSeeker, pageIdx int, maxPageSize
if page.Len > maxPageSize {
metrics.pagesSkipped.WithLabelValues(pageTypeBloom, skipReasonTooLarge).Inc()
metrics.bytesSkipped.WithLabelValues(pageTypeBloom, skipReasonTooLarge).Add(float64(page.DecompressedLen))
return nil, ErrPageTooLarge
return nil, true, nil
}

if _, err = r.Seek(int64(page.Offset), io.SeekStart); err != nil {
metrics.pagesSkipped.WithLabelValues(pageTypeBloom, skipReasonErr).Inc()
metrics.bytesSkipped.WithLabelValues(pageTypeBloom, skipReasonErr).Add(float64(page.DecompressedLen))
return nil, errors.Wrap(err, "seeking to bloom page")
return nil, false, errors.Wrap(err, "seeking to bloom page")
}

if b.schema.encoding == chunkenc.EncNone {
Expand All @@ -306,10 +308,10 @@ func (b *BloomBlock) BloomPageDecoder(r io.ReadSeeker, pageIdx int, maxPageSize
if err != nil {
metrics.pagesSkipped.WithLabelValues(pageTypeBloom, skipReasonErr).Inc()
metrics.bytesSkipped.WithLabelValues(pageTypeBloom, skipReasonErr).Add(float64(page.DecompressedLen))
return nil, errors.Wrap(err, "decoding bloom page")
return nil, false, errors.Wrap(err, "decoding bloom page")
}

metrics.pagesRead.WithLabelValues(pageTypeBloom).Inc()
metrics.bytesRead.WithLabelValues(pageTypeBloom).Add(float64(page.DecompressedLen))
return res, nil
return res, false, nil
}
30 changes: 19 additions & 11 deletions pkg/storage/bloom/v1/bloom_querier.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,14 +42,11 @@ func (it *LazyBloomIter) ensureInit() {
}
}

func (it *LazyBloomIter) Seek(offset BloomOffset) {
// LoadOffset returns whether the bloom page at the given offset should
// be skipped (due to being too large) _and_ there's no error
func (it *LazyBloomIter) LoadOffset(offset BloomOffset) (skip bool) {
it.ensureInit()

// reset error from any previous seek/next that yield pages too large
if errors.Is(it.err, ErrPageTooLarge) {
it.err = nil
}

// if we need a different page or the current page hasn't been loaded,
// load the desired page
if it.curPageIndex != offset.Page || it.curPage == nil {
Expand All @@ -63,12 +60,16 @@ func (it *LazyBloomIter) Seek(offset BloomOffset) {
r, err := it.b.reader.Blooms()
if err != nil {
it.err = errors.Wrap(err, "getting blooms reader")
return
return false
}
decoder, err := it.b.blooms.BloomPageDecoder(r, offset.Page, it.m, it.b.metrics)
decoder, skip, err := it.b.blooms.BloomPageDecoder(r, offset.Page, it.m, it.b.metrics)
if err != nil {
it.err = errors.Wrap(err, "loading bloom page")
return
return false
}

if skip {
return true
}

it.curPageIndex = offset.Page
Expand All @@ -77,6 +78,7 @@ func (it *LazyBloomIter) Seek(offset BloomOffset) {
}

it.curPage.Seek(offset.ByteOffset)
return false
}

func (it *LazyBloomIter) Next() bool {
Expand All @@ -101,17 +103,23 @@ func (it *LazyBloomIter) next() bool {
return false
}

it.curPage, err = it.b.blooms.BloomPageDecoder(
var skip bool
it.curPage, skip, err = it.b.blooms.BloomPageDecoder(
r,
it.curPageIndex,
it.m,
it.b.metrics,
)
// this page wasn't skipped & produced an error, return
if err != nil {
it.err = err
return false
}
continue
if skip {
// this page was skipped; check the next
it.curPageIndex++
continue
}
}

if !it.curPage.Next() {
Expand Down
32 changes: 19 additions & 13 deletions pkg/storage/bloom/v1/fuse.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,15 @@ func NewFusedQuerier(bq *BlockQuerier, inputs []PeekingIterator[Request], logger
}
}

func (fq *FusedQuerier) noRemovals(batch []Request, fp model.Fingerprint) {
for _, input := range batch {
input.Response <- Output{
Fp: fp,
Removals: nil,
}
}
}

func (fq *FusedQuerier) Run() error {
schema, err := fq.bq.Schema()
if err != nil {
Expand All @@ -85,26 +94,23 @@ func (fq *FusedQuerier) Run() error {
if series.Fingerprint != fp {
// fingerprint not found, can't remove chunks
level.Debug(fq.logger).Log("msg", "fingerprint not found", "fp", series.Fingerprint, "err", fq.bq.series.Err())
for _, input := range nextBatch {
input.Response <- Output{
Fp: fp,
Removals: nil,
}
}
fq.noRemovals(nextBatch, fp)
continue
}

// Now that we've found the series, we need to find the unpack the bloom
fq.bq.blooms.Seek(series.Offset)
skip := fq.bq.blooms.LoadOffset(series.Offset)
if skip {
// could not seek to the desired bloom,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as previous comment

// likely because the page was too large to load
fq.noRemovals(nextBatch, fp)
continue
}

if !fq.bq.blooms.Next() {
// fingerprint not found, can't remove chunks
level.Debug(fq.logger).Log("msg", "fingerprint not found", "fp", series.Fingerprint, "err", fq.bq.blooms.Err())
for _, input := range nextBatch {
input.Response <- Output{
Fp: fp,
Removals: nil,
}
}
fq.noRemovals(nextBatch, fp)
continue
}

Expand Down
33 changes: 15 additions & 18 deletions pkg/storage/bloom/v1/fuse_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,10 @@ func TestLazyBloomIter_Seek_ResetError(t *testing.T) {
writer := NewMemoryBlockWriter(indexBuf, bloomsBuf)
reader := NewByteReader(indexBuf, bloomsBuf)

largeSeries := func(i int) bool {
return i%2 == 0
}

numSeries := 4
data := make([]SeriesWithBloom, 0, numSeries)
tokenizer := NewNGramTokenizer(4, 0)
Expand All @@ -170,8 +174,10 @@ func TestLazyBloomIter_Seek_ResetError(t *testing.T) {
bloom.ScalableBloomFilter = *filter.NewScalableBloomFilter(1024, 0.01, 0.8)

nLines := 10
if i == 0 || i == 2 {
// Add enough lines to make the bloom page too large for series 1
// all even series will have a larger bloom (more than 1 filter)
if largeSeries(i) {
// Add enough lines to make the bloom page too large and
// trigger another filter addition
nLines = 10000
}

Expand Down Expand Up @@ -218,14 +224,15 @@ func TestLazyBloomIter_Seek_ResetError(t *testing.T) {
series := querier.series.At()
require.Equal(t, fp, series.Fingerprint)

querier.blooms.Seek(series.Offset)

if fp == 0 || fp == 2 {
require.False(t, querier.blooms.Next())
require.Error(t, querier.blooms.Err())
seekable := true
if largeSeries(int(fp)) {
seekable = false
}
if !seekable {
require.True(t, querier.blooms.LoadOffset(series.Offset))
continue
}

require.False(t, querier.blooms.LoadOffset(series.Offset))
require.True(t, querier.blooms.Next())
require.NoError(t, querier.blooms.Err())
}
Expand Down Expand Up @@ -293,16 +300,6 @@ func BenchmarkBlockQuerying(b *testing.B) {
// benchmark
b.StartTimer()

b.Run("single-pass", func(b *testing.B) {
for i := 0; i < b.N; i++ {
for _, chain := range requestChains {
for _, req := range chain {
_, _ = querier.CheckChunksForSeries(req.Fp, req.Chks, nil)
}
}
}

})
b.Run("fused", func(b *testing.B) {
// spin up some goroutines to consume the responses so they don't block
go func() {
Expand Down
Loading