Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP: Optimize storage.LabelQuerier.LabelValues implementations #525

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 10 additions & 3 deletions model/labels/matcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,17 +26,20 @@ const (
MatchNotEqual
MatchRegexp
MatchNotRegexp
// MatchSet is a special type for internal use, that matches series with a certain label name.
MatchSet
)

var matchTypeToStr = [...]string{
MatchEqual: "=",
MatchNotEqual: "!=",
MatchRegexp: "=~",
MatchNotRegexp: "!~",
MatchSet: "[isSet]",
}

func (m MatchType) String() string {
if m < MatchEqual || m > MatchNotRegexp {
if m < MatchEqual || m > MatchSet {
panic("unknown match type")
}
return matchTypeToStr[m]
Expand Down Expand Up @@ -92,8 +95,11 @@ func (m *Matcher) Matches(s string) bool {
return m.re.MatchString(s)
case MatchNotRegexp:
return !m.re.MatchString(s)
case MatchSet:
return true
default:
panic("labels.Matcher.Matches: invalid match type")
}
panic("labels.Matcher.Matches: invalid match type")
}

// Inverse returns a matcher that matches the opposite.
Expand All @@ -107,8 +113,9 @@ func (m *Matcher) Inverse() (*Matcher, error) {
return NewMatcher(MatchNotRegexp, m.Name, m.Value)
case MatchNotRegexp:
return NewMatcher(MatchRegexp, m.Name, m.Value)
default:
panic("labels.Matcher.Matches: invalid match type")
}
panic("labels.Matcher.Matches: invalid match type")
}

// GetRegexString returns the regex string.
Expand Down
10 changes: 10 additions & 0 deletions tsdb/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,12 @@ type IndexReader interface {
// avoiding same calculations twice, however this implementation may lead to a worse performance when called once.
PostingsForMatchers(concurrent bool, ms ...*labels.Matcher) (index.Postings, error)

// PostingsWithLabel returns the postings list iterator for the label name.
// The Postings here contain the offsets to the series inside the index.
// Found IDs are not strictly required to point to a valid Series, e.g.
// during background garbage collections.
PostingsWithLabel(name string) (index.Postings, error)

// SortedPostings returns a postings list that is reordered to be sorted
// by the label set of the underlying series.
SortedPostings(index.Postings) index.Postings
Expand Down Expand Up @@ -501,6 +507,10 @@ func (r blockIndexReader) LabelValues(name string, matchers ...*labels.Matcher)
return labelValuesWithMatchers(r.ir, name, matchers...)
}

func (r blockIndexReader) PostingsWithLabel(name string) (index.Postings, error) {
return r.ir.PostingsWithLabel(name)
}

