Skip to content

Commit

Permalink
lib/storage: improve search speed for time series matching Graphite w…
Browse files Browse the repository at this point in the history
…hildcards such as `foo.*.bar.baz`

Add index for reverse Graphite-like metric names with dots. Use this index during search for filters
like `__name__=~"foo\\.[^.]*\\.bar\\.baz"` which end with non-empty suffix with dots, i.e. `.bar.baz` in this case.

This change may "hide" historical time series during queries. The workaround is to add `[.]*` to the end of regexp label filter,
i.e. "foo\\.[^.]*\\.bar\\.baz" should be substituted with "foo\\.[^.]*\\.bar\\.baz[.]*".
  • Loading branch information
valyala committed May 27, 2020
1 parent 7e2669f commit d186472
Show file tree
Hide file tree
Showing 4 changed files with 141 additions and 66 deletions.
34 changes: 34 additions & 0 deletions lib/storage/index_db.go
Original file line number Diff line number Diff line change
Expand Up @@ -672,6 +672,7 @@ func (db *indexDB) createIndexes(tsid *TSID, mn *MetricName) error {
items.B = marshalTagValue(items.B, mn.MetricGroup)
items.B = encoding.MarshalUint64(items.B, tsid.MetricID)
items.Next()
addReverseMetricGroupIfNeeded(items, commonPrefix.B, mn, tsid.MetricID)

// For each tag create tag -> MetricID index.
for i := range mn.Tags {
Expand Down Expand Up @@ -2571,6 +2572,7 @@ func (is *indexSearch) storeDateMetricID(date, metricID uint64) error {
items.B = marshalTagValue(items.B, mn.MetricGroup)
items.B = encoding.MarshalUint64(items.B, metricID)
items.Next()
addReverseMetricGroupIfNeeded(items, kb.B, mn, metricID)
for i := range mn.Tags {
tag := &mn.Tags[i]
items.B = append(items.B, kb.B...)
Expand All @@ -2584,6 +2586,38 @@ func (is *indexSearch) storeDateMetricID(date, metricID uint64) error {
return nil
}

func addReverseMetricGroupIfNeeded(items *indexItems, prefix []byte, mn *MetricName, metricID uint64) {
if bytes.IndexByte(mn.MetricGroup, '.') < 0 {
// The reverse metric group is needed only for Graphite-like metrics with points.
return
}
// This is most likely a Graphite metric like 'foo.bar.baz'.
// Store reverse metric name 'zab.rab.oof' in order to speed up search for '*.bar.baz'
// when the Graphite wildcard has a suffix matching small number of time series.
items.B = append(items.B, prefix...)
items.B = marshalTagValue(items.B, graphiteReverseTagKey)
revBuf := kbPool.Get()
revBuf.B = reverseBytes(revBuf.B[:0], mn.MetricGroup)
items.B = marshalTagValue(items.B, revBuf.B)
kbPool.Put(revBuf)
items.B = encoding.MarshalUint64(items.B, metricID)
items.Next()
}

// The tag key for reverse metric name used for speeding up searching
// for Graphite wildcards with suffix matching small number of time series,
// i.e. '*.bar.baz'.
//
// It is expected that the given key isn't be used by users.
var graphiteReverseTagKey = []byte("\xff")

func reverseBytes(dst, src []byte) []byte {
for i := len(src) - 1; i >= 0; i-- {
dst = append(dst, src[i])
}
return dst
}

func (is *indexSearch) hasDateMetricID(date, metricID uint64) (bool, error) {
ts := &is.ts
kb := &is.kb
Expand Down
16 changes: 15 additions & 1 deletion lib/storage/index_db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,20 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/lib/workingsetcache"
)

func TestReverseBytes(t *testing.T) {
f := func(s, resultExpected string) {
t.Helper()
result := reverseBytes(nil, []byte(s))
if string(result) != resultExpected {
t.Fatalf("unexpected result for reverseBytes(%q); got %q; want %q", s, result, resultExpected)
}
}
f("", "")
f("a", "a")
f("av", "va")
f("foo.bar", "rab.oof")
}

func TestMergeTagToMetricIDsRows(t *testing.T) {
f := func(items []string, expectedItems []string) {
t.Helper()
Expand Down Expand Up @@ -659,7 +673,7 @@ func testIndexDBGetOrCreateTSIDByName(db *indexDB, metricGroups int) ([]MetricNa
var mn MetricName

// Init MetricGroup.
mn.MetricGroup = []byte(fmt.Sprintf("metricGroup_%d\x00\x01\x02", i%metricGroups))
mn.MetricGroup = []byte(fmt.Sprintf("metricGroup.%d\x00\x01\x02", i%metricGroups))

// Init other tags.
tagsCount := rand.Intn(10) + 1
Expand Down
79 changes: 51 additions & 28 deletions lib/storage/tag_filters.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,17 +55,26 @@ func (tfs *TagFilters) Add(key, value []byte, isNegative, isRegexp bool) error {
// since it must filter out all the time series with the given key.
}

tf := tfs.addTagFilter()
if err := tf.Init(tfs.commonPrefix, key, value, isNegative, isRegexp); err != nil {
return fmt.Errorf("cannot initialize tagFilter: %s", err)
}
if len(tf.graphiteReverseSuffix) > 0 {
tf = tfs.addTagFilter()
if err := tf.Init(tfs.commonPrefix, graphiteReverseTagKey, tf.graphiteReverseSuffix, false, false); err != nil {
return fmt.Errorf("cannot initialize reverse tag filter for Graphite wildcard: %s", err)
}
}
return nil
}

func (tfs *TagFilters) addTagFilter() *tagFilter {
if cap(tfs.tfs) > len(tfs.tfs) {
tfs.tfs = tfs.tfs[:len(tfs.tfs)+1]
} else {
tfs.tfs = append(tfs.tfs, tagFilter{})
}
tf := &tfs.tfs[len(tfs.tfs)-1]
err := tf.Init(tfs.commonPrefix, key, value, isNegative, isRegexp)
if err != nil {
return fmt.Errorf("cannot initialize tagFilter: %s", err)
}
return nil
return &tfs.tfs[len(tfs.tfs)-1]
}

// Finalize finalizes tfs and may return complementary TagFilters,
Expand Down Expand Up @@ -150,6 +159,10 @@ type tagFilter struct {
//
// Such a filter must be applied directly to metricNames.
matchesEmptyValue bool

// Contains reverse suffix for Graphite wildcard.
// I.e. for `{__name__=~"foo\\.[^.]*\\.bar\\.baz"}` the value will be `zab.rab.`
graphiteReverseSuffix []byte
}

func (tf *tagFilter) Less(other *tagFilter) bool {
Expand Down Expand Up @@ -225,6 +238,7 @@ func (tf *tagFilter) Init(commonPrefix, key, value []byte, isNegative, isRegexp
tf.orSuffixes = tf.orSuffixes[:0]
tf.reSuffixMatch = nil
tf.matchesEmptyValue = false
tf.graphiteReverseSuffix = tf.graphiteReverseSuffix[:0]

tf.prefix = append(tf.prefix, commonPrefix...)
tf.prefix = marshalTagValue(tf.prefix, key)
Expand Down Expand Up @@ -254,6 +268,10 @@ func (tf *tagFilter) Init(commonPrefix, key, value []byte, isNegative, isRegexp
if len(prefix) == 0 && !tf.isNegative && tf.reSuffixMatch(nil) {
tf.matchesEmptyValue = true
}
if !tf.isNegative && len(key) == 0 && strings.IndexByte(rcv.literalSuffix, '.') >= 0 {
// Reverse suffix is needed only for non-negative regexp filters on __name__ that contains dots.
tf.graphiteReverseSuffix = reverseBytes(tf.graphiteReverseSuffix[:0], []byte(rcv.literalSuffix))
}
return nil
}

Expand Down Expand Up @@ -313,6 +331,7 @@ func getRegexpFromCache(expr []byte) (regexpCacheValue, error) {
sExpr := string(expr)
orValues := getOrValues(sExpr)
var reMatch func(b []byte) bool
var literalSuffix string
if len(orValues) > 0 {
if len(orValues) == 1 {
v := orValues[0]
Expand All @@ -330,12 +349,13 @@ func getRegexpFromCache(expr []byte) (regexpCacheValue, error) {
}
}
} else {
reMatch = getOptimizedReMatchFunc(re.Match, sExpr)
reMatch, literalSuffix = getOptimizedReMatchFunc(re.Match, sExpr)
}

// Put the reMatch in the cache.
rcv.orValues = orValues
rcv.reMatch = reMatch
rcv.literalSuffix = literalSuffix

regexpCacheLock.Lock()
if overflow := len(regexpCacheMap) - getMaxRegexpCacheSize(); overflow > 0 {
Expand Down Expand Up @@ -367,45 +387,47 @@ func getRegexpFromCache(expr []byte) (regexpCacheValue, error) {
// '.+literal.+'
//
// It returns reMatch if it cannot find optimized function.
func getOptimizedReMatchFunc(reMatch func(b []byte) bool, expr string) func(b []byte) bool {
//
// It also returns literal suffix from the expr.
func getOptimizedReMatchFunc(reMatch func(b []byte) bool, expr string) (func(b []byte) bool, string) {
sre, err := syntax.Parse(expr, syntax.Perl)
if err != nil {
logger.Panicf("BUG: unexpected error when parsing verified expr=%q: %s", expr, err)
}
if matchFunc := getOptimizedReMatchFuncExt(reMatch, sre); matchFunc != nil {
if matchFunc, literalSuffix := getOptimizedReMatchFuncExt(reMatch, sre); matchFunc != nil {
// Found optimized function for matching the expr.
return matchFunc
return matchFunc, literalSuffix
}
// Fall back to un-optimized reMatch.
return reMatch
return reMatch, ""
}

func getOptimizedReMatchFuncExt(reMatch func(b []byte) bool, sre *syntax.Regexp) func(b []byte) bool {
func getOptimizedReMatchFuncExt(reMatch func(b []byte) bool, sre *syntax.Regexp) (func(b []byte) bool, string) {
if isDotStar(sre) {
// '.*'
return func(b []byte) bool {
return true
}
}, ""
}
if isDotPlus(sre) {
// '.+'
return func(b []byte) bool {
return len(b) > 0
}
}, ""
}
switch sre.Op {
case syntax.OpCapture:
// Remove parenthesis from expr, i.e. '(expr) -> expr'
return getOptimizedReMatchFuncExt(reMatch, sre.Sub[0])
case syntax.OpLiteral:
if !isLiteral(sre) {
return nil
return nil, ""
}
s := string(sre.Rune)
// Literal match
return func(b []byte) bool {
return string(b) == s
}
}, s
case syntax.OpConcat:
if len(sre.Sub) == 2 {
if isLiteral(sre.Sub[0]) {
Expand All @@ -414,13 +436,13 @@ func getOptimizedReMatchFuncExt(reMatch func(b []byte) bool, sre *syntax.Regexp)
// 'prefix.*'
return func(b []byte) bool {
return bytes.HasPrefix(b, prefix)
}
}, ""
}
if isDotPlus(sre.Sub[1]) {
// 'prefix.+'
return func(b []byte) bool {
return len(b) > len(prefix) && bytes.HasPrefix(b, prefix)
}
}, ""
}
}
if isLiteral(sre.Sub[1]) {
Expand All @@ -429,13 +451,13 @@ func getOptimizedReMatchFuncExt(reMatch func(b []byte) bool, sre *syntax.Regexp)
// '.*suffix'
return func(b []byte) bool {
return bytes.HasSuffix(b, suffix)
}
}, string(suffix)
}
if isDotPlus(sre.Sub[0]) {
// '.+suffix'
return func(b []byte) bool {
return len(b) > len(suffix) && bytes.HasSuffix(b[1:], suffix)
}
}, string(suffix)
}
}
}
Expand All @@ -446,27 +468,27 @@ func getOptimizedReMatchFuncExt(reMatch func(b []byte) bool, sre *syntax.Regexp)
// '.*middle.*'
return func(b []byte) bool {
return bytes.Contains(b, middle)
}
}, ""
}
if isDotPlus(sre.Sub[2]) {
// '.*middle.+'
return func(b []byte) bool {
return len(b) > len(middle) && bytes.Contains(b[:len(b)-1], middle)
}
}, ""
}
}
if isDotPlus(sre.Sub[0]) {
if isDotStar(sre.Sub[2]) {
// '.+middle.*'
return func(b []byte) bool {
return len(b) > len(middle) && bytes.Contains(b[1:], middle)
}
}, ""
}
if isDotPlus(sre.Sub[2]) {
// '.+middle.+'
return func(b []byte) bool {
return len(b) > len(middle)+1 && bytes.Contains(b[1:len(b)-1], middle)
}
}, ""
}
}
}
Expand Down Expand Up @@ -500,9 +522,9 @@ func getOptimizedReMatchFuncExt(reMatch func(b []byte) bool, sre *syntax.Regexp)
}
// Fall back to slow path.
return reMatch(bOrig)
}
}, string(suffix)
default:
return nil
return nil, ""
}
}

Expand Down Expand Up @@ -678,8 +700,9 @@ var (
)

type regexpCacheValue struct {
orValues []string
reMatch func(b []byte) bool
orValues []string
reMatch func(b []byte) bool
literalSuffix string
}

func getRegexpPrefix(b []byte) ([]byte, []byte) {
Expand Down

0 comments on commit d186472

Please sign in to comment.