Skip to content

Commit

Permalink
Merge pull request #427 from go-graphite/trie/nil-filename-error-fix
Browse files Browse the repository at this point in the history
carbonserver: log more detailed errors about symlink and continue trie index despite error
  • Loading branch information
bom-d-van committed Sep 30, 2021
2 parents e1a9475 + 488ad5a commit 749acdd
Show file tree
Hide file tree
Showing 4 changed files with 75 additions and 9 deletions.
8 changes: 8 additions & 0 deletions carbon/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -437,6 +437,14 @@ func (app *App) Start(version string) (err error) {
return
}

if conf.Carbonserver.TrigramIndex || conf.Carbonserver.TrieIndex {
if fi, err := os.Lstat(conf.Whisper.DataDir); err != nil {
return fmt.Errorf("failed to stat whisper data directory: %s", err)
} else if fi.Mode()&os.ModeSymlink == 1 {
return fmt.Errorf("whisper data directory is a symlink")
}
}

carbonserver := carbonserver.NewCarbonserverListener(core.Get)
carbonserver.SetWhisperData(conf.Whisper.DataDir)
carbonserver.SetMaxGlobs(conf.Carbonserver.MaxGlobs)
Expand Down
25 changes: 19 additions & 6 deletions carbonserver/carbonserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -627,7 +627,7 @@ uloop:
metric := "/" + filepath.Clean(strings.ReplaceAll(m, ".", "/")+".wsp")

