/
fast.go
125 lines (109 loc) · 3.63 KB
/
fast.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
package atask
import (
"context"
"encoding/json"
"fmt"
"github.com/bytedance/Elkeid/server/manager/biz/midware"
"github.com/bytedance/Elkeid/server/manager/infra"
. "github.com/bytedance/Elkeid/server/manager/infra/def"
"github.com/bytedance/Elkeid/server/manager/infra/ylog"
"github.com/bytedance/Elkeid/server/manager/internal/dbtask"
"github.com/levigross/grequests"
"github.com/rs/xid"
"go.mongodb.org/mongo-driver/bson"
"time"
)
func getSvrAddr(agentID string) (host string, err error) {
host, err = infra.Grds.Get(context.Background(), agentID).Result()
if err != nil {
ylog.Infof("getSvrAddr", "get server addr of %s from redis error %s", agentID, err.Error())
collection := infra.MongoClient.Database(infra.MongoDatabase).Collection(infra.AgentHeartBeatCollection)
var heartbeat AgentHBInfo
err = collection.FindOne(context.Background(), bson.M{"agent_id": agentID}).Decode(&heartbeat)
if err != nil {
return "", err
}
return fmt.Sprintf("%s:%d", heartbeat.SourceIp, heartbeat.SourcePort), nil
}
return host, err
}
func GenerateToken() string {
return xid.New().String()
}
func generateTaskID() string {
return fmt.Sprintf("task-%s", xid.New().String())
}
func sendAgentCommand(agentID string, request *ConfigRequest) (string, interface{}, *grequests.Response, error) {
body := &AgentQuickTask{AgentID: agentID}
body.Command = *request
addr, err := getSvrAddr(agentID)
if err != nil {
return "", nil, nil, err
}
url := fmt.Sprintf("https://%s/command/", addr)
option := midware.SvrAuthRequestOption()
option.JSON = body
option.RequestTimeout = 5 * time.Second
r, err := grequests.Post(url, option)
if err != nil {
ylog.Errorf("SendQuickTask", "request url %s, body %#v, error %s", url, body, err.Error())
return url, body, nil, err
}
if r.StatusCode != 200 {
ylog.Errorf("QuickTaskTask", "request url %s, body %#v, code %d, resp %s", url, body, r.StatusCode, r.String())
return url, body, nil, fmt.Errorf("resp code is %d, resp body is %s", r.StatusCode, r.String())
}
rsp := SvrResponse{}
err = json.Unmarshal(r.Bytes(), &rsp)
if err != nil {
ylog.Errorf("QuickTaskTask", "request url %s, body %#v, resp %s, error %s", url, body, r.String(), err.Error())
return url, body, nil, err
}
if rsp.Code != 0 {
return url, body, nil, fmt.Errorf("%s", rsp.Message)
}
return url, body, r, nil
}
// SendFastConfig send new configs to agent.
// SendFastTask send a task to agent and return taskID if needAgentResp is true.
func SendFastTask(agentID string, taskMsg *AgentTaskMsg, needAgentResp bool, timeout int64, appendData map[string]interface{}) (string, error) {
request := &ConfigRequest{Task: *taskMsg}
if needAgentResp {
request.Task.Token = GenerateToken()
} else {
request.Task.Token = ""
}
url, rBody, resp, err := sendAgentCommand(agentID, request)
if err != nil {
return "", err
}
if needAgentResp {
taskID := generateTaskID()
now := time.Now().Unix()
//Write the subTask back to db for reconciliation
tmp := &AgentSubTask{
TaskType: TypeAgentTask,
TaskDataType: taskMsg.DataType,
TaskData: rBody,
AgentID: agentID,
Token: request.Task.Token,
TaskID: taskID,
TaskUrl: url,
JobStartTime: now,
Status: TaskStatusSuccess,
UpdateTime: now,
InsertTime: now,
TaskResult: "",
TaskResp: resp.String(),
AppendData: appendData,
}
if timeout > 0 {
tmp.JobTimeOutTime = tmp.JobStartTime + timeout
}
dbtask.SubTaskAsyncWrite(tmp)
return taskID, nil
}
return "", nil
}
type TaskResFuc func(subTask *AgentSubTask, err error) error
// SendFastTaskCallBack send a task to agent and call TaskResFuc before return.