Skip to content

Commit

Permalink
feat(blooms): compute chunks once (#12664)
Browse files Browse the repository at this point in the history
Signed-off-by: Owen Diehl <ow.diehl@gmail.com>
  • Loading branch information
owen-d committed Apr 29, 2024
1 parent c3a3bc3 commit bc78d13
Show file tree
Hide file tree
Showing 50 changed files with 1,754 additions and 800 deletions.
2 changes: 1 addition & 1 deletion cmd/migrate/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -309,7 +309,7 @@ func (m *chunkMover) moveChunks(ctx context.Context, threadID int, syncRangeCh <
start := time.Now()
var totalBytes uint64
var totalChunks uint64
schemaGroups, fetchers, err := m.source.GetChunks(m.ctx, m.sourceUser, model.TimeFromUnixNano(sr.from), model.TimeFromUnixNano(sr.to), chunk.NewPredicate(m.matchers, nil))
schemaGroups, fetchers, err := m.source.GetChunks(m.ctx, m.sourceUser, model.TimeFromUnixNano(sr.from), model.TimeFromUnixNano(sr.to), chunk.NewPredicate(m.matchers, nil), nil)
if err != nil {
log.Println(threadID, "Error querying index for chunk refs:", err)
errCh <- err
Expand Down
6 changes: 6 additions & 0 deletions docs/sources/shared/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -3003,6 +3003,12 @@ The `limits_config` block configures global and per-tenant limits in Loki. The v
# CLI flag: -limits.tsdb-sharding-strategy
[tsdb_sharding_strategy: <string> | default = "power_of_two"]

# Precompute chunks for TSDB queries. This can improve query performance at the
# cost of increased memory usage by computing chunks once during planning,
# reducing index calls.
# CLI flag: -querier.tsdb-precompute-chunks
[tsdb_precompute_chunks: <boolean> | default = false]

# Cardinality limit for index queries.
# CLI flag: -store.cardinality-limit
[cardinality_limit: <int> | default = 100000]
Expand Down
42 changes: 1 addition & 41 deletions pkg/indexgateway/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,7 @@ import (

"github.com/grafana/loki/v3/pkg/distributor/clientpool"
"github.com/grafana/loki/v3/pkg/logproto"
"github.com/grafana/loki/v3/pkg/logql/syntax"
"github.com/grafana/loki/v3/pkg/storage/stores/series/index"
"github.com/grafana/loki/v3/pkg/storage/stores/shipper/indexshipper/tsdb/sharding"
"github.com/grafana/loki/v3/pkg/util/constants"
"github.com/grafana/loki/v3/pkg/util/discovery"
util_math "github.com/grafana/loki/v3/pkg/util/math"
Expand Down Expand Up @@ -340,8 +338,7 @@ func (s *GatewayClient) GetShards(
if err != nil {
return errors.WithStack(err)
}
perReplicaResult.Shards = append(perReplicaResult.Shards, resp.Shards...)
perReplicaResult.Statistics.Merge(resp.Statistics)
perReplicaResult.Merge(resp)
}

// Since `poolDo` retries on error, we only want to set the response if we got a successful response.
Expand All @@ -355,48 +352,11 @@ func (s *GatewayClient) GetShards(
return errCt <= maxErrs
},
); err != nil {
if isUnimplementedCallError(err) {
return s.getShardsFromStatsFallback(ctx, in)
}
return nil, err
}
return res, nil
}

func (s *GatewayClient) getShardsFromStatsFallback(
ctx context.Context,
in *logproto.ShardsRequest,
) (*logproto.ShardsResponse, error) {
userID, err := tenant.TenantID(ctx)
if err != nil {
return nil, errors.Wrap(err, "index gateway client get tenant ID")
}

p, err := ExtractShardRequestMatchersAndAST(in.Query)
if err != nil {
return nil, errors.Wrap(err, "failure while falling back to stats for shard calculation")

}

stats, err := s.GetStats(
ctx,
&logproto.IndexStatsRequest{
From: in.From,
Through: in.Through,
Matchers: (&syntax.MatchersExpr{Mts: p.Matchers}).String(),
},
)
if err != nil {
return nil, err
}

var strategy sharding.PowerOfTwoSharding
shards := strategy.ShardsFor(stats.Bytes, uint64(s.limits.TSDBMaxBytesPerShard(userID)))
return &logproto.ShardsResponse{
Shards: shards,
}, nil
}

// TODO(owen-d): this was copied from ingester_querier.go -- move it to a shared pkg
// isUnimplementedCallError tells if the GRPC error is a gRPC error with code Unimplemented.
func isUnimplementedCallError(err error) bool {
Expand Down
90 changes: 67 additions & 23 deletions pkg/indexgateway/gateway.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"math"
"sort"
"sync"
"time"

"github.com/c2h5oh/datasize"
"github.com/go-kit/log"
Expand All @@ -31,6 +32,7 @@ import (
seriesindex "github.com/grafana/loki/v3/pkg/storage/stores/series/index"
tsdb_index "github.com/grafana/loki/v3/pkg/storage/stores/shipper/indexshipper/tsdb/index"
"github.com/grafana/loki/v3/pkg/storage/stores/shipper/indexshipper/tsdb/sharding"
util_log "github.com/grafana/loki/v3/pkg/util/log"
"github.com/grafana/loki/v3/pkg/util/spanlogger"
)

Expand Down Expand Up @@ -67,19 +69,21 @@ type Gateway struct {
bloomQuerier BloomQuerier
metrics *Metrics

cfg Config
log log.Logger
cfg Config
limits Limits
log log.Logger
}

// NewIndexGateway instantiates a new Index Gateway and start its services.
//
// In case it is configured to be in ring mode, a Basic Service wrapping the ring client is started.
// Otherwise, it starts an Idle Service that doesn't have lifecycle hooks.
func NewIndexGateway(cfg Config, log log.Logger, r prometheus.Registerer, indexQuerier IndexQuerier, indexClients []IndexClientWithRange, bloomQuerier BloomQuerier) (*Gateway, error) {
func NewIndexGateway(cfg Config, limits Limits, log log.Logger, r prometheus.Registerer, indexQuerier IndexQuerier, indexClients []IndexClientWithRange, bloomQuerier BloomQuerier) (*Gateway, error) {
g := &Gateway{
indexQuerier: indexQuerier,
bloomQuerier: bloomQuerier,
cfg: cfg,
limits: limits,
log: log,
indexClients: indexClients,
metrics: NewMetrics(r),
Expand Down Expand Up @@ -214,7 +218,7 @@ func (g *Gateway) GetChunkRef(ctx context.Context, req *logproto.GetChunkRefRequ
}

predicate := chunk.NewPredicate(matchers, &req.Plan)
chunks, _, err := g.indexQuerier.GetChunks(ctx, instanceID, req.From, req.Through, predicate)
chunks, _, err := g.indexQuerier.GetChunks(ctx, instanceID, req.From, req.Through, predicate, nil)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -368,11 +372,12 @@ func (g *Gateway) GetShards(request *logproto.ShardsRequest, server logproto.Ind
return err
}

// Shards were requested, but blooms are not enabled or cannot be used due to lack of filters.
// That's ok; we can still return shard ranges without filtering
// which will be more effective than guessing power-of-2 shard ranges.
forSeries, ok := g.indexQuerier.HasForSeries(request.From, request.Through)
if g.bloomQuerier == nil || len(syntax.ExtractLineFilters(p.Plan().AST)) == 0 || !ok {
if !ok {
sp.LogKV(
"msg", "index does not support forSeries",
"action", "falling back to indexQuerier.GetShards impl",
)
shards, err := g.indexQuerier.GetShards(
ctx,
instanceID,
Expand All @@ -388,11 +393,11 @@ func (g *Gateway) GetShards(request *logproto.ShardsRequest, server logproto.Ind
return server.Send(shards)
}

return g.getShardsWithBlooms(ctx, request, server, instanceID, p, forSeries)
return g.boundedShards(ctx, request, server, instanceID, p, forSeries)
}

// getShardsWithBlooms is a helper function to get shards with blooms enabled.
func (g *Gateway) getShardsWithBlooms(
// boundedShards handles bounded shard requests, optionally using blooms and/or returning precomputed chunks.
func (g *Gateway) boundedShards(
ctx context.Context,
req *logproto.ShardsRequest,
server logproto.IndexGateway_GetShardsServer,
Expand All @@ -412,12 +417,12 @@ func (g *Gateway) getShardsWithBlooms(
// as getting it _very_ wrong could harm some cache locality benefits on the bloom-gws by
// sending multiple requests to the entire keyspace).

logger := log.With(g.log, "tenant", instanceID)
logger := util_log.WithContext(ctx, g.log)
sp, ctx := opentracing.StartSpanFromContext(ctx, "indexgateway.getShardsWithBlooms")
defer sp.Finish()

// 1) for all bounds, get chunk refs
grps, _, err := g.indexQuerier.GetChunks(ctx, instanceID, req.From, req.Through, p)
grps, _, err := g.indexQuerier.GetChunks(ctx, instanceID, req.From, req.Through, p, nil)
if err != nil {
return err
}
Expand All @@ -435,11 +440,16 @@ func (g *Gateway) getShardsWithBlooms(
}
}

// 2) filter via blooms
filtered, err := g.bloomQuerier.FilterChunkRefs(ctx, instanceID, req.From, req.Through, refs, p.Plan())
if err != nil {
return err
filtered := refs

// 2) filter via blooms if enabled
if g.bloomQuerier != nil && len(syntax.ExtractLineFilters(p.Plan().AST)) > 0 {
filtered, err = g.bloomQuerier.FilterChunkRefs(ctx, instanceID, req.From, req.Through, refs, p.Plan())
if err != nil {
return err
}
}

g.metrics.preFilterChunks.WithLabelValues(routeShards).Observe(float64(ct))
g.metrics.postFilterChunks.WithLabelValues(routeShards).Observe(float64(len(filtered)))

Expand All @@ -462,23 +472,42 @@ func (g *Gateway) getShardsWithBlooms(
Stats: &logproto.IndexStatsResponse{},
},
}

} else {
shards, err := accumulateChunksToShards(ctx, instanceID, forSeries, req, p, filtered)
shards, chunkGrps, err := accumulateChunksToShards(ctx, instanceID, forSeries, req, p, filtered)
if err != nil {
return err
}
resp.Shards = shards

// If the index gateway is configured to precompute chunks, we can return the chunk groups
// alongside the shards, otherwise discarding them
if g.limits.TSDBPrecomputeChunks(instanceID) {
resp.ChunkGroups = chunkGrps
}
}

sp.LogKV("msg", "send shards response", "shards", len(resp.Shards))

var refCt int
for _, grp := range resp.ChunkGroups {
refCt += len(grp.Refs)
}

ms := syntax.MatchersExpr{Mts: p.Matchers}
level.Debug(logger).Log(
"msg", "send shards response",
"total_chunks", statistics.Index.TotalChunks,
"post_filter_chunks", statistics.Index.PostFilterChunks,
"shards", len(resp.Shards),
"query", req.Query,
"target_bytes_per_shard", datasize.ByteSize(req.TargetBytesPerShard).HumanReadable(),
"precomputed_refs", refCt,
"matchers", ms.String(),
"from", req.From.Time().String(),
"through", req.Through.Time().String(),
"length", req.Through.Time().Sub(req.From.Time()).String(),
"end_delta", time.Since(req.Through.Time()).String(),
)

// 3) build shards
Expand Down Expand Up @@ -525,7 +554,7 @@ func accumulateChunksToShards(
req *logproto.ShardsRequest,
p chunk.Predicate,
filtered []*logproto.ChunkRef,
) ([]logproto.Shard, error) {
) ([]logproto.Shard, []logproto.ChunkRefGroup, error) {
// map for looking up post-filtered chunks in O(n) while iterating the index again for sizing info
filteredM := make(map[model.Fingerprint][]refWithSizingInfo, 1024)
for _, ref := range filtered {
Expand All @@ -541,12 +570,13 @@ func accumulateChunksToShards(
v1.NewBounds(filtered[0].FingerprintModel(), filtered[len(filtered)-1].FingerprintModel()),
req.From, req.Through,
func(l labels.Labels, fp model.Fingerprint, chks []tsdb_index.ChunkMeta) (stop bool) {
mtx.Lock()
defer mtx.Unlock()

// check if this is a fingerprint we need
if _, ok := filteredM[fp]; !ok {
return false
}
mtx.Lock()
defer mtx.Unlock()

filteredChks := filteredM[fp]
var j int
Expand Down Expand Up @@ -579,7 +609,7 @@ func accumulateChunksToShards(
},
p.Matchers...,
); err != nil {
return nil, err
return nil, nil, err
}

collectedSeries := sharding.SizedFPs(sharding.SizedFPsPool.Get(len(filteredM)))
Expand All @@ -597,7 +627,21 @@ func accumulateChunksToShards(
}
sort.Sort(collectedSeries)

return collectedSeries.ShardsFor(req.TargetBytesPerShard), nil
shards := collectedSeries.ShardsFor(req.TargetBytesPerShard)
chkGrps := make([]logproto.ChunkRefGroup, 0, len(shards))
for _, s := range shards {
from := sort.Search(len(filtered), func(i int) bool {
return filtered[i].Fingerprint >= uint64(s.Bounds.Min)
})
through := sort.Search(len(filtered), func(i int) bool {
return filtered[i].Fingerprint > uint64(s.Bounds.Max)
})
chkGrps = append(chkGrps, logproto.ChunkRefGroup{
Refs: filtered[from:through],
})
}

return shards, chkGrps, nil
}

type refWithSizingInfo struct {
Expand Down
27 changes: 24 additions & 3 deletions pkg/indexgateway/gateway_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,18 @@ const (
valuePrefix = "value"
)

type mockLimits struct{}

func (mockLimits) IndexGatewayShardSize(_ string) int {
return 0
}
func (mockLimits) TSDBMaxBytesPerShard(_ string) int {
return sharding.DefaultTSDBMaxBytesPerShard
}
func (mockLimits) TSDBPrecomputeChunks(_ string) bool {
return false
}

type mockBatch struct {
size int
}
Expand Down Expand Up @@ -233,7 +245,7 @@ func TestGateway_QueryIndex_multistore(t *testing.T) {
},
},
}}
gateway, err := NewIndexGateway(Config{}, util_log.Logger, nil, nil, indexClients, nil)
gateway, err := NewIndexGateway(Config{}, mockLimits{}, util_log.Logger, nil, nil, indexClients, nil)
require.NoError(t, err)

expectedQueries = append(expectedQueries,
Expand All @@ -258,7 +270,7 @@ func TestVolume(t *testing.T) {
{Name: "bar", Volume: 38},
}}, nil)

gateway, err := NewIndexGateway(Config{}, util_log.Logger, nil, indexQuerier, nil, nil)
gateway, err := NewIndexGateway(Config{}, mockLimits{}, util_log.Logger, nil, indexQuerier, nil, nil)
require.NoError(t, err)

ctx := user.InjectOrgID(context.Background(), "test")
Expand Down Expand Up @@ -532,7 +544,7 @@ func TestAccumulateChunksToShards(t *testing.T) {
},
}

shards, err := accumulateChunksToShards(
shards, grps, err := accumulateChunksToShards(
context.Background(),
"",
fsImpl(series),
Expand All @@ -543,6 +555,12 @@ func TestAccumulateChunksToShards(t *testing.T) {
filtered,
)

expectedChks := [][]*logproto.ChunkRef{
filtered[0:3],
filtered[3:6],
filtered[6:9],
filtered[9:10],
}
exp := []logproto.Shard{
{
Bounds: logproto.FPBounds{Min: 0, Max: 1},
Expand Down Expand Up @@ -586,6 +604,9 @@ func TestAccumulateChunksToShards(t *testing.T) {

for i := range shards {
require.Equal(t, exp[i], shards[i], "invalid shard at index %d", i)
for j := range grps[i].Refs {
require.Equal(t, expectedChks[i][j], grps[i].Refs[j], "invalid chunk in grp %d at index %d", i, j)
}
}
require.Equal(t, len(exp), len(shards))

Expand Down
1 change: 1 addition & 0 deletions pkg/indexgateway/shufflesharding.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ var (
type Limits interface {
IndexGatewayShardSize(tenantID string) int
TSDBMaxBytesPerShard(string) int
TSDBPrecomputeChunks(string) bool
}

type ShardingStrategy interface {
Expand Down
2 changes: 1 addition & 1 deletion pkg/ingester/flush_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -361,7 +361,7 @@ func (s *testStore) SelectSeries(_ context.Context, _ logql.SelectLogParams) ([]
return nil, nil
}

func (s *testStore) GetChunks(_ context.Context, _ string, _, _ model.Time, _ chunk.Predicate) ([][]chunk.Chunk, []*fetcher.Fetcher, error) {
func (s *testStore) GetChunks(_ context.Context, _ string, _, _ model.Time, _ chunk.Predicate, _ *logproto.ChunkRefGroup) ([][]chunk.Chunk, []*fetcher.Fetcher, error) {
return nil, nil, nil
}

Expand Down

0 comments on commit bc78d13

Please sign in to comment.