Skip to content

Commit

Permalink
refactor: change transfer stats metric
Browse files Browse the repository at this point in the history
  • Loading branch information
710leo committed Mar 27, 2020
1 parent aac5fd1 commit 2f86831
Show file tree
Hide file tree
Showing 2 changed files with 10 additions and 11 deletions.
3 changes: 3 additions & 0 deletions src/modules/judge/cache/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,9 @@ func (i *IndexMap) Set(id int64, hash string, s Series) {
}

func (i *IndexMap) Get(id int64) []Series {
i.RLock()
defer i.RUnlock()

seriess := []Series{}
if ss, exists := i.Data[id]; exists {
for _, s := range ss {
Expand Down
18 changes: 7 additions & 11 deletions src/modules/transfer/backend/sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,17 +62,15 @@ func Send2TsdbTask(Q *list.SafeListLimited, node string, addr string, concurrent
for {
items := Q.PopBackBy(batch)
count := len(items)
stats.Counter.Set("tsdb.queue.push", count)

if count == 0 {
time.Sleep(DefaultSendTaskSleepInterval)
continue
}

tsdbItems := make([]*dataobj.TsdbItem, count)
stats.Counter.Set("points.out.tsdb", count)
for i := 0; i < count; i++ {
tsdbItems[i] = items[i].(*dataobj.TsdbItem)
stats.Counter.Set("points.out.tsdb", 1)
logger.Debug("send to tsdb->: ", tsdbItems[i])
}

Expand All @@ -93,10 +91,8 @@ func Send2TsdbTask(Q *list.SafeListLimited, node string, addr string, concurrent
time.Sleep(time.Millisecond * 10)
}

// statistics
//atomic.AddInt64(&PointOut2Tsdb, int64(count))
if !sendOk {
stats.Counter.Set("points.out.tsdb.err", 1)
stats.Counter.Set("points.out.tsdb.err", count)
logger.Errorf("send %v to tsdb %s:%s fail: %v", tsdbItems, node, addr, err)
} else {
logger.Debugf("send to tsdb %s:%s ok", node, addr)
Expand Down Expand Up @@ -145,11 +141,10 @@ func Send2JudgeTask(Q *list.SafeListLimited, addr string, concurrent int) {
time.Sleep(DefaultSendTaskSleepInterval)
continue
}

judgeItems := make([]*dataobj.JudgeItem, count)
stats.Counter.Set("points.out.judge", count)
for i := 0; i < count; i++ {
judgeItems[i] = items[i].(*dataobj.JudgeItem)
stats.Counter.Set("points.out.judge", 1)
logger.Debug("send to judge: ", judgeItems[i])
}

Expand All @@ -171,8 +166,10 @@ func Send2JudgeTask(Q *list.SafeListLimited, addr string, concurrent int) {
}

if !sendOk {
stats.Counter.Set("points.out.judge.err", 1)
logger.Errorf("send %v to judge %s fail: %v", judgeItems, addr, err)
stats.Counter.Set("points.out.judge.err", count)
for _, item := range judgeItems {
logger.Errorf("send %v to judge %s fail: %v", item, addr, err)
}
}

}(addr, judgeItems, count)
Expand All @@ -186,7 +183,6 @@ func Push2JudgeSendQueue(items []*dataobj.MetricValue) {
stras := cache.StraMap.GetByKey(key)

for _, stra := range stras {

if !TagMatch(stra.Tags, item.TagsMap) {
continue
}
Expand Down

0 comments on commit 2f86831

Please sign in to comment.