/
pipeline.go
136 lines (113 loc) · 3.81 KB
/
pipeline.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
package executor
import (
"fmt"
"time"
"github.com/fristonio/xene/pkg/dag"
"github.com/fristonio/xene/pkg/errors"
"github.com/fristonio/xene/pkg/executor/cre"
"github.com/fristonio/xene/pkg/types/v1alpha1"
"github.com/sirupsen/logrus"
)
// PipelineExecutor is the type for executing pipelines.
type PipelineExecutor struct {
// name contains the name of the pipeline prefixed with the
// workflow name the pipeline is associated with.
name string
// id contains the ID of the pipeline we are currently executing.
id string
// Spec contains the specification of the pipeline.
Spec *v1alpha1.PipelineSpecWithName
// log contains the logger for the pipeline executor
log *logrus.Entry
// re contains the runtime executor for the pipeline.
re RuntimeExecutor
// status contains the status of the pipeline execution
// This is only set to a value if we are not using the store
// for save run status.
status *v1alpha1.PipelineRunStatus
}
// NewPipelineExecutor returns a new instance of PipelineExecutor for the provided
// specification.
func NewPipelineExecutor(name, id string, spec *v1alpha1.PipelineSpecWithName) *PipelineExecutor {
var re RuntimeExecutor
switch v1alpha1.Executor(spec.Executor.Type) {
case v1alpha1.ContainerExecutor:
re = cre.NewCRExecutor(string(v1alpha1.DockerExecutor), id, name, spec)
default:
logrus.Warnf("not a valid executor: %s, using default", spec.Executor.Type)
re = cre.NewCRExecutor(string(v1alpha1.DockerExecutor), id, name, spec)
}
return &PipelineExecutor{
Spec: spec,
log: logrus.WithFields(logrus.Fields{
"pipeline": name,
}),
name: name,
id: id,
re: re,
}
}
// WithoutStore sets the boolean useStore to false which will make the executor
// not perform any KVStore interactions
func (p *PipelineExecutor) WithoutStore() *PipelineExecutor {
p.re.WithoutStore()
return p
}
// Run starts running the pipeline.
// Make sure that Pipeline status contains dummy information about all the tasks
// and step in the pipeline
// it should have a blueprint of the pipeline spec.
func (p *PipelineExecutor) Run(status v1alpha1.PipelineRunStatus) {
p.log.Debugf("running PipelineExecutor")
p.status = &status
// Configure the pipeline runtime executor.
err := p.re.Configure()
if err != nil {
p.log.Errorf("error while setting up runtime executor: %s", err)
return
}
// This also transitively reduces the DAG we have made for the tasks
err = p.Spec.Resolve(p.name)
if err != nil {
p.log.Errorf("error while resolving pipeline: %s", err)
}
// Associate status with the runtime executor
p.re.WithStatus(&status)
// Walk each of task in the pipeline in the required order.
walkErrors := p.Spec.Dag.Walk(func(v dag.Vertex) *errors.MultiError {
errs := errors.NewMultiError()
task, ok := v.(*v1alpha1.TaskSpec)
if !ok {
// If any error is getting the task spec then append it to the list of errors
errs.Append(fmt.Errorf("not a valid vertex to visit, must confirm to type *TaskSpec"))
return errs
}
// Run the task using the runtime executor.
err := p.re.RunTask(task.Name(), task)
if err != nil {
errs.Append(fmt.Errorf("error while running the task: %s", err))
p.log.Errorf("error running task(%s): %s", task.Name(), err)
}
return errs
})
if len(walkErrors) > 0 {
status.Status = v1alpha1.StatusError
p.log.Errorf("error while walking task graph: \n%v", walkErrors)
} else {
status.Status = v1alpha1.StatusSuccess
}
status.EndTime = time.Now().Unix()
err = p.re.SaveStatusToStore()
if err != nil {
p.log.Errorf("error while saving status to the store: %s", err)
}
err = p.re.Shutdown()
if err != nil {
p.log.Errorf("error while shutting down runtime executor: %s", err)
return
}
}
// GetStatus returns the status of the pipeline run.
func (p *PipelineExecutor) GetStatus() *v1alpha1.PipelineRunStatus {
return p.status
}