Skip to content

Commit

Permalink
perf(blooms): Resolve bloom blocks on index gateway and shard by bloc…
Browse files Browse the repository at this point in the history
…k address (#12720)

This pull request changes how data is sharded across bloom gateways.

Currently, chunks are grouped by series and shared by the fingerprint of the series across the available bloom gateways using jumphash algorithm. This however, leads to over-querying bloom blocks, because bloom blocks have consecutive fingerprint ranges, whereas sharding keys are evenly distributed across keyspace.

This PR changes the sharding in the way that bloom blocks for series are already resolved on the index gateways and that their address is used for sharding data. This has the advantage that the grouped series can be mapped to the correct bloom blocks on the client side. Sending the block ref along with the grouped series to the bloom gateway allows for efficient querying of the data because each bloom gateway therefore owns exactly 1/nth of the blocks.

---

Signed-off-by: Christian Haudum <christian.haudum@gmail.com>
  • Loading branch information
chaudum authored Apr 23, 2024
1 parent a00f1f1 commit 5540c92
Show file tree
Hide file tree
Showing 21 changed files with 747 additions and 220 deletions.
32 changes: 25 additions & 7 deletions pkg/bloomgateway/bloomgateway.go
Original file line number Diff line number Diff line change
Expand Up @@ -238,31 +238,49 @@ func (g *Gateway) FilterChunkRefs(ctx context.Context, req *logproto.FilterChunk
}, nil
}

seriesByDay := partitionRequest(req)
stats.NumTasks = len(seriesByDay)
blocks := make([]bloomshipper.BlockRef, 0, len(req.Blocks))
for _, key := range req.Blocks {
block, err := bloomshipper.BlockRefFromKey(key)
if err != nil {
stats.Status = labelFailure
return nil, errors.New("could not parse block key")
}
blocks = append(blocks, block)
}

