Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

carbonserver: log more detailed errors about symlink and continue trie index despite error #427

Merged
merged 2 commits into from
Sep 30, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
8 changes: 8 additions & 0 deletions carbon/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -437,6 +437,14 @@ func (app *App) Start(version string) (err error) {
return
}

if conf.Carbonserver.TrigramIndex || conf.Carbonserver.TrieIndex {
if fi, err := os.Lstat(conf.Whisper.DataDir); err != nil {
return fmt.Errorf("failed to stat whisper data directory: %s", err)
} else if fi.Mode()&os.ModeSymlink == 1 {
return fmt.Errorf("whisper data directory is a symlink")
}
}

carbonserver := carbonserver.NewCarbonserverListener(core.Get)
carbonserver.SetWhisperData(conf.Whisper.DataDir)
carbonserver.SetMaxGlobs(conf.Carbonserver.MaxGlobs)
Expand Down
25 changes: 19 additions & 6 deletions carbonserver/carbonserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -627,7 +627,7 @@ uloop:
metric := "/" + filepath.Clean(strings.ReplaceAll(m, ".", "/")+".wsp")

if err := fidx.trieIdx.insert(metric); err != nil {
listener.logger.Warn("failed to insert new metrics for realtime indexing", zap.String("metric", metric), zap.Error(err))
listener.logTrieInsertError(listener.logger, "failed to insert new metrics for realtime indexing", metric, err)
}
}
continue uloop
Expand Down Expand Up @@ -682,8 +682,7 @@ func (listener *CarbonserverListener) updateFileList(dir string, cacheMetricName
for fileName := range cacheMetricNames {
if listener.trieIndex {
if err := trieIdx.insert(fileName); err != nil {
logger.Error("error populating index from cache indexMap",
zap.Error(err))
listener.logTrieInsertError(logger, "error populating index from cache indexMap", fileName, err)
}
} else {
files = append(files, fileName)
Expand All @@ -709,7 +708,7 @@ func (listener *CarbonserverListener) updateFileList(dir string, cacheMetricName
continue
}
if err := trieIdx.insert(entry); err != nil {
logger.Error("failed to read from file list cache", zap.Error(err))
listener.logTrieInsertError(logger, "failed to read from file list cache", entry, err)
readFromCache = false

trieIdx = newTrie(".wsp")
Expand Down Expand Up @@ -746,6 +745,11 @@ func (listener *CarbonserverListener) updateFileList(dir string, cacheMetricName
}()
}
}
if fi, err := os.Lstat(dir); err != nil {
logger.Error("failed to stat whisper data directory", zap.String("path", dir), zap.Error(err))
} else if fi.Mode()&os.ModeSymlink == 1 {
logger.Error("can't index symlink data dir", zap.String("path", dir))
}
err := filepath.Walk(dir, func(p string, info os.FileInfo, err error) error {
if err != nil {
logger.Info("error processing", zap.String("path", p), zap.Error(err))
Expand All @@ -759,7 +763,7 @@ func (listener *CarbonserverListener) updateFileList(dir string, cacheMetricName
select {
case m := <-listener.newMetricsChan:
if err := trieIdx.insert(filepath.Clean(strings.ReplaceAll(m, ".", "/") + ".wsp")); err != nil {
logger.Warn("failed to update realtime trie index", zap.Error(err))
listener.logTrieInsertError(logger, "failed to update realtime trie index", m, err)
}
default:
break newMetricsLoop
Expand Down Expand Up @@ -788,7 +792,8 @@ func (listener *CarbonserverListener) updateFileList(dir string, cacheMetricName
} else {
if listener.trieIndex {
if err := trieIdx.insert(trimmedName); err != nil {
return fmt.Errorf("updateFileList.trie: %s", err)
// It's better to just log an error than stop indexing
listener.logTrieInsertError(logger, "updateFileList.trie: failed to index path", trimmedName, err)
}
} else {
files = append(files, trimmedName)
Expand Down Expand Up @@ -927,6 +932,14 @@ func (listener *CarbonserverListener) updateFileList(dir string, cacheMetricName
return
}

func (listener *CarbonserverListener) logTrieInsertError(logger *zap.Logger, msg, metric string, err error) {
zfields := []zap.Field{zap.Error(err), zap.String("metric", metric)}
if ierr, ok := err.(*trieInsertError); ok {
zfields = append(zfields, zap.String("err_info", ierr.info))
}
logger.Error(msg, zfields...)
}

func (listener *CarbonserverListener) expandGlobs(ctx context.Context, query string, resultCh chan<- *ExpandedGlobResponse) {
defer func() {
if err := recover(); err != nil {
Expand Down
25 changes: 22 additions & 3 deletions carbonserver/trie.go
Original file line number Diff line number Diff line change
Expand Up @@ -378,6 +378,13 @@ type nilFilenameError string

func (nfe nilFilenameError) Error() string { return string(nfe) }

type trieInsertError struct {
typ string
info string
}

func (t *trieInsertError) Error() string { return t.typ }

// TODO: add some defensive logics agains bad paths?
//
// abc.def.ghi
Expand All @@ -399,7 +406,7 @@ func (ti *trieIndex) insert(path string) error {
}

if path == "" || path[len(path)-1] == '/' {
return nilFilenameError(fmt.Sprintf("metric fileename is nil: %s", path))
return nilFilenameError("metric filename is nil")
}

if uint64(len(path)) > ti.getDepth() {
Expand Down Expand Up @@ -474,7 +481,7 @@ outer:
goto dir
}

return fmt.Errorf("failed to index metric %s: unknwon case of match == nlen = %d", path, nlen)
return &trieInsertError{typ: "failed to index metric: unknwon case of match == nlen", info: fmt.Sprintf("match == nlen == %d", nlen)}
}

if match == len(child.c) && len(child.c) < nlen { // case 2
Expand Down Expand Up @@ -535,10 +542,22 @@ outer:
// TODO: there seems to be a problem of fetching a directory node without files

if !isFile {
// TODO: should double check if / already exists
if cur.dir() {
return nil
}

var newDir = true
for _, child := range *cur.childrens {
if child.dir() {
cur = child
newDir = false
break
}
}
if newDir {
cur.addChild(&trieNode{c: []byte{'/'}, childrens: &[]*trieNode{}, gen: ti.root.gen})
}

return nil
}

Expand Down
26 changes: 26 additions & 0 deletions carbonserver/trie_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -670,6 +670,26 @@ func TestTrieIndex(t *testing.T) {
},
expectLeafs: []bool{true, false},
},
{
input: []string{
"/ns1/ns2/ns3/ns4/ns5/ns6/ns7_handle.wsp",
"/ns1/ns2/ns3/ns4/ns5/ns6_1/",
"/ns1/ns2/ns3/ns4/ns5/ns6_2",
"/ns1/ns2/ns3/ns4/ns5/ns6_3/",
"/ns1/ns2/ns3/ns4/ns5/ns6_3/",
"/ns1/ns2/ns3/ns4/ns5/ns6_3/",
"/ns1/ns2/ns3/ns4/ns5/ns6_3/",
"/ns1/ns2/ns3/ns4/ns5/ns6_3/metric.wsp",
},
query: "ns1.ns2.ns3.ns4.ns5.*",
expect: []string{
"ns1.ns2.ns3.ns4.ns5.ns6",
"ns1.ns2.ns3.ns4.ns5.ns6_1",
"ns1.ns2.ns3.ns4.ns5.ns6_2",
"ns1.ns2.ns3.ns4.ns5.ns6_3",
},
expectLeafs: []bool{false, false, false, false},
},
}

for _, c := range cases {
Expand All @@ -696,6 +716,8 @@ func TestTrieIndex(t *testing.T) {
}
}

// trieServer.CurrentFileIndex().trieIdx.dump(os.Stdout)

sort.Strings(trieFiles)
sort.Strings(c.expect)
if !reflect.DeepEqual(trieFiles, c.expect) {
Expand All @@ -712,6 +734,10 @@ func TestTrieEdgeCases(t *testing.T) {
if err == nil || err.Error() != "glob: range overflow" {
t.Errorf("trie should return an range overflow error")
}

if err := trie.insert("ns1/ns2/ns3/ns4/ns5/ns7/"); err != nil {
t.Errorf("should not return insert error when inserting folders")
}
}

func TestTrieConcurrentReadWrite(t *testing.T) {
Expand Down