/
agent_monitor.go
149 lines (140 loc) · 4.42 KB
/
agent_monitor.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
package prom
import (
"encoding/json"
"fmt"
"github.com/WeBankPartners/open-monitor/monitor-server/middleware/log"
m "github.com/WeBankPartners/open-monitor/monitor-server/models"
"io/ioutil"
"net/http"
"strings"
"sync"
"time"
)
var (
AgentManagerInitFlag = false
AgentManagerLock = new(sync.RWMutex)
)
type agentManagerRequest struct {
Guid string `json:"guid"`
Exporter string `json:"exporter"`
Config string `json:"config"`
InstanceServer string `json:"instance_server"`
InstancePort string `json:"instance_port"`
AuthUser string `json:"auth_user"`
AuthPassword string `json:"auth_password"`
AgentManagerRemoteIp string `json:"agentManagerRemoteIp"`
}
type agentManagerResponse struct {
Code int `json:"code"`
Message string `json:"message"`
Data interface{} `json:"data"`
}
func DeployAgent(agentType, instance, bin, ip, port, user, pwd, url, configFile string) (address string, err error) {
if !AgentManagerInitFlag {
time.Sleep(1 * time.Second)
AgentManagerLock.RLock()
AgentManagerLock.RUnlock()
}
var param agentManagerRequest
param.Guid = fmt.Sprintf("%s_%s_%s", instance, ip, agentType)
param.Exporter = bin
param.Config = configFile
param.InstanceServer = ip
param.InstancePort = port
param.AuthUser = user
param.AuthPassword = pwd
param.AgentManagerRemoteIp = m.AgentManagerRemoteIp
resp, err := requestAgentMonitor(param, url, "add")
if err != nil {
return address, err
}
if resp.Code == 200 {
if strings.Contains(resp.Message, ":") {
tmpAddress := resp.Message
tmpAddress = "127.0.0.1" + tmpAddress[strings.Index(tmpAddress, ":"):]
return tmpAddress, nil
} else {
return "", fmt.Errorf("agent manager response message is illegal address: %s ", resp.Message)
}
} else {
return address, fmt.Errorf(resp.Message)
}
}
func StopAgent(agentType, instance, ip, url string) error {
var param agentManagerRequest
param.Guid = fmt.Sprintf("%s_%s_%s", instance, ip, agentType)
param.AgentManagerRemoteIp = m.AgentManagerRemoteIp
resp, err := requestAgentMonitor(param, url, "delete")
if err != nil {
return err
}
if resp.Code == 200 {
return nil
} else {
return fmt.Errorf(resp.Message)
}
}
func InitAgentManager(param []*m.AgentManagerTable, url string) {
count := 0
initParam := m.InitDeployParam{AgentManagerRemoteIp: m.AgentManagerRemoteIp, Config: param}
AgentManagerLock.Lock()
for {
time.Sleep(30 * time.Second)
resp, err := requestAgentMonitor(&initParam, url, "init")
if err != nil {
log.Logger.Error("Init agent manager, request error", log.Error(err))
}
if resp.Code == 200 {
log.Logger.Info("Init agent manager success")
break
} else {
log.Logger.Warn("Init agent manager, response error", log.String("message", resp.Message))
}
count++
if count >= 10 {
log.Logger.Warn("Init agent manager fail, retry max time")
break
}
}
AgentManagerLock.Unlock()
AgentManagerInitFlag = true
}
func DoSyncAgentManagerJob(param []*m.AgentManagerTable, url string) {
log.Logger.Info("Start init agent manager ")
initParam := m.InitDeployParam{AgentManagerRemoteIp: m.AgentManagerRemoteIp, Config: param}
resp, err := requestAgentMonitor(&initParam, url, "init")
if err != nil {
log.Logger.Error("Init agent manager, request error", log.Error(err))
}
if resp.Code == 200 {
log.Logger.Info("Init agent manager success")
} else {
log.Logger.Warn("Init agent manager, response error", log.String("message", resp.Message))
}
}
func requestAgentMonitor(param interface{}, url, method string) (resp agentManagerResponse, err error) {
postData, err := json.Marshal(param)
if err != nil {
log.Logger.Error("Failed marshalling data", log.Error(err))
return resp, err
}
req, err := http.NewRequest(http.MethodPost, fmt.Sprintf("%s/deploy/%s", url, method), strings.NewReader(string(postData)))
if err != nil {
log.Logger.Error("Curl agent_monitor http request error", log.Error(err))
return resp, err
}
res, err := http.DefaultClient.Do(req)
if err != nil {
log.Logger.Error("Curl agent_monitor http response error", log.Error(err))
return resp, err
}
defer res.Body.Close()
body, _ := ioutil.ReadAll(res.Body)
log.Logger.Debug(fmt.Sprintf("Curl %s agent_monitor response : %s ", method, string(body)))
err = json.Unmarshal(body, &resp)
if err != nil {
log.Logger.Error("Curl agent_monitor unmarshal error", log.Error(err))
return resp, err
}
return resp, nil
}