// no tasks --> empty response
if len(seriesByDay) == 0 {
// Shortcut if request does not contain blocks
if len(blocks) == 0 {
stats.Status = labelSuccess
return &logproto.FilterChunkRefResponse{
ChunkRefs: []*logproto.GroupedChunkRefs{},
ChunkRefs: req.Refs,
}, nil
}

// TODO(chaudum): I intentionally keep the logic for handling multiple tasks,
// so that the PR does not explode in size. This should be cleaned up at some point.

seriesByDay := partitionRequest(req)
stats.NumTasks = len(seriesByDay)

sp.LogKV(
"filters", len(filters),
"days", len(seriesByDay),
"blocks", len(req.Blocks),
"series_requested", len(req.Refs),
)

if len(seriesByDay) != 1 {
stats.Status = labelFailure
return nil, errors.New("request time range must span exactly one day")
}

tasks := make([]Task, 0, len(seriesByDay))
responses := make([][]v1.Output, 0, len(seriesByDay))
for _, seriesForDay := range seriesByDay {
task, err := NewTask(ctx, tenantID, seriesForDay, filters)
task, err := NewTask(ctx, tenantID, seriesForDay, filters, blocks)
if err != nil {
return nil, err
}

// TODO(owen-d): include capacity in constructor?
task.responses = responsesPool.Get(len(seriesForDay.series))
tasks = append(tasks, task)
Expand Down
73 changes: 66 additions & 7 deletions pkg/bloomgateway/bloomgateway_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,16 +30,24 @@ import (
"github.com/grafana/loki/v3/pkg/validation"
)

func stringSlice[T fmt.Stringer](s []T) []string {
res := make([]string, len(s))
for i := range res {
res[i] = s[i].String()
}
return res
}

func groupRefs(t *testing.T, chunkRefs []*logproto.ChunkRef) []*logproto.GroupedChunkRefs {
t.Helper()
grouped := make([]*logproto.GroupedChunkRefs, 0, len(chunkRefs))
return groupChunkRefs(chunkRefs, grouped)
return groupChunkRefs(chunkRefs, nil)
}

func newLimits() *validation.Overrides {
limits := validation.Limits{}
flagext.DefaultValues(&limits)
limits.BloomGatewayEnabled = true
limits.BloomGatewayShardSize = 1

overrides, _ := validation.NewOverrides(limits, nil)
return overrides
Expand Down Expand Up @@ -129,11 +137,46 @@ func TestBloomGateway_FilterChunkRefs(t *testing.T) {
MaxOutstandingPerTenant: 1024,
}

t.Run("shipper error is propagated", func(t *testing.T) {
t.Run("request fails when providing invalid block", func(t *testing.T) {
now := mktime("2023-10-03 10:00")

_, metas, queriers, data := createBlocks(t, tenantID, 10, now.Add(-1*time.Hour), now, 0x0000, 0x0fff)
mockStore := newMockBloomStore(queriers, metas)

reg := prometheus.NewRegistry()
gw, err := New(cfg, mockStore, logger, reg)
require.NoError(t, err)

err = services.StartAndAwaitRunning(context.Background(), gw)
require.NoError(t, err)
t.Cleanup(func() {
err = services.StopAndAwaitTerminated(context.Background(), gw)
require.NoError(t, err)
})

chunkRefs := createQueryInputFromBlockData(t, tenantID, data, 100)

expr, err := syntax.ParseExpr(`{foo="bar"} |= "does not match"`)
require.NoError(t, err)

req := &logproto.FilterChunkRefRequest{
From: now.Add(-24 * time.Hour),
Through: now,
Refs: groupRefs(t, chunkRefs),
Plan: plan.QueryPlan{AST: expr},
Blocks: []string{"bloom/invalid/block.tar.gz"},
}

ctx := user.InjectOrgID(context.Background(), tenantID)
res, err := gw.FilterChunkRefs(ctx, req)
require.ErrorContainsf(t, err, "could not parse block key", "%+v", res)
})

t.Run("shipper error is propagated", func(t *testing.T) {
now := mktime("2023-10-03 10:00")

refs, metas, queriers, data := createBlocks(t, tenantID, 10, now.Add(-1*time.Hour), now, 0x0000, 0x0fff)
mockStore := newMockBloomStore(queriers, metas)
mockStore.err = errors.New("request failed")

reg := prometheus.NewRegistry()
Expand All @@ -160,6 +203,7 @@ func TestBloomGateway_FilterChunkRefs(t *testing.T) {
Through: now,
Refs: groupRefs(t, chunkRefs),
Plan: plan.QueryPlan{AST: expr},
Blocks: stringSlice(refs),
}

ctx, cancelFn := context.WithTimeout(context.Background(), 10*time.Second)
Expand All @@ -175,7 +219,7 @@ func TestBloomGateway_FilterChunkRefs(t *testing.T) {
now := mktime("2024-01-25 10:00")

// replace store implementation and re-initialize workers and sub-services
_, metas, queriers, data := createBlocks(t, tenantID, 10, now.Add(-1*time.Hour), now, 0x0000, 0x0fff)
refs, metas, queriers, data := createBlocks(t, tenantID, 10, now.Add(-1*time.Hour), now, 0x0000, 0x0fff)
mockStore := newMockBloomStore(queriers, metas)
mockStore.delay = 2000 * time.Millisecond

Expand Down Expand Up @@ -203,6 +247,7 @@ func TestBloomGateway_FilterChunkRefs(t *testing.T) {
Through: now,
Refs: groupRefs(t, chunkRefs),
Plan: plan.QueryPlan{AST: expr},
Blocks: stringSlice(refs),
}

ctx, cancelFn := context.WithTimeout(context.Background(), 500*time.Millisecond)
Expand All @@ -228,11 +273,12 @@ func TestBloomGateway_FilterChunkRefs(t *testing.T) {
require.NoError(t, err)
})

// input chunks need to be sorted by their fingerprint
chunkRefs := []*logproto.ChunkRef{
{Fingerprint: 3000, UserID: tenantID, From: now.Add(-24 * time.Hour), Through: now.Add(-23 * time.Hour), Checksum: 1},
{Fingerprint: 1000, UserID: tenantID, From: now.Add(-22 * time.Hour), Through: now.Add(-21 * time.Hour), Checksum: 2},
{Fingerprint: 2000, UserID: tenantID, From: now.Add(-20 * time.Hour), Through: now.Add(-19 * time.Hour), Checksum: 3},
{Fingerprint: 1000, UserID: tenantID, From: now.Add(-23 * time.Hour), Through: now.Add(-22 * time.Hour), Checksum: 4},
{Fingerprint: 2000, UserID: tenantID, From: now.Add(-20 * time.Hour), Through: now.Add(-19 * time.Hour), Checksum: 3},
{Fingerprint: 3000, UserID: tenantID, From: now.Add(-24 * time.Hour), Through: now.Add(-23 * time.Hour), Checksum: 1},
}
req := &logproto.FilterChunkRefRequest{
From: now.Add(-24 * time.Hour),
Expand Down Expand Up @@ -284,13 +330,24 @@ func TestBloomGateway_FilterChunkRefs(t *testing.T) {
Checksum: uint32(idx),
},
}
ref := bloomshipper.BlockRef{
Ref: bloomshipper.Ref{
TenantID: tenantID,
TableName: "table_1",
Bounds: v1.NewBounds(0, 10000),
StartTimestamp: now.Add(-24 * time.Hour),
EndTimestamp: now,
Checksum: uint32(idx),
},
}
expr, err := syntax.ParseExpr(`{foo="bar"} |= "foo"`)
require.NoError(t, err)
req := &logproto.FilterChunkRefRequest{
From: now.Add(-24 * time.Hour),
Through: now,
Refs: groupRefs(t, chunkRefs),
Plan: plan.QueryPlan{AST: expr},
Blocks: stringSlice([]bloomshipper.BlockRef{ref}),
}
ctx := user.InjectOrgID(context.Background(), tenantID)
_, err = gw.FilterChunkRefs(ctx, req)
Expand All @@ -303,7 +360,7 @@ func TestBloomGateway_FilterChunkRefs(t *testing.T) {
now := mktime("2023-10-03 10:00")

// replace store implementation and re-initialize workers and sub-services
_, metas, queriers, data := createBlocks(t, tenantID, 10, now.Add(-1*time.Hour), now, 0x0000, 0x0fff)
refs, metas, queriers, data := createBlocks(t, tenantID, 10, now.Add(-1*time.Hour), now, 0x0000, 0x0fff)

reg := prometheus.NewRegistry()
store := newMockBloomStore(queriers, metas)
Expand All @@ -329,6 +386,7 @@ func TestBloomGateway_FilterChunkRefs(t *testing.T) {
Through: now,
Refs: inputChunkRefs,
Plan: plan.QueryPlan{AST: expr},
Blocks: stringSlice(refs),
}
ctx := user.InjectOrgID(context.Background(), tenantID)
res, err := gw.FilterChunkRefs(ctx, req)
Expand Down Expand Up @@ -361,6 +419,7 @@ func TestBloomGateway_FilterChunkRefs(t *testing.T) {
Through: now,
Refs: inputChunkRefs,
Plan: plan.QueryPlan{AST: expr},
Blocks: stringSlice(refs),
}
ctx := user.InjectOrgID(context.Background(), tenantID)
res, err := gw.FilterChunkRefs(ctx, req)
Expand Down
4 changes: 2 additions & 2 deletions pkg/bloomgateway/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -450,14 +450,14 @@ func TestCache(t *testing.T) {
res, err = cacheMiddleware.FilterChunkRefs(ctx, req)
require.NoError(t, err)
require.Equal(t, 2, *calls)
require.Equal(t, expectedRes, res)
require.ElementsMatch(t, expectedRes.ChunkRefs, res.ChunkRefs)

// Doing a request again should only hit the cache
*calls = 0
res, err = cacheMiddleware.FilterChunkRefs(ctx, req)
require.NoError(t, err)
require.Equal(t, 0, *calls)
require.Equal(t, expectedRes, res)
require.ElementsMatch(t, expectedRes.ChunkRefs, res.ChunkRefs)
}

type mockServer struct {
Expand Down
75 changes: 47 additions & 28 deletions pkg/bloomgateway/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"fmt"
"io"
"math"
"sort"

"github.com/go-kit/log"
"github.com/go-kit/log/level"
Expand All @@ -14,7 +15,6 @@ import (
ringclient "github.com/grafana/dskit/ring/client"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/model"
"google.golang.org/grpc"
"google.golang.org/grpc/health/grpc_health_v1"

Expand All @@ -24,6 +24,7 @@ import (
"github.com/grafana/loki/v3/pkg/queue"
"github.com/grafana/loki/v3/pkg/storage/chunk/cache"
"github.com/grafana/loki/v3/pkg/storage/chunk/cache/resultscache"
"github.com/grafana/loki/v3/pkg/storage/stores/shipper/bloomshipper"
"github.com/grafana/loki/v3/pkg/util/constants"
"github.com/grafana/loki/v3/pkg/util/discovery"
)
Expand Down Expand Up @@ -111,12 +112,11 @@ func (i *ClientConfig) Validate() error {
}

type Client interface {
FilterChunks(ctx context.Context, tenant string, from, through model.Time, groups []*logproto.GroupedChunkRefs, plan plan.QueryPlan) ([]*logproto.GroupedChunkRefs, error)
FilterChunks(ctx context.Context, tenant string, interval bloomshipper.Interval, blocks []blockWithSeries, plan plan.QueryPlan) ([]*logproto.GroupedChunkRefs, error)
}

type GatewayClient struct {
cfg ClientConfig
limits Limits
logger log.Logger
metrics *clientMetrics
pool *JumpHashClientPool
Expand Down Expand Up @@ -188,7 +188,6 @@ func NewClient(
return &GatewayClient{
cfg: cfg,
logger: logger,
limits: limits,
metrics: metrics,
pool: pool,
dnsProvider: dnsProvider, // keep reference so we can stop it when the client is closed
Expand All @@ -201,26 +200,41 @@ func (c *GatewayClient) Close() {
}

// FilterChunkRefs implements Client
func (c *GatewayClient) FilterChunks(ctx context.Context, tenant string, from, through model.Time, groups []*logproto.GroupedChunkRefs, plan plan.QueryPlan) ([]*logproto.GroupedChunkRefs, error) {
if !c.limits.BloomGatewayEnabled(tenant) || len(groups) == 0 {
return groups, nil
func (c *GatewayClient) FilterChunks(ctx context.Context, tenant string, interval bloomshipper.Interval, blocks []blockWithSeries, plan plan.QueryPlan) ([]*logproto.GroupedChunkRefs, error) {
// no block and therefore no series with chunks
if len(blocks) == 0 {
return nil, nil
}

clients := make(map[string][]*logproto.GroupedChunkRefs)
for _, g := range groups {
addr, err := c.pool.AddrForFingerprint(g.Fingerprint)
firstFp, lastFp := uint64(math.MaxUint64), uint64(0)
pos := make(map[string]int)
servers := make([]addrWithGroups, 0, len(blocks))
for _, blockWithSeries := range blocks {
addr, err := c.pool.Addr(blockWithSeries.block.String())
if err != nil {
return nil, errors.Wrap(err, "server address for fingerprint")
return nil, errors.Wrapf(err, "server address for block: %s", blockWithSeries.block)
}
clients[addr] = append(clients[addr], g)
}

servers := make([]addrWithGroups, 0, len(clients))
for k, v := range clients {
servers = append(servers, addrWithGroups{
groups: v,
addr: k,
})
// min/max fingerprint needed for the cache locality score
first, last := getFirstLast(blockWithSeries.series)
if first.Fingerprint < firstFp {
firstFp = first.Fingerprint
}
if last.Fingerprint > lastFp {
lastFp = last.Fingerprint
}

if idx, found := pos[addr]; found {
servers[idx].groups = append(servers[idx].groups, blockWithSeries.series...)
servers[idx].blocks = append(servers[idx].blocks, blockWithSeries.block.String())
} else {
pos[addr] = len(servers)
servers = append(servers, addrWithGroups{
addr: addr,
blocks: []string{blockWithSeries.block.String()},
groups: blockWithSeries.series,
})
}
}

if len(servers) > 0 {
Expand All @@ -229,7 +243,6 @@ func (c *GatewayClient) FilterChunks(ctx context.Context, tenant string, from, t
// but can be less if the keyspace is not evenly distributed across instances. Ideal operation will see the range of
// `1-2/num_instances` -> `1`, where the former represents slight
// overlap on instances to the left and right of the range.
firstFp, lastFp := groups[0].Fingerprint, groups[len(groups)-1].Fingerprint
pctKeyspace := float64(lastFp-firstFp) / float64(math.MaxUint64)
pctInstances := float64(len(servers)) / float64(max(1, len(c.pool.Addrs())))
cacheLocalityScore := pctKeyspace / pctInstances
Expand All @@ -241,22 +254,27 @@ func (c *GatewayClient) FilterChunks(ctx context.Context, tenant string, from, t
err := concurrency.ForEachJob(ctx, len(servers), len(servers), func(ctx context.Context, i int) error {
rs := servers[i]

sort.Slice(rs.groups, func(i, j int) bool {
return rs.groups[i].Fingerprint < rs.groups[j].Fingerprint
})

level.Info(c.logger).Log(
"msg", "do FilterChunkRefs for addresses",
"progress", fmt.Sprintf("%d/%d", i+1, len(servers)),
"part", fmt.Sprintf("%d/%d", i+1, len(servers)),
"addr", rs.addr,
"from", from.Time(),
"through", through.Time(),
"num_refs", len(rs.groups),
"plan", plan.String(),
"plan_hash", plan.Hash(),
"from", interval.Start.Time(),
"through", interval.End.Time(),
"series", len(rs.groups),
"blocks", len(rs.blocks),
"tenant", tenant,
)

return c.doForAddrs([]string{rs.addr}, func(client logproto.BloomGatewayClient) error {
req := &logproto.FilterChunkRefRequest{
From: from,
Through: through,
From: interval.Start,
Through: interval.End,
Refs: rs.groups,
Blocks: rs.blocks,
Plan: plan,
}
resp, err := client.FilterChunkRefs(ctx, req)
Expand Down Expand Up @@ -308,5 +326,6 @@ func (c *GatewayClient) doForAddrs(addrs []string, fn func(logproto.BloomGateway

type addrWithGroups struct {
addr string
blocks []string
groups []*logproto.GroupedChunkRefs
}
Loading

0 comments on commit 5540c92

Please sign in to comment.