Skip to content

Commit

Permalink
fix(blooms): Correctly skip block page in case it exceeds the max pag…
Browse files Browse the repository at this point in the history
…e size for querying (#12297)

Signed-off-by: Christian Haudum <christian.haudum@gmail.com>
  • Loading branch information
chaudum committed Mar 21, 2024
1 parent 3cc4feb commit 2edbe12
Show file tree
Hide file tree
Showing 3 changed files with 59 additions and 20 deletions.
37 changes: 19 additions & 18 deletions pkg/storage/bloom/v1/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,25 +148,26 @@ func (bq *BlockQuerier) Seek(fp model.Fingerprint) error {
}

func (bq *BlockQuerier) Next() bool {
if !bq.series.Next() {
return false
}

series := bq.series.At()

bq.blooms.Seek(series.Offset)
if !bq.blooms.Next() {
return false
}

bloom := bq.blooms.At()

bq.cur = &SeriesWithBloom{
Series: &series.Series,
Bloom: bloom,
for bq.series.Next() {
series := bq.series.At()
bq.blooms.Seek(series.Offset)
if !bq.blooms.Next() {
// skip blocks that are too large
if errors.Is(bq.blooms.Err(), ErrPageTooLarge) {
// fmt.Printf("skipping bloom page: %s (%d)\n", series.Fingerprint, series.Chunks.Len())
bq.blooms.err = nil
continue
}
return false
}
bloom := bq.blooms.At()
bq.cur = &SeriesWithBloom{
Series: &series.Series,
Bloom: bloom,
}
return true
}
return true

return false
}

func (bq *BlockQuerier) At() *SeriesWithBloom {
Expand Down
5 changes: 3 additions & 2 deletions pkg/storage/bloom/v1/bloom.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ import (
// Figure out a decent maximum page size that we can process.
// TODO(chaudum): Make max page size configurable
var maxPageSize = 32 << 20 // 32MB
var errPageTooLarge = "bloom page too large to process: N=%d Offset=%d Len=%d DecompressedLen=%d"
var ErrPageTooLarge = errors.Errorf("bloom page too large: size limit is %.1fMiB", float64(maxPageSize)/float64(1<<20))

type Bloom struct {
filter.ScalableBloomFilter
Expand Down Expand Up @@ -253,9 +253,10 @@ func (b *BloomBlock) BloomPageDecoder(r io.ReadSeeker, pageIdx int) (*BloomPageD
}

page := b.pageHeaders[pageIdx]
// fmt.Printf("pageIdx=%d page=%+v size=%.2fMiB\n", pageIdx, page, float64(page.Len)/float64(1<<20))

if page.Len > maxPageSize {
return nil, fmt.Errorf(errPageTooLarge, page.N, page.Offset, page.Len, page.DecompressedLen)
return nil, ErrPageTooLarge
}

if _, err := r.Seek(int64(page.Offset), io.SeekStart); err != nil {
Expand Down
37 changes: 37 additions & 0 deletions tools/bloom/inspector/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package main

import (
"fmt"
"os"

v1 "github.com/grafana/loki/pkg/storage/bloom/v1"
)

func main() {
if len(os.Args) < 2 {
fmt.Println("Usage: go run main.go BLOCK_DIRECTORY")
os.Exit(2)
}

path := os.Args[1]
fmt.Printf("Block directory: %s\n", path)

r := v1.NewDirectoryBlockReader(path)
b := v1.NewBlock(r)
q := v1.NewBlockQuerier(b)

md, err := q.Metadata()
if err != nil {
panic(err)
}

fmt.Printf("Metadata: %+v\n", md)

for q.Next() {
swb := q.At()
fmt.Printf("%s (%d)\n", swb.Series.Fingerprint, swb.Series.Chunks.Len())
}
if q.Err() != nil {
fmt.Printf("error: %s\n", q.Err())
}
}

0 comments on commit 2edbe12

Please sign in to comment.