Skip to content

Commit

Permalink
ingester: reduce active series when series change owner (#8084)
Browse files Browse the repository at this point in the history
* calculate owned active series in owned series loop

Signed-off-by: Dimitar Dimitrov <dimitar.dimitrov@grafana.com>

* Support clearing active series

Signed-off-by: Dimitar Dimitrov <dimitar.dimitrov@grafana.com>

* Update owned series tests

Signed-off-by: Dimitar Dimitrov <dimitar.dimitrov@grafana.com>

* Add test for early head compaction

Signed-off-by: Dimitar Dimitrov <dimitar.dimitrov@grafana.com>

* Add CHANGELOG.md entry

Signed-off-by: Dimitar Dimitrov <dimitar.dimitrov@grafana.com>

* Apply suggestions from code review

Co-authored-by: Jon Kartago Lamida <jon.lamida@grafana.com>

* Clarify CHANGELOG.md

Signed-off-by: Dimitar Dimitrov <dimitar.dimitrov@grafana.com>

* Add missing assertion

Signed-off-by: Dimitar Dimitrov <dimitar.dimitrov@grafana.com>

* Fix typo

Signed-off-by: Dimitar Dimitrov <dimitar.dimitrov@grafana.com>

---------

Signed-off-by: Dimitar Dimitrov <dimitar.dimitrov@grafana.com>
Co-authored-by: Jon Kartago Lamida <jon.lamida@grafana.com>
  • Loading branch information
2 people authored and francoposa committed May 27, 2024
1 parent d2862cf commit 1cd1a0b
Show file tree
Hide file tree
Showing 5 changed files with 210 additions and 11 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
* [ENHANCEMENT] Store-gateway: add `-blocks-storage.bucket-store.max-concurrent-queue-timeout`. When set, queries at the store-gateway's query gate will not wait longer than that to execute. If a query reaches the wait timeout, then the querier will retry the blocks on a different store-gateway. If all store-gateways are unavailable, then the query will fail with `err-mimir-store-consistency-check-failed`. #7777
* [ENHANCEMENT] Ingester: Optimize querying with regexp matchers. #8106
* [ENHANCEMENT] Distributor: Introduce `-distributor.max-request-pool-buffer-size` to allow configuring the maximum size of the request pool buffers. #8082
* [ENHANCEMENT] Ingester: active series are now updated along with owned series. They decrease when series change ownership between ingesters. This helps provide a more accurate total of active series when ingesters are added. This is only enabled when `-ingester.track-ingester-owned-series` or `-ingester.use-ingester-owned-series-for-limits` are enabled. #8084
* [BUGFIX] Rules: improve error handling when querier is local to the ruler. #7567
* [BUGFIX] Querier, store-gateway: Protect against panics raised during snappy encoding. #7520
* [BUGFIX] Ingester: Prevent timely compaction of empty blocks. #7624
Expand Down
26 changes: 25 additions & 1 deletion pkg/ingester/activeseries/active_series.go
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,21 @@ func (c *ActiveSeries) ActiveWithMatchers() (total int, totalMatching []int, tot
return
}

func (c *ActiveSeries) Delete(ref chunks.HeadSeriesRef) {
stripeID := storage.SeriesRef(ref) % numStripes
c.stripes[stripeID].remove(storage.SeriesRef(ref))
}

func (c *ActiveSeries) Clear() {
for s := 0; s < numStripes; s++ {
c.stripes[s].clear()
}
// c.deleted keeps track of series which were removed from memory, but might come back with a different SeriesRef.
// If they do come back, then we use deletedSeries to stop tracking their previous entry.
// We can also clear the deleted series because we've already stopped tracking all series.
c.deleted.clear()
}

func (s *seriesStripe) containsRef(ref storage.SeriesRef) bool {
s.mu.RLock()
defer s.mu.RUnlock()
Expand Down Expand Up @@ -381,7 +396,6 @@ func (s *seriesStripe) findAndUpdateOrCreateEntryForSeries(ref storage.SeriesRef
return e.nanos, true
}

// nolint // Linter reports that this method is unused, but it is.
func (s *seriesStripe) clear() {
s.mu.Lock()
defer s.mu.Unlock()
Expand Down Expand Up @@ -576,3 +590,13 @@ func (ds *deletedSeries) purge(ref storage.SeriesRef) {
delete(ds.keys, ref)
delete(ds.refs, key)
}

func (ds *deletedSeries) clear() {
ds.mu.Lock()
defer ds.mu.Unlock()

// nil the maps to release memory.
// They will be reinitialized if the tenant resumes sending series.
ds.keys = nil
ds.refs = nil
}
30 changes: 27 additions & 3 deletions pkg/ingester/activeseries/active_series_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -220,16 +220,19 @@ func TestActiveSeries_ContainsRef(t *testing.T) {
}

func TestActiveSeries_UpdateSeries_WithMatchers(t *testing.T) {
asm := NewMatchers(mustNewCustomTrackersConfigFromMap(t, map[string]string{"foo": `{a=~"2|3|4"}`}))
c := NewActiveSeries(asm, DefaultTimeout)
testUpdateSeries(t, c)
}

func testUpdateSeries(t *testing.T, c *ActiveSeries) {
ref1, ls1 := storage.SeriesRef(1), labels.FromStrings("a", "1")
ref2, ls2 := storage.SeriesRef(2), labels.FromStrings("a", "2")
ref3, ls3 := storage.SeriesRef(3), labels.FromStrings("a", "3")
ref4, ls4 := storage.SeriesRef(4), labels.FromStrings("a", "4")
ref5, ls5 := storage.SeriesRef(5), labels.FromStrings("a", "5")
ref6 := storage.SeriesRef(6) // same as ls2

asm := NewMatchers(mustNewCustomTrackersConfigFromMap(t, map[string]string{"foo": `{a=~"2|3|4"}`}))

c := NewActiveSeries(asm, DefaultTimeout)
valid := c.Purge(time.Now())
assert.True(t, valid)
allActive, activeMatching, allActiveHistograms, activeMatchingHistograms, allActiveBuckets, activeMatchingBuckets := c.ActiveWithMatchers()
Expand Down Expand Up @@ -433,6 +436,27 @@ func TestActiveSeries_UpdateSeries_WithMatchers(t *testing.T) {
assert.Empty(t, c.deleted.keys)
}

func TestActiveSeries_UpdateSeries_Clear(t *testing.T) {
asm := NewMatchers(mustNewCustomTrackersConfigFromMap(t, map[string]string{"foo": `{a=~"2|3|4"}`}))
c := NewActiveSeries(asm, DefaultTimeout)
testUpdateSeries(t, c)

c.Clear()
allActive, activeMatching, allActiveHistograms, activeMatchingHistograms, allActiveBuckets, activeMatchingBuckets := c.ActiveWithMatchers()
assert.Equal(t, 0, allActive)
assert.Equal(t, []int{0}, activeMatching)
assert.Equal(t, 0, allActiveHistograms)
assert.Equal(t, []int{0}, activeMatchingHistograms)
assert.Equal(t, 0, allActiveBuckets)
assert.Equal(t, []int{0}, activeMatchingBuckets)
allActive, allActiveHistograms, allActiveBuckets = c.Active()
assert.Equal(t, 0, allActive)
assert.Equal(t, 0, allActiveHistograms)
assert.Equal(t, 0, allActiveBuckets)

testUpdateSeries(t, c)
}

func labelsWithHashCollision() (labels.Labels, labels.Labels) {
// These two series have the same XXHash; thanks to https://github.com/pstibrany/labels_hash_collisions
ls1 := labels.FromStrings("__name__", "metric", "lbl1", "value", "lbl2", "l6CQ5y")
Expand Down
Loading

0 comments on commit 1cd1a0b

Please sign in to comment.