-
Notifications
You must be signed in to change notification settings - Fork 1
/
executor.go
369 lines (332 loc) · 12.3 KB
/
executor.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
package executor
import (
"context"
"fmt"
aline_context "github.com/hamster-shared/aline-engine/ctx"
"os"
"path"
"runtime"
"strconv"
"strings"
"sync"
"time"
"github.com/hamster-shared/aline-engine/action"
"github.com/hamster-shared/aline-engine/consts"
jober "github.com/hamster-shared/aline-engine/job"
"github.com/hamster-shared/aline-engine/logger"
"github.com/hamster-shared/aline-engine/model"
"github.com/hamster-shared/aline-engine/output"
"github.com/hamster-shared/aline-engine/utils"
)
type IExecutor interface {
// Execute 执行任务
Execute(id int, job *model.Job) error
//SendResultToQueue 发送结果到队列
// SendResultToQueue(job *model.JobDetail)
Cancel(id int, job *model.Job) error
}
type Executor struct {
cancelMap map[string]func() // key: jobName/jobID, value: cancelFunc
StatusChan chan model.StatusChangeMessage
stepTimerMap sync.Map // key: jobName/jobID, value: stepTimer
}
// Execute 执行任务
func (e *Executor) Execute(id int, job *model.Job) error {
// 1. 解析对 pipeline 进行任务排序
stages, err := job.StageSort()
jobWrapper := &model.JobDetail{
Id: id,
Job: *job,
Status: model.STATUS_NOTRUN,
Stages: stages,
ActionResult: model.ActionResult{
Artifactorys: make([]model.Artifactory, 0),
Reports: make([]model.Report, 0),
},
}
// 分支太多,不确定会从哪个分支 return,所以使用 defer,保证一定会将最终结果发送到 StatusChan
defer func() {
// 将执行结果发送到 StatusChan,worker 会监听该 chan,将结果发送到 grpc server
e.StatusChan <- model.NewStatusChangeMsg(jobWrapper.Name, jobWrapper.Id, jobWrapper.Status)
logger.Infof("send status change message to chan, job name: %s, job id: %d, status: %d", jobWrapper.Name, jobWrapper.Id, jobWrapper.Status)
// step 定时器也需要删除,避免出现意料之外的报错
e.stepTimerMap.Delete(utils.FormatJobToString(jobWrapper.Name, jobWrapper.Id))
}()
if err != nil {
return err
}
go e.handleTimerListener()
// 2. 初始化 执行器的上下文
env := make([]string, 0)
env = append(env, "PIPELINE_NAME="+job.Name)
env = append(env, "PIPELINE_ID="+strconv.Itoa(id))
homeDir, _ := os.UserHomeDir()
engineContext := make(map[string]any)
engineContext["hamsterRoot"] = path.Join(homeDir, "workdir")
workdir := path.Join(engineContext["hamsterRoot"].(string), job.Name)
engineContext["workdir"] = workdir
err = os.MkdirAll(workdir, os.ModePerm)
engineContext["name"] = job.Name
engineContext["id"] = fmt.Sprintf("%d", id)
engineContext["env"] = env
if job.Parameter == nil {
job.Parameter = make(map[string]string)
}
engineContext["parameter"] = job.Parameter
engineContext["userId"] = job.UserId
ctx, cancel := context.WithCancel(context.WithValue(context.Background(), "stack", engineContext))
// 将取消 hook 记录到内存中,用于中断程序
e.cancelMap[strings.Join([]string{job.Name, strconv.Itoa(id)}, "/")] = cancel
// 队列堆栈
var stack utils.Stack[action.ActionHandler]
jobWrapper.Status = model.STATUS_RUNNING
jobWrapper.StartTime = time.Now()
executeAction := func(ah action.ActionHandler, job *model.JobDetail) (err error) {
// 延迟处理的函数
defer func() {
// 发生宕机时,获取 panic 传递的上下文并打印
rErr := recover()
switch rErr.(type) {
case runtime.Error: // 运行时错误
fmt.Println("runtime error:", rErr)
logger.Errorf("runtime error: %s", rErr)
err = fmt.Errorf("runtime error: %s", rErr)
default: // 非运行时错误
// do nothing
}
}()
if jobWrapper.Status != model.STATUS_RUNNING {
return nil
}
if ah == nil {
logger.Errorf("action handler is nil, job name: %s, job id: %d", job.Name, job.Id)
return nil
}
err = ah.Pre()
if err != nil {
job.Status = model.STATUS_FAIL
logger.Errorf("action pre hook error, job name: %s, job id: %d, error: %s", job.Name, job.Id, err.Error())
fmt.Println(err)
return err
}
logger.Infof("action pre hook success, job name: %s, job id: %d", job.Name, job.Id)
stack.Push(ah)
actionResult, err := ah.Hook()
if actionResult != nil && len(actionResult.Artifactorys) > 0 {
jobWrapper.Artifactorys = append(jobWrapper.Artifactorys, actionResult.Artifactorys...)
}
if actionResult != nil && len(actionResult.Reports) > 0 {
jobWrapper.Reports = append(jobWrapper.Reports, actionResult.Reports...)
}
if actionResult != nil && actionResult.CodeInfo != "" {
jobWrapper.CodeInfo = actionResult.CodeInfo
}
if actionResult != nil && len(actionResult.Deploys) > 0 {
jobWrapper.Deploys = append(jobWrapper.Deploys, actionResult.Deploys...)
}
if actionResult != nil && len(actionResult.BuildData) > 0 {
jobWrapper.BuildData = append(jobWrapper.BuildData, actionResult.BuildData...)
}
if actionResult != nil && len(actionResult.MetaScanData) > 0 {
jobWrapper.MetaScanData = append(jobWrapper.MetaScanData, actionResult.MetaScanData...)
}
if err != nil {
job.Status = model.STATUS_FAIL
return err
}
return err
}
jobWrapper.Output = output.New(job.Name, jobWrapper.Id)
var jobDone = make(chan struct{})
defer close(jobDone)
// 定时保存运行状态到 job detail,以更新 step 的运行时间
go func(jobW *model.JobDetail) {
for {
select {
case <-jobDone:
return
default:
for i := range jobW.Stages {
for j := range jobW.Stages[i].Stage.Steps {
if jobW.Stages[i].Stage.Steps[j].Status == model.STATUS_RUNNING {
jobW.Stages[i].Stage.Steps[j].Duration = int64(time.Since(jobW.Stages[i].Stage.Steps[j].StartTime).Milliseconds())
// logger.Tracef("job: %s, step: %s, duration: %d", jobW.Name, jobW.Stages[i].Stage.Steps[j].Name, jobW.Stages[i].Stage.Steps[j].Duration)
}
}
}
jober.SaveJobDetail(jobW.Name, jobW)
time.Sleep(time.Second * 2)
}
}
}(jobWrapper)
for index, stageWapper := range jobWrapper.Stages {
//TODO ... stage 的输出也需要换成堆栈方式
logger.Info("stage: {")
logger.Infof(" // %s", stageWapper.Name)
stageWapper.Status = model.STATUS_RUNNING
stageWapper.StartTime = time.Now()
jobWrapper.Stages[index] = stageWapper
jobWrapper.Output.NewStage(stageWapper.Name)
jober.SaveJobDetail(jobWrapper.Name, jobWrapper)
for index, step := range stageWapper.Stage.Steps {
var ah action.ActionHandler
if step.RunsOn != "" {
ah = action.NewDockerEnv(step, ctx, jobWrapper.Output)
err = executeAction(ah, jobWrapper)
if err != nil {
break
}
}
stageWapper.Stage.Steps[index].StartTime = time.Now()
stageWapper.Stage.Steps[index].Status = model.STATUS_RUNNING
jober.SaveJobDetail(jobWrapper.Name, jobWrapper)
actionContext := aline_context.NewActionContext(step, ctx, jobWrapper.Output)
// 如果 step 超时,则调用 cancel,在这里存储该 job 的计时器
// 每次新 step 时,都会重新设置该计时器,所以不需要存储到底是哪个 step
e.stepTimerMap.Store(utils.FormatJobToString(jobWrapper.Name, jobWrapper.Id), newStepTimer())
if step.Uses == "" || step.Uses == "shell" {
ah = action.NewShellAction(step, ctx, jobWrapper.Output)
} else if step.Uses == "git-checkout" {
ah = action.NewGitAction(step, ctx, jobWrapper.Output)
} else if step.Uses == "hamster-ipfs" {
ah = action.NewIpfsAction(step, ctx, jobWrapper.Output)
} else if step.Uses == "hamster-pinata-ipfs" {
ah = action.NewPinataIpfsAction(step, ctx, jobWrapper.Output)
} else if step.Uses == "hamster-artifactory" {
ah = action.NewArtifactoryAction(step, ctx, jobWrapper.Output)
//} else if step.Uses == "deploy-contract" {
// ah = action.NewTruffleDeployAction(step, ctx, jobWrapper.Output)
} else if step.Uses == "image-build" {
ah = action.NewImageBuildAction(step, ctx, jobWrapper.Output)
} else if step.Uses == "image-push" {
ah = action.NewImagePushAction(step, ctx, jobWrapper.Output)
} else if step.Uses == "k8s-frontend-deploy" {
ah = action.NewK8sDeployAction(step, ctx, jobWrapper.Output)
} else if step.Uses == "k8s-assign-domain" {
ah = action.NewK8sIngressAction(step, ctx, jobWrapper.Output)
} else if step.Uses == "metascan_action" {
ah = action.NewMetaScanCheckAction(step, ctx, jobWrapper.Output)
} else if step.Uses == "sol-profiler-check" {
ah = action.NewSolProfilerAction(step, ctx, jobWrapper.Output)
} else if step.Uses == "solhint-check" {
ah = action.NewSolHintAction(step, ctx, jobWrapper.Output)
} else if step.Uses == "mythril-check" {
ah = action.NewMythRilAction(step, ctx, jobWrapper.Output)
} else if step.Uses == "slither-check" {
ah = action.NewSlitherAction(step, ctx, jobWrapper.Output)
} else if step.Uses == "check-aggregation" {
ah = action.NewCheckAggregationAction(step, ctx, jobWrapper.Output)
} else if step.Uses == "deploy-ink-contract" {
ah = action.NewInkAction(step, ctx, jobWrapper.Output)
} else if step.Uses == "frontend-check" {
ah = action.NewEslintAction(step, ctx, jobWrapper.Output)
} else if step.Uses == "eth-gas-reporter" {
ah = action.NewEthGasReporterAction(step, ctx, jobWrapper.Output)
} else if step.Uses == "aptos-check" {
ah = action.NewMoveProverAction(step, ctx, jobWrapper.Output)
} else if step.Uses == "workdir" {
ah = action.NewWorkdirAction(step, ctx, jobWrapper.Output)
} else if step.Uses == "openai" {
ah = action.NewOpenaiAction(step, ctx, jobWrapper.Output)
} else if step.Uses == "icp-deploy" {
ah = action.NewICPDeployAction(actionContext)
} else if strings.Contains(step.Uses, "/") {
ah = action.NewRemoteAction(step, ctx)
}
jobWrapper.Output.NewStep(step.Name)
err = executeAction(ah, jobWrapper)
dataTime := time.Since(stageWapper.Stage.Steps[index].StartTime)
stageWapper.Stage.Steps[index].Duration = dataTime.Milliseconds()
if err != nil {
stageWapper.Stage.Steps[index].Status = model.STATUS_FAIL
break
}
stageWapper.Stage.Steps[index].Status = model.STATUS_SUCCESS
jober.SaveJobDetail(jobWrapper.Name, jobWrapper)
}
for !stack.IsEmpty() {
ah, _ := stack.Pop()
_ = ah.Post()
}
if err != nil {
stageWapper.Status = model.STATUS_FAIL
} else {
stageWapper.Status = model.STATUS_SUCCESS
}
dataTime := time.Since(stageWapper.StartTime)
stageWapper.Duration = dataTime.Milliseconds()
jobWrapper.Stages[index] = stageWapper
jober.SaveJobDetail(jobWrapper.Name, jobWrapper)
logger.Info("}")
if err != nil {
cancel()
break
}
}
jobWrapper.Output.Done()
delete(e.cancelMap, job.Name)
if err == nil {
jobWrapper.Status = model.STATUS_SUCCESS
} else {
jobWrapper.Status = model.STATUS_FAIL
jobWrapper.Error = err.Error()
}
dataTime := time.Since(jobWrapper.StartTime)
jobWrapper.Duration = dataTime.Milliseconds()
jober.SaveJobDetail(jobWrapper.Name, jobWrapper)
return err
}
// Cancel 取消
func (e *Executor) Cancel(jobName string, id int) error {
cancel, ok := e.cancelMap[strings.Join([]string{jobName, strconv.Itoa(id)}, "/")]
if ok {
cancel()
// 删除
delete(e.cancelMap, strings.Join([]string{jobName, strconv.Itoa(id)}, "/"))
} else {
logger.Errorf("job cancel function not found: %s/%d", jobName, id)
}
e.StatusChan <- model.NewStatusChangeMsg(jobName, id, model.STATUS_STOP)
return nil
}
func (e *Executor) GetJobStatus(jobName string, jobID int) (model.Status, error) {
_, ok := e.cancelMap[strings.Join([]string{jobName, strconv.Itoa(jobID)}, "/")]
if ok {
return model.STATUS_RUNNING, nil
}
return model.STATUS_NOTRUN, fmt.Errorf("job not found")
}
// 定时监听,以在任务超时时将其取消
func (e *Executor) handleTimerListener() {
for {
e.stepTimerMap.Range(func(key, value any) bool {
timer := value.(*stepTimer)
if timer.isTimeout() {
name, id, err := utils.GetJobNameAndIDFromFormatString(key.(string))
if err != nil {
logger.Errorf("get job name and id from format string error: %v, key: %s", err, key.(string))
return true
}
err = e.Cancel(name, id)
if err != nil {
logger.Errorf("cancel job error: %v, key: %s", err, key.(string))
}
e.stepTimerMap.Delete(key)
}
return true
})
time.Sleep(time.Minute)
}
}
type stepTimer struct {
startTime time.Time
}
func newStepTimer() *stepTimer {
return &stepTimer{
startTime: time.Now(),
}
}
// 如果单个步骤超时了,就取消,超时时间暂定为 30 分钟
func (t *stepTimer) isTimeout() bool {
return time.Since(t.startTime) > time.Minute*consts.STEP_TIMEOUT_MINUTE
}