Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
79 changes: 31 additions & 48 deletions pkg/bloomgateway/multiplexing.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,64 +84,47 @@ func (t Task) Copy(series []*logproto.GroupedChunkRefs) Task {
}
}

// taskMergeIterator implements v1.Iterator
type taskMergeIterator struct {
curr v1.Request
heap *v1.HeapIterator[v1.IndexedValue[*logproto.GroupedChunkRefs]]
tasks []Task
day model.Time
tokenizer *v1.NGramTokenizer
err error
func (t Task) RequestIter(tokenizer *v1.NGramTokenizer) v1.Iterator[v1.Request] {
return &requestIterator{
series: v1.NewSliceIter(t.series),
searches: convertToSearches(t.filters, tokenizer),
channel: t.ResCh,
curr: v1.Request{},
}
}

func newTaskMergeIterator(day model.Time, tokenizer *v1.NGramTokenizer, tasks ...Task) v1.PeekingIterator[v1.Request] {
it := &taskMergeIterator{
tasks: tasks,
curr: v1.Request{},
day: day,
tokenizer: tokenizer,
}
it.init()
return v1.NewPeekingIter[v1.Request](it)
var _ v1.Iterator[v1.Request] = &requestIterator{}

type requestIterator struct {
series v1.Iterator[*logproto.GroupedChunkRefs]
searches [][]byte
channel chan<- v1.Output
curr v1.Request
}

func (it *taskMergeIterator) init() {
sequences := make([]v1.PeekingIterator[v1.IndexedValue[*logproto.GroupedChunkRefs]], 0, len(it.tasks))
for i := range it.tasks {
iter := v1.NewSliceIterWithIndex(it.tasks[i].series, i)
sequences = append(sequences, iter)
}
it.heap = v1.NewHeapIterator(
func(i, j v1.IndexedValue[*logproto.GroupedChunkRefs]) bool {
return i.Value().Fingerprint < j.Value().Fingerprint
},
sequences...,
)
it.err = nil
// At implements v1.Iterator.
func (it *requestIterator) At() v1.Request {

return it.curr
}

// Err implements v1.Iterator.
func (it *requestIterator) Err() error {
return nil
}

func (it *taskMergeIterator) Next() bool {
ok := it.heap.Next()
// Next implements v1.Iterator.
func (it *requestIterator) Next() bool {
ok := it.series.Next()
if !ok {
return false
}

group := it.heap.At()
task := it.tasks[group.Index()]

group := it.series.At()
it.curr = v1.Request{
Fp: model.Fingerprint(group.Value().Fingerprint),
Chks: convertToChunkRefs(group.Value().Refs),
Searches: convertToSearches(task.filters, it.tokenizer),
Response: task.ResCh,
Fp: model.Fingerprint(group.Fingerprint),
Chks: convertToChunkRefs(group.Refs),
Searches: it.searches,
Response: it.channel,
}
return true
}

func (it *taskMergeIterator) At() v1.Request {
return it.curr
}

func (it *taskMergeIterator) Err() error {
return it.err
}
24 changes: 18 additions & 6 deletions pkg/bloomgateway/multiplexing_test.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,15 @@
package bloomgateway

import (
"math"
"testing"
"time"

"github.com/prometheus/common/model"
"github.com/stretchr/testify/require"

"github.com/grafana/loki/pkg/logproto"
"github.com/grafana/loki/pkg/logql/syntax"
v1 "github.com/grafana/loki/pkg/storage/bloom/v1"
)

Expand All @@ -32,7 +34,6 @@ func TestTask(t *testing.T) {
from, through := task.Bounds()
require.Equal(t, ts.Add(-1*time.Hour), from)
require.Equal(t, ts, through)
require.Equal(t, truncateDay(ts), task.day)
})
}

Expand All @@ -50,14 +51,18 @@ func createTasksForRequests(t *testing.T, tenant string, requests ...*logproto.F
return tasks
}

func TestTaskMergeIterator(t *testing.T) {
func TestTask_RequestIterator(t *testing.T) {
ts := mktime("2024-01-24 12:00")
day := truncateDay(ts)
tenant := "fake"
tokenizer := v1.NewNGramTokenizer(4, 0)

t.Run("empty requests result in empty iterator", func(t *testing.T) {
it := newTaskMergeIterator(day, tokenizer)
t.Run("empty request yields empty iterator", func(t *testing.T) {
swb := seriesWithBounds{
bounds: model.Interval{Start: 0, End: math.MaxInt64},
series: []*logproto.GroupedChunkRefs{},
}
task, _ := NewTask(tenant, swb, []syntax.LineFilter{})
it := task.RequestIter(tokenizer)
// nothing to iterate over
require.False(t, it.Next())
})
Expand Down Expand Up @@ -97,7 +102,14 @@ func TestTaskMergeIterator(t *testing.T) {
}

tasks := createTasksForRequests(t, tenant, r1, r2, r3)
it := newTaskMergeIterator(day, tokenizer, tasks...)

iters := make([]v1.PeekingIterator[v1.Request], 0, len(tasks))
for _, task := range tasks {
iters = append(iters, v1.NewPeekingIter(task.RequestIter(tokenizer)))
}

// merge the request iterators using the heap sort iterator
it := v1.NewHeapIterator[v1.Request](func(r1, r2 v1.Request) bool { return r1.Fp < r2.Fp }, iters...)

// first item
require.True(t, it.Next())
Expand Down
16 changes: 10 additions & 6 deletions pkg/bloomgateway/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,7 @@ func (w *worker) running(ctx context.Context) error {
blockRefs = append(blockRefs, b.blockRef)
}

err = w.processBlocksWithCallback(taskCtx, tasks[0].Tenant, day, blockRefs, tasksForBlocks)
err = w.processBlocksWithCallback(taskCtx, tasks[0].Tenant, blockRefs, tasksForBlocks)
if err != nil {
for _, t := range tasks {
t.ErrCh <- err
Expand All @@ -215,26 +215,30 @@ func (w *worker) stopping(err error) error {
return nil
}

func (w *worker) processBlocksWithCallback(taskCtx context.Context, tenant string, day model.Time, blockRefs []bloomshipper.BlockRef, boundedRefs []boundedTasks) error {
func (w *worker) processBlocksWithCallback(taskCtx context.Context, tenant string, blockRefs []bloomshipper.BlockRef, boundedRefs []boundedTasks) error {
return w.shipper.Fetch(taskCtx, tenant, blockRefs, func(bq *v1.BlockQuerier, minFp, maxFp uint64) error {
for _, b := range boundedRefs {
if b.blockRef.MinFingerprint == minFp && b.blockRef.MaxFingerprint == maxFp {
return w.processBlock(bq, day, b.tasks)
return w.processBlock(bq, b.tasks)
}
}
return nil
})
}

func (w *worker) processBlock(blockQuerier *v1.BlockQuerier, day model.Time, tasks []Task) error {
func (w *worker) processBlock(blockQuerier *v1.BlockQuerier, tasks []Task) error {
schema, err := blockQuerier.Schema()
if err != nil {
return err
}

tokenizer := v1.NewNGramTokenizer(schema.NGramLen(), 0)
it := newTaskMergeIterator(day, tokenizer, tasks...)
fq := blockQuerier.Fuse([]v1.PeekingIterator[v1.Request]{it})
iters := make([]v1.PeekingIterator[v1.Request], 0, len(tasks))
for _, task := range tasks {
it := v1.NewPeekingIter(task.RequestIter(tokenizer))
iters = append(iters, it)
}
fq := blockQuerier.Fuse(iters)

start := time.Now()
err = fq.Run()
Expand Down