Skip to content

Commit

Permalink
MemPostings.PostingsForLabelMatching(): don't hold the mutex while …
Browse files Browse the repository at this point in the history
…matching (#14286)

* MemPostings.PostingsForLabelMatching: let mutex go

This changes the `MemPostings.PostingsForLabelMatching` implementation
to stop holding the read mutex while matching the label values.

We've seen that this method can be slow when the matcher is expensive,
that's why we even added a context expiration check.

However, there are critical process that might be waiting on this mutex:
writes (adding new series) and compaction (deleting the
garbage-collected ones), so we should avoid holding it for a long period
of time.

Given that we've copied the values to a slice anyway, there's no need to
hold the lock while matching.

Signed-off-by: Oleg Zaytsev <mail@olegzaytsev.com>
  • Loading branch information
colega committed Jun 10, 2024
1 parent 2dc177d commit 10a3c72
Show file tree
Hide file tree
Showing 2 changed files with 73 additions and 19 deletions.
70 changes: 51 additions & 19 deletions tsdb/index/postings.go
Original file line number Diff line number Diff line change
Expand Up @@ -425,38 +425,70 @@ func (p *MemPostings) addFor(id storage.SeriesRef, l labels.Label) {
}

func (p *MemPostings) PostingsForLabelMatching(ctx context.Context, name string, match func(string) bool) Postings {
// We'll copy the values into a slice and then match over that,
// this way we don't need to hold the mutex while we're matching,
// which can be slow (seconds) if the match function is a huge regex.
// Holding this lock prevents new series from being added (slows down the write path)
// and blocks the compaction process.
vals := p.labelValues(name)
for i, count := 0, 1; i < len(vals); count++ {
if count%checkContextEveryNIterations == 0 && ctx.Err() != nil {
return ErrPostings(ctx.Err())
}

if match(vals[i]) {
i++
continue
}

// Didn't match, bring the last value to this position, make the slice shorter and check again.
// The order of the slice doesn't matter as it comes from a map iteration.
vals[i], vals = vals[len(vals)-1], vals[:len(vals)-1]
}

// If none matched (or this label had no values), no need to grab the lock again.
if len(vals) == 0 {
return EmptyPostings()
}

// Now `vals` only contains the values that matched, get their postings.
its := make([]Postings, 0, len(vals))
p.mtx.RLock()
e := p.m[name]
for _, v := range vals {
if refs, ok := e[v]; ok {
// Some of the values may have been garbage-collected in the meantime this is fine, we'll just skip them.
// If we didn't let the mutex go, we'd have these postings here, but they would be pointing nowhere
// because there would be a `MemPostings.Delete()` call waiting for the lock to delete these labels,
// because the series were deleted already.
its = append(its, NewListPostings(refs))
}
}
// Let the mutex go before merging.
p.mtx.RUnlock()

return Merge(ctx, its...)
}

// labelValues returns a slice of label values for the given label name.
// It will take the read lock.
func (p *MemPostings) labelValues(name string) []string {
p.mtx.RLock()
defer p.mtx.RUnlock()

e := p.m[name]
if len(e) == 0 {
p.mtx.RUnlock()
return EmptyPostings()
return nil
}

// Benchmarking shows that first copying the values into a slice and then matching over that is
// faster than matching over the map keys directly, at least on AMD64.
vals := make([]string, 0, len(e))
for v, srs := range e {
if len(srs) > 0 {
vals = append(vals, v)
}
}

var its []Postings
count := 1
for _, v := range vals {
if count%checkContextEveryNIterations == 0 && ctx.Err() != nil {
p.mtx.RUnlock()
return ErrPostings(ctx.Err())
}
count++
if match(v) {
its = append(its, NewListPostings(e[v]))
}
}
p.mtx.RUnlock()

return Merge(ctx, its...)
return vals
}

// ExpandPostings returns the postings expanded as a slice.
Expand Down
22 changes: 22 additions & 0 deletions tsdb/index/postings_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1435,6 +1435,28 @@ func BenchmarkMemPostings_PostingsForLabelMatching(b *testing.B) {
}
}

func TestMemPostings_PostingsForLabelMatching(t *testing.T) {
mp := NewMemPostings()
mp.Add(1, labels.FromStrings("foo", "1"))
mp.Add(2, labels.FromStrings("foo", "2"))
mp.Add(3, labels.FromStrings("foo", "3"))
mp.Add(4, labels.FromStrings("foo", "4"))

isEven := func(v string) bool {
iv, err := strconv.Atoi(v)
if err != nil {
panic(err)
}
return iv%2 == 0
}

p := mp.PostingsForLabelMatching(context.Background(), "foo", isEven)
require.NoError(t, p.Err())
refs, err := ExpandPostings(p)
require.NoError(t, err)
require.Equal(t, []storage.SeriesRef{2, 4}, refs)
}

func TestMemPostings_PostingsForLabelMatchingHonorsContextCancel(t *testing.T) {
memP := NewMemPostings()
seriesCount := 10 * checkContextEveryNIterations
Expand Down

0 comments on commit 10a3c72

Please sign in to comment.