Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bug fixes and small refactoring for carbonserver and trie index #416

Merged
merged 4 commits into from
Jul 16, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -362,10 +362,10 @@ max-globs = 100
fail-on-max-globs = false

# Maximum metrics could be returned by glob/wildcard in find request (currently
# works only for trie index)
# works only for trie index) (Default: 10,000,000)
max-metrics-globbed = 30000
bom-d-van marked this conversation as resolved.
Show resolved Hide resolved
# Maximum metrics could be returned in render request (works both all types of
# indexes)
# indexes) (Default: 1,000,000)
max-metrics-rendered = 1000

# graphite-web-10-mode
Expand Down
17 changes: 14 additions & 3 deletions cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@ type Cache struct {
overflowCnt uint32 // drop packages if cache full
queryCnt uint32 // number of queries
tagsNormalizeErrors uint32 // tags normalize errors count

droppedRealtimeIndex uint32 // new metrics failed to be indexed in realtime
}

newMetricsChan chan string
Expand All @@ -79,7 +81,6 @@ func New() *Cache {
c.data[i] = &Shard{
items: make(map[string]*points.Points),
notConfirmed: make([]*points.Points, 4),
adds: make(map[string]struct{}),
}
}

Expand All @@ -95,6 +96,12 @@ func New() *Cache {
return c
}

func (c *Cache) InitCacheScanAdds() {
for _, shard := range c.data {
shard.adds = make(map[string]struct{})
}
}

