Skip to content

Commit

Permalink
ctrie: ci and bug fixes
Browse files Browse the repository at this point in the history
* fix ci errors
* handle potential index out of bound in all the trie walking funcs
* refactor test logs
  • Loading branch information
bom-d-van committed Oct 18, 2020
1 parent 39e432f commit 6b5ff11
Show file tree
Hide file tree
Showing 4 changed files with 63 additions and 49 deletions.
15 changes: 10 additions & 5 deletions carbonserver/carbonserver.go
Expand Up @@ -728,8 +728,11 @@ func (listener *CarbonserverListener) updateFileList(dir string, cacheMetricName
}
} else {
defer func() {
if err := flc.close(); err != nil {
logger.Error("failed to close flie list cache", zap.Error(err))
// flc could be reset to nil during filepath walk
if flc != nil {
if err := flc.close(); err != nil {
logger.Error("failed to close flie list cache", zap.Error(err))
}
}
}()
}
Expand Down Expand Up @@ -776,7 +779,9 @@ func (listener *CarbonserverListener) updateFileList(dir string, cacheMetricName
if flc != nil {
if err := flc.write(trimmedName); err != nil {
logger.Error("failed to write to file list cache", zap.Error(err))
flc.file.Close()
if err := flc.close(); err != nil {
logger.Error("failed to close flie list cache", zap.Error(err))
}
flc = nil
}
}
Expand Down Expand Up @@ -821,7 +826,7 @@ func (listener *CarbonserverListener) updateFileList(dir string, cacheMetricName
atomic.StoreUint64(&listener.metrics.TrieNodes, uint64(count))
atomic.StoreUint64(&listener.metrics.TrieFiles, uint64(files))
atomic.StoreUint64(&listener.metrics.TrieDirs, uint64(dirs))
infos = append(infos, zap.Duration("trie_count_nodes_time", time.Now().Sub(start)))
infos = append(infos, zap.Duration("trie_count_nodes_time", time.Since(start)))
}

var stat syscall.Statfs_t
Expand Down Expand Up @@ -865,7 +870,7 @@ func (listener *CarbonserverListener) updateFileList(dir string, cacheMetricName
if listener.trigramIndex && !listener.concurrentIndex {
start := time.Now()
nfidx.trieIdx.setTrigrams()
infos = append(infos, zap.Duration("set_trigram_time", time.Now().Sub(start))) //nolint:gosimple
infos = append(infos, zap.Duration("set_trigram_time", time.Since(start)))
}
} else {
nfidx.files = files
Expand Down
64 changes: 33 additions & 31 deletions carbonserver/trie.go
Expand Up @@ -356,6 +356,9 @@ func newTrie(fileExt string) *trieIndex {
}
}

func (ti *trieIndex) getDepth() uint64 { return atomic.LoadUint64(&ti.depth) }
func (ti *trieIndex) setDepth(d uint64) { atomic.StoreUint64(&ti.depth, d) }

// TODO: add some defensive logics agains bad paths?
//
// abc.def.ghi
Expand All @@ -372,8 +375,8 @@ func (ti *trieIndex) insert(path string) error {
}

cur := ti.root
if uint64(len(path)) > atomic.LoadUint64(&ti.depth) {
atomic.StoreUint64(&ti.depth, uint64(len(path)))
if uint64(len(path)) > ti.getDepth() {
ti.setDepth(uint64(len(path)))
ti.longestMetric = path
}

Expand Down Expand Up @@ -555,10 +558,10 @@ func (ti *trieIndex) query(expr string, limit int, expand func(globs []string) (

var cur = ti.root
var curChildrens = cur.getChildrens()
var depth = atomic.LoadUint64(&ti.depth)
var nindex = make([]int, depth+1)
var trieNodes = make([]*trieNode, depth+1)
var childrensStack = make([][]*trieNode, depth+1)
var depth = ti.getDepth() + 1
var nindex = make([]int, depth)
var trieNodes = make([]*trieNode, depth)
var childrensStack = make([][]*trieNode, depth)
var ncindex int
var mindex int
var curm = matchers[0]
Expand Down Expand Up @@ -725,12 +728,12 @@ func dumpTrigrams(data []uint32) []trigram.T { //nolint:deadcode,unused

func (ti *trieIndex) allMetrics(sep byte) []string {
var files = make([]string, 0, ti.fileCount)
var depth = atomic.LoadUint64(&ti.depth)
var nindex = make([]int, depth+1)
var depth = ti.getDepth() + 1
var nindex = make([]int, depth)
var ncindex int
var cur = ti.root
var curChildrens = cur.getChildrens()
var trieNodes = make([]*trieNode, depth+1)
var trieNodes = make([]*trieNode, depth)
for {
if nindex[ncindex] >= len(curChildrens) {
goto parent
Expand All @@ -740,7 +743,7 @@ func (ti *trieIndex) allMetrics(sep byte) []string {
cur = cur.getChild(curChildrens, nindex[ncindex])
curChildrens = cur.getChildrens()
ncindex++
if ncindex >= len(nindex)-1 {
if ncindex >= len(trieNodes)-1 {
goto parent
}

Expand Down Expand Up @@ -769,12 +772,12 @@ func (ti *trieIndex) allMetrics(sep byte) []string {
}

func (ti *trieIndex) dump(w io.Writer) {
var depth = atomic.LoadUint64(&ti.depth)
var nindex = make([]int, depth+1)
var depth = ti.getDepth() + 1
var nindex = make([]int, depth)
var ncindex int
var cur = ti.root
var curChildrens = cur.getChildrens()
var trieNodes = make([]*trieNode, depth+1)
var trieNodes = make([]*trieNode, depth)
var ident []byte
for {
if nindex[ncindex] >= len(curChildrens) {
Expand All @@ -785,7 +788,7 @@ func (ti *trieIndex) dump(w io.Writer) {
cur = cur.getChild(curChildrens, nindex[ncindex])
curChildrens = cur.getChildrens()
ncindex++
if ncindex >= len(nindex)-1 {
if ncindex >= len(trieNodes)-1 {
goto parent
}

Expand Down Expand Up @@ -822,13 +825,13 @@ func (ti *trieIndex) dump(w io.Writer) {
// boundary)
func (ti *trieIndex) statNodes() map[*trieNode]int {
var stats = map[*trieNode]int{}
var depth = atomic.LoadUint64(&ti.depth)
var nindex = make([]int, depth+1)
var depth = ti.getDepth() + 1
var nindex = make([]int, depth)
var ncindex int
var cur = ti.root
var curChildrens = cur.getChildrens()
var trieNodes = make([]*trieNode, depth+1)
var curdirs = make([]int, depth+1)
var trieNodes = make([]*trieNode, depth)
var curdirs = make([]int, depth)
var curindex int

for {
Expand Down Expand Up @@ -878,12 +881,12 @@ func (ti *trieIndex) statNodes() map[*trieNode]int {

// TODO: support ctrie
func (ti *trieIndex) setTrigrams() {
var depth = atomic.LoadUint64(&ti.depth)
var nindex = make([]int, depth+1)
var depth = ti.getDepth() + 1
var nindex = make([]int, depth)
var ncindex int
var cur = ti.root
var trieNodes = make([]*trieNode, depth+1)
var trigrams = make([][]uint32, depth+1)
var trieNodes = make([]*trieNode, depth)
var trigrams = make([][]uint32, depth)
var stats = ti.statNodes()

// chosen semi-randomly. maybe we could turn this into a configurations
Expand All @@ -903,7 +906,6 @@ func (ti *trieIndex) setTrigrams() {
if ncindex >= len(nindex)-1 {
goto parent
}
trieNodes[ncindex] = cur

// abc.xyz.cjk
trigrams[ncindex] = []uint32{}
Expand Down Expand Up @@ -995,9 +997,8 @@ func (ti *trieIndex) prune() {
cur.childrens = *cur.node.childrens

var idx int
var depth = atomic.LoadUint64(&ti.depth)
var states = make([]state, depth+1)

var depth = ti.getDepth() + 1
var states = make([]state, depth)
for {
if cur.next >= len(cur.childrens) {
cc := cur.node.getChildrens()
Expand Down Expand Up @@ -1060,7 +1061,7 @@ func (ti *trieIndex) prune() {

type trieCounter [256]int

func (tc trieCounter) String() string {
func (tc *trieCounter) String() string {
var str string
for i, c := range tc {
if c > 0 {
Expand All @@ -1073,7 +1074,7 @@ func (tc trieCounter) String() string {
return fmt.Sprintf("{%s}", str)
}

func (ti *trieIndex) countNodes() (count, files, dirs, onec, onefc, onedc int, countByChildren, nodesByGen trieCounter) {
func (ti *trieIndex) countNodes() (count, files, dirs, onec, onefc, onedc int, countByChildren, nodesByGen *trieCounter) {
type state struct {
next int
node *trieNode
Expand All @@ -1085,9 +1086,10 @@ func (ti *trieIndex) countNodes() (count, files, dirs, onec, onefc, onedc int, c
cur.childrens = cur.node.childrens

var idx int
var depth = atomic.LoadUint64(&ti.depth)
var states = make([]state, depth+1)

var depth = ti.getDepth() + 1
var states = make([]state, depth)
countByChildren = &trieCounter{}
nodesByGen = &trieCounter{}
for {
if cur.next >= len(*cur.childrens) {
if len(*cur.childrens) == 1 && !cur.node.dir() {
Expand Down
4 changes: 2 additions & 2 deletions carbonserver/trie_real_test.go
Expand Up @@ -67,7 +67,7 @@ func TestTrieGlobRealData(t *testing.T) {
var wg sync.WaitGroup
wg.Add(1)
go func() {
trieServer = newTrieServer(files, !*pureTrie)
trieServer = newTrieServer(files, !*pureTrie, t)
trieServer.whisperData = *carbonPath

if *checkMemory {
Expand All @@ -80,7 +80,7 @@ func TestTrieGlobRealData(t *testing.T) {
if !*noTrigram {
wg.Add(1)
go func() {
trigramServer = newTrigramServer(files)
trigramServer = newTrigramServer(files, t)
trigramServer.whisperData = *carbonPath

if *checkMemory {
Expand Down
29 changes: 18 additions & 11 deletions carbonserver/trie_test.go
Expand Up @@ -21,7 +21,11 @@ import (

func init() { log.SetFlags(log.Lshortfile) }

func newTrieServer(files []string, withTrigram bool) *CarbonserverListener {
type logf interface {
Logf(format string, args ...interface{})
}

func newTrieServer(files []string, withTrigram bool, l logf) *CarbonserverListener {
var listener CarbonserverListener
listener.logger = zap.NewNop()
listener.accessLogger = zap.NewNop()
Expand All @@ -37,15 +41,15 @@ func newTrieServer(files []string, withTrigram bool) *CarbonserverListener {
for _, file := range files {
trieIndex.insert(file)
}
fmt.Printf("trie index took %s\n", time.Now().Sub(start)) //nolint:gosimple
l.Logf("trie index took %s\n", time.Since(start))

if withTrigram {
start = time.Now()
trieIndex.setTrigrams()
fmt.Printf("trie setTrigrams took %s size %d\n", time.Now().Sub(start), len(trieIndex.trigrams)) //nolint:gosimple
l.Logf("trie setTrigrams took %s size %d\n", time.Since(start), len(trieIndex.trigrams))
}

fmt.Printf("longest metric(%d): %s\n", trieIndex.depth, trieIndex.longestMetric)
l.Logf("longest metric(%d): %s\n", trieIndex.depth, trieIndex.longestMetric)

listener.UpdateFileIndex(&fileIndex{
trieIdx: trieIndex,
Expand All @@ -54,7 +58,7 @@ func newTrieServer(files []string, withTrigram bool) *CarbonserverListener {
return &listener
}

func newTrigramServer(files []string) *CarbonserverListener {
func newTrigramServer(files []string, l logf) *CarbonserverListener {
var listener CarbonserverListener
listener.logger = zap.NewNop()
listener.accessLogger = zap.NewNop()
Expand All @@ -67,7 +71,7 @@ func newTrigramServer(files []string) *CarbonserverListener {

start := time.Now()
idx := trigram.NewIndex(files)
fmt.Printf("trigram index took %s\n", time.Now().Sub(start)) //nolint:gosimple
l.Logf("trigram index took %s\n", time.Since(start))

listener.UpdateFileIndex(&fileIndex{
idx: idx,
Expand Down Expand Up @@ -634,7 +638,7 @@ func TestTrieIndex(t *testing.T) {
t.Run(c.query, func(t *testing.T) {
t.Logf("case: TestTrieIndex/'^%s$'", regexp.QuoteMeta(c.query))

trieServer := newTrieServer(c.input, false)
trieServer := newTrieServer(c.input, false, t)
resultCh := make(chan *ExpandedGlobResponse, 1)
trieServer.expandGlobs(context.TODO(), c.query, resultCh)
result := <-resultCh
Expand Down Expand Up @@ -720,6 +724,9 @@ func TestTrieConcurrentReadWrite(t *testing.T) {
}
}

// for fixing Unused code error from deepsource.io, dump is useful for debugging
var _ = (&trieIndex{}).dump

func TestTriePrune(t *testing.T) {
cases := []struct {
files1 []string
Expand Down Expand Up @@ -827,7 +834,7 @@ func TestTriePrune(t *testing.T) {
if conedc != sonedc {
t.Errorf("conedc = %v; sonedc %v", conedc, sonedc)
}
if ccountByChildren != scountByChildren {
if *ccountByChildren != *scountByChildren {
t.Errorf("ccountByChildren = %s; scountByChildren %s", ccountByChildren, scountByChildren)
}
})
Expand Down Expand Up @@ -859,11 +866,11 @@ func BenchmarkGlobIndex(b *testing.B) {
var wg sync.WaitGroup
wg.Add(2)
go func() {
btrieServer = newTrieServer(files, true)
btrieServer = newTrieServer(files, true, b)
wg.Done()
}()
go func() {
btrigramServer = newTrigramServer(files)
btrigramServer = newTrigramServer(files, b)
wg.Done()
}()

Expand Down Expand Up @@ -952,7 +959,7 @@ func TestDumpAllMetrics(t *testing.T) {
"/service-01/server-170/metric-namespace-007-008-xdp/cpu.wsp",
"/service-01/server-170/metric-namespace-006-xdp/cpu.wsp",
}
trie := newTrieServer(files, true)
trie := newTrieServer(files, true, t)
metrics := trie.CurrentFileIndex().trieIdx.allMetrics('.')
for i := range files {
files[i] = files[i][:len(files[i])-4]
Expand Down

0 comments on commit 6b5ff11

Please sign in to comment.