Skip to content

Commit

Permalink
Streaming store-gateway: make room for tenant ID in postings cache key (
Browse files Browse the repository at this point in the history
#3839)

* Add postings to cache entry

Signed-off-by: Dimitar Dimitrov <dimitar.dimitrov@grafana.com>

* Remove shard from cache entry

Signed-off-by: Dimitar Dimitrov <dimitar.dimitrov@grafana.com>

* Remove increased hash size

Signed-off-by: Dimitar Dimitrov <dimitar.dimitrov@grafana.com>

* Remove commented debug line

Signed-off-by: Dimitar Dimitrov <dimitar.dimitrov@grafana.com>

* Remove extra comment in memcached.go

Signed-off-by: Dimitar Dimitrov <dimitar.dimitrov@grafana.com>

* Run BenchmarkFetchCachedSeriesForPostings with postings

Signed-off-by: Dimitar Dimitrov <dimitar.dimitrov@grafana.com>

* Change file mode

Signed-off-by: Dimitar Dimitrov <dimitar.dimitrov@grafana.com>

* Add CHANGELOG.md entry

Signed-off-by: Dimitar Dimitrov <dimitar.dimitrov@grafana.com>

Signed-off-by: Dimitar Dimitrov <dimitar.dimitrov@grafana.com>
  • Loading branch information
dimitarvdimitrov committed Jan 4, 2023
1 parent c50d360 commit 4a80c31
Show file tree
Hide file tree
Showing 13 changed files with 202 additions and 319 deletions.
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

* [CHANGE] Querier: Introduce `-querier.max-partial-query-length` to limit the time range for partial queries at the querier level and deprecate `-store.max-query-length`. #3825
* [CHANGE] Store-gateway: Remove experimental `-blocks-storage.bucket-store.max-concurrent-reject-over-limit` flag. #3706
* [FEATURE] Store-gateway: streaming of series. The store-gateway can now stream results back to the querier instead of buffering them. This is expected to greatly reduce peak memory consumption while keeping latency the same. You can enable this feature by setting `-blocks-storage.bucket-store.batch-series-size` to a value in the high thousands (5000-10000). This is still an experimental feature and is subject to a changing API and instability. #3540 #3546 #3587 #3606 #3611 #3620 #3645 #3355 #3697 #3666 #3687 #3728 #3739 #3751 #3779
* [FEATURE] Store-gateway: streaming of series. The store-gateway can now stream results back to the querier instead of buffering them. This is expected to greatly reduce peak memory consumption while keeping latency the same. You can enable this feature by setting `-blocks-storage.bucket-store.batch-series-size` to a value in the high thousands (5000-10000). This is still an experimental feature and is subject to a changing API and instability. #3540 #3546 #3587 #3606 #3611 #3620 #3645 #3355 #3697 #3666 #3687 #3728 #3739 #3751 #3779 #3839
* [ENHANCEMENT] Added new metric `thanos_shipper_last_successful_upload_time`: Unix timestamp (in seconds) of the last successful TSDB block uploaded to the bucket. #3627
* [ENHANCEMENT] Ruler: Added `-ruler.alertmanager-client.tls-enabled` configuration for alertmanager client. #3432 #3597
* [ENHANCEMENT] Activity tracker logs now have `component=activity-tracker` label. #3556
Expand Down
4 changes: 2 additions & 2 deletions pkg/storegateway/bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,9 +152,9 @@ func (noopCache) FetchSeries(_ context.Context, _ string, _ ulid.ULID, _ indexca
return nil, false
}

func (noopCache) StoreSeriesForPostings(_ context.Context, _ string, _ ulid.ULID, _ indexcache.LabelMatchersKey, _ *sharding.ShardSelector, _ indexcache.PostingsKey, _ []byte) {
func (noopCache) StoreSeriesForPostings(ctx context.Context, userID string, blockID ulid.ULID, shard *sharding.ShardSelector, postingsKey indexcache.PostingsKey, v []byte) {
}
func (noopCache) FetchSeriesForPostings(_ context.Context, _ string, _ ulid.ULID, _ indexcache.LabelMatchersKey, _ *sharding.ShardSelector, _ indexcache.PostingsKey) ([]byte, bool) {
func (noopCache) FetchSeriesForPostings(ctx context.Context, userID string, blockID ulid.ULID, shard *sharding.ShardSelector, postingsKey indexcache.PostingsKey) ([]byte, bool) {
return nil, false
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/storegateway/bucket_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2343,7 +2343,7 @@ func (c cacheNotExpectingToStoreSeries) StoreSeries(ctx context.Context, userID
c.t.Fatalf("StoreSeries should not be called")
}

func (c cacheNotExpectingToStoreSeries) StoreSeriesForPostings(ctx context.Context, userID string, blockID ulid.ULID, matchersKey indexcache.LabelMatchersKey, shard *sharding.ShardSelector, postingsKey indexcache.PostingsKey, v []byte) {
func (c cacheNotExpectingToStoreSeries) StoreSeriesForPostings(ctx context.Context, userID string, blockID ulid.ULID, shard *sharding.ShardSelector, postingsKey indexcache.PostingsKey, v []byte) {
c.t.Fatalf("StoreSeriesForPostings should not be called")
}

Expand Down
4 changes: 2 additions & 2 deletions pkg/storegateway/indexcache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,9 +72,9 @@ type IndexCache interface {
FetchSeries(ctx context.Context, userID string, blockID ulid.ULID, matchersKey LabelMatchersKey, shard *sharding.ShardSelector) ([]byte, bool)

// StoreSeriesForPostings stores a series set for the provided postings.
StoreSeriesForPostings(ctx context.Context, userID string, blockID ulid.ULID, matchersKey LabelMatchersKey, shard *sharding.ShardSelector, postingsKey PostingsKey, v []byte)
StoreSeriesForPostings(ctx context.Context, userID string, blockID ulid.ULID, shard *sharding.ShardSelector, postingsKey PostingsKey, v []byte)
// FetchSeriesForPostings fetches a series set for the provided postings.
FetchSeriesForPostings(ctx context.Context, userID string, blockID ulid.ULID, matchersKey LabelMatchersKey, shard *sharding.ShardSelector, postingsKey PostingsKey) ([]byte, bool)
FetchSeriesForPostings(ctx context.Context, userID string, blockID ulid.ULID, shard *sharding.ShardSelector, postingsKey PostingsKey) ([]byte, bool)

// StoreLabelNames stores the result of a LabelNames() call.
StoreLabelNames(ctx context.Context, userID string, blockID ulid.ULID, matchersKey LabelMatchersKey, v []byte)
Expand Down
11 changes: 5 additions & 6 deletions pkg/storegateway/indexcache/inmemory.go
Original file line number Diff line number Diff line change
Expand Up @@ -356,13 +356,13 @@ func (c *InMemoryIndexCache) FetchSeries(_ context.Context, userID string, block
}

// StoreSeriesForPostings stores a series set for the provided postings.
func (c *InMemoryIndexCache) StoreSeriesForPostings(_ context.Context, userID string, blockID ulid.ULID, matchersKey LabelMatchersKey, shard *sharding.ShardSelector, postingsKey PostingsKey, v []byte) {
c.set(cacheKeySeriesForPostings{userID, blockID, matchersKey, shardKey(shard), postingsKey}, v)
func (c *InMemoryIndexCache) StoreSeriesForPostings(ctx context.Context, userID string, blockID ulid.ULID, shard *sharding.ShardSelector, postingsKey PostingsKey, v []byte) {
c.set(cacheKeySeriesForPostings{userID, blockID, shardKey(shard), postingsKey}, v)
}

// FetchSeriesForPostings fetches a series set for the provided postings.
func (c *InMemoryIndexCache) FetchSeriesForPostings(_ context.Context, userID string, blockID ulid.ULID, matchersKey LabelMatchersKey, shard *sharding.ShardSelector, postingsKey PostingsKey) ([]byte, bool) {
return c.get(cacheKeySeriesForPostings{userID, blockID, matchersKey, shardKey(shard), postingsKey})
func (c *InMemoryIndexCache) FetchSeriesForPostings(ctx context.Context, userID string, blockID ulid.ULID, shard *sharding.ShardSelector, postingsKey PostingsKey) ([]byte, bool) {
return c.get(cacheKeySeriesForPostings{userID, blockID, shardKey(shard), postingsKey})
}

// StoreLabelNames stores the result of a LabelNames() call.
Expand Down Expand Up @@ -451,7 +451,6 @@ func (c cacheKeySeries) size() uint64 {
type cacheKeySeriesForPostings struct {
userID string
block ulid.ULID
matchersKey LabelMatchersKey
shard string
postingsKey PostingsKey
}
Expand All @@ -461,7 +460,7 @@ func (c cacheKeySeriesForPostings) typ() string {
}

func (c cacheKeySeriesForPostings) size() uint64 {
return stringSize(c.userID) + ulidSize + stringSize(string(c.matchersKey)) + stringSize(c.shard) + stringSize(string(c.postingsKey))
return stringSize(c.userID) + ulidSize + stringSize(c.shard) + stringSize(string(c.postingsKey))
}

type cacheKeyLabelNames struct {
Expand Down
4 changes: 2 additions & 2 deletions pkg/storegateway/indexcache/inmemory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,10 +181,10 @@ func TestInMemoryIndexCache_UpdateItem(t *testing.T) {
{
typ: cacheTypeSeriesForPostings,
set: func(id uint64, b []byte) {
cache.StoreSeriesForPostings(ctx, user, uid(id), CanonicalLabelMatchersKey(matchers), shard, CanonicalPostingsKey([]storage.SeriesRef{1}), b)
cache.StoreSeriesForPostings(ctx, user, uid(id), shard, CanonicalPostingsKey([]storage.SeriesRef{1}), b)
},
get: func(id uint64) ([]byte, bool) {
return cache.FetchSeriesForPostings(ctx, user, uid(id), CanonicalLabelMatchersKey(matchers), shard, CanonicalPostingsKey([]storage.SeriesRef{1}))
return cache.FetchSeriesForPostings(ctx, user, uid(id), shard, CanonicalPostingsKey([]storage.SeriesRef{1}))
},
},
{
Expand Down
16 changes: 9 additions & 7 deletions pkg/storegateway/indexcache/memcached.go
Original file line number Diff line number Diff line change
Expand Up @@ -237,22 +237,24 @@ func seriesCacheKey(userID string, blockID ulid.ULID, matchersKey LabelMatchersK
}

// StoreSeriesForPostings stores a series set for the provided postings.
func (c *MemcachedIndexCache) StoreSeriesForPostings(ctx context.Context, userID string, blockID ulid.ULID, matchersKey LabelMatchersKey, shard *sharding.ShardSelector, postingsKey PostingsKey, v []byte) {
c.set(ctx, cacheTypeSeriesForPostings, seriesForPostingsCacheKey(userID, blockID, matchersKey, shard, postingsKey), v)
func (c *MemcachedIndexCache) StoreSeriesForPostings(ctx context.Context, userID string, blockID ulid.ULID, shard *sharding.ShardSelector, postingsKey PostingsKey, v []byte) {
c.set(ctx, cacheTypeSeriesForPostings, seriesForPostingsCacheKey(userID, blockID, shard, postingsKey), v)
}

// FetchSeriesForPostings fetches a series set for the provided postings.
func (c *MemcachedIndexCache) FetchSeriesForPostings(ctx context.Context, userID string, blockID ulid.ULID, matchersKey LabelMatchersKey, shard *sharding.ShardSelector, postingsKey PostingsKey) ([]byte, bool) {
return c.get(ctx, cacheTypeSeriesForPostings, seriesForPostingsCacheKey(userID, blockID, matchersKey, shard, postingsKey))
func (c *MemcachedIndexCache) FetchSeriesForPostings(ctx context.Context, userID string, blockID ulid.ULID, shard *sharding.ShardSelector, postingsKey PostingsKey) ([]byte, bool) {
return c.get(ctx, cacheTypeSeriesForPostings, seriesForPostingsCacheKey(userID, blockID, shard, postingsKey))
}

func seriesForPostingsCacheKey(userID string, blockID ulid.ULID, matchersKey LabelMatchersKey, shard *sharding.ShardSelector, postingsKey PostingsKey) string {
hash := blake2b.Sum256([]byte(matchersKey))
func seriesForPostingsCacheKey(userID string, blockID ulid.ULID, shard *sharding.ShardSelector, postingsKey PostingsKey) string {
// We use SP2: as
// * S: is already used for SeriesForRef
// * SS: is already used for Series
// * SP: was in use when using gob encoding
return "SP2:" + userID + ":" + blockID.String() + ":" + shardKey(shard) + ":" + string(postingsKey) + ":" + base64.RawURLEncoding.EncodeToString(hash[0:])
//
// "SP*" (3) + userID (150) + blockID (26) + shard (10 with up to 1000 shards) + ":" (4) = 193
// Memcached limits key length to 250, so we're left with 57 bytes for the postings key.
return "SP2:" + userID + ":" + blockID.String() + ":" + shardKey(shard) + ":" + string(postingsKey)
}

// StoreLabelNames stores the result of a LabelNames() call.
Expand Down
15 changes: 7 additions & 8 deletions pkg/storegateway/indexcache/memcached_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -359,12 +359,11 @@ func TestMemcachedIndexCache_FetchSeriesForPostings(t *testing.T) {
},
"should return no miss on hit": {
setup: []mockedSeries{
{userID: user1, block: block1, matchers: matchers1, shard: shard1, postings: postings1, value: value1},
{userID: user2, block: block1, matchers: matchers1, shard: shard1, postings: postings1, value: value2}, // different user
{userID: user1, block: block1, matchers: matchers1, shard: shard2, postings: postings1, value: value2}, // different shard
{userID: user1, block: block1, matchers: matchers2, shard: shard1, postings: postings1, value: value2}, // different matchers
{userID: user1, block: block2, matchers: matchers1, shard: shard1, postings: postings1, value: value3}, // different block
{userID: user1, block: block2, matchers: matchers1, shard: shard1, postings: postings2, value: value3}, // different postings
{userID: user1, block: block1, shard: shard1, postings: postings1, value: value1},
{userID: user2, block: block1, shard: shard1, postings: postings1, value: value2}, // different user
{userID: user1, block: block1, shard: shard2, postings: postings1, value: value2}, // different shard
{userID: user1, block: block2, shard: shard1, postings: postings1, value: value3}, // different block
{userID: user1, block: block2, shard: shard1, postings: postings2, value: value3}, // different postings
},
fetchUserID: user1,
fetchBlockID: block1,
Expand Down Expand Up @@ -400,11 +399,11 @@ func TestMemcachedIndexCache_FetchSeriesForPostings(t *testing.T) {
// Store the postings expected before running the test.
ctx := context.Background()
for _, p := range testData.setup {
c.StoreSeriesForPostings(ctx, p.userID, p.block, CanonicalLabelMatchersKey(p.matchers), p.shard, CanonicalPostingsKey(p.postings), p.value)
c.StoreSeriesForPostings(ctx, p.userID, p.block, p.shard, CanonicalPostingsKey(p.postings), p.value)
}

// Fetch postings from cached and assert on it.
data, ok := c.FetchSeriesForPostings(ctx, testData.fetchUserID, testData.fetchBlockID, testData.fetchKey, testData.fetchShard, CanonicalPostingsKey(testData.postings))
data, ok := c.FetchSeriesForPostings(ctx, testData.fetchUserID, testData.fetchBlockID, testData.fetchShard, CanonicalPostingsKey(testData.postings))
assert.Equal(t, testData.expectedData, data)
assert.Equal(t, testData.expectedOk, ok)

Expand Down
9 changes: 4 additions & 5 deletions pkg/storegateway/indexcache/tracing.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,18 +113,17 @@ func (t *TracingIndexCache) FetchSeries(ctx context.Context, userID string, bloc
return data, found
}

func (t *TracingIndexCache) StoreSeriesForPostings(ctx context.Context, userID string, blockID ulid.ULID, matchersKey LabelMatchersKey, shard *sharding.ShardSelector, postingsKey PostingsKey, v []byte) {
t.c.StoreSeriesForPostings(ctx, userID, blockID, matchersKey, shard, postingsKey, v)
func (t *TracingIndexCache) StoreSeriesForPostings(ctx context.Context, userID string, blockID ulid.ULID, shard *sharding.ShardSelector, postingsKey PostingsKey, v []byte) {
t.c.StoreSeriesForPostings(ctx, userID, blockID, shard, postingsKey, v)
}

func (t *TracingIndexCache) FetchSeriesForPostings(ctx context.Context, userID string, blockID ulid.ULID, matchersKey LabelMatchersKey, shard *sharding.ShardSelector, postingsKey PostingsKey) ([]byte, bool) {
func (t *TracingIndexCache) FetchSeriesForPostings(ctx context.Context, userID string, blockID ulid.ULID, shard *sharding.ShardSelector, postingsKey PostingsKey) ([]byte, bool) {
t0 := time.Now()
data, found := t.c.FetchSeriesForPostings(ctx, userID, blockID, matchersKey, shard, postingsKey)
data, found := t.c.FetchSeriesForPostings(ctx, userID, blockID, shard, postingsKey)

spanLogger := spanlogger.FromContext(ctx, t.logger)
level.Debug(spanLogger).Log(
"msg", "IndexCache.FetchSeriesForPostings",
"matchers_key", matchersKey,
"shard", shardKey(shard),
"found", found,
"time_elapsed", time.Since(t0),
Expand Down
Loading

0 comments on commit 4a80c31

Please sign in to comment.