func (r blockIndexReader) LabelNames(matchers ...*labels.Matcher) ([]string, error) {
if len(matchers) == 0 {
return r.b.LabelNames()
Expand Down
4 changes: 4 additions & 0 deletions tsdb/head_read.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,10 @@ func (h *headIndexReader) PostingsForMatchers(concurrent bool, ms ...*labels.Mat
return h.head.pfmc.PostingsForMatchers(h, concurrent, ms...)
}

func (h *headIndexReader) PostingsWithLabel(name string) (index.Postings, error) {
return h.head.postings.GetWithLabel(name), nil
}

func (h *headIndexReader) SortedPostings(p index.Postings) index.Postings {
series := make([]*memSeries, 0, 128)

Expand Down
107 changes: 94 additions & 13 deletions tsdb/index/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -1484,31 +1484,35 @@ func (r *Reader) SortedLabelValues(name string, matchers ...*labels.Matcher) ([]
// LabelValues returns value tuples that exist for the given label name.
// It is not safe to use the return value beyond the lifetime of the byte slice
// passed into the Reader.
// TODO(replay): Support filtering by matchers
func (r *Reader) LabelValues(name string, matchers ...*labels.Matcher) ([]string, error) {
if len(matchers) > 0 {
return nil, errors.Errorf("matchers parameter is not implemented: %+v", matchers)
}

if r.version == FormatV1 {
e, ok := r.postingsV1[name]
if !ok {
return nil, nil
}
values := make([]string, 0, len(e))
for k := range e {
values = append(values, k)
isMatch := true
for _, m := range matchers {
if m.Name == name && !m.Matches(k) {
isMatch = false
break
}
}

if isMatch {
values = append(values, k)
}
}
return values, nil

}
e, ok := r.postings[name]
if !ok {
return nil, nil
}

e := r.postings[name]
if len(e) == 0 {
return nil, nil
}

values := make([]string, 0, len(e)*symbolFactor)

d := encoding.NewDecbufAt(r.b, int(r.toc.PostingsTable), nil)
Expand All @@ -1528,7 +1532,19 @@ func (r *Reader) LabelValues(name string, matchers ...*labels.Matcher) ([]string
d.Skip(skip)
}
s := yoloString(d.UvarintBytes()) // Label value.
values = append(values, s)

isMatch := true
// Try to exclude via matchers for the label name
for _, m := range matchers {
if m.Name == name && !m.Matches(s) {
isMatch = false
break
}
}

if isMatch {
values = append(values, s)
}
if s == lastVal {
break
}
Expand Down Expand Up @@ -1647,8 +1663,8 @@ func (r *Reader) Postings(name string, values ...string) (Postings, error) {
return Merge(res...), nil
}

e, ok := r.postings[name]
if !ok {
e := r.postings[name]
if len(e) == 0 {
return EmptyPostings(), nil
}

Expand Down Expand Up @@ -1725,6 +1741,71 @@ func (r *Reader) Postings(name string, values ...string) (Postings, error) {
return Merge(res...), nil
}

func (r *Reader) PostingsWithLabel(name string) (Postings, error) {
if r.version == FormatV1 {
e := r.postingsV1[name]
if len(e) == 0 {
return EmptyPostings(), nil
}

var res []Postings
for _, off := range e {
// Read from the postings table.
d := encoding.NewDecbufAt(r.b, int(off), castagnoliTable)
_, p, err := r.dec.Postings(d.Get())
if err != nil {
return nil, errors.Wrap(err, "decode postings")
}
res = append(res, p)
}
return Merge(res...), nil
}

e := r.postings[name]
if len(e) == 0 {
return EmptyPostings(), nil
}

d := encoding.NewDecbufAt(r.b, int(r.toc.PostingsTable), nil)
// Skip to start
d.Skip(e[0].off)
lastVal := e[len(e)-1].value

skip := 0
var res []Postings
for d.Err() == nil {
if skip == 0 {
// These are always the same number of bytes,
// and it's faster to skip than to parse.
skip = d.Len()
d.Uvarint() // Keycount.
d.UvarintBytes() // Label name.
skip -= d.Len()
} else {
d.Skip(skip)
}
v := yoloString(d.UvarintBytes()) // Label value.

postingsOff := d.Uvarint64()
// Read from the postings table
d2 := encoding.NewDecbufAt(r.b, int(postingsOff), castagnoliTable)
_, p, err := r.dec.Postings(d2.Get())
if err != nil {
return nil, errors.Wrap(err, "decode postings")
}
res = append(res, p)

if v == lastVal {
break
}
}
if d.Err() != nil {
return nil, errors.Wrap(d.Err(), "get postings offset entry")
}

return Merge(res...), nil
}

// SortedPostings returns the given postings list reordered so that the backing series
// are sorted.
func (r *Reader) SortedPostings(p Postings) Postings {
Expand Down
27 changes: 25 additions & 2 deletions tsdb/index/postings.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,13 +135,24 @@ func (p *MemPostings) LabelNames() []string {
}

// LabelValues returns label values for the given name.
func (p *MemPostings) LabelValues(name string) []string {
func (p *MemPostings) LabelValues(name string, matchers ...*labels.Matcher) []string {
p.mtx.RLock()
defer p.mtx.RUnlock()

values := make([]string, 0, len(p.m[name]))
for v := range p.m[name] {
values = append(values, v)
isMatch := true
// Try to exclude this value through corresponding matchers
for _, m := range matchers {
if m.Name == name && !m.Matches(v) {
isMatch = false
break
}
}

if isMatch {
values = append(values, v)
}
}
return values
}
Expand Down Expand Up @@ -216,6 +227,18 @@ func (p *MemPostings) Get(name, value string) Postings {
return newListPostings(lp...)
}

// GetWithLabel returns a postings list for the given label name.
func (p *MemPostings) GetWithLabel(name string) Postings {
p.mtx.RLock()
var ps []Postings
for _, srs := range p.m[name] {
ps = append(ps, newListPostings(srs...))
}
p.mtx.RUnlock()

return Merge(ps...)
}

// All returns a postings list over all documents ever added.
func (p *MemPostings) All() Postings {
return p.Get(AllPostingsKey())
Expand Down
8 changes: 8 additions & 0 deletions tsdb/ooo_head_read.go
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,10 @@ func (oh *OOOHeadIndexReader) Postings(name string, values ...string) (index.Pos
}
}

func (oh *OOOHeadIndexReader) PostingsWithLabel(name string) (index.Postings, error) {
return oh.head.postings.GetWithLabel(name), nil
}

type OOOHeadChunkReader struct {
head *Head
mint, maxt int64
Expand Down Expand Up @@ -410,6 +414,10 @@ func (ir *OOOCompactionHeadIndexReader) Postings(name string, values ...string)
return index.NewListPostings(ir.ch.postings), nil
}

func (ir *OOOCompactionHeadIndexReader) PostingsWithLabel(name string) (index.Postings, error) {
return nil, errors.New("not implemented")
}

func (ir *OOOCompactionHeadIndexReader) SortedPostings(p index.Postings) index.Postings {
// This will already be sorted from the Postings() call above.
return p
Expand Down
6 changes: 6 additions & 0 deletions tsdb/postings_for_matchers_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,12 @@ type IndexPostingsReader interface {
// Found IDs are not strictly required to point to a valid Series, e.g.
// during background garbage collections. Input values must be sorted.
Postings(name string, values ...string) (index.Postings, error)

// PostingsWithLabel returns the postings list iterator for the label name.
// The Postings here contain the offsets to the series inside the index.
// Found IDs are not strictly required to point to a valid Series, e.g.
// during background garbage collections.
PostingsWithLabel(name string) (index.Postings, error)
}

// NewPostingsForMatchersCache creates a new PostingsForMatchersCache.
Expand Down
4 changes: 4 additions & 0 deletions tsdb/postings_for_matchers_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -269,6 +269,10 @@ func (idx indexForPostingsMock) Postings(string, ...string) (index.Postings, err
panic("implement me")
}

func (idx indexForPostingsMock) PostingsWithLabel(string) (index.Postings, error) {
panic("implement me")
}

// timeNowMock offers a mockable time.Now() implementation
// empty value is ready to be used, and it should not be copied (use a reference)
type timeNowMock struct {
Expand Down
Loading
Loading