diff --git a/pkg/storage/bloom/v1/block.go b/pkg/storage/bloom/v1/block.go index 84bc71a6b203..c9eef5fa3302 100644 --- a/pkg/storage/bloom/v1/block.go +++ b/pkg/storage/bloom/v1/block.go @@ -148,25 +148,26 @@ func (bq *BlockQuerier) Seek(fp model.Fingerprint) error { } func (bq *BlockQuerier) Next() bool { - if !bq.series.Next() { - return false - } - - series := bq.series.At() - - bq.blooms.Seek(series.Offset) - if !bq.blooms.Next() { - return false - } - - bloom := bq.blooms.At() - - bq.cur = &SeriesWithBloom{ - Series: &series.Series, - Bloom: bloom, + for bq.series.Next() { + series := bq.series.At() + bq.blooms.Seek(series.Offset) + if !bq.blooms.Next() { + // skip blocks that are too large + if errors.Is(bq.blooms.Err(), ErrPageTooLarge) { + // fmt.Printf("skipping bloom page: %s (%d)\n", series.Fingerprint, series.Chunks.Len()) + bq.blooms.err = nil + continue + } + return false + } + bloom := bq.blooms.At() + bq.cur = &SeriesWithBloom{ + Series: &series.Series, + Bloom: bloom, + } + return true } - return true - + return false } func (bq *BlockQuerier) At() *SeriesWithBloom { diff --git a/pkg/storage/bloom/v1/bloom.go b/pkg/storage/bloom/v1/bloom.go index 6a6c2610e82e..da0a770fb257 100644 --- a/pkg/storage/bloom/v1/bloom.go +++ b/pkg/storage/bloom/v1/bloom.go @@ -18,7 +18,7 @@ import ( // Figure out a decent maximum page size that we can process. // TODO(chaudum): Make max page size configurable var maxPageSize = 32 << 20 // 32MB -var errPageTooLarge = "bloom page too large to process: N=%d Offset=%d Len=%d DecompressedLen=%d" +var ErrPageTooLarge = errors.Errorf("bloom page too large: size limit is %.1fMiB", float64(maxPageSize)/float64(1<<20)) type Bloom struct { filter.ScalableBloomFilter @@ -253,9 +253,10 @@ func (b *BloomBlock) BloomPageDecoder(r io.ReadSeeker, pageIdx int) (*BloomPageD } page := b.pageHeaders[pageIdx] + // fmt.Printf("pageIdx=%d page=%+v size=%.2fMiB\n", pageIdx, page, float64(page.Len)/float64(1<<20)) if page.Len > maxPageSize { - return nil, fmt.Errorf(errPageTooLarge, page.N, page.Offset, page.Len, page.DecompressedLen) + return nil, ErrPageTooLarge } if _, err := r.Seek(int64(page.Offset), io.SeekStart); err != nil { diff --git a/tools/bloom/inspector/main.go b/tools/bloom/inspector/main.go new file mode 100644 index 000000000000..bb81d02b260b --- /dev/null +++ b/tools/bloom/inspector/main.go @@ -0,0 +1,37 @@ +package main + +import ( + "fmt" + "os" + + v1 "github.com/grafana/loki/pkg/storage/bloom/v1" +) + +func main() { + if len(os.Args) < 2 { + fmt.Println("Usage: go run main.go BLOCK_DIRECTORY") + os.Exit(2) + } + + path := os.Args[1] + fmt.Printf("Block directory: %s\n", path) + + r := v1.NewDirectoryBlockReader(path) + b := v1.NewBlock(r) + q := v1.NewBlockQuerier(b) + + md, err := q.Metadata() + if err != nil { + panic(err) + } + + fmt.Printf("Metadata: %+v\n", md) + + for q.Next() { + swb := q.At() + fmt.Printf("%s (%d)\n", swb.Series.Fingerprint, swb.Series.Chunks.Len()) + } + if q.Err() != nil { + fmt.Printf("error: %s\n", q.Err()) + } +}