From 2f86831c60fc90ef2973ffd09386799ccf09eff3 Mon Sep 17 00:00:00 2001 From: 710leo <710leo@gmail.com> Date: Fri, 27 Mar 2020 14:58:36 +0800 Subject: [PATCH] refactor: change transfer stats metric --- src/modules/judge/cache/index.go | 3 +++ src/modules/transfer/backend/sender.go | 18 +++++++----------- 2 files changed, 10 insertions(+), 11 deletions(-) diff --git a/src/modules/judge/cache/index.go b/src/modules/judge/cache/index.go index 8d2e31a2a..0dac52dcc 100644 --- a/src/modules/judge/cache/index.go +++ b/src/modules/judge/cache/index.go @@ -40,6 +40,9 @@ func (i *IndexMap) Set(id int64, hash string, s Series) { } func (i *IndexMap) Get(id int64) []Series { + i.RLock() + defer i.RUnlock() + seriess := []Series{} if ss, exists := i.Data[id]; exists { for _, s := range ss { diff --git a/src/modules/transfer/backend/sender.go b/src/modules/transfer/backend/sender.go index f28dde189..f3ea01a0a 100644 --- a/src/modules/transfer/backend/sender.go +++ b/src/modules/transfer/backend/sender.go @@ -62,17 +62,15 @@ func Send2TsdbTask(Q *list.SafeListLimited, node string, addr string, concurrent for { items := Q.PopBackBy(batch) count := len(items) - stats.Counter.Set("tsdb.queue.push", count) - if count == 0 { time.Sleep(DefaultSendTaskSleepInterval) continue } tsdbItems := make([]*dataobj.TsdbItem, count) + stats.Counter.Set("points.out.tsdb", count) for i := 0; i < count; i++ { tsdbItems[i] = items[i].(*dataobj.TsdbItem) - stats.Counter.Set("points.out.tsdb", 1) logger.Debug("send to tsdb->: ", tsdbItems[i]) } @@ -93,10 +91,8 @@ func Send2TsdbTask(Q *list.SafeListLimited, node string, addr string, concurrent time.Sleep(time.Millisecond * 10) } - // statistics - //atomic.AddInt64(&PointOut2Tsdb, int64(count)) if !sendOk { - stats.Counter.Set("points.out.tsdb.err", 1) + stats.Counter.Set("points.out.tsdb.err", count) logger.Errorf("send %v to tsdb %s:%s fail: %v", tsdbItems, node, addr, err) } else { logger.Debugf("send to tsdb %s:%s ok", node, addr) @@ -145,11 +141,10 @@ func Send2JudgeTask(Q *list.SafeListLimited, addr string, concurrent int) { time.Sleep(DefaultSendTaskSleepInterval) continue } - judgeItems := make([]*dataobj.JudgeItem, count) + stats.Counter.Set("points.out.judge", count) for i := 0; i < count; i++ { judgeItems[i] = items[i].(*dataobj.JudgeItem) - stats.Counter.Set("points.out.judge", 1) logger.Debug("send to judge: ", judgeItems[i]) } @@ -171,8 +166,10 @@ func Send2JudgeTask(Q *list.SafeListLimited, addr string, concurrent int) { } if !sendOk { - stats.Counter.Set("points.out.judge.err", 1) - logger.Errorf("send %v to judge %s fail: %v", judgeItems, addr, err) + stats.Counter.Set("points.out.judge.err", count) + for _, item := range judgeItems { + logger.Errorf("send %v to judge %s fail: %v", item, addr, err) + } } }(addr, judgeItems, count) @@ -186,7 +183,6 @@ func Push2JudgeSendQueue(items []*dataobj.MetricValue) { stras := cache.StraMap.GetByKey(key) for _, stra := range stras { - if !TagMatch(stra.Tags, item.TagsMap) { continue }