Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(blooms): compute chunks once #12664

Merged
merged 27 commits into from
Apr 29, 2024
Merged
Show file tree
Hide file tree
Changes from 25 commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
4807f48
remove unnecessary fallback
owen-d Mar 27, 2024
48402d2
[wip] wiring up support to pass store chunks to queriers
owen-d Mar 28, 2024
bb283f1
Merge remote-tracking branch 'upstream/main' into blooms/compute-chun…
owen-d Mar 29, 2024
7a05032
[wip] threading through store overrides for chunkrefs
owen-d Mar 29, 2024
97bc139
multi-tenant querier partitions store overrides by tenant id
owen-d Mar 29, 2024
3858893
metrics & ifc alignment
owen-d Mar 31, 2024
ea68788
Merge remote-tracking branch 'upstream/main' into blooms/compute-chun…
owen-d Apr 17, 2024
46d85f9
remove unused fn
owen-d Apr 17, 2024
db62dd8
send chunks in shards resp
owen-d Apr 17, 2024
ad855e6
type alignment
owen-d Apr 17, 2024
93a9ce4
Merge remote-tracking branch 'upstream/main' into blooms/compute-chun…
owen-d Apr 17, 2024
6ce86b4
type alignment
owen-d Apr 17, 2024
294261d
ShardsResponse.Merge extension
owen-d Apr 17, 2024
d7f2af9
fix unrelated codec test err msg
owen-d Apr 17, 2024
e8f58f5
tidy
owen-d Apr 17, 2024
4639cfd
binding shard to chunk refs
owen-d Apr 18, 2024
a764c11
simplify+pointer for shard chunks
owen-d Apr 18, 2024
3ba9330
fix signature
owen-d Apr 18, 2024
b2990bf
precomputed chunk logging
owen-d Apr 19, 2024
141c4f7
log matchers & always use mutex while accumulating chunks to shards
owen-d Apr 19, 2024
8db855d
more logging
owen-d Apr 19, 2024
8bdb823
better logging for gateway.go
owen-d Apr 22, 2024
56eabcc
independent handling for precomputed chunks vs bloom enablement optio…
owen-d Apr 23, 2024
33c8e82
Merge remote-tracking branch 'upstream/main' into blooms/compute-chun…
owen-d Apr 23, 2024
a3bd99c
make doc
owen-d Apr 23, 2024
82923e7
pr feedback
owen-d Apr 29, 2024
7b2f72e
pr feedback: only dispatch to bloom querier when line filters exist
owen-d Apr 29, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cmd/migrate/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -309,7 +309,7 @@ func (m *chunkMover) moveChunks(ctx context.Context, threadID int, syncRangeCh <
start := time.Now()
var totalBytes uint64
var totalChunks uint64
schemaGroups, fetchers, err := m.source.GetChunks(m.ctx, m.sourceUser, model.TimeFromUnixNano(sr.from), model.TimeFromUnixNano(sr.to), chunk.NewPredicate(m.matchers, nil))
schemaGroups, fetchers, err := m.source.GetChunks(m.ctx, m.sourceUser, model.TimeFromUnixNano(sr.from), model.TimeFromUnixNano(sr.to), chunk.NewPredicate(m.matchers, nil), nil)
if err != nil {
log.Println(threadID, "Error querying index for chunk refs:", err)
errCh <- err
Expand Down
6 changes: 6 additions & 0 deletions docs/sources/shared/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -3139,6 +3139,12 @@ The `limits_config` block configures global and per-tenant limits in Loki. The v
# CLI flag: -limits.tsdb-sharding-strategy
[tsdb_sharding_strategy: <string> | default = "power_of_two"]

# Precompute chunks for TSDB queries. This can improve query performance at the
# cost of increased memory usage by computing chunks once during planning,
# reducing index calls.
# CLI flag: -querier.tsdb-precompute-chunks
[tsdb_precompute_chunks: <boolean> | default = false]

# Cardinality limit for index queries.
# CLI flag: -store.cardinality-limit
[cardinality_limit: <int> | default = 100000]
Expand Down
42 changes: 1 addition & 41 deletions pkg/indexgateway/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,7 @@ import (

"github.com/grafana/loki/v3/pkg/distributor/clientpool"
"github.com/grafana/loki/v3/pkg/logproto"
"github.com/grafana/loki/v3/pkg/logql/syntax"
"github.com/grafana/loki/v3/pkg/storage/stores/series/index"
"github.com/grafana/loki/v3/pkg/storage/stores/shipper/indexshipper/tsdb/sharding"
"github.com/grafana/loki/v3/pkg/util/constants"
"github.com/grafana/loki/v3/pkg/util/discovery"
util_math "github.com/grafana/loki/v3/pkg/util/math"
Expand Down Expand Up @@ -340,8 +338,7 @@ func (s *GatewayClient) GetShards(
if err != nil {
return errors.WithStack(err)
}
perReplicaResult.Shards = append(perReplicaResult.Shards, resp.Shards...)
perReplicaResult.Statistics.Merge(resp.Statistics)
perReplicaResult.Merge(resp)
}

// Since `poolDo` retries on error, we only want to set the response if we got a successful response.
Expand All @@ -355,48 +352,11 @@ func (s *GatewayClient) GetShards(
return errCt <= maxErrs
},
); err != nil {
if isUnimplementedCallError(err) {
Copy link
Contributor

@salvacorts salvacorts Apr 29, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IIUC we are removing this since we already rolled out index-gws that support the get shards method, right?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No (the OSS repo is independent from our internal deployments at Grafana Labs). This is because the new bounded shards implementation is an intentional choice that needs to be turned on (default=power_of_two), so I removed the fallback for simplicity's sake.

return s.getShardsFromStatsFallback(ctx, in)
}
return nil, err
}
return res, nil
}

func (s *GatewayClient) getShardsFromStatsFallback(
ctx context.Context,
in *logproto.ShardsRequest,
) (*logproto.ShardsResponse, error) {
userID, err := tenant.TenantID(ctx)
if err != nil {
return nil, errors.Wrap(err, "index gateway client get tenant ID")
}

p, err := ExtractShardRequestMatchersAndAST(in.Query)
if err != nil {
return nil, errors.Wrap(err, "failure while falling back to stats for shard calculation")

}

stats, err := s.GetStats(
ctx,
&logproto.IndexStatsRequest{
From: in.From,
Through: in.Through,
Matchers: (&syntax.MatchersExpr{Mts: p.Matchers}).String(),
},
)
if err != nil {
return nil, err
}

var strategy sharding.PowerOfTwoSharding
shards := strategy.ShardsFor(stats.Bytes, uint64(s.limits.TSDBMaxBytesPerShard(userID)))
return &logproto.ShardsResponse{
Shards: shards,
}, nil
}

// TODO(owen-d): this was copied from ingester_querier.go -- move it to a shared pkg
// isUnimplementedCallError tells if the GRPC error is a gRPC error with code Unimplemented.
func isUnimplementedCallError(err error) bool {
Expand Down
90 changes: 67 additions & 23 deletions pkg/indexgateway/gateway.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"math"
"sort"
"sync"
"time"

"github.com/c2h5oh/datasize"
"github.com/go-kit/log"
Expand All @@ -31,6 +32,7 @@ import (
seriesindex "github.com/grafana/loki/v3/pkg/storage/stores/series/index"
tsdb_index "github.com/grafana/loki/v3/pkg/storage/stores/shipper/indexshipper/tsdb/index"
"github.com/grafana/loki/v3/pkg/storage/stores/shipper/indexshipper/tsdb/sharding"
util_log "github.com/grafana/loki/v3/pkg/util/log"
"github.com/grafana/loki/v3/pkg/util/spanlogger"
)

Expand Down Expand Up @@ -67,19 +69,21 @@ type Gateway struct {
bloomQuerier BloomQuerier
metrics *Metrics

cfg Config
log log.Logger
cfg Config
limits Limits
log log.Logger
}

// NewIndexGateway instantiates a new Index Gateway and start its services.
//
// In case it is configured to be in ring mode, a Basic Service wrapping the ring client is started.
// Otherwise, it starts an Idle Service that doesn't have lifecycle hooks.
func NewIndexGateway(cfg Config, log log.Logger, r prometheus.Registerer, indexQuerier IndexQuerier, indexClients []IndexClientWithRange, bloomQuerier BloomQuerier) (*Gateway, error) {
func NewIndexGateway(cfg Config, limits Limits, log log.Logger, r prometheus.Registerer, indexQuerier IndexQuerier, indexClients []IndexClientWithRange, bloomQuerier BloomQuerier) (*Gateway, error) {
g := &Gateway{
indexQuerier: indexQuerier,
bloomQuerier: bloomQuerier,
cfg: cfg,
limits: limits,
log: log,
indexClients: indexClients,
metrics: NewMetrics(r),
Expand Down Expand Up @@ -214,7 +218,7 @@ func (g *Gateway) GetChunkRef(ctx context.Context, req *logproto.GetChunkRefRequ
}

predicate := chunk.NewPredicate(matchers, &req.Plan)
chunks, _, err := g.indexQuerier.GetChunks(ctx, instanceID, req.From, req.Through, predicate)
chunks, _, err := g.indexQuerier.GetChunks(ctx, instanceID, req.From, req.Through, predicate, nil)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -368,11 +372,12 @@ func (g *Gateway) GetShards(request *logproto.ShardsRequest, server logproto.Ind
return err
}

// Shards were requested, but blooms are not enabled or cannot be used due to lack of filters.
// That's ok; we can still return shard ranges without filtering
// which will be more effective than guessing power-of-2 shard ranges.
forSeries, ok := g.indexQuerier.HasForSeries(request.From, request.Through)
if g.bloomQuerier == nil || len(syntax.ExtractLineFilters(p.Plan().AST)) == 0 || !ok {
if !ok {
sp.LogKV(
"msg", "index does not support forSeries",
"action", "falling back to indexQuerier.GetShards impl",
)
shards, err := g.indexQuerier.GetShards(
ctx,
instanceID,
Expand All @@ -388,11 +393,11 @@ func (g *Gateway) GetShards(request *logproto.ShardsRequest, server logproto.Ind
return server.Send(shards)
}

return g.getShardsWithBlooms(ctx, request, server, instanceID, p, forSeries)
return g.boundedShards(ctx, request, server, instanceID, p, forSeries)
}

// getShardsWithBlooms is a helper function to get shards with blooms enabled.
func (g *Gateway) getShardsWithBlooms(
// boundedShards handles bounded shard requests, optionally using blooms and/or returning precomputed chunks.
func (g *Gateway) boundedShards(
ctx context.Context,
req *logproto.ShardsRequest,
server logproto.IndexGateway_GetShardsServer,
Expand All @@ -412,12 +417,12 @@ func (g *Gateway) getShardsWithBlooms(
// as getting it _very_ wrong could harm some cache locality benefits on the bloom-gws by
// sending multiple requests to the entire keyspace).

logger := log.With(g.log, "tenant", instanceID)
logger := util_log.WithContext(ctx, g.log)
sp, ctx := opentracing.StartSpanFromContext(ctx, "indexgateway.getShardsWithBlooms")
defer sp.Finish()

// 1) for all bounds, get chunk refs
grps, _, err := g.indexQuerier.GetChunks(ctx, instanceID, req.From, req.Through, p)
grps, _, err := g.indexQuerier.GetChunks(ctx, instanceID, req.From, req.Through, p, nil)
if err != nil {
return err
}
Expand All @@ -435,11 +440,16 @@ func (g *Gateway) getShardsWithBlooms(
}
}

// 2) filter via blooms
filtered, err := g.bloomQuerier.FilterChunkRefs(ctx, instanceID, req.From, req.Through, refs, p.Plan())
if err != nil {
return err
filtered := refs

// 2) filter via blooms if enabled
if g.bloomQuerier != nil {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should check here if the AST has any filtering expression:

if g.bloomQuerier != nil && syntax.ExtractLineFilters(p.Plan().AST)) != 0 {

filtered, err = g.bloomQuerier.FilterChunkRefs(ctx, instanceID, req.From, req.Through, refs, p.Plan())
if err != nil {
return err
}
}

g.metrics.preFilterChunks.WithLabelValues(routeShards).Observe(float64(ct))
g.metrics.postFilterChunks.WithLabelValues(routeShards).Observe(float64(len(filtered)))

Expand All @@ -462,23 +472,42 @@ func (g *Gateway) getShardsWithBlooms(
Stats: &logproto.IndexStatsResponse{},
},
}

} else {
shards, err := accumulateChunksToShards(ctx, instanceID, forSeries, req, p, filtered)
shards, chunkGrps, err := accumulateChunksToShards(ctx, instanceID, forSeries, req, p, filtered)
if err != nil {
return err
}
resp.Shards = shards

// If the index gateway is configured to precompute chunks, we can return the chunk groups
// alongside the shards, otherwise discarding them
if g.limits.TSDBPrecomputeChunks(instanceID) {
resp.ChunkGroups = chunkGrps
}
}

sp.LogKV("msg", "send shards response", "shards", len(resp.Shards))

var refCt int
for _, grp := range resp.ChunkGroups {
refCt += len(grp.Refs)
}

ms := syntax.MatchersExpr{Mts: p.Matchers}
level.Debug(logger).Log(
"msg", "send shards response",
"total_chunks", statistics.Index.TotalChunks,
"post_filter_chunks", statistics.Index.PostFilterChunks,
"shards", len(resp.Shards),
"query", req.Query,
"target_bytes_per_shard", datasize.ByteSize(req.TargetBytesPerShard).HumanReadable(),
"precomputed_refs", refCt,
"matchers", ms.String(),
"from", req.From.Time().String(),
"through", req.Through.Time().String(),
"length", req.Through.Time().Sub(req.From.Time()).String(),
"end_delta", time.Since(req.Through.Time()).String(),
)

// 3) build shards
Expand Down Expand Up @@ -525,7 +554,7 @@ func accumulateChunksToShards(
req *logproto.ShardsRequest,
p chunk.Predicate,
filtered []*logproto.ChunkRef,
) ([]logproto.Shard, error) {
) ([]logproto.Shard, []logproto.ChunkRefGroup, error) {
// map for looking up post-filtered chunks in O(n) while iterating the index again for sizing info
filteredM := make(map[model.Fingerprint][]refWithSizingInfo, 1024)
for _, ref := range filtered {
Expand All @@ -541,12 +570,13 @@ func accumulateChunksToShards(
v1.NewBounds(filtered[0].FingerprintModel(), filtered[len(filtered)-1].FingerprintModel()),
req.From, req.Through,
func(l labels.Labels, fp model.Fingerprint, chks []tsdb_index.ChunkMeta) (stop bool) {
mtx.Lock()
defer mtx.Unlock()

// check if this is a fingerprint we need
if _, ok := filteredM[fp]; !ok {
return false
}
mtx.Lock()
defer mtx.Unlock()

filteredChks := filteredM[fp]
var j int
Expand Down Expand Up @@ -579,7 +609,7 @@ func accumulateChunksToShards(
},
p.Matchers...,
); err != nil {
return nil, err
return nil, nil, err
}

collectedSeries := sharding.SizedFPs(sharding.SizedFPsPool.Get(len(filteredM)))
Expand All @@ -597,7 +627,21 @@ func accumulateChunksToShards(
}
sort.Sort(collectedSeries)

return collectedSeries.ShardsFor(req.TargetBytesPerShard), nil
shards := collectedSeries.ShardsFor(req.TargetBytesPerShard)
chkGrps := make([]logproto.ChunkRefGroup, 0, len(shards))
for _, s := range shards {
from := sort.Search(len(filtered), func(i int) bool {
return filtered[i].Fingerprint >= uint64(s.Bounds.Min)
})
through := sort.Search(len(filtered), func(i int) bool {
return filtered[i].Fingerprint > uint64(s.Bounds.Max)
})
chkGrps = append(chkGrps, logproto.ChunkRefGroup{
Refs: filtered[from:through],
})
}

return shards, chkGrps, nil
}

type refWithSizingInfo struct {
Expand Down
27 changes: 24 additions & 3 deletions pkg/indexgateway/gateway_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,18 @@ const (
valuePrefix = "value"
)

type mockLimits struct{}

func (mockLimits) IndexGatewayShardSize(_ string) int {
return 0
}
func (mockLimits) TSDBMaxBytesPerShard(_ string) int {
return sharding.DefaultTSDBMaxBytesPerShard
}
func (mockLimits) TSDBPrecomputeChunks(_ string) bool {
return false
}

type mockBatch struct {
size int
}
Expand Down Expand Up @@ -233,7 +245,7 @@ func TestGateway_QueryIndex_multistore(t *testing.T) {
},
},
}}
gateway, err := NewIndexGateway(Config{}, util_log.Logger, nil, nil, indexClients, nil)
gateway, err := NewIndexGateway(Config{}, mockLimits{}, util_log.Logger, nil, nil, indexClients, nil)
require.NoError(t, err)

expectedQueries = append(expectedQueries,
Expand All @@ -258,7 +270,7 @@ func TestVolume(t *testing.T) {
{Name: "bar", Volume: 38},
}}, nil)

gateway, err := NewIndexGateway(Config{}, util_log.Logger, nil, indexQuerier, nil, nil)
gateway, err := NewIndexGateway(Config{}, mockLimits{}, util_log.Logger, nil, indexQuerier, nil, nil)
require.NoError(t, err)

ctx := user.InjectOrgID(context.Background(), "test")
Expand Down Expand Up @@ -532,7 +544,7 @@ func TestAccumulateChunksToShards(t *testing.T) {
},
}

shards, err := accumulateChunksToShards(
shards, grps, err := accumulateChunksToShards(
context.Background(),
"",
fsImpl(series),
Expand All @@ -543,6 +555,12 @@ func TestAccumulateChunksToShards(t *testing.T) {
filtered,
)

expectedChks := [][]*logproto.ChunkRef{
filtered[0:3],
filtered[3:6],
filtered[6:9],
filtered[9:10],
}
exp := []logproto.Shard{
{
Bounds: logproto.FPBounds{Min: 0, Max: 1},
Expand Down Expand Up @@ -586,6 +604,9 @@ func TestAccumulateChunksToShards(t *testing.T) {

for i := range shards {
require.Equal(t, exp[i], shards[i], "invalid shard at index %d", i)
for j := range grps[i].Refs {
require.Equal(t, expectedChks[i][j], grps[i].Refs[j], "invalid chunk in grp %d at index %d", i, j)
}
}
require.Equal(t, len(exp), len(shards))

Expand Down
1 change: 1 addition & 0 deletions pkg/indexgateway/shufflesharding.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ var (
type Limits interface {
IndexGatewayShardSize(tenantID string) int
TSDBMaxBytesPerShard(string) int
TSDBPrecomputeChunks(string) bool
}

type ShardingStrategy interface {
Expand Down
2 changes: 1 addition & 1 deletion pkg/ingester/flush_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -361,7 +361,7 @@ func (s *testStore) SelectSeries(_ context.Context, _ logql.SelectLogParams) ([]
return nil, nil
}

func (s *testStore) GetChunks(_ context.Context, _ string, _, _ model.Time, _ chunk.Predicate) ([][]chunk.Chunk, []*fetcher.Fetcher, error) {
func (s *testStore) GetChunks(_ context.Context, _ string, _, _ model.Time, _ chunk.Predicate, _ *logproto.ChunkRefGroup) ([][]chunk.Chunk, []*fetcher.Fetcher, error) {
return nil, nil, nil
}

Expand Down
Loading
Loading