// SetWriteStrategy ...
func (c *Cache) SetWriteStrategy(s string) (err error) {
c.Lock()
Expand Down Expand Up @@ -148,6 +155,8 @@ func (c *Cache) Stat(send helper.StatCallback) {
helper.SendAndSubstractUint32("queueBuildCount", &c.stat.queueBuildCnt, send)
helper.SendAndSubstractUint32("queueBuildTimeMs", &c.stat.queueBuildTimeMs, send)
helper.SendUint32("queueWriteoutTime", &c.stat.queueWriteoutTime, send)

helper.SendAndSubstractUint32("droppedRealtimeIndex", &c.stat.droppedRealtimeIndex, send)
}

// hash function
Expand Down Expand Up @@ -281,13 +290,15 @@ func (c *Cache) Add(p *points.Points) {
values.Data = append(values.Data, p.Data...)
} else {
shard.items[p.Metric] = p
shard.adds[p.Metric] = struct{}{}

if shard.adds != nil {
shard.adds[p.Metric] = struct{}{}
}
if c.newMetricsChan != nil {
select {
case c.newMetricsChan <- p.Metric:
default:
// TODO: log/metrics
atomic.AddUint32(&c.stat.droppedRealtimeIndex, 1)
}
}
}
Expand Down
1 change: 1 addition & 0 deletions carbon/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -465,6 +465,7 @@ func (app *App) Start(version string) (err error) {

var setConfigRetriever bool
if conf.Carbonserver.CacheScan {
core.InitCacheScanAdds()
carbonserver.SetCacheGetMetricsFunc(core.GetRecentNewMetrics)

setConfigRetriever = true
Expand Down
4 changes: 2 additions & 2 deletions carbon/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -227,8 +227,8 @@ func NewConfig() *Config {
FindCacheEnabled: true,
TrigramIndex: true,
CacheScan: false,
MaxMetricsGlobbed: 30000,
MaxMetricsRendered: 1000,
MaxMetricsGlobbed: 10_000_000,
MaxMetricsRendered: 1_000_000,
},
Carbonlink: carbonlinkConfig{
Listen: "127.0.0.1:7002",
Expand Down
1 change: 1 addition & 0 deletions carbonserver/cache_index_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ func getTestInfo(t *testing.T) *testInfo {
}

c := cache.New()
c.InitCacheScanAdds()
carbonserver := NewCarbonserverListener(c.Get)
carbonserver.whisperData = tmpDir
carbonserver.logger = zap.NewNop()
Expand Down
79 changes: 47 additions & 32 deletions carbonserver/carbonserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -612,7 +612,7 @@ func splitAndInsert(cacheMetricNames map[string]struct{}, newCacheMetricNames []
}

func (listener *CarbonserverListener) fileListUpdater(dir string, tick <-chan time.Time, force <-chan struct{}, exit <-chan struct{}) {
cacheMetrics := make(map[string]struct{})
cacheMetricNames := make(map[string]struct{})

uloop:
for {
Expand All @@ -624,29 +624,30 @@ uloop:
case m := <-listener.newMetricsChan:
fidx := listener.CurrentFileIndex()
if listener.trieIndex && listener.concurrentIndex && fidx != nil && fidx.trieIdx != nil {
metric := filepath.Clean(strings.ReplaceAll(m, ".", "/") + ".wsp")
metric := "/" + filepath.Clean(strings.ReplaceAll(m, ".", "/")+".wsp")

if err := fidx.trieIdx.insert(metric); err != nil {
listener.logger.Warn("failed to insert new metrics for realtime indexing", zap.String("metric", metric), zap.Error(err))
}

cacheMetrics[metric] = struct{}{}
}
continue uloop
}

if listener.cacheGetRecentMetrics != nil {
// cacheMetrics maintains all new metric names added in cache
// cacheMetricNames maintains all new metric names added in cache
// when cache-scan is enabled in conf
newCacheMetricNames := listener.cacheGetRecentMetrics()
cacheMetrics = splitAndInsert(cacheMetrics, newCacheMetricNames)
cacheMetricNames = splitAndInsert(cacheMetricNames, newCacheMetricNames)
}

listener.updateFileList(dir, cacheMetrics)
if listener.updateFileList(dir, cacheMetricNames) {
listener.logger.Info("file list updated with cache, starting a new scan immediately")
listener.updateFileList(dir, cacheMetricNames)
}
}
}

func (listener *CarbonserverListener) updateFileList(dir string, cacheMetricNames map[string]struct{}) {
func (listener *CarbonserverListener) updateFileList(dir string, cacheMetricNames map[string]struct{}) (readFromCache bool) {
logger := listener.logger.With(zap.String("handler", "fileListUpdated"))
defer func() {
if r := recover(); r != nil {
Expand Down Expand Up @@ -676,6 +677,8 @@ func (listener *CarbonserverListener) updateFileList(dir string, cacheMetricName

// populate index for all the metric names in cache
// the iteration takes place only when cache-scan is enabled in conf
var tcache = time.Now()
var cacheMetricLen = len(cacheMetricNames)
for fileName := range cacheMetricNames {
if listener.trieIndex {
if err := trieIdx.insert(fileName); err != nil {
Expand All @@ -689,8 +692,9 @@ func (listener *CarbonserverListener) updateFileList(dir string, cacheMetricName
metricsKnown++
}
}
cacheIndexRuntime := time.Since(tcache)

var readFromCache bool
// readFromCache hould only occur once at the start of the program
if fidx == nil && listener.fileListCache != "" {
fileListCache, err := newFileListCache(listener.fileListCache, 'r')
if err != nil {
Expand All @@ -707,6 +711,8 @@ func (listener *CarbonserverListener) updateFileList(dir string, cacheMetricName
if err := trieIdx.insert(entry); err != nil {
logger.Error("failed to read from file list cache", zap.Error(err))
readFromCache = false

trieIdx = newTrie(".wsp")
break
}
filesLen++
Expand Down Expand Up @@ -746,6 +752,7 @@ func (listener *CarbonserverListener) updateFileList(dir string, cacheMetricName
return nil
}

// && len(listener.newMetricsChan) >= cap(listener.newMetricsChan)/2
if listener.trieIndex && listener.concurrentIndex && listener.newMetricsChan != nil {
newMetricsLoop:
for {
Expand All @@ -760,8 +767,8 @@ func (listener *CarbonserverListener) updateFileList(dir string, cacheMetricName
}
}

hasSuffix := strings.HasSuffix(info.Name(), ".wsp")
if info.IsDir() || hasSuffix {
isFullMetric := strings.HasSuffix(info.Name(), ".wsp")
if info.IsDir() || isFullMetric {
trimmedName := strings.TrimPrefix(p, listener.whisperData)
filesLen++
if flc != nil {
Expand All @@ -787,21 +794,19 @@ func (listener *CarbonserverListener) updateFileList(dir string, cacheMetricName
files = append(files, trimmedName)
}

if hasSuffix {
if isFullMetric {
metricsKnown++
}
}

if hasSuffix {
if listener.internalStatsDir != "" {
i := stat.GetStat(info)
trimmedName = strings.ReplaceAll(trimmedName[1:len(trimmedName)-4], "/", ".")
details[trimmedName] = &protov3.MetricDetails{
Size_: i.Size,
ModTime: i.MTime,
ATime: i.ATime,
RealSize: i.RealSize,
}
if isFullMetric && listener.internalStatsDir != "" {
i := stat.GetStat(info)
trimmedName = strings.ReplaceAll(trimmedName[1:len(trimmedName)-4], "/", ".")
details[trimmedName] = &protov3.MetricDetails{
Size_: i.Size,
ModTime: i.MTime,
ATime: i.ATime,
RealSize: i.RealSize,
}
}
}
Expand All @@ -817,13 +822,6 @@ func (listener *CarbonserverListener) updateFileList(dir string, cacheMetricName

if listener.concurrentIndex && trieIdx != nil {
trieIdx.prune()

start := time.Now()
count, files, dirs, _, _, _, _, _ := trieIdx.countNodes()
atomic.StoreUint64(&listener.metrics.TrieNodes, uint64(count))
atomic.StoreUint64(&listener.metrics.TrieFiles, uint64(files))
atomic.StoreUint64(&listener.metrics.TrieDirs, uint64(dirs))
infos = append(infos, zap.Duration("trie_count_nodes_time", time.Since(start)))
}

var stat syscall.Statfs_t
Expand Down Expand Up @@ -855,7 +853,8 @@ func (listener *CarbonserverListener) updateFileList(dir string, cacheMetricName

var pruned int
var indexType = "trigram"
t0 = time.Now()
var tindex = time.Now()
var indexSize int
if listener.trieIndex {
indexType = "trie"
nfidx.trieIdx = trieIdx
Expand All @@ -864,18 +863,28 @@ func (listener *CarbonserverListener) updateFileList(dir string, cacheMetricName
zap.Int("trie_depth", int(nfidx.trieIdx.depth)),
zap.String("longest_metric", nfidx.trieIdx.longestMetric),
)

if listener.trigramIndex && !listener.concurrentIndex {
start := time.Now()
nfidx.trieIdx.setTrigrams()
infos = append(infos, zap.Duration("set_trigram_time", time.Since(start)))
}

start := time.Now()
count, files, dirs, _, _, _, _, _ := trieIdx.countNodes()
atomic.StoreUint64(&listener.metrics.TrieNodes, uint64(count))
atomic.StoreUint64(&listener.metrics.TrieFiles, uint64(files))
atomic.StoreUint64(&listener.metrics.TrieDirs, uint64(dirs))
infos = append(infos, zap.Duration("trie_count_nodes_time", time.Since(start)))

indexSize = count
} else {
nfidx.files = files
nfidx.idx = trigram.NewIndex(files)
pruned = nfidx.idx.Prune(0.95)
indexSize = len(nfidx.idx)
}
indexSize := len(nfidx.idx)
indexingRuntime := time.Since(t0)
indexingRuntime := time.Since(tindex) // note: no longer meaningful for trie index
atomic.AddUint64(&listener.metrics.IndexBuildTimeNS, uint64(indexingRuntime.Nanoseconds()))

var tl = time.Now()
Expand All @@ -902,14 +911,20 @@ func (listener *CarbonserverListener) updateFileList(dir string, cacheMetricName
zap.Duration("file_scan_runtime", fileScanRuntime),
zap.Duration("indexing_runtime", indexingRuntime),
zap.Duration("rdtime_update_runtime", rdTimeUpdateRuntime),
zap.Duration("cache_index_runtime", cacheIndexRuntime),
zap.Duration("total_runtime", time.Since(t0)),
zap.Int("Files", filesLen),
zap.Int("index_size", indexSize),
zap.Int("pruned_trigrams", pruned),
zap.Int("cache_metric_len_before", cacheMetricLen),
zap.Int("cache_metric_len_after", len(cacheMetricNames)),
zap.Uint64("metrics_known", metricsKnown),
zap.String("index_type", indexType),
zap.Bool("read_from_cache", readFromCache),
)
logger.Info("file list updated", infos...)

return
}

func (listener *CarbonserverListener) expandGlobs(ctx context.Context, query string, resultCh chan<- *ExpandedGlobResponse) {
Expand Down
14 changes: 10 additions & 4 deletions carbonserver/render.go
Original file line number Diff line number Diff line change
Expand Up @@ -371,6 +371,12 @@ func (listener *CarbonserverListener) prepareDataProto(ctx context.Context, logg
if expandedResult, ok := metricGlobMap[metric.Name]; ok {
files, leafs := expandedResult.Files, expandedResult.Leafs
if len(files) > listener.maxMetricsRendered {
listener.accessLogger.Error(
"rendering too many metrics",
zap.Int("limit", listener.maxMetricsRendered),
zap.Int("target", len(files)),
)

files = files[:listener.maxMetricsRendered]
leafs = leafs[:listener.maxMetricsRendered]
}
Expand All @@ -381,7 +387,7 @@ func (listener *CarbonserverListener) prepareDataProto(ctx context.Context, logg
metricsCount++
}
}
listener.logger.Debug("expandGlobs result",
listener.accessLogger.Debug("expandGlobs result",
zap.String("handler", "render"),
zap.String("action", "expandGlobs"),
zap.String("metric", metric.Name),
Expand All @@ -394,7 +400,7 @@ func (listener *CarbonserverListener) prepareDataProto(ctx context.Context, logg
res, err := listener.fetchDataPB(metric.Name, files, leafs, fromTime, untilTime)
if err != nil {
atomic.AddUint64(&listener.metrics.RenderErrors, 1)
listener.logger.Error("error while fetching the data",
listener.accessLogger.Error("error while fetching the data",
zap.Error(err),
)
continue
Expand All @@ -404,7 +410,7 @@ func (listener *CarbonserverListener) prepareDataProto(ctx context.Context, logg
res, err := listener.fetchDataPB3(metric.Name, files, leafs, fromTime, untilTime)
if err != nil {
atomic.AddUint64(&listener.metrics.RenderErrors, 1)
listener.logger.Error("error while fetching the data",
listener.accessLogger.Error("error while fetching the data",
zap.Error(err),
)
continue
Expand All @@ -415,7 +421,7 @@ func (listener *CarbonserverListener) prepareDataProto(ctx context.Context, logg
multiv3.Metrics = append(multiv3.Metrics, res.Metrics...)
}
} else {
listener.logger.Debug("expand globs returned an error",
listener.accessLogger.Debug("expand globs returned an error",
zap.Error(err),
)
continue
Expand Down
4 changes: 2 additions & 2 deletions deploy/go-carbon.conf
Original file line number Diff line number Diff line change
Expand Up @@ -275,10 +275,10 @@ max-globs = 100
fail-on-max-globs = false

# Maximum metrics could be returned by glob/wildcard in find request (currently
# works only for trie index)
# works only for trie index) (Default: 10,000,000)
max-metrics-globbed = 30000
# Maximum metrics could be returned in render request (works both all types of
# indexes)
# indexes) (Default: 1,000,000)
max-metrics-rendered = 1000


Expand Down
4 changes: 2 additions & 2 deletions go-carbon.conf.example
Original file line number Diff line number Diff line change
Expand Up @@ -277,10 +277,10 @@ max-globs = 100
fail-on-max-globs = false

# Maximum metrics could be returned by glob/wildcard in find request (currently
# works only for trie index)
# works only for trie index) (Default: 10,000,000)
bom-d-van marked this conversation as resolved.
Show resolved Hide resolved
max-metrics-globbed = 30000
# Maximum metrics could be returned in render request (works both all types of
# indexes)
# indexes) (Default: 1,000,000)
max-metrics-rendered = 1000

# graphite-web-10-mode
Expand Down