Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(metadata cache): adds max_metadata_cache_freshness #11682

Merged
merged 5 commits into from
Jan 16, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
6 changes: 6 additions & 0 deletions docs/sources/configure/_index.md
Original file line number Diff line number Diff line change
Expand Up @@ -2830,6 +2830,12 @@ The `limits_config` block configures global and per-tenant limits in Loki.
# CLI flag: -frontend.max-cache-freshness
[max_cache_freshness_per_query: <duration> | default = 10m]

# Do not cache metadata request if the end time is within the
# frontend.max-metadata-cache-freshness window. Set this to 0 to apply no such
# limits. Defaults to 24h.
# CLI flag: -frontend.max-metadata-cache-freshness
[max_metadata_cache_freshness: <duration> | default = 1d]

# Do not cache requests with an end time that falls within Now minus this
# duration. 0 disables this feature (default).
# CLI flag: -frontend.max-stats-cache-freshness
Expand Down
15 changes: 14 additions & 1 deletion pkg/querier/queryrange/labels_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"time"

"github.com/go-kit/log"
"github.com/go-kit/log/level"

"github.com/grafana/loki/pkg/querier/queryrange/queryrangebase"
"github.com/grafana/loki/pkg/storage/chunk/cache"
Expand Down Expand Up @@ -91,7 +92,19 @@ func NewLabelsCacheMiddleware(
merger,
labelsExtractor{},
cacheGenNumberLoader,
shouldCache,
func(ctx context.Context, r queryrangebase.Request) bool {
if shouldCache != nil && !shouldCache(ctx, r) {
return false
}

cacheReq, err := shouldCacheMetadataReq(ctx, r, limits)
if err != nil {
level.Error(logger).Log("msg", "failed to determine if metadata request should be cached. Won't cache", "err", err)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit

Suggested change
level.Error(logger).Log("msg", "failed to determine if metadata request should be cached. Won't cache", "err", err)
level.Error(logger).Log("msg", "failed to determine if metadata request should be cached, won't cache", "err", err)

return false
}

return cacheReq
},
parallelismForReq,
retentionEnabled,
metrics,
Expand Down
108 changes: 108 additions & 0 deletions pkg/querier/queryrange/labels_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -249,3 +249,111 @@ func TestLabelsCache(t *testing.T) {
})
}
}

func TestLabelCache_freshness(t *testing.T) {
testTime := time.Now().Add(-1 * time.Hour)
from, through := util.RoundToMilliseconds(testTime.Add(-1*time.Hour), testTime)
start, end := from.Time(), through.Time()
nonOverlappingStart, nonOverlappingEnd := from.Add(-24*time.Hour).Time(), through.Add(-24*time.Hour).Time()

for _, tt := range []struct {
name string
req *LabelRequest
shouldCache bool
maxMetadataCacheFreshness time.Duration
}{
{
name: "max metadata freshness not set",
req: &LabelRequest{
LabelRequest: logproto.LabelRequest{
Start: &start,
End: &end,
},
},
shouldCache: true,
},
{
name: "req overlaps with max cache freshness window",
req: &LabelRequest{
LabelRequest: logproto.LabelRequest{
Start: &start,
End: &end,
},
},
maxMetadataCacheFreshness: 24 * time.Hour,
shouldCache: false,
},
{
name: "req does not overlap max cache freshness window",
req: &LabelRequest{
LabelRequest: logproto.LabelRequest{
Start: &nonOverlappingStart,
End: &nonOverlappingEnd,
},
},
maxMetadataCacheFreshness: 24 * time.Hour,
shouldCache: true,
},
} {
t.Run(tt.name, func(t *testing.T) {
cacheMiddleware, err := NewLabelsCacheMiddleware(
log.NewNopLogger(),
fakeLimits{
metadataSplitDuration: map[string]time.Duration{
"fake": 24 * time.Hour,
},
maxMetadataCacheFreshness: tt.maxMetadataCacheFreshness,
},
DefaultCodec,
cache.NewMockCache(),
nil,
nil,
func(_ context.Context, _ []string, _ queryrangebase.Request) int {
return 1
},
false,
nil,
nil,
)
require.NoError(t, err)

labelsResp := &LokiLabelNamesResponse{
Status: "success",
Version: uint32(loghttp.VersionV1),
Data: []string{"bar", "buzz"},
Statistics: stats.Result{
Summary: stats.Summary{
Splits: 1,
},
},
}

called := 0
handler := cacheMiddleware.Wrap(queryrangebase.HandlerFunc(func(_ context.Context, r queryrangebase.Request) (queryrangebase.Response, error) {
called++

// should request the entire length with no partitioning as nothing is cached yet.
require.Equal(t, tt.req.GetStart(), r.GetStart())
require.Equal(t, tt.req.GetEnd(), r.GetEnd())

return labelsResp, nil
}))

ctx := user.InjectOrgID(context.Background(), "fake")
got, err := handler.Do(ctx, tt.req)
require.NoError(t, err)
require.Equal(t, 1, called) // called actual handler, as not cached.
require.Equal(t, labelsResp, got)

called = 0
got, err = handler.Do(ctx, tt.req)
require.NoError(t, err)
if !tt.shouldCache {
require.Equal(t, 1, called)
} else {
require.Equal(t, 0, called)
}
require.Equal(t, labelsResp, got)
})
}
}
1 change: 1 addition & 0 deletions pkg/querier/queryrange/limits/definitions.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,5 +30,6 @@ type Limits interface {
MaxQueryBytesRead(context.Context, string) int
MaxQuerierBytesRead(context.Context, string) int
MaxStatsCacheFreshness(context.Context, string) time.Duration
MaxMetadataCacheFreshness(context.Context, string) time.Duration
VolumeEnabled(string) bool
}
39 changes: 22 additions & 17 deletions pkg/querier/queryrange/roundtrip_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1237,23 +1237,24 @@ func TestMetricsTripperware_SplitShardStats(t *testing.T) {
}

