Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bump frontend results cache version #1631

Merged
merged 3 commits into from
Apr 6, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
- `-alertmanager.alertmanager-client.grpc-max-send-msg-size` now defaults to 100 MiB (previously was not configurable and set to 4 MiB)
- `-alertmanager.max-recv-msg-size` now defaults to 100 MiB (previously was 16 MiB)
* [CHANGE] Ingester: Add `user` label to metrics `cortex_ingester_ingested_samples_total` and `cortex_ingester_ingested_samples_failures_total`. #1533
* [CHANGE] Query-frontend: results cache keys are now versioned, this will cause cache to be re-filled when rolling out this version. #1631
* [FEATURE] Ruler: Allow setting `evaluation_delay` for each rule group via rules group configuration file. #1474
* [FEATURE] Distributor: Added the ability to forward specifics metrics to alternative remote_write API endpoints. #1052
* [FEATURE] Ingester: Active series custom trackers now supports runtime tenant-specific overrides. The configuration has been moved to limit config, the ingester config has been deprecated. #1188
Expand Down
57 changes: 57 additions & 0 deletions pkg/cache/versioned.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
// SPDX-License-Identifier: AGPL-3.0-only

package cache

import (
"context"
"fmt"
"strings"
"time"
)

// Versioned cache adds a version prefix to the keys.
// This allows cache keys to be changed in a newer version of the code (after a bugfix or a cached data format change).
type Versioned struct {
cache Cache
versionPrefix string
}

// NewVersioned creates a new Versioned cache.
func NewVersioned(c Cache, version uint) Versioned {
return Versioned{
cache: c,
versionPrefix: fmt.Sprintf("%d@", version),
}
}

func (c Versioned) Store(ctx context.Context, data map[string][]byte, ttl time.Duration) {
versioned := make(map[string][]byte, len(data))
for k, v := range data {
versioned[c.addVersion(k)] = v
}
c.cache.Store(ctx, versioned, ttl)
}

func (c Versioned) Fetch(ctx context.Context, keys []string) map[string][]byte {
versionedKeys := make([]string, len(keys))
for i, k := range keys {
versionedKeys[i] = c.addVersion(k)
}
versionedRes := c.cache.Fetch(ctx, versionedKeys)
res := make(map[string][]byte, len(versionedRes))
for k, v := range versionedRes {
res[c.removeVersion(k)] = v
}
return res
}

func (c Versioned) Name() string {
return c.cache.Name()
}

func (c Versioned) addVersion(k string) string {
return c.versionPrefix + k
}
func (c Versioned) removeVersion(k string) string {
return strings.TrimPrefix(k, c.versionPrefix)
}
37 changes: 37 additions & 0 deletions pkg/cache/versioned_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
// SPDX-License-Identifier: AGPL-3.0-only

package cache

import (
"context"
"testing"
"time"

"github.com/stretchr/testify/assert"
)

func TestVersioned(t *testing.T) {
t.Run("happy case: can store and retrieve", func(t *testing.T) {
cache := NewMockCache()
v1 := NewVersioned(cache, 1)
data := map[string][]byte{"hit": []byte(`data`)}
v1.Store(context.Background(), data, time.Minute)
res := v1.Fetch(context.Background(), []string{"hit", "miss"})
assert.Equal(t, data, res)
})

t.Run("different versions use different datasets", func(t *testing.T) {
cache := NewMockCache()
v1 := NewVersioned(cache, 1)
v1Data := map[string][]byte{"hit": []byte(`first version`)}
v1.Store(context.Background(), v1Data, time.Minute)
v2 := NewVersioned(cache, 2)
v2Data := map[string][]byte{"hit": []byte(`second version`)}
v2.Store(context.Background(), v2Data, time.Minute)

resV1 := v1.Fetch(context.Background(), []string{"hit", "miss"})
assert.Equal(t, v1Data, resV1)
resV2 := v2.Fetch(context.Background(), []string{"hit", "miss"})
assert.Equal(t, v2Data, resV2)
})
}
8 changes: 7 additions & 1 deletion pkg/frontend/querymiddleware/results_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,9 @@ import (
)

const (
// resultsCacheVersion should be increased every time cache should be invalidated (after a bugfix or cache format change).
resultsCacheVersion = 1

// cacheControlHeader is the name of the cache control header.
cacheControlHeader = "Cache-Control"

Expand Down Expand Up @@ -86,7 +89,10 @@ func newResultsCache(cfg ResultsCacheConfig, logger log.Logger, reg prometheus.R
return nil, errUnsupportedResultsCacheBackend(cfg.Backend)
}

return cache.NewSpanlessTracingCache(client, logger), nil
return cache.NewVersioned(
cache.NewSpanlessTracingCache(client, logger),
resultsCacheVersion,
), nil
}

// Extractor is used by the cache to extract a subset of a response from a cache entry.
Expand Down