Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(blooms): compute chunks once #12664

Merged
merged 27 commits into from
Apr 29, 2024
Merged
Show file tree
Hide file tree
Changes from 18 commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
4807f48
remove unnecessary fallback
owen-d Mar 27, 2024
48402d2
[wip] wiring up support to pass store chunks to queriers
owen-d Mar 28, 2024
bb283f1
Merge remote-tracking branch 'upstream/main' into blooms/compute-chun…
owen-d Mar 29, 2024
7a05032
[wip] threading through store overrides for chunkrefs
owen-d Mar 29, 2024
97bc139
multi-tenant querier partitions store overrides by tenant id
owen-d Mar 29, 2024
3858893
metrics & ifc alignment
owen-d Mar 31, 2024
ea68788
Merge remote-tracking branch 'upstream/main' into blooms/compute-chun…
owen-d Apr 17, 2024
46d85f9
remove unused fn
owen-d Apr 17, 2024
db62dd8
send chunks in shards resp
owen-d Apr 17, 2024
ad855e6
type alignment
owen-d Apr 17, 2024
93a9ce4
Merge remote-tracking branch 'upstream/main' into blooms/compute-chun…
owen-d Apr 17, 2024
6ce86b4
type alignment
owen-d Apr 17, 2024
294261d
ShardsResponse.Merge extension
owen-d Apr 17, 2024
d7f2af9
fix unrelated codec test err msg
owen-d Apr 17, 2024
e8f58f5
tidy
owen-d Apr 17, 2024
4639cfd
binding shard to chunk refs
owen-d Apr 18, 2024
a764c11
simplify+pointer for shard chunks
owen-d Apr 18, 2024
3ba9330
fix signature
owen-d Apr 18, 2024
b2990bf
precomputed chunk logging
owen-d Apr 19, 2024
141c4f7
log matchers & always use mutex while accumulating chunks to shards
owen-d Apr 19, 2024
8db855d
more logging
owen-d Apr 19, 2024
8bdb823
better logging for gateway.go
owen-d Apr 22, 2024
56eabcc
independent handling for precomputed chunks vs bloom enablement optio…
owen-d Apr 23, 2024
33c8e82
Merge remote-tracking branch 'upstream/main' into blooms/compute-chun…
owen-d Apr 23, 2024
a3bd99c
make doc
owen-d Apr 23, 2024
82923e7
pr feedback
owen-d Apr 29, 2024
7b2f72e
pr feedback: only dispatch to bloom querier when line filters exist
owen-d Apr 29, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cmd/migrate/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -309,7 +309,7 @@ func (m *chunkMover) moveChunks(ctx context.Context, threadID int, syncRangeCh <
start := time.Now()
var totalBytes uint64
var totalChunks uint64
schemaGroups, fetchers, err := m.source.GetChunks(m.ctx, m.sourceUser, model.TimeFromUnixNano(sr.from), model.TimeFromUnixNano(sr.to), chunk.NewPredicate(m.matchers, nil))
schemaGroups, fetchers, err := m.source.GetChunks(m.ctx, m.sourceUser, model.TimeFromUnixNano(sr.from), model.TimeFromUnixNano(sr.to), chunk.NewPredicate(m.matchers, nil), nil)
if err != nil {
log.Println(threadID, "Error querying index for chunk refs:", err)
errCh <- err
Expand Down
42 changes: 1 addition & 41 deletions pkg/indexgateway/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,7 @@ import (

"github.com/grafana/loki/v3/pkg/distributor/clientpool"
"github.com/grafana/loki/v3/pkg/logproto"
"github.com/grafana/loki/v3/pkg/logql/syntax"
"github.com/grafana/loki/v3/pkg/storage/stores/series/index"
"github.com/grafana/loki/v3/pkg/storage/stores/shipper/indexshipper/tsdb/sharding"
"github.com/grafana/loki/v3/pkg/util/constants"
"github.com/grafana/loki/v3/pkg/util/discovery"
util_math "github.com/grafana/loki/v3/pkg/util/math"
Expand Down Expand Up @@ -340,8 +338,7 @@ func (s *GatewayClient) GetShards(
if err != nil {
return errors.WithStack(err)
}
perReplicaResult.Shards = append(perReplicaResult.Shards, resp.Shards...)
perReplicaResult.Statistics.Merge(resp.Statistics)
perReplicaResult.Merge(resp)
}

// Since `poolDo` retries on error, we only want to set the response if we got a successful response.
Expand All @@ -355,48 +352,11 @@ func (s *GatewayClient) GetShards(
return errCt <= maxErrs
},
); err != nil {
if isUnimplementedCallError(err) {
Copy link
Contributor

@salvacorts salvacorts Apr 29, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IIUC we are removing this since we already rolled out index-gws that support the get shards method, right?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No (the OSS repo is independent from our internal deployments at Grafana Labs). This is because the new bounded shards implementation is an intentional choice that needs to be turned on (default=power_of_two), so I removed the fallback for simplicity's sake.

return s.getShardsFromStatsFallback(ctx, in)
}
return nil, err
}
return res, nil
}

func (s *GatewayClient) getShardsFromStatsFallback(
ctx context.Context,
in *logproto.ShardsRequest,
) (*logproto.ShardsResponse, error) {
userID, err := tenant.TenantID(ctx)
if err != nil {
return nil, errors.Wrap(err, "index gateway client get tenant ID")
}

p, err := ExtractShardRequestMatchersAndAST(in.Query)
if err != nil {
return nil, errors.Wrap(err, "failure while falling back to stats for shard calculation")

}

stats, err := s.GetStats(
ctx,
&logproto.IndexStatsRequest{
From: in.From,
Through: in.Through,
Matchers: (&syntax.MatchersExpr{Mts: p.Matchers}).String(),
},
)
if err != nil {
return nil, err
}

var strategy sharding.PowerOfTwoSharding
shards := strategy.ShardsFor(stats.Bytes, uint64(s.limits.TSDBMaxBytesPerShard(userID)))
return &logproto.ShardsResponse{
Shards: shards,
}, nil
}

// TODO(owen-d): this was copied from ingester_querier.go -- move it to a shared pkg
// isUnimplementedCallError tells if the GRPC error is a gRPC error with code Unimplemented.
func isUnimplementedCallError(err error) bool {
Expand Down
27 changes: 21 additions & 6 deletions pkg/indexgateway/gateway.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,7 @@ func (g *Gateway) GetChunkRef(ctx context.Context, req *logproto.GetChunkRefRequ
}

predicate := chunk.NewPredicate(matchers, &req.Plan)
chunks, _, err := g.indexQuerier.GetChunks(ctx, instanceID, req.From, req.Through, predicate)
chunks, _, err := g.indexQuerier.GetChunks(ctx, instanceID, req.From, req.Through, predicate, nil)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -417,7 +417,7 @@ func (g *Gateway) getShardsWithBlooms(
defer sp.Finish()

// 1) for all bounds, get chunk refs
grps, _, err := g.indexQuerier.GetChunks(ctx, instanceID, req.From, req.Through, p)
grps, _, err := g.indexQuerier.GetChunks(ctx, instanceID, req.From, req.Through, p, nil)
if err != nil {
return err
}
Expand Down Expand Up @@ -463,11 +463,12 @@ func (g *Gateway) getShardsWithBlooms(
},
}
} else {
shards, err := accumulateChunksToShards(ctx, instanceID, forSeries, req, p, filtered)
shards, chunkGrps, err := accumulateChunksToShards(ctx, instanceID, forSeries, req, p, filtered)
if err != nil {
return err
}
resp.Shards = shards
resp.ChunkGroups = chunkGrps
}

sp.LogKV("msg", "send shards response", "shards", len(resp.Shards))
Expand Down Expand Up @@ -525,7 +526,7 @@ func accumulateChunksToShards(
req *logproto.ShardsRequest,
p chunk.Predicate,
filtered []*logproto.ChunkRef,
) ([]logproto.Shard, error) {
) ([]logproto.Shard, []logproto.ChunkRefGroup, error) {
// map for looking up post-filtered chunks in O(n) while iterating the index again for sizing info
filteredM := make(map[model.Fingerprint][]refWithSizingInfo, 1024)
for _, ref := range filtered {
Expand Down Expand Up @@ -579,7 +580,7 @@ func accumulateChunksToShards(
},
p.Matchers...,
); err != nil {
return nil, err
return nil, nil, err
}

collectedSeries := sharding.SizedFPs(sharding.SizedFPsPool.Get(len(filteredM)))
Expand All @@ -597,7 +598,21 @@ func accumulateChunksToShards(
}
sort.Sort(collectedSeries)

return collectedSeries.ShardsFor(req.TargetBytesPerShard), nil
shards := collectedSeries.ShardsFor(req.TargetBytesPerShard)
chkGrps := make([]logproto.ChunkRefGroup, 0, len(shards))
for _, s := range shards {
from := sort.Search(len(filtered), func(i int) bool {
return filtered[i].Fingerprint >= uint64(s.Bounds.Min)
})
through := sort.Search(len(filtered), func(i int) bool {
return filtered[i].Fingerprint > uint64(s.Bounds.Max)
})
chkGrps = append(chkGrps, logproto.ChunkRefGroup{
Refs: filtered[from:through],
})
}

return shards, chkGrps, nil
}

type refWithSizingInfo struct {
Expand Down
11 changes: 10 additions & 1 deletion pkg/indexgateway/gateway_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -532,7 +532,7 @@ func TestAccumulateChunksToShards(t *testing.T) {
},
}

shards, err := accumulateChunksToShards(
shards, grps, err := accumulateChunksToShards(
context.Background(),
"",
fsImpl(series),
Expand All @@ -543,6 +543,12 @@ func TestAccumulateChunksToShards(t *testing.T) {
filtered,
)

expectedChks := [][]*logproto.ChunkRef{
filtered[0:3],
filtered[3:6],
filtered[6:9],
filtered[9:10],
}
exp := []logproto.Shard{
{
Bounds: logproto.FPBounds{Min: 0, Max: 1},
Expand Down Expand Up @@ -586,6 +592,9 @@ func TestAccumulateChunksToShards(t *testing.T) {

for i := range shards {
require.Equal(t, exp[i], shards[i], "invalid shard at index %d", i)
for j := range grps[i].Refs {
require.Equal(t, expectedChks[i][j], grps[i].Refs[j], "invalid chunk in grp %d at index %d", i, j)
}
}
require.Equal(t, len(exp), len(shards))

Expand Down
2 changes: 1 addition & 1 deletion pkg/ingester/flush_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -361,7 +361,7 @@ func (s *testStore) SelectSeries(_ context.Context, _ logql.SelectLogParams) ([]
return nil, nil
}

func (s *testStore) GetChunks(_ context.Context, _ string, _, _ model.Time, _ chunk.Predicate) ([][]chunk.Chunk, []*fetcher.Fetcher, error) {
func (s *testStore) GetChunks(_ context.Context, _ string, _, _ model.Time, _ chunk.Predicate, _ *logproto.ChunkRefGroup) ([][]chunk.Chunk, []*fetcher.Fetcher, error) {
return nil, nil, nil
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/ingester/ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -1036,7 +1036,7 @@ func (i *Ingester) GetChunkIDs(ctx context.Context, req *logproto.GetChunkIDsReq
}

// get chunk references
chunksGroups, _, err := i.store.GetChunks(ctx, orgID, start, end, chunk.NewPredicate(matchers, nil))
chunksGroups, _, err := i.store.GetChunks(ctx, orgID, start, end, chunk.NewPredicate(matchers, nil), nil)
if err != nil {
return nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/ingester/ingester_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -457,7 +457,7 @@ func (s *mockStore) PutOne(_ context.Context, _, _ model.Time, _ chunk.Chunk) er
return nil
}

func (s *mockStore) GetChunks(_ context.Context, _ string, _, _ model.Time, _ chunk.Predicate) ([][]chunk.Chunk, []*fetcher.Fetcher, error) {
func (s *mockStore) GetChunks(_ context.Context, _ string, _, _ model.Time, _ chunk.Predicate, _ *logproto.ChunkRefGroup) ([][]chunk.Chunk, []*fetcher.Fetcher, error) {
return nil, nil, nil
}

Expand Down
2 changes: 2 additions & 0 deletions pkg/logcli/client/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ func (f *FileClient) Query(q string, limit int, t time.Time, direction logproto.
direction,
uint32(limit),
nil,
nil,
)
if err != nil {
return nil, fmt.Errorf("failed to parse query: %w", err)
Expand Down Expand Up @@ -118,6 +119,7 @@ func (f *FileClient) QueryRange(queryStr string, limit int, start, end time.Time
direction,
uint32(limit),
nil,
nil,
)
if err != nil {
return nil, err
Expand Down
2 changes: 2 additions & 0 deletions pkg/logcli/query/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -490,6 +490,7 @@ func (q *Query) DoLocalQuery(out output.LogOutput, statistics bool, orgID string
q.resultsDirection(),
uint32(q.Limit),
nil,
nil,
)
if err != nil {
return err
Expand All @@ -506,6 +507,7 @@ func (q *Query) DoLocalQuery(out output.LogOutput, statistics bool, orgID string
q.resultsDirection(),
uint32(q.Limit),
nil,
nil,
)
if err != nil {
return err
Expand Down
2 changes: 1 addition & 1 deletion pkg/logcli/query/query_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -426,7 +426,7 @@ func (t *testQueryClient) Query(_ string, _ int, _ time.Time, _ logproto.Directi
func (t *testQueryClient) QueryRange(queryStr string, limit int, from, through time.Time, direction logproto.Direction, step, interval time.Duration, _ bool) (*loghttp.QueryResponse, error) {
ctx := user.InjectOrgID(context.Background(), "fake")

params, err := logql.NewLiteralParams(queryStr, from, through, step, interval, direction, uint32(limit), nil)
params, err := logql.NewLiteralParams(queryStr, from, through, step, interval, direction, uint32(limit), nil, nil)
if err != nil {
return nil, err
}
Expand Down
6 changes: 6 additions & 0 deletions pkg/logproto/extensions.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,3 +182,9 @@ func (r *QueryPatternsResponse) UnmarshalJSON(data []byte) error {
func (d DetectedFieldType) String() string {
return string(d)
}

func (m *ShardsResponse) Merge(other *ShardsResponse) {
m.Shards = append(m.Shards, other.Shards...)
m.ChunkGroups = append(m.ChunkGroups, other.ChunkGroups...)
m.Statistics.Merge(other.Statistics)
}
Loading
Loading