Skip to content

Commit

Permalink
feat(bloom): Skip attempts to filter chunks for which blooms have not…
Browse files Browse the repository at this point in the history
… been built (#12961)

Bloom filters are built from `today - MaxTableOffset` to `today - MinTableOffset`, this means that blooms are not available for the most recent period, between `now` and at least `today - MinTableOffset`.
To avoid resolving chunks and filtering out no-matches, we can skip chunks for this period completely and return them as-is without filtering.


Signed-off-by: Christian Haudum <christian.haudum@gmail.com>
  • Loading branch information
chaudum authored May 14, 2024
1 parent 00bdd2f commit a1b1eeb
Show file tree
Hide file tree
Showing 3 changed files with 46 additions and 7 deletions.
39 changes: 37 additions & 2 deletions pkg/bloomgateway/querier.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package bloomgateway

import (
"context"
"time"

"github.com/go-kit/log"
"github.com/go-kit/log/level"
Expand All @@ -13,6 +14,7 @@ import (
"github.com/grafana/loki/v3/pkg/logproto"
"github.com/grafana/loki/v3/pkg/querier/plan"
v1 "github.com/grafana/loki/v3/pkg/storage/bloom/v1"
"github.com/grafana/loki/v3/pkg/storage/config"
"github.com/grafana/loki/v3/pkg/storage/stores/shipper/bloomshipper"
"github.com/grafana/loki/v3/pkg/util/constants"
"github.com/grafana/loki/v3/pkg/util/spanlogger"
Expand Down Expand Up @@ -61,19 +63,26 @@ func newQuerierMetrics(registerer prometheus.Registerer, namespace, subsystem st
}
}

type QuerierConfig struct {
// MinTableOffset is derived from the compactor's MinTableOffset
MinTableOffset int
}

// BloomQuerier is a store-level abstraction on top of Client
// It is used by the index gateway to filter ChunkRefs based on given line fiter expression.
type BloomQuerier struct {
c Client
cfg QuerierConfig
logger log.Logger
metrics *querierMetrics
limits Limits
blockResolver BlockResolver
}

func NewQuerier(c Client, limits Limits, resolver BlockResolver, r prometheus.Registerer, logger log.Logger) *BloomQuerier {
func NewQuerier(c Client, cfg QuerierConfig, limits Limits, resolver BlockResolver, r prometheus.Registerer, logger log.Logger) *BloomQuerier {
return &BloomQuerier{
c: c,
cfg: cfg,
logger: logger,
metrics: newQuerierMetrics(r, constants.Loki, querierMetricsSubsystem),
limits: limits,
Expand Down Expand Up @@ -101,6 +110,33 @@ func (bq *BloomQuerier) FilterChunkRefs(ctx context.Context, tenant string, from
preFilterChunks := len(chunkRefs)
preFilterSeries := len(grouped)

// Do not attempt to filter chunks for which there are no blooms
if bq.cfg.MinTableOffset > 0 {
minAge := truncateDay(model.Now()).Add(-1 * config.ObjectStorageIndexRequiredPeriod * time.Duration(bq.cfg.MinTableOffset-1))
if through.After(minAge) {
level.Debug(logger).Log(
"msg", "skip too recent chunks",
"tenant", tenant,
"from", from.Time(),
"through", through.Time(),
"responses", 0,
"preFilterChunks", preFilterChunks,
"postFilterChunks", preFilterChunks,
"filteredChunks", 0,
"preFilterSeries", preFilterSeries,
"postFilterSeries", preFilterSeries,
"filteredSeries", 0,
)

bq.metrics.chunksTotal.Add(float64(preFilterChunks))
bq.metrics.chunksFiltered.Add(0)
bq.metrics.seriesTotal.Add(float64(preFilterSeries))
bq.metrics.seriesFiltered.Add(0)

return chunkRefs, nil
}
}

responses := make([][]*logproto.GroupedChunkRefs, 0, 2)
// We can perform requests sequentially, because most of the time the request
// only covers a single day, and if not, it's at most two days.
Expand Down Expand Up @@ -153,7 +189,6 @@ func (bq *BloomQuerier) FilterChunkRefs(ctx context.Context, tenant string, from
"preFilterSeries", preFilterSeries,
"postFilterSeries", postFilterSeries,
"filteredSeries", preFilterSeries-postFilterSeries,
"operation", "bloomquerier.FilterChunkRefs",
)

bq.metrics.chunksTotal.Add(float64(preFilterChunks))
Expand Down
9 changes: 5 additions & 4 deletions pkg/bloomgateway/querier_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,12 +61,13 @@ var _ BlockResolver = &mockBlockResolver{}
func TestBloomQuerier(t *testing.T) {
logger := log.NewNopLogger()
limits := newLimits()
cfg := QuerierConfig{}
resolver := &mockBlockResolver{}
tenant := "fake"

t.Run("client not called when filters are empty", func(t *testing.T) {
c := &noopClient{}
bq := NewQuerier(c, limits, resolver, nil, logger)
bq := NewQuerier(c, cfg, limits, resolver, nil, logger)

ctx := context.Background()
through := model.Now()
Expand All @@ -86,7 +87,7 @@ func TestBloomQuerier(t *testing.T) {

t.Run("client not called when chunkRefs are empty", func(t *testing.T) {
c := &noopClient{}
bq := NewQuerier(c, limits, resolver, nil, logger)
bq := NewQuerier(c, cfg, limits, resolver, nil, logger)

ctx := context.Background()
through := model.Now()
Expand All @@ -102,7 +103,7 @@ func TestBloomQuerier(t *testing.T) {

t.Run("querier propagates error from client", func(t *testing.T) {
c := &noopClient{err: errors.New("something went wrong")}
bq := NewQuerier(c, limits, resolver, nil, logger)
bq := NewQuerier(c, cfg, limits, resolver, nil, logger)

ctx := context.Background()
through := model.Now()
Expand All @@ -121,7 +122,7 @@ func TestBloomQuerier(t *testing.T) {

t.Run("client called once for each day of the interval", func(t *testing.T) {
c := &noopClient{}
bq := NewQuerier(c, limits, resolver, nil, logger)
bq := NewQuerier(c, cfg, limits, resolver, nil, logger)

ctx := context.Background()
from := mktime("2024-04-16 22:00")
Expand Down
5 changes: 4 additions & 1 deletion pkg/loki/modules.go
Original file line number Diff line number Diff line change
Expand Up @@ -1445,7 +1445,10 @@ func (t *Loki) initIndexGateway() (services.Service, error) {
return nil, err
}
resolver := bloomgateway.NewBlockResolver(t.BloomStore, logger)
bloomQuerier = bloomgateway.NewQuerier(bloomGatewayClient, t.Overrides, resolver, prometheus.DefaultRegisterer, logger)
querierCfg := bloomgateway.QuerierConfig{
MinTableOffset: t.Cfg.BloomCompactor.MinTableOffset,
}
bloomQuerier = bloomgateway.NewQuerier(bloomGatewayClient, querierCfg, t.Overrides, resolver, prometheus.DefaultRegisterer, logger)
}

gateway, err := indexgateway.NewIndexGateway(t.Cfg.IndexGateway, t.Overrides, logger, prometheus.DefaultRegisterer, t.Store, indexClients, bloomQuerier)
Expand Down

0 comments on commit a1b1eeb

Please sign in to comment.