forked from harness/harness
-
Notifications
You must be signed in to change notification settings - Fork 0
/
worker.go
257 lines (219 loc) · 7.22 KB
/
worker.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
package queue
import (
"bytes"
"fmt"
"github.com/drone/drone/pkg/build/git"
r "github.com/drone/drone/pkg/build/repo"
"github.com/drone/drone/pkg/build/script"
"github.com/drone/drone/pkg/channel"
"github.com/drone/drone/pkg/database"
. "github.com/drone/drone/pkg/model"
"github.com/drone/drone/pkg/plugin/notify"
"github.com/drone/go-github/github"
"io"
"log"
"net/url"
"path/filepath"
"time"
)
type worker struct {
runner BuildRunner
}
// work is a function that will infinitely
// run in the background waiting for tasks that
// it can pull off the queue and execute.
func (w *worker) work(queue <-chan *BuildTask) {
var task *BuildTask
for {
// get work item (pointer) from the queue
task = <-queue
if task == nil {
continue
}
// execute the task
w.execute(task)
}
}
// execute will execute the build task and persist
// the results to the datastore.
func (w *worker) execute(task *BuildTask) error {
// we need to be sure that we can recover
// from any sort panic that could occur
// to avoid brining down the entire application
defer func() {
if e := recover(); e != nil {
task.Build.Finished = time.Now().UTC()
task.Commit.Finished = time.Now().UTC()
task.Build.Duration = task.Build.Finished.Unix() - task.Build.Started.Unix()
task.Commit.Duration = task.Build.Finished.Unix() - task.Build.Started.Unix()
task.Commit.Status = "Error"
task.Build.Status = "Error"
database.SaveBuild(task.Build)
database.SaveCommit(task.Commit)
}
}()
// update commit and build status
task.Commit.Status = "Started"
task.Build.Status = "Started"
task.Build.Started = time.Now().UTC()
task.Commit.Started = time.Now().UTC()
// persist the commit to the database
if err := database.SaveCommit(task.Commit); err != nil {
return err
}
// persist the build to the database
if err := database.SaveBuild(task.Build); err != nil {
return err
}
// get settings
settings, _ := database.GetSettings()
// notification context
context := ¬ify.Context{
Repo: task.Repo,
Commit: task.Commit,
Host: settings.URL().String(),
}
// parse the build script
buildscript, err := script.ParseBuild([]byte(task.Build.BuildScript), task.Repo.Params)
if err != nil {
log.Printf("Could not parse your .drone.yml file. It needs to be a valid drone yaml file.\n\n" + err.Error() + "\n")
return err
}
// send all "started" notifications
if buildscript.Notifications != nil {
buildscript.Notifications.Send(context)
}
// Send "started" notification to Github
if err := updateGitHubStatus(task.Repo, task.Commit); err != nil {
log.Printf("error updating github status: %s\n", err.Error())
}
// make sure a channel exists for the repository,
// the commit, and the commit output (TODO)
reposlug := fmt.Sprintf("%s/%s/%s", task.Repo.Host, task.Repo.Owner, task.Repo.Name)
commitslug := fmt.Sprintf("%s/%s/%s/commit/%s/%s", task.Repo.Host, task.Repo.Owner, task.Repo.Name, task.Commit.Branch, task.Commit.Hash)
consoleslug := fmt.Sprintf("%s/%s/%s/commit/%s/%s/builds/%s", task.Repo.Host, task.Repo.Owner, task.Repo.Name, task.Commit.Branch, task.Commit.Hash, task.Build.Slug)
channel.Create(reposlug)
channel.Create(commitslug)
channel.CreateStream(consoleslug)
// notify the channels that the commit and build started
channel.SendJSON(reposlug, task.Commit)
channel.SendJSON(commitslug, task.Build)
var buf = &bufferWrapper{channel: consoleslug}
// append private parameters to the environment
// variable section of the .drone.yml file, iff
// this is not a pull request (for security purposes)
if task.Repo.Params != nil && len(task.Commit.PullRequest) == 0 {
for k, v := range task.Repo.Params {
buildscript.Env = append(buildscript.Env, k+"="+v)
}
}
defer func() {
// update the status of the commit using the
// GitHub status API.
if err := updateGitHubStatus(task.Repo, task.Commit); err != nil {
log.Printf("error updating github status: %s\n", err.Error())
}
}()
// execute the build
passed, buildErr := w.runBuild(task, buildscript, buf)
task.Build.Finished = time.Now().UTC()
task.Commit.Finished = time.Now().UTC()
task.Build.Duration = task.Build.Finished.UnixNano() - task.Build.Started.UnixNano()
task.Commit.Duration = task.Build.Finished.UnixNano() - task.Build.Started.UnixNano()
task.Commit.Status = "Success"
task.Build.Status = "Success"
task.Build.Stdout = buf.buf.String()
// if exit code != 0 set to failure
if passed {
task.Commit.Status = "Failure"
task.Build.Status = "Failure"
if buildErr != nil && task.Build.Stdout == "" {
// TODO: If you wanted to have very friendly error messages, you could do that here
task.Build.Stdout = buildErr.Error() + "\n"
}
}
// persist the build to the database
if err := database.SaveBuild(task.Build); err != nil {
return err
}
// persist the commit to the database
if err := database.SaveCommit(task.Commit); err != nil {
return err
}
// notify the channels that the commit and build finished
channel.SendJSON(reposlug, task.Commit)
channel.SendJSON(commitslug, task.Build)
channel.Close(consoleslug)
// send all "finished" notifications
if buildscript.Notifications != nil {
buildscript.Notifications.Send(context)
}
return nil
}
func (w *worker) runBuild(task *BuildTask, buildscript *script.Build, buf io.Writer) (bool, error) {
repo := &r.Repo{
Name: task.Repo.Slug,
Path: task.Repo.URL,
Branch: task.Commit.Branch,
Commit: task.Commit.Hash,
PR: task.Commit.PullRequest,
Dir: filepath.Join("/var/cache/drone/src", git.GitPath(buildscript.Git, task.Repo.Slug)),
Depth: git.GitDepth(buildscript.Git),
}
return w.runner.Run(
buildscript,
repo,
[]byte(task.Repo.PrivateKey),
task.Repo.Privileged,
buf,
)
}
// updateGitHubStatus is a helper function that will send
// the build status to GitHub using the Status API.
// see https://github.com/blog/1227-commit-status-api
func updateGitHubStatus(repo *Repo, commit *Commit) error {
// convert from drone status to github status
var message, status string
switch commit.Status {
case "Success":
status = "success"
message = "The build succeeded on drone.io"
case "Failure":
status = "failure"
message = "The build failed on drone.io"
case "Started":
status = "pending"
message = "The build is pending on drone.io"
default:
status = "error"
message = "The build errored on drone.io"
}
// get the system settings
settings, _ := database.GetSettings()
// get the user from the database
// since we need his / her GitHub token
user, err := database.GetUser(repo.UserID)
if err != nil {
return err
}
client := github.New(user.GithubToken)
client.ApiUrl = settings.GitHubApiUrl
buildUrl := getBuildUrl(settings.URL().String(), repo, commit)
return client.Repos.CreateStatus(repo.Owner, repo.Name, status, buildUrl, message, commit.Hash)
}
func getBuildUrl(host string, repo *Repo, commit *Commit) string {
branchQuery := url.Values{}
branchQuery.Set("branch", commit.Branch)
buildUrl := fmt.Sprintf("%s/%s/commit/%s?%s", host, repo.Slug, commit.Hash, branchQuery.Encode())
return buildUrl
}
type bufferWrapper struct {
buf bytes.Buffer
// name of the channel
channel string
}
func (b *bufferWrapper) Write(p []byte) (n int, err error) {
n, err = b.buf.Write(p)
channel.SendBytes(b.channel, p)
return
}