type fakeLimits struct {
maxQueryLength time.Duration
maxQueryParallelism int
tsdbMaxQueryParallelism int
maxQueryLookback time.Duration
maxEntriesLimitPerQuery int
maxSeries int
splitDuration map[string]time.Duration
metadataSplitDuration map[string]time.Duration
ingesterSplitDuration map[string]time.Duration
minShardingLookback time.Duration
queryTimeout time.Duration
requiredLabels []string
requiredNumberLabels int
maxQueryBytesRead int
maxQuerierBytesRead int
maxStatsCacheFreshness time.Duration
volumeEnabled bool
maxQueryLength time.Duration
maxQueryParallelism int
tsdbMaxQueryParallelism int
maxQueryLookback time.Duration
maxEntriesLimitPerQuery int
maxSeries int
splitDuration map[string]time.Duration
metadataSplitDuration map[string]time.Duration
ingesterSplitDuration map[string]time.Duration
minShardingLookback time.Duration
queryTimeout time.Duration
requiredLabels []string
requiredNumberLabels int
maxQueryBytesRead int
maxQuerierBytesRead int
maxStatsCacheFreshness time.Duration
maxMetadataCacheFreshness time.Duration
volumeEnabled bool
}

func (f fakeLimits) QuerySplitDuration(key string) time.Duration {
Expand Down Expand Up @@ -1344,6 +1345,10 @@ func (f fakeLimits) MaxStatsCacheFreshness(_ context.Context, _ string) time.Dur
return f.maxStatsCacheFreshness
}

func (f fakeLimits) MaxMetadataCacheFreshness(_ context.Context, _ string) time.Duration {
return f.maxMetadataCacheFreshness
}

func (f fakeLimits) VolumeEnabled(_ string) bool {
return f.volumeEnabled
}
Expand Down
31 changes: 30 additions & 1 deletion pkg/querier/queryrange/series_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,15 @@ import (
"time"

"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/prometheus/common/model"

"github.com/grafana/dskit/tenant"

"github.com/grafana/loki/pkg/querier/queryrange/queryrangebase"
"github.com/grafana/loki/pkg/storage/chunk/cache"
"github.com/grafana/loki/pkg/storage/chunk/cache/resultscache"
"github.com/grafana/loki/pkg/util/validation"
)

type cacheKeySeries struct {
Expand Down Expand Up @@ -92,9 +97,33 @@ func NewSeriesCacheMiddleware(
merger,
seriesExtractor{},
cacheGenNumberLoader,
shouldCache,
func(ctx context.Context, r queryrangebase.Request) bool {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we extract this out to a function for reuse?

if shouldCache != nil && !shouldCache(ctx, r) {
return false
}

cacheReq, err := shouldCacheMetadataReq(ctx, r, limits)
if err != nil {
level.Error(logger).Log("msg", "failed to determine if metadata request should be cached. Won't cache", "err", err)
return false
}

return cacheReq
},
parallelismForReq,
retentionEnabled,
metrics,
)
}

