Skip to content

Commit

Permalink
add into access log "complexity" of find request (#487)
Browse files Browse the repository at this point in the history
* add into access log "complexity" of find request

* fix golint && uint32

* add into access log "complexity" of find request

* fix golint && uint32

* fix golint

Co-authored-by: Elena Nuretdinova <elena.nuretdinova@booking.com>
  • Loading branch information
enuret and enuret committed Aug 18, 2022
1 parent b2b610e commit 608fccc
Show file tree
Hide file tree
Showing 5 changed files with 66 additions and 24 deletions.
15 changes: 8 additions & 7 deletions carbonserver/carbonserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,7 @@ type ExpandedGlobResponse struct {
Files []string
Leafs []bool
TrieNodes []*trieNode
Lookups uint32
Err error
}

Expand Down Expand Up @@ -1229,7 +1230,7 @@ func (*CarbonserverListener) logTrieInsertError(logger *zap.Logger, msg, metric
func (listener *CarbonserverListener) expandGlobs(ctx context.Context, query string, resultCh chan<- *ExpandedGlobResponse) {
defer func() {
if err := recover(); err != nil {
resultCh <- &ExpandedGlobResponse{query, nil, nil, nil, fmt.Errorf("%s\n%s", err, debug.Stack())}
resultCh <- &ExpandedGlobResponse{query, nil, nil, nil, 0, fmt.Errorf("%s\n%s", err, debug.Stack())}
}
}()

Expand All @@ -1243,7 +1244,7 @@ func (listener *CarbonserverListener) expandGlobs(ctx context.Context, query str
}
if cap(rl.maxInflightRequests) == 0 {
err := fmt.Errorf("rejected by query rate limiter: %s", rl.pattern.String())
resultCh <- &ExpandedGlobResponse{query, nil, nil, nil, err}
resultCh <- &ExpandedGlobResponse{query, nil, nil, nil, 0, err}
return
}

Expand All @@ -1255,7 +1256,7 @@ func (listener *CarbonserverListener) expandGlobs(ctx context.Context, query str
// why: no need to continue execution if the request is already timeout.
if listener.checkRequestCtx(ctx) != nil {
err := fmt.Errorf("time out due to heavy glob query rate limiter: %s", rl.pattern.String())
resultCh <- &ExpandedGlobResponse{query, nil, nil, nil, err}
resultCh <- &ExpandedGlobResponse{query, nil, nil, nil, 0, err}
return
}

Expand All @@ -1282,8 +1283,8 @@ func (listener *CarbonserverListener) expandGlobs(ctx context.Context, query str
}(time.Now())

if listener.trieIndex && listener.CurrentFileIndex() != nil {
files, leafs, nodes, err := listener.expandGlobsTrie(query)
resultCh <- &ExpandedGlobResponse{query, files, leafs, nodes, err}
files, leafs, nodes, lookups, err := listener.expandGlobsTrie(query)
resultCh <- &ExpandedGlobResponse{query, files, leafs, nodes, lookups, err}
return
}

Expand Down Expand Up @@ -1319,7 +1320,7 @@ func (listener *CarbonserverListener) expandGlobs(ctx context.Context, query str
globs = append(globs, query)
globs, err := listener.expandGlobBraces(globs)
if err != nil {
resultCh <- &ExpandedGlobResponse{query, nil, nil, nil, err}
resultCh <- &ExpandedGlobResponse{query, nil, nil, nil, 0, err}
return
}

Expand Down Expand Up @@ -1404,7 +1405,7 @@ func (listener *CarbonserverListener) expandGlobs(ctx context.Context, query str
}

matchedCount = len(files)
resultCh <- &ExpandedGlobResponse{query, files, leafs, nil, nil}
resultCh <- &ExpandedGlobResponse{query, files, leafs, nil, 0, nil}
}

// TODO(dgryski): add tests
Expand Down
11 changes: 9 additions & 2 deletions carbonserver/find.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ type findResponse struct {
data []byte
contentType string
files int
lookups uint32
}

func (listener *CarbonserverListener) findHandler(wr http.ResponseWriter, req *http.Request) {
Expand Down Expand Up @@ -193,6 +194,7 @@ func (listener *CarbonserverListener) findHandler(wr http.ResponseWriter, req *h
zap.Bool("find_cache_enabled", listener.findCacheEnabled),
zap.Bool("from_cache", fromCache),
zap.Int("http_code", http.StatusOK),
zap.Uint32("lookups", response.lookups),
)
span.SetAttributes(
kv.Int("graphite.files", response.files),
Expand All @@ -213,6 +215,7 @@ type globs struct {
Files []string
Leafs []bool
TrieNodes []*trieNode
Lookups uint32
}

func (listener *CarbonserverListener) findMetrics(ctx context.Context, logger *zap.Logger, t0 time.Time, format responseFormat, names []string) (*findResponse, error) {
Expand All @@ -229,6 +232,7 @@ func (listener *CarbonserverListener) findMetrics(ctx context.Context, logger *z
multiResponse := protov3.MultiGlobResponse{}
for _, glob := range expandedGlobs {
result.files += len(glob.Files)
result.lookups += glob.Lookups
response := protov3.GlobResponse{
Name: glob.Name,
Matches: make([]*protov3.GlobMatch, 0),
Expand Down Expand Up @@ -286,6 +290,7 @@ func (listener *CarbonserverListener) findMetrics(ctx context.Context, logger *z
}

result.files += len(expandedGlobs[0].Files)
result.lookups += expandedGlobs[0].Lookups
for i, p := range expandedGlobs[0].Files {
if expandedGlobs[0].Leafs[i] {
metricsCount++
Expand Down Expand Up @@ -317,9 +322,11 @@ func (listener *CarbonserverListener) findMetrics(ctx context.Context, logger *z
var metrics []map[string]interface{}
var m map[string]interface{}
files := 0
var lookups uint32

glob := expandedGlobs[0]
files += len(glob.Files)
lookups += glob.Lookups
for i, p := range glob.Files {
if glob.Leafs[i] {
metricsCount++
Expand All @@ -333,7 +340,7 @@ func (listener *CarbonserverListener) findMetrics(ctx context.Context, logger *z
var buf bytes.Buffer
pEnc := pickle.NewEncoder(&buf)
pEnc.Encode(metrics)
return &findResponse{buf.Bytes(), httpHeaders.ContentTypePickle, files}, nil
return &findResponse{buf.Bytes(), httpHeaders.ContentTypePickle, files, lookups}, nil
}
return nil, nil
}
Expand All @@ -356,7 +363,7 @@ GATHER:
glob := globs{
Name: expandedResult.Name,
}
glob.Files, glob.Leafs, glob.TrieNodes, err = expandedResult.Files, expandedResult.Leafs, expandedResult.TrieNodes, expandedResult.Err
glob.Files, glob.Leafs, glob.TrieNodes, glob.Lookups, err = expandedResult.Files, expandedResult.Leafs, expandedResult.TrieNodes, expandedResult.Lookups, expandedResult.Err
if err != nil {
errors = append(errors, fmt.Errorf("%s: %s", expandedResult.Name, err))
}
Expand Down
2 changes: 1 addition & 1 deletion carbonserver/list.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ func (listener *CarbonserverListener) queryMetricsList(query string, limit int,
return nil, errMetricsListEmpty
}

names, isFiles, nodes, err := fidx.trieIdx.query(strings.ReplaceAll(query, ".", "/"), limit, nil)
names, isFiles, nodes, _, err := fidx.trieIdx.query(strings.ReplaceAll(query, ".", "/"), limit, nil)
if err != nil {
return nil, err
}
Expand Down
56 changes: 45 additions & 11 deletions carbonserver/trie.go
Original file line number Diff line number Diff line change
Expand Up @@ -753,37 +753,58 @@ func (ti *trieIndex) newDir() *trieNode {

// TODO: add some defensive logics agains bad queries?
// depth first search
func (ti *trieIndex) query(expr string, limit int, expand func(globs []string) ([]string, error)) (files []string, isFiles []bool, nodes []*trieNode, err error) {
// TODO: refactor to make the function more readable. Some ideas:
// - to isolate Depth first search in separate class
// - probably we can optimize the length of existed arrays. It uses tree depth + 7, where tree depth is the longest path in the tree in characters
// but we should be able to calculate max required depth based on expr, we don't have to look into deeper levels than expr's depth unless there is some unknown corner case exist
// - get rid of 'goto', since it adds complexions
func (ti *trieIndex) query(expr string, limit int, expand func(globs []string) ([]string, error)) (files []string, isFiles []bool, nodes []*trieNode, its uint32, err error) {
expr = strings.TrimSpace(expr)
if expr == "" {
expr = "*"
}
var matchers []*gmatcher
var matchers []*gmatcher // information about matching texts, has bytes array with used characters, one matcher per one depth level

// TODO:
// Supposely it indicates that we look for just one exact path, and it stops search after finding one path
// but it is always false in current code. 'exact' should be true by default
// to test if it works properly
var exact bool
// complexity of query
var lookups uint32
for _, node := range strings.Split(expr, "/") {
if node == "" {
continue
}
gs, err := newGlobState(node, expand)
if err != nil {
return nil, nil, nil, err
return nil, nil, nil, lookups, err
}
exact = exact && gs.exact
matchers = append(matchers, gs)
}

if len(matchers) == 0 {
return nil, nil, nil, nil
return nil, nil, nil, lookups, nil
}

// current node
var cur = ti.root
// children of current node
var curChildrens = cur.getChildrens()
// longest path in trie (length in characters) + 7 (some magic number for extra capacity)
var depth = ti.getDepth() + trieDepthBuffer
// array of child indexes we looked at last time
var nindex = make([]int, depth)
// array of nodes, contains the path to the current processed node
var trieNodes = make([]*trieNode, depth)
// each item contains an array of all children for every trieNode in current looking path
var childrensStack = make([][]*trieNode, depth)
// current searching depth
var ncindex int
// index of needed matcher
var mindex int
//// matcher for the current level depth
var curm = matchers[0]
var ndstate *gdstate
var isFile, isDir, hasMoreNodes bool
Expand All @@ -795,11 +816,14 @@ func (ti *trieIndex) query(expr string, limit int, expand func(globs []string) (
goto parent
}

// starting processing children of current node and saving current node into path
// the root is supposed to be '/' node, not containing useful information, we skip checking it
trieNodes[ncindex] = cur
childrensStack[ncindex] = curChildrens
cur = cur.getChild(curChildrens, nindex[ncindex])
curChildrens = cur.getChildrens()
ncindex++
lookups++

// It's possible to run into a situation of newer and longer metrics
// are added during the query, for such situation, it's safer to just
Expand All @@ -808,7 +832,10 @@ func (ti *trieIndex) query(expr string, limit int, expand func(globs []string) (
goto parent
}

// if node is '/'
if cur.dir() {
// amount of matchers should correlate the max searching depth,
// first condition: checking the depth is too deep it doesn't make sense, going to parent
if mindex+1 >= len(matchers) || !curm.dstate().matched() || len(curChildrens) == 0 {
goto parent
}
Expand All @@ -820,6 +847,7 @@ func (ti *trieIndex) query(expr string, limit int, expand func(globs []string) (
continue
}

// matching regexp
if curm.lsComplex && len(curm.trigrams) > 0 {
if _, ok := ti.trigrams[cur]; ok {
for _, t := range curm.trigrams {
Expand All @@ -830,6 +858,7 @@ func (ti *trieIndex) query(expr string, limit int, expand func(globs []string) (
}
}

// matching regexp
for i := 0; i < len(cur.c); i++ {
ndstate = curm.dstate().step(cur.c[i])
if len(ndstate.gstates) == 0 {
Expand Down Expand Up @@ -876,6 +905,7 @@ func (ti *trieIndex) query(expr string, limit int, expand func(globs []string) (
goto parent
}

// we found result metric here
if isFile {
files = append(files, cur.fullPath('.', trieNodes[:ncindex]))
isFiles = append(isFiles, isFile)
Expand All @@ -898,6 +928,8 @@ func (ti *trieIndex) query(expr string, limit int, expand func(globs []string) (
curm.pop(len(cur.c))
goto parent

// calling this procedure means that we are done with current child and moving level up
// we are supposed to check next child of current parent next
parent:
// use exact for fast exit
nindex[ncindex] = 0
Expand All @@ -919,7 +951,7 @@ func (ti *trieIndex) query(expr string, limit int, expand func(globs []string) (
continue
}

return files, isFiles, nodes, nil
return files, isFiles, nodes, lookups, nil
}

// note: tn might be a root or file node, which has a nil c
Expand Down Expand Up @@ -1504,7 +1536,7 @@ func (ti *trieIndex) countNodes() (count, files, dirs, onec, onefc, onedc int, c
return
}

func (listener *CarbonserverListener) expandGlobsTrie(query string) ([]string, []bool, []*trieNode, error) {
func (listener *CarbonserverListener) expandGlobsTrie(query string) ([]string, []bool, []*trieNode, uint32, error) {
query = strings.ReplaceAll(query, ".", "/")
globs := []string{query}

Expand All @@ -1524,29 +1556,31 @@ func (listener *CarbonserverListener) expandGlobsTrie(query string) ([]string, [
var err error
globs, err = listener.expandGlobBraces(globs)
if err != nil {
return nil, nil, nil, err
return nil, nil, nil, 0, err
}
}

var fidx = listener.CurrentFileIndex()
var files []string
var leafs []bool
var nodes []*trieNode
var lookups uint32

for _, g := range globs {
f, l, n, err := fidx.trieIdx.query(g, listener.maxMetricsGlobbed-len(files), listener.expandGlobBraces)
f, l, n, lk, err := fidx.trieIdx.query(g, listener.maxMetricsGlobbed-len(files), listener.expandGlobBraces)
if err != nil {
return nil, nil, nil, err
return nil, nil, nil, lookups, err
}
files = append(files, f...)
leafs = append(leafs, l...)
nodes = append(nodes, n...)
lookups += lk
}
// set node as viewed
for _, node := range nodes {
node.incrementReadHitsMetric()
}
return files, leafs, nodes, nil
return files, leafs, nodes, lookups, nil
}

type QuotaDroppingPolicy int8
Expand Down Expand Up @@ -1820,7 +1854,7 @@ func (ti *trieIndex) applyQuotas(resetFrequency time.Duration, quotas ...*Quota)
continue
}

paths, _, nodes, err := ti.query(strings.ReplaceAll(quota.Pattern, ".", "/"), 1<<31-1, nil)
paths, _, nodes, _, err := ti.query(strings.ReplaceAll(quota.Pattern, ".", "/"), 1<<31-1, nil)
if err != nil {
return nil, err
}
Expand Down
6 changes: 3 additions & 3 deletions carbonserver/trie_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -745,7 +745,7 @@ func TestTrieIndex(t *testing.T) {
func TestTrieEdgeCases(t *testing.T) {
var trie = newTrie(".wsp", nil)

_, _, _, err := trie.query("[\xff\xff-\xff", 1000, func([]string) ([]string, error) { return nil, nil })
_, _, _, _, err := trie.query("[\xff\xff-\xff", 1000, func([]string) ([]string, error) { return nil, nil })
if err == nil || err.Error() != "glob: range overflow" {
t.Errorf("trie should return an range overflow error")
}
Expand Down Expand Up @@ -775,7 +775,7 @@ func TestTrieQueryOpts(t *testing.T) {
"sys.app.host-02.cpu.system",
}

files, _, nodes, err := trieIndex.query("sys/app/host-0{1,2}", 1000, nil)
files, _, nodes, _, err := trieIndex.query("sys/app/host-0{1,2}", 1000, nil)
if err != nil {
t.Error(err)
}
Expand Down Expand Up @@ -837,7 +837,7 @@ func TestTrieConcurrentReadWrite(t *testing.T) {
// case <-filec:
default:
// skipcq: GSC-G404
files, _, _, err := trieIndex.query(fmt.Sprintf("level-0-%d/level-1-%d/level-2-%d*", rand.Intn(factor), rand.Intn(factor), rand.Intn(factor)), int(math.MaxInt64), nil)
files, _, _, _, err := trieIndex.query(fmt.Sprintf("level-0-%d/level-1-%d/level-2-%d*", rand.Intn(factor), rand.Intn(factor), rand.Intn(factor)), int(math.MaxInt64), nil)
if err != nil {
panic(err)
}
Expand Down

0 comments on commit 608fccc

Please sign in to comment.