Skip to content

Commit

Permalink
feat(blooms): limit bloom size during creation (#12796)
Browse files Browse the repository at this point in the history
  • Loading branch information
owen-d committed Apr 26, 2024
1 parent c0113db commit eac5622
Show file tree
Hide file tree
Showing 14 changed files with 179 additions and 96 deletions.
6 changes: 6 additions & 0 deletions docs/sources/shared/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -3333,6 +3333,12 @@ shard_streams:
# CLI flag: -bloom-compactor.max-block-size
[bloom_compactor_max_block_size: <int> | default = 200MB]

# Experimental. The maximum bloom size per log stream. A log stream whose
# generated bloom filter exceeds this size will be discarded. A value of 0 sets
# an unlimited size. Default is 128MB.
# CLI flag: -bloom-compactor.max-bloom-size
[bloom_compactor_max_bloom_size: <int> | default = 128MB]

# Experimental. Length of the n-grams created when computing blooms from log
# lines.
# CLI flag: -bloom-compactor.ngram-length
Expand Down
4 changes: 4 additions & 0 deletions pkg/bloomcompactor/bloomcompactor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,10 @@ func (m mockLimits) BloomCompactorMaxBlockSize(_ string) int {
panic("implement me")
}

func (m mockLimits) BloomCompactorMaxBloomSize(_ string) int {
panic("implement me")
}

func TestTokenRangesForInstance(t *testing.T) {
desc := func(id int, tokens ...uint32) ring.InstanceDesc {
return ring.InstanceDesc{Id: fmt.Sprintf("%d", id), Tokens: tokens}
Expand Down
1 change: 1 addition & 0 deletions pkg/bloomcompactor/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,5 +93,6 @@ type Limits interface {
BloomNGramSkip(tenantID string) int
BloomFalsePositiveRate(tenantID string) float64
BloomCompactorMaxBlockSize(tenantID string) int
BloomCompactorMaxBloomSize(tenantID string) int
BloomBlockEncoding(tenantID string) string
}
3 changes: 2 additions & 1 deletion pkg/bloomcompactor/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -352,7 +352,8 @@ func (s *SimpleBloomController) buildGaps(
nGramSize = uint64(s.limits.BloomNGramLength(tenant))
nGramSkip = uint64(s.limits.BloomNGramSkip(tenant))
maxBlockSize = uint64(s.limits.BloomCompactorMaxBlockSize(tenant))
blockOpts = v1.NewBlockOptions(blockEnc, nGramSize, nGramSkip, maxBlockSize)
maxBloomSize = uint64(s.limits.BloomCompactorMaxBloomSize(tenant))
blockOpts = v1.NewBlockOptions(blockEnc, nGramSize, nGramSkip, maxBlockSize, maxBloomSize)
created []bloomshipper.Meta
totalSeries int
bytesAdded int
Expand Down
21 changes: 13 additions & 8 deletions pkg/bloomcompactor/spec.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,12 +89,17 @@ func NewSimpleBloomGenerator(
metrics: metrics,
reporter: reporter,

tokenizer: v1.NewBloomTokenizer(opts.Schema.NGramLen(), opts.Schema.NGramSkip(), metrics.bloomMetrics),
tokenizer: v1.NewBloomTokenizer(
opts.Schema.NGramLen(),
opts.Schema.NGramSkip(),
int(opts.UnencodedBlockOptions.MaxBloomSizeBytes),
metrics.bloomMetrics,
),
}
}

func (s *SimpleBloomGenerator) populator(ctx context.Context) func(series *v1.Series, bloom *v1.Bloom) (int, error) {
return func(series *v1.Series, bloom *v1.Bloom) (int, error) {
func (s *SimpleBloomGenerator) populator(ctx context.Context) func(series *v1.Series, bloom *v1.Bloom) (int, bool, error) {
return func(series *v1.Series, bloom *v1.Bloom) (int, bool, error) {
start := time.Now()
level.Debug(s.logger).Log(
"msg", "populating bloom filter",
Expand All @@ -104,10 +109,10 @@ func (s *SimpleBloomGenerator) populator(ctx context.Context) func(series *v1.Se
)
chunkItersWithFP, err := s.chunkLoader.Load(ctx, s.userID, series)
if err != nil {
return 0, errors.Wrapf(err, "failed to load chunks for series: %+v", series)
return 0, false, errors.Wrapf(err, "failed to load chunks for series: %+v", series)
}

bytesAdded, err := s.tokenizer.Populate(
bytesAdded, skip, err := s.tokenizer.Populate(
&v1.SeriesWithBloom{
Series: series,
Bloom: bloom,
Expand All @@ -128,7 +133,7 @@ func (s *SimpleBloomGenerator) populator(ctx context.Context) func(series *v1.Se
if s.reporter != nil {
s.reporter(series.Fingerprint)
}
return bytesAdded, err
return bytesAdded, skip, err
}

}
Expand Down Expand Up @@ -174,7 +179,7 @@ type LazyBlockBuilderIterator struct {
ctx context.Context
opts v1.BlockOptions
metrics *Metrics
populate func(*v1.Series, *v1.Bloom) (int, error)
populate func(*v1.Series, *v1.Bloom) (int, bool, error)
readWriterFn func() (v1.BlockWriter, v1.BlockReader)
series v1.PeekingIterator[*v1.Series]
blocks v1.ResettableIterator[*v1.SeriesWithBloom]
Expand All @@ -188,7 +193,7 @@ func NewLazyBlockBuilderIterator(
ctx context.Context,
opts v1.BlockOptions,
metrics *Metrics,
populate func(*v1.Series, *v1.Bloom) (int, error),
populate func(*v1.Series, *v1.Bloom) (int, bool, error),
readWriterFn func() (v1.BlockWriter, v1.BlockReader),
series v1.PeekingIterator[*v1.Series],
blocks v1.ResettableIterator[*v1.SeriesWithBloom],
Expand Down
8 changes: 4 additions & 4 deletions pkg/bloomcompactor/spec_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,13 +121,13 @@ func TestSimpleBloomGenerator(t *testing.T) {
}{
{
desc: "SkipsIncompatibleSchemas",
fromSchema: v1.NewBlockOptions(enc, 3, 0, maxBlockSize),
toSchema: v1.NewBlockOptions(enc, 4, 0, maxBlockSize),
fromSchema: v1.NewBlockOptions(enc, 3, 0, maxBlockSize, 0),
toSchema: v1.NewBlockOptions(enc, 4, 0, maxBlockSize, 0),
},
{
desc: "CombinesBlocks",
fromSchema: v1.NewBlockOptions(enc, 4, 0, maxBlockSize),
toSchema: v1.NewBlockOptions(enc, 4, 0, maxBlockSize),
fromSchema: v1.NewBlockOptions(enc, 4, 0, maxBlockSize, 0),
toSchema: v1.NewBlockOptions(enc, 4, 0, maxBlockSize, 0),
},
} {
t.Run(fmt.Sprintf("%s/%s", tc.desc, enc), func(t *testing.T) {
Expand Down
116 changes: 67 additions & 49 deletions pkg/storage/bloom/v1/bloom_tokenizer.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"math"
"time"

"github.com/c2h5oh/datasize"
"github.com/go-kit/log/level"
"github.com/pkg/errors"

Expand All @@ -25,6 +26,7 @@ Bloom filters are utilized for faster lookups of log lines.
type BloomTokenizer struct {
metrics *Metrics

maxBloomSize int
lineTokenizer *NGramTokenizer
cache map[string]interface{}
}
Expand All @@ -38,13 +40,14 @@ const eightBits = 8
// 1) The token slices generated must not be mutated externally
// 2) The token slice must not be used after the next call to `Tokens()` as it will repopulate the slice.
// 2) This is not thread safe.
func NewBloomTokenizer(nGramLen, nGramSkip int, metrics *Metrics) *BloomTokenizer {
func NewBloomTokenizer(nGramLen, nGramSkip int, maxBloomSize int, metrics *Metrics) *BloomTokenizer {
// TODO(chaudum): Replace logger
level.Info(util_log.Logger).Log("msg", "create new bloom tokenizer", "ngram length", nGramLen, "ngram skip", nGramSkip)
return &BloomTokenizer{
metrics: metrics,
cache: make(map[string]interface{}, cacheSize),
lineTokenizer: NewNGramTokenizer(nGramLen, nGramSkip),
maxBloomSize: maxBloomSize,
}
}

Expand Down Expand Up @@ -89,7 +92,9 @@ type ChunkRefWithIter struct {
}

// Populate adds the tokens from the given chunks to the given seriesWithBloom.
func (bt *BloomTokenizer) Populate(swb *SeriesWithBloom, chks Iterator[ChunkRefWithIter]) (int, error) {
// The `skip` return value indicates whether this series should be discarded and is used to short-circuit
// bloom generation for series that are too large. We will undoubtedly improve this in the future.
func (bt *BloomTokenizer) Populate(swb *SeriesWithBloom, chks Iterator[ChunkRefWithIter]) (bytesAdded int, skip bool, err error) {
startTime := time.Now().UnixMilli()

clearCache(bt.cache)
Expand Down Expand Up @@ -119,61 +124,53 @@ func (bt *BloomTokenizer) Populate(swb *SeriesWithBloom, chks Iterator[ChunkRefW
tokenBuf, prefixLn = prefixedToken(bt.lineTokenizer.N(), chk.Ref, tokenBuf)

// Iterate over lines in the chunk
entries:
for itr.Next() && itr.Error() == nil {
// TODO(owen-d): rather than iterate over the line twice, once for prefixed tokenizer & once for
// raw tokenizer, we could iterate once and just return (prefix, token) pairs from the tokenizer.
// Double points for them being different-ln references to the same data.
line := itr.Entry().Line
chunkBytes += len(line)
chunkTokenizer := NewPrefixedTokenIter(tokenBuf, prefixLn, bt.lineTokenizer.Tokens(line))
for chunkTokenizer.Next() {
tok := chunkTokenizer.At()
tokens++
// TODO(owen-d): [n]byte this
str := string(tok)
_, found := bt.cache[str] // A cache is used ahead of the SBF, as it cuts out the costly operations of scaling bloom filters
if found {
cachedInserts++
continue
}

bt.cache[str] = nil
collision := swb.Bloom.ScalableBloomFilter.TestAndAdd(tok)
if collision {
collisionInserts++
} else {
successfulInserts++
}

if len(bt.cache) >= cacheSize { // While crude, this has proven efficient in performance testing. This speaks to the similarity in log lines near each other
clearCache(bt.cache)
}
tokenItrs := []Iterator[[]byte]{
// two iterators, one for the raw tokens and one for the chunk prefixed tokens.
// Warning: the underlying line tokenizer (used in both iterators) uses the same buffer for tokens.
// They are NOT SAFE for concurrent use.
NewPrefixedTokenIter(tokenBuf, prefixLn, bt.lineTokenizer.Tokens(line)),
bt.lineTokenizer.Tokens(line),
}

lineTokenizer := bt.lineTokenizer.Tokens(line)
for lineTokenizer.Next() {
tok := lineTokenizer.At()
tokens++
str := string(tok)
_, found := bt.cache[str] // A cache is used ahead of the SBF, as it cuts out the costly operations of scaling bloom filters
if found {
chunkCachedInserts++
continue
for _, itr := range tokenItrs {
for itr.Next() {
tok := itr.At()
tokens++
// TODO(owen-d): [n]byte this
str := string(tok)
_, found := bt.cache[str] // A cache is used ahead of the SBF, as it cuts out the costly operations of scaling bloom filters
if found {
cachedInserts++
continue
}

bt.cache[str] = nil
collision, sz := swb.Bloom.ScalableBloomFilter.HeavyAdd(tok)
if collision {
collisionInserts++
} else {
successfulInserts++
}

if bt.maxBloomSize > 0 && sz > bt.maxBloomSize {
skip = true
break entries
}

if len(bt.cache) >= cacheSize { // While crude, this has proven efficient in performance testing. This speaks to the similarity in log lines near each other
clearCache(bt.cache)
}
}
bt.cache[str] = nil

collision := swb.Bloom.ScalableBloomFilter.TestAndAdd(tok)
if collision {
chunkCollisionInserts++
} else {
chunkSuccessfulInserts++
}

if len(bt.cache) >= cacheSize { // While crude, this has proven efficient in performance testing. This speaks to the similarity in log lines near each other
clearCache(bt.cache)
}
}

}

// add the recorded chunkbytes to the sourcebytes counter in case we return early via error
Expand All @@ -187,7 +184,7 @@ func (bt *BloomTokenizer) Populate(swb *SeriesWithBloom, chks Iterator[ChunkRefW
es.Add(errors.Wrapf(err, "error iterating chunk: %#v", chk.Ref))
}
if combined := es.Err(); combined != nil {
return sourceBytes, combined
return sourceBytes, skip, combined
}
swb.Series.Chunks = append(swb.Series.Chunks, chk.Ref)

Expand All @@ -200,13 +197,27 @@ func (bt *BloomTokenizer) Populate(swb *SeriesWithBloom, chks Iterator[ChunkRefW
bt.metrics.insertsTotal.WithLabelValues(tokenTypeChunkPrefixed, collisionTypeCache).Add(float64(chunkCachedInserts))
bt.metrics.insertsTotal.WithLabelValues(tokenTypeChunkPrefixed, collisionTypeTrue).Add(float64(chunkCollisionInserts))
bt.metrics.sourceBytesAdded.Add(float64(chunkBytes))

// Exit early if the series is too large
if skip {
break
}
}

if err := chks.Err(); err != nil {
level.Error(util_log.Logger).Log("msg", "error downloading chunks batch", "err", err)
return sourceBytes, fmt.Errorf("error downloading chunks batch: %w", err)
return sourceBytes, skip, fmt.Errorf("error downloading chunks batch: %w", err)
}

level.Debug(util_log.Logger).Log(
"msg", "bloom filter populated",
"chunks", len(swb.Series.Chunks),
"fp", swb.Series.Fingerprint,
"sourceBytes", datasize.ByteSize(sourceBytes).HumanReadable(),
"bloomSize", datasize.ByteSize(swb.Bloom.Capacity()/8).HumanReadable(),
"skipped", skip,
)

endTime := time.Now().UnixMilli()

fillRatio := swb.Bloom.ScalableBloomFilter.FillRatio()
Expand All @@ -215,8 +226,15 @@ func (bt *BloomTokenizer) Populate(swb *SeriesWithBloom, chks Iterator[ChunkRefW
float64(estimatedCount(swb.Bloom.ScalableBloomFilter.Capacity(), fillRatio)),
)
bt.metrics.bloomSize.Observe(float64(swb.Bloom.ScalableBloomFilter.Capacity() / eightBits))
bt.metrics.sbfCreationTime.Add(float64(endTime - startTime))
return sourceBytes, nil

ty := bloomCreationTypeIndexed
if skip {
ty = bloomCreationTypeSkipped
}
bt.metrics.sbfCreationTime.WithLabelValues(ty).Add(float64(endTime - startTime))
bt.metrics.bloomsTotal.WithLabelValues(ty).Inc()

return sourceBytes, skip, nil
}

// n ≈ −m ln(1 − p).
Expand Down
14 changes: 7 additions & 7 deletions pkg/storage/bloom/v1/bloom_tokenizer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ func TestPrefixedKeyCreation(t *testing.T) {

func TestSetLineTokenizer(t *testing.T) {
t.Parallel()
bt := NewBloomTokenizer(DefaultNGramLength, DefaultNGramSkip, metrics)
bt := NewBloomTokenizer(DefaultNGramLength, DefaultNGramSkip, 0, metrics)

// Validate defaults
require.Equal(t, bt.lineTokenizer.N(), DefaultNGramLength)
Expand All @@ -94,7 +94,7 @@ func TestSetLineTokenizer(t *testing.T) {
func TestTokenizerPopulate(t *testing.T) {
t.Parallel()
var testLine = "this is a log line"
bt := NewBloomTokenizer(DefaultNGramLength, DefaultNGramSkip, metrics)
bt := NewBloomTokenizer(DefaultNGramLength, DefaultNGramSkip, 0, metrics)

sbf := filter.NewScalableBloomFilter(1024, 0.01, 0.8)
var lbsList []labels.Labels
Expand Down Expand Up @@ -125,7 +125,7 @@ func TestTokenizerPopulate(t *testing.T) {
Series: &series,
}

_, err = bt.Populate(&swb, NewSliceIter([]ChunkRefWithIter{{Ref: ChunkRef{}, Itr: itr}}))
_, _, err = bt.Populate(&swb, NewSliceIter([]ChunkRefWithIter{{Ref: ChunkRef{}, Itr: itr}}))
require.NoError(t, err)
tokenizer := NewNGramTokenizer(DefaultNGramLength, DefaultNGramSkip)
toks := tokenizer.Tokens(testLine)
Expand All @@ -138,7 +138,7 @@ func TestTokenizerPopulate(t *testing.T) {
func BenchmarkPopulateSeriesWithBloom(b *testing.B) {
for i := 0; i < b.N; i++ {
var testLine = lorem + lorem + lorem
bt := NewBloomTokenizer(DefaultNGramLength, DefaultNGramSkip, metrics)
bt := NewBloomTokenizer(DefaultNGramLength, DefaultNGramSkip, 0, metrics)

sbf := filter.NewScalableBloomFilter(1024, 0.01, 0.8)
var lbsList []labels.Labels
Expand Down Expand Up @@ -169,13 +169,13 @@ func BenchmarkPopulateSeriesWithBloom(b *testing.B) {
Series: &series,
}

_, err = bt.Populate(&swb, NewSliceIter([]ChunkRefWithIter{{Ref: ChunkRef{}, Itr: itr}}))
_, _, err = bt.Populate(&swb, NewSliceIter([]ChunkRefWithIter{{Ref: ChunkRef{}, Itr: itr}}))
require.NoError(b, err)
}
}

func BenchmarkMapClear(b *testing.B) {
bt := NewBloomTokenizer(DefaultNGramLength, DefaultNGramSkip, metrics)
bt := NewBloomTokenizer(DefaultNGramLength, DefaultNGramSkip, 0, metrics)
for i := 0; i < b.N; i++ {
for k := 0; k < cacheSize; k++ {
bt.cache[fmt.Sprint(k)] = k
Expand All @@ -186,7 +186,7 @@ func BenchmarkMapClear(b *testing.B) {
}

func BenchmarkNewMap(b *testing.B) {
bt := NewBloomTokenizer(DefaultNGramLength, DefaultNGramSkip, metrics)
bt := NewBloomTokenizer(DefaultNGramLength, DefaultNGramSkip, 0, metrics)
for i := 0; i < b.N; i++ {
for k := 0; k < cacheSize; k++ {
bt.cache[fmt.Sprint(k)] = k
Expand Down
Loading

0 comments on commit eac5622

Please sign in to comment.