func shouldCacheMetadataReq(ctx context.Context, req queryrangebase.Request, l Limits) (bool, error) {
tenantIDs, err := tenant.TenantIDs(ctx)
if err != nil {
return false, err
}

cacheFreshnessCapture := func(id string) time.Duration { return l.MaxMetadataCacheFreshness(ctx, id) }
maxCacheFreshness := validation.MaxDurationPerTenant(tenantIDs, cacheFreshnessCapture)

return maxCacheFreshness == 0 || model.Time(req.GetEnd().UnixMilli()).Before(model.Now().Add(-maxCacheFreshness)), nil
}
110 changes: 110 additions & 0 deletions pkg/querier/queryrange/series_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -312,3 +312,113 @@ func TestSeriesCache(t *testing.T) {
}
})
}

func TestSeriesCache_freshness(t *testing.T) {
testTime := time.Now().Add(-1 * time.Hour)
from, through := util.RoundToMilliseconds(testTime.Add(-1*time.Hour), testTime)

for _, tt := range []struct {
name string
req *LokiSeriesRequest
shouldCache bool
maxMetadataCacheFreshness time.Duration
}{
{
name: "max metadata freshness not set",
req: &LokiSeriesRequest{
StartTs: from.Time(),
EndTs: through.Time(),
Match: []string{`{namespace=~".*"}`},
Path: seriesAPIPath,
},
shouldCache: true,
},
{
name: "req overlaps with max cache freshness window",
req: &LokiSeriesRequest{
StartTs: from.Time(),
EndTs: through.Time(),
Match: []string{`{namespace=~".*"}`},
Path: seriesAPIPath,
},
maxMetadataCacheFreshness: 24 * time.Hour,
shouldCache: false,
},
{
name: "req does not overlap max cache freshness window",
req: &LokiSeriesRequest{
StartTs: from.Add(-24 * time.Hour).Time(),
EndTs: through.Add(-24 * time.Hour).Time(),
Match: []string{`{namespace=~".*"}`},
Path: seriesAPIPath,
},
maxMetadataCacheFreshness: 24 * time.Hour,
shouldCache: true,
},
} {
t.Run(tt.name, func(t *testing.T) {
cacheMiddleware, err := NewSeriesCacheMiddleware(
log.NewNopLogger(),
fakeLimits{
metadataSplitDuration: map[string]time.Duration{
"fake": 24 * time.Hour,
},
maxMetadataCacheFreshness: tt.maxMetadataCacheFreshness,
},
DefaultCodec,
cache.NewMockCache(),
nil,
nil,
func(_ context.Context, _ []string, _ queryrangebase.Request) int {
return 1
},
false,
nil,
nil,
)
require.NoError(t, err)

seriesResp := &LokiSeriesResponse{
Status: "success",
Version: uint32(loghttp.VersionV1),
Data: []logproto.SeriesIdentifier{
{
Labels: []logproto.SeriesIdentifier_LabelsEntry{{Key: "cluster", Value: "eu-west"}, {Key: "namespace", Value: "prod"}},
},
},
Statistics: stats.Result{
Summary: stats.Summary{
Splits: 1,
},
},
}

called := 0
handler := cacheMiddleware.Wrap(queryrangebase.HandlerFunc(func(_ context.Context, r queryrangebase.Request) (queryrangebase.Response, error) {
called++

// should request the entire length with no partitioning as nothing is cached yet.
require.Equal(t, tt.req.GetStart(), r.GetStart())
require.Equal(t, tt.req.GetEnd(), r.GetEnd())

return seriesResp, nil
}))

ctx := user.InjectOrgID(context.Background(), "fake")
got, err := handler.Do(ctx, tt.req)
require.NoError(t, err)
require.Equal(t, 1, called) // called actual handler, as not cached.
require.Equal(t, seriesResp, got)

called = 0
got, err = handler.Do(ctx, tt.req)
require.NoError(t, err)
if !tt.shouldCache {
require.Equal(t, 1, called)
} else {
require.Equal(t, 0, called)
}
require.Equal(t, seriesResp, got)
})
}
}