/
agent.go
127 lines (112 loc) · 2.74 KB
/
agent.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
package monitor
import (
"encoding/json"
"fmt"
"sync"
"time"
"github.com/VertexC/log-formatter/logger"
)
const HeartbeatInterval = 20
const StaleInterval = 3 * HeartbeatInterval
type Agent struct {
Id uint64 `json:"id"`
Status string `json:"status"`
// rpc connection address of agent
Address string `json:"address"`
// config is a place holder of config
Config string `json:"config"`
HeartbeatTick int `json:"alive"`
}
type AgentsSyncMap struct {
agents map[uint64]*Agent
lock sync.RWMutex
logger *logger.Logger
}
func NewAgentsSyncMap(logger *logger.Logger) *AgentsSyncMap {
return &AgentsSyncMap{
agents: make(map[uint64]*Agent),
logger: logger,
}
}
func (agentsMap *AgentsSyncMap) Update(agents ...Agent) {
agentsMap.lock.Lock()
defer agentsMap.lock.Unlock()
for _, agent := range agents {
agentsMap.agents[agent.Id] = &agent
agentsMap.agents[agent.Id].HeartbeatTick = 0
}
}
func (agentsMap *AgentsSyncMap) TryGet(id uint64) (Agent, error) {
agentsMap.lock.RLock()
defer agentsMap.lock.RUnlock()
if agent, ok := agentsMap.agents[id]; ok {
return *agent, nil
} else {
return Agent{}, fmt.Errorf("Agent with Id %d not found", id)
}
}
func (agentsMap *AgentsSyncMap) GetAll() []Agent {
agentsMap.lock.RLock()
defer agentsMap.lock.RUnlock()
agents := []Agent{}
for _, agent := range agentsMap.agents {
agents = append(agents, *agent)
}
return agents
}
func (agentsMap *AgentsSyncMap) ToJson() ([]byte, error) {
agentsMap.lock.RLock()
defer agentsMap.lock.RUnlock()
data, err := json.Marshal(agentsMap.agents)
return data, err
}
func (agentsMap *AgentsSyncMap) AgentToJson(id uint64) ([]byte, error) {
agentsMap.lock.RLock()
defer agentsMap.lock.RUnlock()
var (
err error
data []byte
)
if agent, ok := agentsMap.agents[id]; ok {
data, err = json.Marshal(agent)
} else {
err = fmt.Errorf("Agent with Id %d not found", id)
}
return data, err
}
func (agentsMap *AgentsSyncMap) Tick() {
for {
time.Sleep(1 * time.Second)
agentsMap.tick()
}
}
func (agentsMap *AgentsSyncMap) tick() {
agentsMap.lock.Lock()
defer agentsMap.lock.Unlock()
for _, agent := range agentsMap.agents {
agent.HeartbeatTick++
if agent.HeartbeatTick > HeartbeatInterval {
agent.Status = "Unknown"
}
}
}
func (agentsMap *AgentsSyncMap) gcWorker() {
for {
agentsMap.lock.Lock()
staleAgents := []uint64{}
for _, agent := range agentsMap.agents {
if agent.HeartbeatTick > StaleInterval {
staleAgents = append(staleAgents, agent.Id)
}
}
agentsMap.remove(staleAgents...)
agentsMap.lock.Unlock()
time.Sleep(10 * time.Second)
}
}
func (agentsMap *AgentsSyncMap) remove(ids ...uint64) {
for _, id := range ids {
agentsMap.logger.Info.Printf("Remove stale agent %d", id)
delete(agentsMap.agents, id)
}
}