Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Streaming store-gateway: make room for tenant ID in postings cache key #3839

Merged
merged 8 commits into from
Jan 4, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
### Grafana Mimir

* [CHANGE] Store-gateway: Remove experimental `-blocks-storage.bucket-store.max-concurrent-reject-over-limit` flag. #3706
* [FEATURE] Store-gateway: streaming of series. The store-gateway can now stream results back to the querier instead of buffering them. This is expected to greatly reduce peak memory consumption while keeping latency the same. You can enable this feature by setting `-blocks-storage.bucket-store.batch-series-size` to a value in the high thousands (5000-10000). This is still an experimental feature and is subject to a changing API and instability. #3540 #3546 #3587 #3606 #3611 #3620 #3645 #3355 #3697 #3666 #3687 #3728 #3739 #3751 #3779
* [FEATURE] Store-gateway: streaming of series. The store-gateway can now stream results back to the querier instead of buffering them. This is expected to greatly reduce peak memory consumption while keeping latency the same. You can enable this feature by setting `-blocks-storage.bucket-store.batch-series-size` to a value in the high thousands (5000-10000). This is still an experimental feature and is subject to a changing API and instability. #3540 #3546 #3587 #3606 #3611 #3620 #3645 #3355 #3697 #3666 #3687 #3728 #3739 #3751 #3779 #3839
* [ENHANCEMENT] Added new metric `thanos_shipper_last_successful_upload_time`: Unix timestamp (in seconds) of the last successful TSDB block uploaded to the bucket. #3627
* [ENHANCEMENT] Ruler: Added `-ruler.alertmanager-client.tls-enabled` configuration for alertmanager client. #3432 #3597
* [ENHANCEMENT] Activity tracker logs now have `component=activity-tracker` label. #3556
Expand Down
4 changes: 2 additions & 2 deletions pkg/storegateway/bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,9 +152,9 @@ func (noopCache) FetchSeries(_ context.Context, _ string, _ ulid.ULID, _ indexca
return nil, false
}

