/
daemon.go
212 lines (199 loc) · 6.37 KB
/
daemon.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
package comm
import (
"fmt"
"github.com/hanc00l/nemo_go/pkg/conf"
"github.com/hanc00l/nemo_go/pkg/filesync"
"github.com/hanc00l/nemo_go/pkg/logging"
"github.com/hanc00l/nemo_go/pkg/utils"
"github.com/shirou/gopsutil/v3/process"
"os"
"os/exec"
"path/filepath"
"time"
)
type WorkerOption struct {
Concurrency int `json:"concurrency" form:"concurrency"`
WorkerPerformance int `json:"worker_performance" form:"worker_performance"`
WorkerRunTaskMode string `json:"worker_run_task_mode" form:"worker_run_task_mode"`
TaskWorkspaceGUID string `json:"task_workspace_guid" form:"task_workspace_guid"`
WorkerTopic map[string]struct{} `json:"-"`
TLSEnabled bool `json:"-"`
DefaultConfigFile string `json:"default_config_file" form:"default_config_file"`
NoProxy bool `json:"no_proxy" form:"no_proxy"`
}
type WorkerDaemonOption struct {
Concurrency int
WorkerPerformance int
NoFilesync bool
NoProxy bool
WorkerRunTaskMode string
TaskWorkspaceGUID string
ManualSyncHost string
ManualSyncPort string
ManualSyncAuth string
TLSEnabled bool
DefaultConfigFile string
}
var cmd *exec.Cmd
var WorkerName string
var DaemonRunOption *WorkerDaemonOption
var WorkerRunOption *WorkerOption
// WatchWorkerProcess worker进程状态监控
func WatchWorkerProcess() {
if cmd == nil {
return
}
// 检查worker进程是否存在
p, err := process.NewProcess(int32(cmd.Process.Pid))
if err != nil {
logging.RuntimeLog.Warning("detected worker process not exist")
logging.CLILog.Warning("detected worker process not exist")
if KillWorker() {
StartWorker()
}
return
}
// 获取worker进程状态
status, err1 := p.Status()
if err1 != nil {
logging.CLILog.Error(err)
logging.RuntimeLog.Error(err)
return
}
//fmt.Println(status)
for _, s := range status {
// 如果发现进程挂掉了,重启worker
if s == process.Zombie {
logging.RuntimeLog.Warning("detected worker zombie status")
logging.CLILog.Warning("detected worker zombie status")
if KillWorker() {
StartWorker()
}
}
}
}
// StartWorkerDaemon 启动worker的daemon
func StartWorkerDaemon() {
fileSyncServer := conf.GlobalWorkerConfig().FileSync
if !DaemonRunOption.NoFilesync {
logging.CLILog.Info("start file sync...")
filesync.WorkerStartupSync(fileSyncServer.Host, fmt.Sprintf("%d", fileSyncServer.Port), fileSyncServer.AuthKey)
}
if success := StartWorker(); success == false {
return
}
for {
time.Sleep(15 * time.Second)
replay, err := DoDaemonKeepAlive()
if err != nil {
logging.RuntimeLog.Error("daemon keep alive fail")
logging.CLILog.Error("daemon keep alive fail")
continue
}
WatchWorkerProcess()
// 收到server更新运行参数的命令
if replay.ManualUpdateOptionFlag && replay.WorkerRunOption != nil {
DaemonRunOption.NoProxy = replay.WorkerRunOption.NoProxy
DaemonRunOption.Concurrency = replay.WorkerRunOption.Concurrency
DaemonRunOption.WorkerPerformance = replay.WorkerRunOption.WorkerPerformance
DaemonRunOption.DefaultConfigFile = replay.WorkerRunOption.DefaultConfigFile
DaemonRunOption.WorkerRunTaskMode = replay.WorkerRunOption.WorkerRunTaskMode
DaemonRunOption.TaskWorkspaceGUID = replay.WorkerRunOption.TaskWorkspaceGUID
//更新运行参数后,强制重启worker
replay.ManualReloadFlag = true
}
// 收到server的手动重启worker命令,执行停止worker、文件同步、重启worker
if replay.ManualReloadFlag {
if KillWorker() {
if !DaemonRunOption.NoFilesync {
logging.CLILog.Info("manual reload to start file sync...")
logging.RuntimeLog.Info("manual reload to start file sync...")
filesync.WorkerStartupSync(fileSyncServer.Host, fmt.Sprintf("%d", fileSyncServer.Port), fileSyncServer.AuthKey)
}
StartWorker()
}
// 忽略文件同步(如果有)
continue
}
if !DaemonRunOption.NoFilesync && replay.ManualFileSyncFlag {
logging.CLILog.Info("manual start file sync...")
logging.RuntimeLog.Info("manual start file sync...")
filesync.WorkerStartupSync(fileSyncServer.Host, fmt.Sprintf("%d", fileSyncServer.Port), fileSyncServer.AuthKey)
}
}
}
// KillWorker 停止当前worker进程
func KillWorker() bool {
if cmd == nil {
return true
}
err := cmd.Process.Kill()
if err != nil {
msg := fmt.Sprintf("kill worker fail,pid:%d,%v", cmd.Process.Pid, err)
logging.RuntimeLog.Error(msg)
logging.CLILog.Error(msg)
return false
}
if err = cmd.Wait(); err != nil {
msg := fmt.Sprintf("kill worker pid:%d,%v", cmd.Process.Pid, err)
logging.RuntimeLog.Warningf(msg)
logging.CLILog.Warningf(msg)
}
msg := fmt.Sprintf("kill worker ok,pid:%d", cmd.Process.Pid)
logging.RuntimeLog.Info(msg)
logging.CLILog.Info(msg)
cmd = nil
return true
}
// StartWorker 启动worker进程
func StartWorker() bool {
workerBin := utils.GetThirdpartyBinNameByPlatform(utils.Worker)
//绝对路径
workerPathName, err := filepath.Abs(filepath.Join(conf.GetRootPath(), workerBin))
if err != nil {
logging.RuntimeLog.Error(err)
logging.CLILog.Error(err)
return false
}
var cmdArgs = []string{
"-c", fmt.Sprintf("%d", DaemonRunOption.Concurrency),
"-p", fmt.Sprintf("%d", DaemonRunOption.WorkerPerformance),
"-m", DaemonRunOption.WorkerRunTaskMode,
"-f", DaemonRunOption.DefaultConfigFile,
}
if DaemonRunOption.TaskWorkspaceGUID != "" {
cmdArgs = append(cmdArgs, "-w", DaemonRunOption.TaskWorkspaceGUID)
}
if DaemonRunOption.TLSEnabled {
cmdArgs = append(cmdArgs, "-tls")
}
if DaemonRunOption.NoProxy {
cmdArgs = append(cmdArgs, "-np")
}
cmd = exec.Command(workerPathName, cmdArgs...)
cmd.Stdout = os.Stdout
cmd.Stderr = os.Stderr
if err = cmd.Start(); err != nil {
logging.CLILog.Infof("start worker fail: %v", err)
logging.RuntimeLog.Infof("start worker fail: %v", err)
return false
}
WorkerName = GetWorkerNameByDaemon()
logging.CLILog.Infof("start worker pid: %d", cmd.Process.Pid)
logging.RuntimeLog.Infof("start worker pid: %d", cmd.Process.Pid)
return true
}
func GetWorkerNameByDaemon() string {
return getWorkerName(cmd.Process.Pid)
}
func GetWorkerNameBySelf() string {
return getWorkerName(os.Getpid())
}
func getWorkerName(pid int) string {
hostIP, _ := utils.GetOutBoundIP()
if hostIP == "" {
hostIP, _ = utils.GetClientIp()
}
hostName, _ := os.Hostname()
return fmt.Sprintf("%s@%s#%d", hostName, hostIP, pid)
}