Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ingester: reduce active series when series change owner #8084

Merged
Merged
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
* [ENHANCEMENT] Store-gateway: add `-blocks-storage.bucket-store.max-concurrent-queue-timeout`. When set, queries at the store-gateway's query gate will not wait longer than that to execute. If a query reaches the wait timeout, then the querier will retry the blocks on a different store-gateway. If all store-gateways are unavailable, then the query will fail with `err-mimir-store-consistency-check-failed`. #7777
* [ENHANCEMENT] Ingester: Optimize querying with regexp matchers. #8106
* [ENHANCEMENT] Distributor: Introduce `-distributor.max-request-pool-buffer-size` to allow configuring the maximum size of the request pool buffers. #8082
* [ENHANCEMENT] Ingester: active series are now updated along with owned series. They decrease when series change ownership between ingesters. This helps provide a more accurate total of active series when ingesters are added. This is only enabled when `-ingester.track-ingester-owned-series` or `-ingester.use-ingester-owned-series-for-limits` are enabled. #8084
* [BUGFIX] Rules: improve error handling when querier is local to the ruler. #7567
* [BUGFIX] Querier, store-gateway: Protect against panics raised during snappy encoding. #7520
* [BUGFIX] Ingester: Prevent timely compaction of empty blocks. #7624
Expand Down
26 changes: 25 additions & 1 deletion pkg/ingester/activeseries/active_series.go
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,21 @@ func (c *ActiveSeries) ActiveWithMatchers() (total int, totalMatching []int, tot
return
}

func (c *ActiveSeries) Delete(ref chunks.HeadSeriesRef) {
stripeID := storage.SeriesRef(ref) % numStripes
c.stripes[stripeID].remove(storage.SeriesRef(ref))
}

func (c *ActiveSeries) Clear() {
for s := 0; s < numStripes; s++ {
c.stripes[s].clear()
}
// c.deleted keeps track of series which were removed from memory, but might come back with a different SeriesRef.
// If they do come back, then we use deletedSeries to stop tracking their previous entry.
// We can also clear the deleted series because we've already stopped tracking all series.
c.deleted.clear()
}

func (s *seriesStripe) containsRef(ref storage.SeriesRef) bool {
s.mu.RLock()
defer s.mu.RUnlock()
Expand Down Expand Up @@ -381,7 +396,6 @@ func (s *seriesStripe) findAndUpdateOrCreateEntryForSeries(ref storage.SeriesRef
return e.nanos, true
}

// nolint // Linter reports that this method is unused, but it is.
func (s *seriesStripe) clear() {
s.mu.Lock()
defer s.mu.Unlock()
Expand Down Expand Up @@ -576,3 +590,13 @@ func (ds *deletedSeries) purge(ref storage.SeriesRef) {
delete(ds.keys, ref)
delete(ds.refs, key)
}

func (ds *deletedSeries) clear() {
ds.mu.Lock()
defer ds.mu.Unlock()

// nil the maps to release memory.
// They will be reinitialized if the tenant resumes sending series.
ds.keys = nil
ds.refs = nil
}
30 changes: 27 additions & 3 deletions pkg/ingester/activeseries/active_series_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -220,16 +220,19 @@ func TestActiveSeries_ContainsRef(t *testing.T) {
}

func TestActiveSeries_UpdateSeries_WithMatchers(t *testing.T) {
asm := NewMatchers(mustNewCustomTrackersConfigFromMap(t, map[string]string{"foo": `{a=~"2|3|4"}`}))
c := NewActiveSeries(asm, DefaultTimeout)
testUpdateSeries(t, c)
}

func testUpdateSeries(t *testing.T, c *ActiveSeries) {
ref1, ls1 := storage.SeriesRef(1), labels.FromStrings("a", "1")
ref2, ls2 := storage.SeriesRef(2), labels.FromStrings("a", "2")
ref3, ls3 := storage.SeriesRef(3), labels.FromStrings("a", "3")
ref4, ls4 := storage.SeriesRef(4), labels.FromStrings("a", "4")
ref5, ls5 := storage.SeriesRef(5), labels.FromStrings("a", "5")
ref6 := storage.SeriesRef(6) // same as ls2

asm := NewMatchers(mustNewCustomTrackersConfigFromMap(t, map[string]string{"foo": `{a=~"2|3|4"}`}))

c := NewActiveSeries(asm, DefaultTimeout)
valid := c.Purge(time.Now())
assert.True(t, valid)
allActive, activeMatching, allActiveHistograms, activeMatchingHistograms, allActiveBuckets, activeMatchingBuckets := c.ActiveWithMatchers()
Expand Down Expand Up @@ -433,6 +436,27 @@ func TestActiveSeries_UpdateSeries_WithMatchers(t *testing.T) {
assert.Empty(t, c.deleted.keys)
}

func TestActiveSeries_UpdateSeries_Clear(t *testing.T) {
asm := NewMatchers(mustNewCustomTrackersConfigFromMap(t, map[string]string{"foo": `{a=~"2|3|4"}`}))
c := NewActiveSeries(asm, DefaultTimeout)
testUpdateSeries(t, c)

c.Clear()
allActive, activeMatching, allActiveHistograms, activeMatchingHistograms, allActiveBuckets, activeMatchingBuckets := c.ActiveWithMatchers()
assert.Equal(t, 0, allActive)
assert.Equal(t, []int{0}, activeMatching)
assert.Equal(t, 0, allActiveHistograms)
assert.Equal(t, []int{0}, activeMatchingHistograms)
assert.Equal(t, 0, allActiveBuckets)
assert.Equal(t, []int{0}, activeMatchingBuckets)
allActive, allActiveHistograms, allActiveBuckets = c.Active()
assert.Equal(t, 0, allActive)
assert.Equal(t, 0, allActiveHistograms)
assert.Equal(t, 0, allActiveBuckets)

testUpdateSeries(t, c)
}

func labelsWithHashCollision() (labels.Labels, labels.Labels) {
// These two series have the same XXHash; thanks to https://github.com/pstibrany/labels_hash_collisions
ls1 := labels.FromStrings("__name__", "metric", "lbl1", "value", "lbl2", "l6CQ5y")
Expand Down
Loading
Loading