if err := fidx.trieIdx.insert(metric); err != nil {
listener.logger.Warn("failed to insert new metrics for realtime indexing", zap.String("metric", metric), zap.Error(err))
listener.logTrieInsertError(listener.logger, "failed to insert new metrics for realtime indexing", metric, err)
}
}
continue uloop
Expand Down Expand Up @@ -682,8 +682,7 @@ func (listener *CarbonserverListener) updateFileList(dir string, cacheMetricName
for fileName := range cacheMetricNames {
if listener.trieIndex {
if err := trieIdx.insert(fileName); err != nil {
logger.Error("error populating index from cache indexMap",
zap.Error(err))
listener.logTrieInsertError(logger, "error populating index from cache indexMap", fileName, err)
}
} else {
files = append(files, fileName)
Expand All @@ -709,7 +708,7 @@ func (listener *CarbonserverListener) updateFileList(dir string, cacheMetricName
continue
}
if err := trieIdx.insert(entry); err != nil {
logger.Error("failed to read from file list cache", zap.Error(err))
listener.logTrieInsertError(logger, "failed to read from file list cache", entry, err)
readFromCache = false

trieIdx = newTrie(".wsp")
Expand Down Expand Up @@ -746,6 +745,11 @@ func (listener *CarbonserverListener) updateFileList(dir string, cacheMetricName
}()
}
}
if fi, err := os.Lstat(dir); err != nil {
logger.Error("failed to stat whisper data directory", zap.String("path", dir), zap.Error(err))
} else if fi.Mode()&os.ModeSymlink == 1 {
logger.Error("can't index symlink data dir", zap.String("path", dir))
}
err := filepath.Walk(dir, func(p string, info os.FileInfo, err error) error {
if err != nil {
logger.Info("error processing", zap.String("path", p), zap.Error(err))
Expand All @@ -759,7 +763,7 @@ func (listener *CarbonserverListener) updateFileList(dir string, cacheMetricName
select {
case m := <-listener.newMetricsChan:
if err := trieIdx.insert(filepath.Clean(strings.ReplaceAll(m, ".", "/") + ".wsp")); err != nil {
logger.Warn("failed to update realtime trie index", zap.Error(err))
listener.logTrieInsertError(logger, "failed to update realtime trie index", m, err)
}
default:
break newMetricsLoop
Expand Down Expand Up @@ -788,7 +792,8 @@ func (listener *CarbonserverListener) updateFileList(dir string, cacheMetricName
} else {
if listener.trieIndex {
if err := trieIdx.insert(trimmedName); err != nil {
return fmt.Errorf("updateFileList.trie: %s", err)
// It's better to just log an error than stop indexing
listener.logTrieInsertError(logger, "updateFileList.trie: failed to index path", trimmedName, err)
}
} else {
files = append(files, trimmedName)
Expand Down Expand Up @@ -927,6 +932,14 @@ func (listener *CarbonserverListener) updateFileList(dir string, cacheMetricName
return
}

func (listener *CarbonserverListener) logTrieInsertError(logger *zap.Logger, msg, metric string, err error) {
zfields := []zap.Field{zap.Error(err), zap.String("metric", metric)}
if ierr, ok := err.(*trieInsertError); ok {
zfields = append(zfields, zap.String("err_info", ierr.info))
}
logger.Error(msg, zfields...)
}

func (listener *CarbonserverListener) expandGlobs(ctx context.Context, query string, resultCh chan<- *ExpandedGlobResponse) {
defer func() {
if err := recover(); err != nil {
Expand Down
25 changes: 22 additions & 3 deletions carbonserver/trie.go
Original file line number Diff line number Diff line change
Expand Up @@ -378,6 +378,13 @@ type nilFilenameError string

func (nfe nilFilenameError) Error() string { return string(nfe) }

type trieInsertError struct {
typ string
info string
}

func (t *trieInsertError) Error() string { return t.typ }

// TODO: add some defensive logics agains bad paths?
//
// abc.def.ghi
Expand All @@ -399,7 +406,7 @@ func (ti *trieIndex) insert(path string) error {
}

if path == "" || path[len(path)-1] == '/' {
return nilFilenameError(fmt.Sprintf("metric fileename is nil: %s", path))
return nilFilenameError("metric filename is nil")
}

if uint64(len(path)) > ti.getDepth() {
Expand Down Expand Up @@ -474,7 +481,7 @@ outer:
goto dir
}

return fmt.Errorf("failed to index metric %s: unknwon case of match == nlen = %d", path, nlen)
return &trieInsertError{typ: "failed to index metric: unknwon case of match == nlen", info: fmt.Sprintf("match == nlen == %d", nlen)}
}

if match == len(child.c) && len(child.c) < nlen { // case 2
Expand Down Expand Up @@ -535,10 +542,22 @@ outer:
// TODO: there seems to be a problem of fetching a directory node without files

if !isFile {
// TODO: should double check if / already exists
if cur.dir() {
return nil
}

var newDir = true
for _, child := range *cur.childrens {
if child.dir() {
cur = child
newDir = false
break
}
}
if newDir {
cur.addChild(&trieNode{c: []byte{'/'}, childrens: &[]*trieNode{}, gen: ti.root.gen})
}

return nil
}

Expand Down
26 changes: 26 additions & 0 deletions carbonserver/trie_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -670,6 +670,26 @@ func TestTrieIndex(t *testing.T) {
},
expectLeafs: []bool{true, false},
},
{
input: []string{
"/ns1/ns2/ns3/ns4/ns5/ns6/ns7_handle.wsp",
"/ns1/ns2/ns3/ns4/ns5/ns6_1/",
"/ns1/ns2/ns3/ns4/ns5/ns6_2",
"/ns1/ns2/ns3/ns4/ns5/ns6_3/",
"/ns1/ns2/ns3/ns4/ns5/ns6_3/",
"/ns1/ns2/ns3/ns4/ns5/ns6_3/",
"/ns1/ns2/ns3/ns4/ns5/ns6_3/",
"/ns1/ns2/ns3/ns4/ns5/ns6_3/metric.wsp",
},
query: "ns1.ns2.ns3.ns4.ns5.*",
expect: []string{
"ns1.ns2.ns3.ns4.ns5.ns6",
"ns1.ns2.ns3.ns4.ns5.ns6_1",
"ns1.ns2.ns3.ns4.ns5.ns6_2",
"ns1.ns2.ns3.ns4.ns5.ns6_3",
},
expectLeafs: []bool{false, false, false, false},
},
}

for _, c := range cases {
Expand All @@ -696,6 +716,8 @@ func TestTrieIndex(t *testing.T) {
}
}

// trieServer.CurrentFileIndex().trieIdx.dump(os.Stdout)

sort.Strings(trieFiles)
sort.Strings(c.expect)
if !reflect.DeepEqual(trieFiles, c.expect) {
Expand All @@ -712,6 +734,10 @@ func TestTrieEdgeCases(t *testing.T) {
if err == nil || err.Error() != "glob: range overflow" {
t.Errorf("trie should return an range overflow error")
}

if err := trie.insert("ns1/ns2/ns3/ns4/ns5/ns7/"); err != nil {
t.Errorf("should not return insert error when inserting folders")
}
}

func TestTrieConcurrentReadWrite(t *testing.T) {
Expand Down

0 comments on commit 749acdd

Please sign in to comment.