-
Notifications
You must be signed in to change notification settings - Fork 0
/
pipeline_action.go
219 lines (190 loc) · 6.54 KB
/
pipeline_action.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
package pipeline
import (
"encoding/json"
"fmt"
"github.com/ovh/cds/engine/api/action"
"github.com/ovh/cds/engine/api/database"
"github.com/ovh/cds/engine/log"
"github.com/ovh/cds/sdk"
)
// DeletePipelineActionByStage Delete all action from a stage
func DeletePipelineActionByStage(db database.QueryExecuter, stageID int64, userID int64) error {
pipelineActionsID, errSelect := selectAllPipelineActionID(db, stageID)
if errSelect != nil {
return errSelect
}
if err := DeleteActionBuild(db, pipelineActionsID); err != nil {
return err
}
// For all pipeline_action in stage
for i := range pipelineActionsID {
var id int64
var actionType string
// Fetch id and type of action linked to pipeline_action so we can delete it if it's a joined action
query := `SELECT action.id, action.type FROM action JOIN pipeline_action ON pipeline_action.action_id = action.id WHERE pipeline_action.id = $1`
if err := db.QueryRow(query, pipelineActionsID[i]).Scan(&id, &actionType); err != nil {
return err
}
// Delete pipeline_action
query = `DELETE FROM pipeline_action WHERE id = $1`
if _, err := db.Exec(query, pipelineActionsID[i]); err != nil {
return err
}
// Then if action is a Joined Action delete action as well
if actionType != sdk.JoinedAction {
continue
}
log.Info("DeletePipelineActionByStage> Deleting action %d\n", id)
if err := action.DeleteAction(db, id, userID); err != nil {
return err
}
}
return nil
}
func selectAllPipelineActionID(db database.QueryExecuter, pipelineStageID int64) ([]int64, error) {
var pipelineActionIDs []int64
query := `SELECT id FROM "pipeline_action"
WHERE pipeline_stage_id = $1`
rows, err := db.Query(query, pipelineStageID)
if err != nil {
return pipelineActionIDs, err
}
defer rows.Close()
for rows.Next() {
var pipelineActionID int64
err = rows.Scan(&pipelineActionID)
if err != nil {
return pipelineActionIDs, err
}
pipelineActionIDs = append(pipelineActionIDs, pipelineActionID)
}
return pipelineActionIDs, nil
}
//InsertPipelineJob insert data in pipeline_action table
// DEPRECATED
func InsertPipelineJob(db database.QueryExecuter, pip *sdk.Pipeline, s *sdk.Stage, a *sdk.Action) error {
query := `INSERT INTO pipeline_action (pipeline_stage_id, action_id, args, enabled) VALUES ($1, $2, $3, $4) RETURNING id`
args, err := json.Marshal(a.Parameters)
if err != nil {
return err
}
if err := db.QueryRow(query, s.ID, a.ID, string(args), a.Enabled).Scan(&a.PipelineActionID); err != nil {
return err
}
return nil
}
// InsertPipelineAction insert an action in a pipeline
func InsertPipelineAction(db database.QueryExecuter, projectKey, pipelineName string, actionID int64, args string, stageID int64) (int64, error) {
p, err := LoadPipeline(db, projectKey, pipelineName, true)
if err != nil {
return 0, fmt.Errorf("Cannot LoadPipeline> %s", err)
}
var stage *sdk.Stage
//Create stage if stageID == 0
if stageID == 0 {
stage = &sdk.Stage{
Name: fmt.Sprintf("Stage %d", len(p.Stages)+1),
PipelineID: p.ID,
BuildOrder: len(p.Stages) + 1,
Enabled: true,
}
if err := InsertStage(db, stage); err != nil {
return 0, fmt.Errorf("Cannot InsertStage on pipeline %d> %s", p.ID, err)
}
stageID = stage.ID
} else {
//Else load the stage
stage, err = LoadStage(db, p.ID, stageID)
if err != nil {
return 0, err
}
}
//Reload action
a, err := action.LoadActionByID(db, actionID)
if err != nil {
return 0, err
}
//Insert in pipeline_action table
if err := InsertPipelineJob(db, p, stage, a); err != nil {
return 0, err
}
return a.PipelineActionID, UpdatePipelineLastModified(db, p)
}
// InsertJob Insert a new Job ( pipeline_action + joinedAction )
func InsertJob(db database.QueryExecuter, job *sdk.Job, stageID int64, pip *sdk.Pipeline) error {
// Insert Joined Action
job.Action.Type = sdk.JoinedAction
job.Action.Enabled = true
log.Debug("InsertJob> Insert Action %s on pipeline %s with %d children", job.Action.Name, pip.Name, len(job.Action.Actions))
if err := action.InsertAction(db, &job.Action, false); err != nil {
return err
}
// Create Stage if needed
var stage *sdk.Stage
if stageID == 0 {
stage = &sdk.Stage{
Name: fmt.Sprintf("Stage %d", len(pip.Stages)+1),
PipelineID: pip.ID,
BuildOrder: len(pip.Stages) + 1,
Enabled: true,
}
log.Debug("InsertJob> Creating stage %s on pipeline %s", stage.Name, pip.Name)
if err := InsertStage(db, stage); err != nil {
return fmt.Errorf("Cannot InsertStage on pipeline %d> %s", pip.ID, err)
}
} else {
//Else load the stage
var errLoad error
stage, errLoad = LoadStage(db, pip.ID, stageID)
if errLoad != nil {
return errLoad
}
log.Debug("InsertJob> Load existing stage %s on pipeline %s", stage.Name, pip.Name)
}
job.PipelineStageID = stage.ID
// Create pipeline action
query := `INSERT INTO pipeline_action (pipeline_stage_id, action_id, enabled) VALUES ($1, $2, $3) RETURNING id`
if err := db.QueryRow(query, job.PipelineStageID, job.Action.ID, job.Enabled).Scan(&job.PipelineActionID); err != nil {
return err
}
return nil
}
// UpdateJob updates the job by actionData.PipelineActionID and actionData.ID
func UpdateJob(db database.QueryExecuter, job *sdk.Job, userID int64) error {
clearJoinedAction, err := action.LoadActionByID(db, job.Action.ID)
if err != nil {
return err
}
if clearJoinedAction.Type != sdk.JoinedAction {
return sdk.ErrForbidden
}
query := `UPDATE pipeline_action set action_id=$1, pipeline_stage_id=$2, enabled=$4 WHERE id=$3`
_, err = db.Exec(query, job.Action.ID, job.PipelineStageID, job.PipelineActionID, job.Enabled)
if err != nil {
return err
}
return action.UpdateActionDB(db, &job.Action, userID)
}
// DeleteJob Delete a job ( action + pipeline_action )
func DeleteJob(db database.QueryExecuter, job sdk.Job, userID int64) error {
return action.DeleteAction(db, job.Action.ID, userID)
}
// UpdatePipelineAction Update an action in a pipeline
func UpdatePipelineAction(db database.Executer, action sdk.Action, args string) error {
query := `UPDATE pipeline_action set action_id=$1, args=$2, pipeline_stage_id=$3, enabled=$5 WHERE id=$4`
_, err := db.Exec(query, action.ID, args, action.PipelineStageID, action.PipelineActionID, action.Enabled)
if err != nil {
return err
}
return nil
}
// DeletePipelineAction Delete an action in a pipeline
func DeletePipelineAction(db database.QueryExecuter, pipelineActionID int64) error {
// Delete pipelineAction by buildOrder
query := `DELETE FROM pipeline_action WHERE id = $1`
_, err := db.Exec(query, pipelineActionID)
if err != nil {
return err
}
return nil
}