/
topic_length.go
95 lines (84 loc) · 1.96 KB
/
topic_length.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
package crontab
import (
"fmt"
"log"
"math/rand"
"sync"
"github.com/huajiao-tv/dashboard/dao"
"github.com/huajiao-tv/dashboard/keeper"
)
type TopicLengthCollect struct {
mu sync.RWMutex
m map[string]*dao.TopicHistory
}
func newTopicLengthCollect() *TopicLengthCollect {
return &TopicLengthCollect{
m: map[string]*dao.TopicHistory{},
}
}
func (s *TopicLengthCollect) GetByKey(key string) *dao.TopicHistory {
s.mu.RLock()
defer s.mu.RUnlock()
if data, ok := s.m[key]; ok {
return data
}
return &dao.TopicHistory{}
}
func (s *TopicLengthCollect) Get(queue, topic string) *dao.TopicHistory {
key := fmt.Sprintf("%v/%v", queue, topic)
return s.GetByKey(key)
}
func (s *TopicLengthCollect) collect() {
nodes, err := keeper.GetNodeList()
if err != nil {
log.Println("CollectTopicLength getNodeList failed:", err)
return
}
if len(nodes) == 0 {
log.Println("CollectTopicLength getNodeList get empty node,skip!")
return
}
topics, err := dao.Topic{}.Query(&dao.Query{})
if err != nil {
log.Println("Query topic failed:", err)
return
}
topicChan := make(chan *dao.TopicHistory, len(topics))
for _, topic := range topics {
node := nodes[rand.Intn(len(nodes))]
info, err := GetTopicLength(node, topic.Queue, topic.Name)
if err != nil {
log.Println("getTopicLength failed:", err)
return
}
topicChan <- &dao.TopicHistory{
Queue: topic.Queue,
Topic: topic.Name,
Length: info.Data.Normal,
RetryLength: info.Data.Retry,
TimeoutLength: info.Data.Timeout,
}
}
// save
if len(topicChan) == 0 {
return
}
data := make(map[string]*dao.TopicHistory, len(topicChan))
Range:
for {
select {
case stats := <-topicChan:
key := fmt.Sprintf("%v/%v", stats.Queue, stats.Topic)
data[key] = stats
default:
break Range
}
}
s.mu.Lock()
s.m = data
s.mu.Unlock()
err = dao.TopicHistory{DataType: dao.HourData}.CreateBatch(data)
if err != nil {
log.Println("TopicHistory CreateBatch failed:", err)
}
}