Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 1 addition & 4 deletions pkg/bloomgateway/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,7 @@ type processor struct {

func (p *processor) run(ctx context.Context, tasks []Task) error {
for ts, tasks := range group(tasks, func(t Task) model.Time { return t.day }) {
interval := bloomshipper.Interval{
Start: ts,
End: ts.Add(Day),
}
interval := bloomshipper.NewInterval(ts, ts.Add(Day))
tenant := tasks[0].Tenant
err := p.processTasks(ctx, tenant, interval, []v1.FingerprintBounds{{Min: 0, Max: math.MaxUint64}}, tasks)
if err != nil {
Expand Down
6 changes: 1 addition & 5 deletions pkg/bloomgateway/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,11 +160,7 @@ func (w *worker) running(_ context.Context) error {
}

// interval is [Start, End)
interval := bloomshipper.Interval{
Start: day, // inclusive
End: day.Add(Day), // non-inclusive
}

interval := bloomshipper.NewInterval(day, day.Add(Day))
logger := log.With(w.logger, "day", day.Time(), "tenant", tasks[0].Tenant)
level.Debug(logger).Log("msg", "process tasks", "tasks", len(tasks))

Expand Down
5 changes: 3 additions & 2 deletions pkg/storage/bloom/v1/bounds.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ func (b FingerprintBounds) Hash(h hash.Hash32) error {
enc.PutBE64(uint64(b.Min))
enc.PutBE64(uint64(b.Max))
_, err := h.Write(enc.Get())
return errors.Wrap(err, "writing OwnershipRange")
return errors.Wrap(err, "writing FingerprintBounds")
}

func (b FingerprintBounds) String() string {
Expand All @@ -54,6 +54,7 @@ func (b FingerprintBounds) Cmp(fp model.Fingerprint) BoundsCheck {
return Overlap
}

// Overlaps returns whether the bounds (partially) overlap with the target bounds
func (b FingerprintBounds) Overlaps(target FingerprintBounds) bool {
return b.Cmp(target.Min) != After && b.Cmp(target.Max) != Before
}
Expand All @@ -63,7 +64,7 @@ func (b FingerprintBounds) Slice(min, max model.Fingerprint) *FingerprintBounds
return b.Intersection(FingerprintBounds{Min: min, Max: max})
}

// Returns whether the fingerprint is fully within the target bounds
// Within returns whether the fingerprint is fully within the target bounds
func (b FingerprintBounds) Within(target FingerprintBounds) bool {
return b.Min >= target.Min && b.Max <= target.Max
}
Expand Down
28 changes: 13 additions & 15 deletions pkg/storage/stores/shipper/bloomshipper/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,16 +45,20 @@ func (r Ref) Cmp(fp uint64) v1.BoundsCheck {
return v1.Overlap
}

func (r Ref) Bounds() v1.FingerprintBounds {
return v1.NewBounds(model.Fingerprint(r.MinFingerprint), model.Fingerprint(r.MaxFingerprint))
}

func (r Ref) Interval() Interval {
return NewInterval(r.StartTimestamp, r.EndTimestamp)
}

type BlockRef struct {
Ref
IndexPath string
BlockPath string
}

func (b *BlockRef) Bounds() v1.FingerprintBounds {
return v1.NewBounds(model.Fingerprint(b.MinFingerprint), model.Fingerprint(b.MaxFingerprint))
}

type MetaRef struct {
Ref
FilePath string
Expand Down Expand Up @@ -282,19 +286,13 @@ func createMetaRef(objectKey string, tenantID string, tableName string) (MetaRef
}, nil
}

func tablesForRange(periodConfig config.PeriodConfig, from, to model.Time) []string {
interval := periodConfig.IndexTables.Period
step := int64(interval.Seconds())
lower := from.Unix() / step
upper := to.Unix() / step
func tablesForRange(periodConfig config.PeriodConfig, interval Interval) []string {
step := int64(periodConfig.IndexTables.Period.Seconds())
lower := interval.Start.Unix() / step
upper := interval.End.Unix() / step
tables := make([]string, 0, 1+upper-lower)
prefix := periodConfig.IndexTables.Prefix
for i := lower; i <= upper; i++ {
tables = append(tables, joinTableName(prefix, i))
tables = append(tables, fmt.Sprintf("%s%d", periodConfig.IndexTables.Prefix, i))
}
return tables
}

func joinTableName(prefix string, tableNumber int64) string {
return fmt.Sprintf("%s%d", prefix, tableNumber)
}
3 changes: 2 additions & 1 deletion pkg/storage/stores/shipper/bloomshipper/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,8 +73,9 @@ func Test_BloomClient_FetchMetas(t *testing.T) {

searchParams := MetaSearchParams{
TenantID: "tenantA",

Keyspace: v1.NewBounds(50, 150),
Interval: Interval{Start: fixedDay.Add(-6 * day), End: fixedDay.Add(-1*day - 1*time.Hour)},
Interval: NewInterval(fixedDay.Add(-6*day), fixedDay.Add(-1*day-1*time.Hour)),
}

fetched, err := store.FetchMetas(context.Background(), searchParams)
Expand Down
59 changes: 59 additions & 0 deletions pkg/storage/stores/shipper/bloomshipper/interval.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
package bloomshipper

import (
"fmt"
"hash"

"github.com/pkg/errors"
"github.com/prometheus/common/model"

v1 "github.com/grafana/loki/pkg/storage/bloom/v1"
"github.com/grafana/loki/pkg/util/encoding"
)

// Interval defines a time range with start end end time
// where the start is inclusive, the end is non-inclusive.
type Interval struct {
Start, End model.Time
}

func NewInterval(start, end model.Time) Interval {
return Interval{Start: start, End: end}
}

func (i Interval) Hash(h hash.Hash32) error {
var enc encoding.Encbuf
enc.PutBE64(uint64(i.Start))
enc.PutBE64(uint64(i.End))
_, err := h.Write(enc.Get())
return errors.Wrap(err, "writing Interval")
}

func (i Interval) String() string {
// 13 digits are enough until Sat Nov 20 2286 17:46:39 UTC
return fmt.Sprintf("%013d-%013d", i.Start, i.End)
}

func (i Interval) Repr() string {
return fmt.Sprintf("[%s, %s)", i.Start.Time().UTC(), i.End.Time().UTC())
}

// Cmp returns the position of a time relative to the interval
func (i Interval) Cmp(ts model.Time) v1.BoundsCheck {
if ts.Before(i.Start) {
return v1.Before
} else if ts.After(i.End) || ts.Equal(i.End) {
return v1.After
}
return v1.Overlap
}

// Overlaps returns whether the interval overlaps (partially) with the target interval
func (i Interval) Overlaps(target Interval) bool {
return i.Cmp(target.Start) != v1.After && i.Cmp(target.End) != v1.Before
}

// Within returns whether the interval is fully within the target interval
func (i Interval) Within(target Interval) bool {
return i.Start >= target.Start && i.End <= target.End
}
50 changes: 50 additions & 0 deletions pkg/storage/stores/shipper/bloomshipper/interval_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
package bloomshipper

import (
"testing"
"time"

"github.com/prometheus/common/model"
"github.com/stretchr/testify/assert"

v1 "github.com/grafana/loki/pkg/storage/bloom/v1"
)

func Test_Interval_String(t *testing.T) {
start := model.Time(0)
end := model.TimeFromUnix(time.Date(2024, 1, 1, 0, 0, 0, 0, time.UTC).Unix())
interval := NewInterval(start, end)
assert.Equal(t, "0000000000000-1704067200000", interval.String())
assert.Equal(t, "[1970-01-01 00:00:00 +0000 UTC, 2024-01-01 00:00:00 +0000 UTC)", interval.Repr())
}

func Test_Interval_Cmp(t *testing.T) {
interval := NewInterval(10, 20)
assert.Equal(t, v1.Before, interval.Cmp(0))
assert.Equal(t, v1.Overlap, interval.Cmp(10))
assert.Equal(t, v1.Overlap, interval.Cmp(15))
assert.Equal(t, v1.After, interval.Cmp(20)) // End is not inclusive
assert.Equal(t, v1.After, interval.Cmp(21))
}

func Test_Interval_Overlap(t *testing.T) {
interval := NewInterval(10, 20)
assert.True(t, interval.Overlaps(Interval{Start: 5, End: 15}))
assert.True(t, interval.Overlaps(Interval{Start: 15, End: 25}))
assert.True(t, interval.Overlaps(Interval{Start: 10, End: 20}))
assert.True(t, interval.Overlaps(Interval{Start: 5, End: 25}))
assert.False(t, interval.Overlaps(Interval{Start: 1, End: 9}))
assert.False(t, interval.Overlaps(Interval{Start: 20, End: 30})) // End is not inclusive
assert.False(t, interval.Overlaps(Interval{Start: 25, End: 30}))
}

func Test_Interval_Within(t *testing.T) {
target := NewInterval(10, 20)
assert.False(t, NewInterval(1, 9).Within(target))
assert.False(t, NewInterval(21, 30).Within(target))
assert.True(t, NewInterval(10, 20).Within(target))
assert.True(t, NewInterval(14, 15).Within(target))
assert.False(t, NewInterval(5, 15).Within(target))
assert.False(t, NewInterval(15, 25).Within(target))
assert.False(t, NewInterval(5, 25).Within(target))
}
24 changes: 3 additions & 21 deletions pkg/storage/stores/shipper/bloomshipper/shipper.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,30 +8,12 @@ import (
"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/model"
"golang.org/x/exp/slices"

v1 "github.com/grafana/loki/pkg/storage/bloom/v1"
"github.com/grafana/loki/pkg/storage/stores/shipper/bloomshipper/config"
)

type Interval struct {
Start, End model.Time
}

func (i Interval) String() string {
return fmt.Sprintf("[%s, %s)", i.Start.Time(), i.End.Time())
}

func (i Interval) Cmp(other model.Time) v1.BoundsCheck {
if other.Before(i.Start) {
return v1.Before
} else if other.After(i.End) || other.Equal(i.End) {
return v1.After
}
return v1.Overlap
}

type BlockQuerierWithFingerprintRange struct {
*v1.BlockQuerier
v1.FingerprintBounds
Expand Down Expand Up @@ -203,14 +185,14 @@ func BlocksForMetas(metas []Meta, interval Interval, keyspaces []v1.FingerprintB
// isOutsideRange tests if a given BlockRef b is outside of search boundaries
// defined by min/max timestamp and min/max fingerprint.
// Fingerprint ranges must be sorted in ascending order.
func isOutsideRange(b BlockRef, interval Interval, keyspaces []v1.FingerprintBounds) bool {
func isOutsideRange(b BlockRef, interval Interval, bounds []v1.FingerprintBounds) bool {
// check time interval
if interval.Cmp(b.EndTimestamp) == v1.Before || interval.Cmp(b.StartTimestamp) == v1.After {
if !interval.Overlaps(b.Interval()) {
return true
}

// check fingerprint ranges
for _, keyspace := range keyspaces {
for _, keyspace := range bounds {
if keyspace.Within(b.Bounds()) || keyspace.Overlaps(b.Bounds()) {
return false
}
Expand Down
36 changes: 16 additions & 20 deletions pkg/storage/stores/shipper/bloomshipper/shipper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,6 @@ import (
v1 "github.com/grafana/loki/pkg/storage/bloom/v1"
)

func interval(start, end model.Time) Interval {
return Interval{Start: start, End: end}
}

func Test_Shipper_findBlocks(t *testing.T) {
t.Run("expected block that are specified in tombstones to be filtered out", func(t *testing.T) {
metas := []Meta{
Expand Down Expand Up @@ -46,10 +42,10 @@ func Test_Shipper_findBlocks(t *testing.T) {

ts := model.Now()

interval := Interval{
Start: ts.Add(-2 * time.Hour),
End: ts.Add(-1 * time.Hour),
}
interval := NewInterval(
ts.Add(-2*time.Hour),
ts.Add(-1*time.Hour),
)
blocks := BlocksForMetas(metas, interval, []v1.FingerprintBounds{{Min: 100, Max: 200}})

expectedBlockRefs := []BlockRef{
Expand Down Expand Up @@ -103,7 +99,7 @@ func Test_Shipper_findBlocks(t *testing.T) {
for name, data := range tests {
t.Run(name, func(t *testing.T) {
ref := createBlockRef("fake-block", data.minFingerprint, data.maxFingerprint, data.startTimestamp, data.endTimestamp)
blocks := BlocksForMetas([]Meta{{Blocks: []BlockRef{ref}}}, interval(300, 400), []v1.FingerprintBounds{{Min: 100, Max: 200}})
blocks := BlocksForMetas([]Meta{{Blocks: []BlockRef{ref}}}, NewInterval(300, 400), []v1.FingerprintBounds{{Min: 100, Max: 200}})
if data.filtered {
require.Empty(t, blocks)
return
Expand All @@ -120,67 +116,67 @@ func TestIsOutsideRange(t *testing.T) {

t.Run("is outside if startTs > through", func(t *testing.T) {
b := createBlockRef("block", 0, math.MaxUint64, startTs, endTs)
isOutside := isOutsideRange(b, interval(0, 900), []v1.FingerprintBounds{})
isOutside := isOutsideRange(b, NewInterval(0, 900), []v1.FingerprintBounds{})
require.True(t, isOutside)
})

t.Run("is outside if startTs == through ", func(t *testing.T) {
b := createBlockRef("block", 0, math.MaxUint64, startTs, endTs)
isOutside := isOutsideRange(b, interval(900, 1000), []v1.FingerprintBounds{})
isOutside := isOutsideRange(b, NewInterval(900, 1000), []v1.FingerprintBounds{})
require.True(t, isOutside)
})

t.Run("is outside if endTs < from", func(t *testing.T) {
b := createBlockRef("block", 0, math.MaxUint64, startTs, endTs)
isOutside := isOutsideRange(b, interval(2100, 3000), []v1.FingerprintBounds{})
isOutside := isOutsideRange(b, NewInterval(2100, 3000), []v1.FingerprintBounds{})
require.True(t, isOutside)
})

t.Run("is outside if endFp < first fingerprint", func(t *testing.T) {
b := createBlockRef("block", 0, 90, startTs, endTs)
isOutside := isOutsideRange(b, interval(startTs, endTs), []v1.FingerprintBounds{{Min: 100, Max: 199}})
isOutside := isOutsideRange(b, NewInterval(startTs, endTs), []v1.FingerprintBounds{{Min: 100, Max: 199}})
require.True(t, isOutside)
})

t.Run("is outside if startFp > last fingerprint", func(t *testing.T) {
b := createBlockRef("block", 200, math.MaxUint64, startTs, endTs)
isOutside := isOutsideRange(b, interval(startTs, endTs), []v1.FingerprintBounds{{Min: 0, Max: 49}, {Min: 100, Max: 149}})
isOutside := isOutsideRange(b, NewInterval(startTs, endTs), []v1.FingerprintBounds{{Min: 0, Max: 49}, {Min: 100, Max: 149}})
require.True(t, isOutside)
})

t.Run("is outside if within gaps in fingerprints", func(t *testing.T) {
b := createBlockRef("block", 100, 199, startTs, endTs)
isOutside := isOutsideRange(b, interval(startTs, endTs), []v1.FingerprintBounds{{Min: 0, Max: 99}, {Min: 200, Max: 299}})
isOutside := isOutsideRange(b, NewInterval(startTs, endTs), []v1.FingerprintBounds{{Min: 0, Max: 99}, {Min: 200, Max: 299}})
require.True(t, isOutside)
})

t.Run("is not outside if within fingerprints 1", func(t *testing.T) {
b := createBlockRef("block", 10, 90, startTs, endTs)
isOutside := isOutsideRange(b, interval(startTs, endTs), []v1.FingerprintBounds{{Min: 0, Max: 99}, {Min: 200, Max: 299}})
isOutside := isOutsideRange(b, NewInterval(startTs, endTs), []v1.FingerprintBounds{{Min: 0, Max: 99}, {Min: 200, Max: 299}})
require.False(t, isOutside)
})

t.Run("is not outside if within fingerprints 2", func(t *testing.T) {
b := createBlockRef("block", 210, 290, startTs, endTs)
isOutside := isOutsideRange(b, interval(startTs, endTs), []v1.FingerprintBounds{{Min: 0, Max: 99}, {Min: 200, Max: 299}})
isOutside := isOutsideRange(b, NewInterval(startTs, endTs), []v1.FingerprintBounds{{Min: 0, Max: 99}, {Min: 200, Max: 299}})
require.False(t, isOutside)
})

t.Run("is not outside if spans across multiple fingerprint ranges", func(t *testing.T) {
b := createBlockRef("block", 50, 250, startTs, endTs)
isOutside := isOutsideRange(b, interval(startTs, endTs), []v1.FingerprintBounds{{Min: 0, Max: 99}, {Min: 200, Max: 299}})
isOutside := isOutsideRange(b, NewInterval(startTs, endTs), []v1.FingerprintBounds{{Min: 0, Max: 99}, {Min: 200, Max: 299}})
require.False(t, isOutside)
})

t.Run("is not outside if fingerprint range and time range are larger than block", func(t *testing.T) {
b := createBlockRef("block", math.MaxUint64/3, math.MaxUint64/3*2, startTs, endTs)
isOutside := isOutsideRange(b, interval(0, 3000), []v1.FingerprintBounds{{Min: 0, Max: math.MaxUint64}})
isOutside := isOutsideRange(b, NewInterval(0, 3000), []v1.FingerprintBounds{{Min: 0, Max: math.MaxUint64}})
require.False(t, isOutside)
})

t.Run("is not outside if block fingerprint range is bigger that search keyspace", func(t *testing.T) {
b := createBlockRef("block", 0x0000, 0xffff, model.Earliest, model.Latest)
isOutside := isOutsideRange(b, interval(startTs, endTs), []v1.FingerprintBounds{{Min: 0x0100, Max: 0xff00}})
isOutside := isOutsideRange(b, NewInterval(startTs, endTs), []v1.FingerprintBounds{{Min: 0x0100, Max: 0xff00}})
require.False(t, isOutside)
})
}
Expand Down
Loading