func (noopCache) StoreSeriesForPostings(_ context.Context, _ string, _ ulid.ULID, _ indexcache.LabelMatchersKey, _ *sharding.ShardSelector, _ indexcache.PostingsKey, _ []byte) {
func (noopCache) StoreSeriesForPostings(ctx context.Context, userID string, blockID ulid.ULID, shard *sharding.ShardSelector, postingsKey indexcache.PostingsKey, v []byte) {
}
func (noopCache) FetchSeriesForPostings(_ context.Context, _ string, _ ulid.ULID, _ indexcache.LabelMatchersKey, _ *sharding.ShardSelector, _ indexcache.PostingsKey) ([]byte, bool) {
func (noopCache) FetchSeriesForPostings(ctx context.Context, userID string, blockID ulid.ULID, shard *sharding.ShardSelector, postingsKey indexcache.PostingsKey) ([]byte, bool) {
return nil, false
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/storegateway/bucket_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2343,7 +2343,7 @@ func (c cacheNotExpectingToStoreSeries) StoreSeries(ctx context.Context, userID
c.t.Fatalf("StoreSeries should not be called")
}

func (c cacheNotExpectingToStoreSeries) StoreSeriesForPostings(ctx context.Context, userID string, blockID ulid.ULID, matchersKey indexcache.LabelMatchersKey, shard *sharding.ShardSelector, postingsKey indexcache.PostingsKey, v []byte) {
func (c cacheNotExpectingToStoreSeries) StoreSeriesForPostings(ctx context.Context, userID string, blockID ulid.ULID, shard *sharding.ShardSelector, postingsKey indexcache.PostingsKey, v []byte) {
c.t.Fatalf("StoreSeriesForPostings should not be called")
}

Expand Down
4 changes: 2 additions & 2 deletions pkg/storegateway/indexcache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,9 +72,9 @@ type IndexCache interface {
FetchSeries(ctx context.Context, userID string, blockID ulid.ULID, matchersKey LabelMatchersKey, shard *sharding.ShardSelector) ([]byte, bool)

// StoreSeriesForPostings stores a series set for the provided postings.
StoreSeriesForPostings(ctx context.Context, userID string, blockID ulid.ULID, matchersKey LabelMatchersKey, shard *sharding.ShardSelector, postingsKey PostingsKey, v []byte)
StoreSeriesForPostings(ctx context.Context, userID string, blockID ulid.ULID, shard *sharding.ShardSelector, postingsKey PostingsKey, v []byte)
// FetchSeriesForPostings fetches a series set for the provided postings.
FetchSeriesForPostings(ctx context.Context, userID string, blockID ulid.ULID, matchersKey LabelMatchersKey, shard *sharding.ShardSelector, postingsKey PostingsKey) ([]byte, bool)
FetchSeriesForPostings(ctx context.Context, userID string, blockID ulid.ULID, shard *sharding.ShardSelector, postingsKey PostingsKey) ([]byte, bool)

// StoreLabelNames stores the result of a LabelNames() call.
StoreLabelNames(ctx context.Context, userID string, blockID ulid.ULID, matchersKey LabelMatchersKey, v []byte)
Expand Down
11 changes: 5 additions & 6 deletions pkg/storegateway/indexcache/inmemory.go
Original file line number Diff line number Diff line change
Expand Up @@ -356,13 +356,13 @@ func (c *InMemoryIndexCache) FetchSeries(_ context.Context, userID string, block
}

// StoreSeriesForPostings stores a series set for the provided postings.
func (c *InMemoryIndexCache) StoreSeriesForPostings(_ context.Context, userID string, blockID ulid.ULID, matchersKey LabelMatchersKey, shard *sharding.ShardSelector, postingsKey PostingsKey, v []byte) {
c.set(cacheKeySeriesForPostings{userID, blockID, matchersKey, shardKey(shard), postingsKey}, v)
func (c *InMemoryIndexCache) StoreSeriesForPostings(ctx context.Context, userID string, blockID ulid.ULID, shard *sharding.ShardSelector, postingsKey PostingsKey, v []byte) {
c.set(cacheKeySeriesForPostings{userID, blockID, shardKey(shard), postingsKey}, v)
}

// FetchSeriesForPostings fetches a series set for the provided postings.
func (c *InMemoryIndexCache) FetchSeriesForPostings(_ context.Context, userID string, blockID ulid.ULID, matchersKey LabelMatchersKey, shard *sharding.ShardSelector, postingsKey PostingsKey) ([]byte, bool) {
return c.get(cacheKeySeriesForPostings{userID, blockID, matchersKey, shardKey(shard), postingsKey})
func (c *InMemoryIndexCache) FetchSeriesForPostings(ctx context.Context, userID string, blockID ulid.ULID, shard *sharding.ShardSelector, postingsKey PostingsKey) ([]byte, bool) {
return c.get(cacheKeySeriesForPostings{userID, blockID, shardKey(shard), postingsKey})
}

// StoreLabelNames stores the result of a LabelNames() call.
Expand Down Expand Up @@ -451,7 +451,6 @@ func (c cacheKeySeries) size() uint64 {
type cacheKeySeriesForPostings struct {
userID string
block ulid.ULID
matchersKey LabelMatchersKey
shard string
postingsKey PostingsKey
}
Expand All @@ -461,7 +460,7 @@ func (c cacheKeySeriesForPostings) typ() string {
}

func (c cacheKeySeriesForPostings) size() uint64 {
return stringSize(c.userID) + ulidSize + stringSize(string(c.matchersKey)) + stringSize(c.shard) + stringSize(string(c.postingsKey))
return stringSize(c.userID) + ulidSize + stringSize(c.shard) + stringSize(string(c.postingsKey))
}

type cacheKeyLabelNames struct {
Expand Down
4 changes: 2 additions & 2 deletions pkg/storegateway/indexcache/inmemory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,10 +181,10 @@ func TestInMemoryIndexCache_UpdateItem(t *testing.T) {
{
typ: cacheTypeSeriesForPostings,
set: func(id uint64, b []byte) {
cache.StoreSeriesForPostings(ctx, user, uid(id), CanonicalLabelMatchersKey(matchers), shard, CanonicalPostingsKey([]storage.SeriesRef{1}), b)
cache.StoreSeriesForPostings(ctx, user, uid(id), shard, CanonicalPostingsKey([]storage.SeriesRef{1}), b)
},
get: func(id uint64) ([]byte, bool) {
return cache.FetchSeriesForPostings(ctx, user, uid(id), CanonicalLabelMatchersKey(matchers), shard, CanonicalPostingsKey([]storage.SeriesRef{1}))
return cache.FetchSeriesForPostings(ctx, user, uid(id), shard, CanonicalPostingsKey([]storage.SeriesRef{1}))
},
},
{
Expand Down
16 changes: 9 additions & 7 deletions pkg/storegateway/indexcache/memcached.go
Original file line number Diff line number Diff line change
Expand Up @@ -237,22 +237,24 @@ func seriesCacheKey(userID string, blockID ulid.ULID, matchersKey LabelMatchersK
}

// StoreSeriesForPostings stores a series set for the provided postings.
func (c *MemcachedIndexCache) StoreSeriesForPostings(ctx context.Context, userID string, blockID ulid.ULID, matchersKey LabelMatchersKey, shard *sharding.ShardSelector, postingsKey PostingsKey, v []byte) {
c.set(ctx, cacheTypeSeriesForPostings, seriesForPostingsCacheKey(userID, blockID, matchersKey, shard, postingsKey), v)
func (c *MemcachedIndexCache) StoreSeriesForPostings(ctx context.Context, userID string, blockID ulid.ULID, shard *sharding.ShardSelector, postingsKey PostingsKey, v []byte) {
c.set(ctx, cacheTypeSeriesForPostings, seriesForPostingsCacheKey(userID, blockID, shard, postingsKey), v)
}

// FetchSeriesForPostings fetches a series set for the provided postings.
func (c *MemcachedIndexCache) FetchSeriesForPostings(ctx context.Context, userID string, blockID ulid.ULID, matchersKey LabelMatchersKey, shard *sharding.ShardSelector, postingsKey PostingsKey) ([]byte, bool) {
return c.get(ctx, cacheTypeSeriesForPostings, seriesForPostingsCacheKey(userID, blockID, matchersKey, shard, postingsKey))
func (c *MemcachedIndexCache) FetchSeriesForPostings(ctx context.Context, userID string, blockID ulid.ULID, shard *sharding.ShardSelector, postingsKey PostingsKey) ([]byte, bool) {
return c.get(ctx, cacheTypeSeriesForPostings, seriesForPostingsCacheKey(userID, blockID, shard, postingsKey))
}

func seriesForPostingsCacheKey(userID string, blockID ulid.ULID, matchersKey LabelMatchersKey, shard *sharding.ShardSelector, postingsKey PostingsKey) string {
hash := blake2b.Sum256([]byte(matchersKey))
func seriesForPostingsCacheKey(userID string, blockID ulid.ULID, shard *sharding.ShardSelector, postingsKey PostingsKey) string {
// We use SP2: as
// * S: is already used for SeriesForRef
// * SS: is already used for Series
// * SP: was in use when using gob encoding
return "SP2:" + userID + ":" + blockID.String() + ":" + shardKey(shard) + ":" + string(postingsKey) + ":" + base64.RawURLEncoding.EncodeToString(hash[0:])
//
// "SP*" (3) + userID (150) + blockID (26) + shard (10 with up to 1000 shards) + ":" (4) = 193
dimitarvdimitrov marked this conversation as resolved.
Show resolved Hide resolved
// Memcached limits key length to 250, so we're left with 57 bytes for the postings key.
return "SP2:" + userID + ":" + blockID.String() + ":" + shardKey(shard) + ":" + string(postingsKey)
}

// StoreLabelNames stores the result of a LabelNames() call.
Expand Down
15 changes: 7 additions & 8 deletions pkg/storegateway/indexcache/memcached_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -359,12 +359,11 @@ func TestMemcachedIndexCache_FetchSeriesForPostings(t *testing.T) {
},
"should return no miss on hit": {
setup: []mockedSeries{
{userID: user1, block: block1, matchers: matchers1, shard: shard1, postings: postings1, value: value1},
{userID: user2, block: block1, matchers: matchers1, shard: shard1, postings: postings1, value: value2}, // different user
{userID: user1, block: block1, matchers: matchers1, shard: shard2, postings: postings1, value: value2}, // different shard
{userID: user1, block: block1, matchers: matchers2, shard: shard1, postings: postings1, value: value2}, // different matchers
{userID: user1, block: block2, matchers: matchers1, shard: shard1, postings: postings1, value: value3}, // different block
{userID: user1, block: block2, matchers: matchers1, shard: shard1, postings: postings2, value: value3}, // different postings
{userID: user1, block: block1, shard: shard1, postings: postings1, value: value1},
{userID: user2, block: block1, shard: shard1, postings: postings1, value: value2}, // different user
{userID: user1, block: block1, shard: shard2, postings: postings1, value: value2}, // different shard
{userID: user1, block: block2, shard: shard1, postings: postings1, value: value3}, // different block
{userID: user1, block: block2, shard: shard1, postings: postings2, value: value3}, // different postings
},
fetchUserID: user1,
fetchBlockID: block1,
Expand Down Expand Up @@ -400,11 +399,11 @@ func TestMemcachedIndexCache_FetchSeriesForPostings(t *testing.T) {
// Store the postings expected before running the test.
ctx := context.Background()
for _, p := range testData.setup {
c.StoreSeriesForPostings(ctx, p.userID, p.block, CanonicalLabelMatchersKey(p.matchers), p.shard, CanonicalPostingsKey(p.postings), p.value)
c.StoreSeriesForPostings(ctx, p.userID, p.block, p.shard, CanonicalPostingsKey(p.postings), p.value)
}

// Fetch postings from cached and assert on it.
data, ok := c.FetchSeriesForPostings(ctx, testData.fetchUserID, testData.fetchBlockID, testData.fetchKey, testData.fetchShard, CanonicalPostingsKey(testData.postings))
data, ok := c.FetchSeriesForPostings(ctx, testData.fetchUserID, testData.fetchBlockID, testData.fetchShard, CanonicalPostingsKey(testData.postings))
assert.Equal(t, testData.expectedData, data)
assert.Equal(t, testData.expectedOk, ok)

Expand Down
9 changes: 4 additions & 5 deletions pkg/storegateway/indexcache/tracing.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,18 +113,17 @@ func (t *TracingIndexCache) FetchSeries(ctx context.Context, userID string, bloc
return data, found
}

func (t *TracingIndexCache) StoreSeriesForPostings(ctx context.Context, userID string, blockID ulid.ULID, matchersKey LabelMatchersKey, shard *sharding.ShardSelector, postingsKey PostingsKey, v []byte) {
t.c.StoreSeriesForPostings(ctx, userID, blockID, matchersKey, shard, postingsKey, v)
func (t *TracingIndexCache) StoreSeriesForPostings(ctx context.Context, userID string, blockID ulid.ULID, shard *sharding.ShardSelector, postingsKey PostingsKey, v []byte) {
t.c.StoreSeriesForPostings(ctx, userID, blockID, shard, postingsKey, v)
}

func (t *TracingIndexCache) FetchSeriesForPostings(ctx context.Context, userID string, blockID ulid.ULID, matchersKey LabelMatchersKey, shard *sharding.ShardSelector, postingsKey PostingsKey) ([]byte, bool) {
func (t *TracingIndexCache) FetchSeriesForPostings(ctx context.Context, userID string, blockID ulid.ULID, shard *sharding.ShardSelector, postingsKey PostingsKey) ([]byte, bool) {
t0 := time.Now()
data, found := t.c.FetchSeriesForPostings(ctx, userID, blockID, matchersKey, shard, postingsKey)
data, found := t.c.FetchSeriesForPostings(ctx, userID, blockID, shard, postingsKey)

spanLogger := spanlogger.FromContext(ctx, t.logger)
level.Debug(spanLogger).Log(
"msg", "IndexCache.FetchSeriesForPostings",
"matchers_key", matchersKey,
"shard", shardKey(shard),
"found", found,
"time_elapsed", time.Since(t0),
Expand Down
Loading