Skip to content

Commit

Permalink
feat(bloom-gw): Make num multiplexing tasks configurable, and other a…
Browse files Browse the repository at this point in the history
…djustments (#12169)

* We want to be able to change the amount of dequeued items per iterations to test different configurations.
* Ignore block pages that are magnitues bigger than the target page size.
* Increase download queue capacity to avoid blocking when enqueueing

Signed-off-by: Christian Haudum <christian.haudum@gmail.com>
  • Loading branch information
chaudum committed Mar 12, 2024
1 parent e95cc6c commit a0fce39
Show file tree
Hide file tree
Showing 10 changed files with 84 additions and 11 deletions.
4 changes: 4 additions & 0 deletions docs/sources/configure/_index.md
Original file line number Diff line number Diff line change
Expand Up @@ -1937,6 +1937,10 @@ client:
# Maximum number of outstanding tasks per tenant.
# CLI flag: -bloom-gateway.max-outstanding-per-tenant
[max_outstanding_per_tenant: <int> | default = 1024]

# How many tasks are multiplexed at once.
# CLI flag: -bloom-gateway.num-multiplex-tasks
[num_multiplex_tasks: <int> | default = 512]
```

### storage_config
Expand Down
2 changes: 1 addition & 1 deletion pkg/bloomgateway/bloomgateway.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ func New(cfg Config, store bloomshipper.Store, logger log.Logger, reg prometheus
logger: logger,
metrics: newMetrics(reg, constants.Loki, metricsSubsystem),
workerConfig: workerConfig{
maxItems: 100,
maxItems: cfg.NumMultiplexItems,
},
pendingTasks: &atomic.Int64{},

Expand Down
2 changes: 2 additions & 0 deletions pkg/bloomgateway/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ type Config struct {

WorkerConcurrency int `yaml:"worker_concurrency"`
MaxOutstandingPerTenant int `yaml:"max_outstanding_per_tenant"`
NumMultiplexItems int `yaml:"num_multiplex_tasks"`
}

// RegisterFlags registers flags for the Bloom Gateway configuration.
Expand All @@ -32,6 +33,7 @@ func (cfg *Config) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) {
f.BoolVar(&cfg.Enabled, prefix+"enabled", false, "Flag to enable or disable the bloom gateway component globally.")
f.IntVar(&cfg.WorkerConcurrency, prefix+"worker-concurrency", 4, "Number of workers to use for filtering chunks concurrently.")
f.IntVar(&cfg.MaxOutstandingPerTenant, prefix+"max-outstanding-per-tenant", 1024, "Maximum number of outstanding tasks per tenant.")
f.IntVar(&cfg.NumMultiplexItems, prefix+"num-multiplex-tasks", 512, "How many tasks are multiplexed at once.")
// TODO(chaudum): Figure out what the better place is for registering flags
// -bloom-gateway.client.* or -bloom-gateway-client.*
cfg.Client.RegisterFlags(f)
Expand Down
9 changes: 7 additions & 2 deletions pkg/bloomgateway/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,12 @@ func (p *processor) run(ctx context.Context, tasks []Task) error {

func (p *processor) runWithBounds(ctx context.Context, tasks []Task, bounds v1.MultiFingerprintBounds) error {
tenant := tasks[0].Tenant
level.Info(p.logger).Log("msg", "process tasks with bounds", "tenant", tenant, "tasks", len(tasks), "bounds", bounds)
level.Info(p.logger).Log(
"msg", "process tasks with bounds",
"tenant", tenant,
"tasks", len(tasks),
"bounds", JoinFunc(bounds, ",", func(e v1.FingerprintBounds) string { return e.String() }),
)

for ts, tasks := range group(tasks, func(t Task) config.DayTime { return t.table }) {
err := p.processTasks(ctx, tenant, ts, bounds, tasks)
Expand Down Expand Up @@ -145,7 +150,7 @@ func (p *processor) processBlock(_ context.Context, blockQuerier *v1.BlockQuerie
iters = append(iters, it)
}

fq := blockQuerier.Fuse(iters)
fq := blockQuerier.Fuse(iters, p.logger)

start := time.Now()
err = fq.Run()
Expand Down
2 changes: 2 additions & 0 deletions pkg/bloomgateway/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,8 @@ func (w *worker) running(_ context.Context) error {
w.queue.ReleaseRequests(items)
continue
}

level.Debug(w.logger).Log("msg", "dequeued tasks", "count", len(items))
w.metrics.tasksDequeued.WithLabelValues(w.id, labelSuccess).Add(float64(len(items)))

tasks := make([]Task, 0, len(items))
Expand Down
12 changes: 12 additions & 0 deletions pkg/storage/bloom/v1/bloom.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,14 @@ import (
"github.com/grafana/loki/pkg/util/encoding"
)

// NB(chaudum): Some block pages are way bigger than others (400MiB and
// bigger), and loading multiple pages into memory in parallel can cause the
// gateways to OOM.
// Figure out a decent maximum page size that we can process.
// TODO(chaudum): Make max page size configurable
var maxPageSize = 32 << 20 // 32MB
var errPageTooLarge = "bloom page too large to process: N=%d Offset=%d Len=%d DecompressedLen=%d"

type Bloom struct {
filter.ScalableBloomFilter
}
Expand Down Expand Up @@ -239,6 +247,10 @@ func (b *BloomBlock) BloomPageDecoder(r io.ReadSeeker, pageIdx int) (*BloomPageD

page := b.pageHeaders[pageIdx]

if page.Len > maxPageSize {
return nil, fmt.Errorf(errPageTooLarge, page.N, page.Offset, page.Len, page.DecompressedLen)
}

if _, err := r.Seek(int64(page.Offset), io.SeekStart); err != nil {
return nil, errors.Wrap(err, "seeking to bloom page")
}
Expand Down
12 changes: 9 additions & 3 deletions pkg/storage/bloom/v1/fuse.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ package v1

import (
"github.com/efficientgo/core/errors"
"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/prometheus/common/model"
)

Expand All @@ -22,16 +24,17 @@ type Output struct {
// Fuse combines multiple requests into a single loop iteration
// over the data set and returns the corresponding outputs
// TODO(owen-d): better async control
func (bq *BlockQuerier) Fuse(inputs []PeekingIterator[Request]) *FusedQuerier {
return NewFusedQuerier(bq, inputs)
func (bq *BlockQuerier) Fuse(inputs []PeekingIterator[Request], logger log.Logger) *FusedQuerier {
return NewFusedQuerier(bq, inputs, logger)
}

type FusedQuerier struct {
bq *BlockQuerier
inputs Iterator[[]Request]
logger log.Logger
}

func NewFusedQuerier(bq *BlockQuerier, inputs []PeekingIterator[Request]) *FusedQuerier {
func NewFusedQuerier(bq *BlockQuerier, inputs []PeekingIterator[Request], logger log.Logger) *FusedQuerier {
heap := NewHeapIterator[Request](
func(a, b Request) bool {
return a.Fp < b.Fp
Expand All @@ -52,6 +55,7 @@ func NewFusedQuerier(bq *BlockQuerier, inputs []PeekingIterator[Request]) *Fused
return &FusedQuerier{
bq: bq,
inputs: merging,
logger: logger,
}
}

Expand Down Expand Up @@ -80,6 +84,7 @@ func (fq *FusedQuerier) Run() error {
series := fq.bq.series.At()
if series.Fingerprint != fp {
// fingerprint not found, can't remove chunks
level.Debug(fq.logger).Log("msg", "fingerprint not found", "fp", series.Fingerprint, "err", fq.bq.series.Err())
for _, input := range nextBatch {
input.Response <- Output{
Fp: fp,
Expand All @@ -92,6 +97,7 @@ func (fq *FusedQuerier) Run() error {
fq.bq.blooms.Seek(series.Offset)
if !fq.bq.blooms.Next() {
// fingerprint not found, can't remove chunks
level.Debug(fq.logger).Log("msg", "fingerprint not found", "fp", series.Fingerprint, "err", fq.bq.blooms.Err())
for _, input := range nextBatch {
input.Response <- Output{
Fp: fp,
Expand Down
5 changes: 3 additions & 2 deletions pkg/storage/bloom/v1/fuse_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"sync"
"testing"

"github.com/go-kit/log"
"github.com/grafana/dskit/concurrency"
"github.com/stretchr/testify/require"

Expand Down Expand Up @@ -94,7 +95,7 @@ func TestFusedQuerier(t *testing.T) {
g.Done()
}()

fused := querier.Fuse(itrs)
fused := querier.Fuse(itrs, log.NewNopLogger())

require.Nil(t, fused.Run())
for _, input := range inputs {
Expand Down Expand Up @@ -211,7 +212,7 @@ func BenchmarkBlockQuerying(b *testing.B) {
for _, reqs := range requestChains {
itrs = append(itrs, NewPeekingIter[Request](NewSliceIter[Request](reqs)))
}
fused := querier.Fuse(itrs)
fused := querier.Fuse(itrs, log.NewNopLogger())
_ = fused.Run()
}
})
Expand Down
16 changes: 13 additions & 3 deletions pkg/storage/stores/shipper/bloomshipper/fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"path/filepath"
"strings"
"sync"
"time"

"github.com/go-kit/log"
"github.com/go-kit/log/level"
Expand All @@ -19,6 +20,8 @@ import (
"github.com/grafana/loki/pkg/util/constants"
)

var downloadQueueCapacity = 100000

type options struct {
ignoreNotFound bool // ignore 404s from object storage; default=true
fetchAsync bool // dispatch downloading of block and return immediately; default=false
Expand Down Expand Up @@ -77,7 +80,7 @@ func NewFetcher(cfg bloomStoreConfig, client Client, metasCache cache.Cache, blo
metrics: newFetcherMetrics(reg, constants.Loki, "bloom_store"),
logger: logger,
}
q, err := newDownloadQueue[BlockRef, BlockDirectory](1000, cfg.numWorkers, fetcher.processTask, logger)
q, err := newDownloadQueue[BlockRef, BlockDirectory](downloadQueueCapacity, cfg.numWorkers, fetcher.processTask, logger)
if err != nil {
return nil, errors.Wrap(err, "creating download queue for fetcher")
}
Expand Down Expand Up @@ -187,13 +190,16 @@ func (f *Fetcher) FetchBlocks(ctx context.Context, refs []BlockRef, opts ...Fetc
// by fetching all keys at once.
// The problem is keeping the order of the responses.

var enqueueTime time.Duration
for i := 0; i < n; i++ {
key := f.client.Block(refs[i]).Addr()
dir, isFound, err := f.getBlockDir(ctx, key)
if err != nil {
return results, err
}
if !isFound {
f.metrics.downloadQueueSize.Observe(float64(len(f.q.queue)))
start := time.Now()
f.q.enqueue(downloadRequest[BlockRef, BlockDirectory]{
ctx: ctx,
item: refs[i],
Expand All @@ -203,21 +209,25 @@ func (f *Fetcher) FetchBlocks(ctx context.Context, refs []BlockRef, opts ...Fetc
errors: errors,
})
missing++
f.metrics.blocksMissing.Inc()
enqueueTime += time.Since(start)
f.metrics.downloadQueueEnqueueTime.Observe(time.Since(start).Seconds())
continue
}
found++
f.metrics.blocksFound.Inc()
results[i] = dir.BlockQuerier()
}

// fetchAsync defines whether the function may return early or whether it
// should wait for responses from the download queue
if cfg.fetchAsync {
f.metrics.blocksFetched.Observe(float64(found))
level.Debug(f.logger).Log("msg", "request unavailable blocks in the background", "missing", missing, "found", found)
level.Debug(f.logger).Log("msg", "request unavailable blocks in the background", "missing", missing, "found", found, "enqueue_time", enqueueTime)
return results, nil
}

level.Debug(f.logger).Log("msg", "wait for unavailable blocks", "missing", missing, "found", found)
level.Debug(f.logger).Log("msg", "wait for unavailable blocks", "missing", missing, "found", found, "enqueue_time", enqueueTime)
// second, wait for missing blocks to be fetched and append them to the
// results
for i := 0; i < missing; i++ {
Expand Down
31 changes: 31 additions & 0 deletions pkg/storage/stores/shipper/bloomshipper/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,11 @@ type fetcherMetrics struct {
blocksFetched prometheus.Histogram
metasFetchedSize *prometheus.HistogramVec
blocksFetchedSize *prometheus.HistogramVec

downloadQueueEnqueueTime prometheus.Histogram
downloadQueueSize prometheus.Histogram
blocksFound prometheus.Counter
blocksMissing prometheus.Counter
}

func newFetcherMetrics(registerer prometheus.Registerer, namespace, subsystem string) *fetcherMetrics {
Expand Down Expand Up @@ -56,5 +61,31 @@ func newFetcherMetrics(registerer prometheus.Registerer, namespace, subsystem st
Buckets: prometheus.ExponentialBuckets((5 << 20), 1.75, 10), // [5M, 8.75M, 15.3M, ... 769.7M]
Help: "Decompressed size of blocks fetched from storage/filesystem/cache",
}, []string{"source"}),
downloadQueueEnqueueTime: r.NewHistogram(prometheus.HistogramOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "download_queue_enqueue_time_seconds",
Buckets: prometheus.ExponentialBuckets(0.0001, 5, 8), // [0.0001, 0.0005, ... 7.8125]
Help: "Time in seconds it took to enqueue item to download queue",
}),
downloadQueueSize: r.NewHistogram(prometheus.HistogramOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "download_queue_size",
Buckets: prometheus.ExponentialBuckets(1, 2, 20), // [1, 2, 4, ... 524288]
Help: "Number of enqueued items in download queue",
}),
blocksFound: r.NewCounter(prometheus.CounterOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "fetcher_blocks_found_total",
Help: "tdb",
}),
blocksMissing: r.NewCounter(prometheus.CounterOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "fetcher_blocks_missing_total",
Help: "tbd",
}),
}
}

0 comments on commit a0fce39

Please sign in to comment.