This repository has been archived by the owner on Sep 7, 2022. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathparallel_job.go
74 lines (65 loc) · 1.84 KB
/
parallel_job.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
package taskexecutor
import (
"fmt"
"time"
"github.com/Attsun1031/jobnetes/dao"
"github.com/Attsun1031/jobnetes/model"
"github.com/Attsun1031/jobnetes/utils/log"
"github.com/jinzhu/gorm"
)
type ParallelJobTaskExecutor struct {
Task *model.ParallelTask
TaskExecutionDao dao.TaskExecutionDao
TaskExecutorFactory Factory
}
func (executor *ParallelJobTaskExecutor) Execute(we *model.WorkflowExecution, db *gorm.DB, input string, parentId uint, prevId uint) (*model.TaskExecution, error) {
task := executor.Task
log.Logger.Infof("Requesting parallel task. ExecutionName=%v Type=%v ParentId=%v PrevId=%v", task.Name, task.GetJobType(), parentId, prevId)
// create execution record
startedAt := time.Now()
te := &model.TaskExecution{
WorkflowExecution: we,
ParentTaskExecutionID: parentId,
PrevTaskExecutionID: prevId,
TaskName: task.Name,
TaskType: task.GetJobType(),
StartedAt: &startedAt,
Status: model.TaskRunning,
Input: input,
Output: "{}",
}
err := executor.TaskExecutionDao.Update(te, db)
if err != nil {
return nil, err
}
name := fmt.Sprintf(
"%s-%d-%d-%s",
task.Name,
we.ID,
te.ID,
time.Now().Format("2006-01-02-15-04-05-99"))
te.ExecutionName = name
err = executor.TaskExecutionDao.Update(te, db)
if err != nil {
return nil, err
}
executors := make([]TaskExecutor, len(task.TaskSets))
for _, taskSet := range task.TaskSets {
startTask := taskSet[0]
executor, err := executor.TaskExecutorFactory.GetTaskExecutor(startTask)
if err != nil {
return nil, err
}
executors = append(executors, executor)
}
for _, executor := range executors {
if executor == nil {
continue
}
_, err = executor.Execute(we, db, input, te.ID, prevId)
if err != nil {
return nil, err
}
}
return te, nil
}