/
job.go
315 lines (259 loc) · 10.3 KB
/
job.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
package biscepter
import (
"context"
"errors"
"fmt"
"io"
"math"
"os"
"os/exec"
"strings"
"sync"
"time"
_ "crypto/sha1"
"github.com/creasty/defaults"
"github.com/docker/docker/api/types"
"github.com/docker/docker/client"
"github.com/opencontainers/go-digest"
"github.com/sirupsen/logrus"
"golang.org/x/sync/semaphore"
"gopkg.in/yaml.v3"
)
type jobYaml struct {
Repository string `yaml:"repository"`
GoodCommit string `yaml:"goodCommit"`
BadCommit string `yaml:"badCommit"`
Port int `yaml:"port"`
Ports []int `yaml:"ports"`
Healthcheck []healthcheckYaml `yaml:"healthcheck"`
Dockerfile string `yaml:"dockerfile"`
DockerfilePath string `yaml:"dockerfilePath"`
BuildCost float64 `yaml:"buildCost"`
}
// GetJobFromConfig reads in a job config in yaml format from a reader and initializes the corresponding job struct
func GetJobFromConfig(r io.Reader) (*Job, error) {
var config jobYaml
// Read in yaml
decoder := yaml.NewDecoder(r)
if err := decoder.Decode(&config); err != nil {
return nil, err
}
// Convert to Job struct
job := Job{
BuildCost: config.BuildCost,
GoodCommit: config.GoodCommit,
BadCommit: config.BadCommit,
Dockerfile: config.Dockerfile,
DockerfilePath: config.DockerfilePath,
Repository: config.Repository,
}
job.Ports = config.Ports
if config.Port != 0 {
job.Ports = []int{config.Port}
}
// Set all the healthchecks
checkTypes := map[string]HealthcheckType{
"http": HttpGet200,
"script": Script,
}
for _, check := range config.Healthcheck {
if err := defaults.Set(&check); err != nil {
return nil, err
}
checkType, ok := checkTypes[strings.ToLower(check.Type)]
if !ok {
return nil, fmt.Errorf("invalid check type supplied for healthcheck %s", check.Type)
}
job.Healthchecks = append(job.Healthchecks, Healthcheck{
Port: check.Port,
CheckType: checkType,
Data: check.Data,
Config: HealthcheckConfig{
Retries: check.Retries,
Backoff: check.Backoff * time.Millisecond,
BackoffIncrement: check.BackoffIncrement * time.Millisecond,
MaxBackoff: check.MaxBackoff * time.Millisecond,
},
})
}
return &job, nil
}
// A job represents a blueprint for replicas, which are then used to bisect one issue.
// Jobs can create multiple replicas at once.
type Job struct {
ReplicasCount int // How many replicas of itself this job should spawn simultaneously. Each replica is to be used for bisecting one issue.
// The cost multiplier of building a commit compared to running an already built commit.
// A build cost of 100 means building a commit is 100 times more expensive than running a built commit.
// A build cost of less than 1 results in biscepter always building the middle commit (if it was not built yet) and not using nearby, cached, builds.
BuildCost float64
Ports []int // The ports which this job needs
Healthchecks []Healthcheck // The healthchecks for this job
GoodCommit string // The hash of the good commit, i.e. the commit which does not exhibit any issues
BadCommit string // The hash of the bad commit, i.e. the commit which exhibits the issue(s) to be bisected
Dockerfile string // The contents of the dockerfile.
DockerfilePath string // The path to the dockerfile relative to the present working directory. Only gets used if Dockerfile is empty.
Log *logrus.Logger // The log to which information gets printed to
MaxConcurrentReplicas uint // The max amount of replicas that can run concurrently, or 0 if no limit
replicaSemaphore *semaphore.Weighted
dockerfileString string // The parsed dockerfile for building the repository
dockerfileHash string // The hash of the dockerfile string, for differentiating them in built images
replicas []*replica // This job's replicas
Repository string // The repository URL
repoPath string // The path to the original cloned repository which replicas will copy from
commits []string // This job's commits, where commits[0] is the good commit and commits[N-1] is the bad commit
builtImages map[string]bool // A hashmap where, if a commit exists as a key, this commit's docker image has already been built before
imagesBuilding *sync.Map // Map of keys for every commit to ensure only one replica is building a specific commit at once
commitReplacements *sync.Map // Map of commits to the commits they should be replaced with. used to avoid commits that break the build
// Path to the file where commit replacements are written to and stored for subsequent runs. Defaults to "$(PWD)/.biscepter-replacements~"
CommitReplacementsBackup string
commitReplacementsBackupFile *os.File
}
// Run the job. This initializes all the replicas and starts them. This function returns a [RunningSystem] channel and an [OffendingCommit] channel.
// The [RunningSystem] channel should be used to get notified about systems which are ready to be tested.
// Once an [OffendingCommit] was received for a given replica index, no more [RunningSystem] structs for this replica will appear in the [RunningSystem] channel.
func (job *Job) Run() (chan RunningSystem, chan OffendingCommit, error) {
// Init the logger
if job.Log == nil {
// Mute logger
job.Log = logrus.New()
job.Log.SetOutput(io.Discard)
}
// Init the replica semaphore
if job.MaxConcurrentReplicas == 0 {
job.MaxConcurrentReplicas = math.MaxInt
}
job.replicaSemaphore = semaphore.NewWeighted(int64(job.MaxConcurrentReplicas))
// Init the sync maps
job.imagesBuilding = &sync.Map{}
job.commitReplacements = &sync.Map{}
// Read in the stored replacements
if job.CommitReplacementsBackup == "" {
job.CommitReplacementsBackup = ".biscepter-replacements~"
}
var err error
job.commitReplacementsBackupFile, err = os.OpenFile(".biscepter-replacements~", os.O_APPEND|os.O_WRONLY|os.O_CREATE, 0644)
if err != nil {
return nil, nil, errors.Join(fmt.Errorf("couldn't get replacements backup"), err)
}
replacements, err := os.ReadFile(".biscepter-replacements~")
if err != nil {
return nil, nil, errors.Join(fmt.Errorf("couldn't read replacements"), err)
}
replacementPairs := strings.Split(strings.TrimSuffix(string(replacements), ","), ",")
if replacementPairs[0] != "" {
for _, pair := range replacementPairs {
split := strings.Split(pair, ":")
if len(split) != 2 {
return nil, nil, fmt.Errorf("format of replacements file entry incorrect: %s", pair)
}
job.Log.Debugf("Adding replacement from replacements file: %s -> %s", split[0], split[1])
job.commitReplacements.Store(split[0], split[1])
}
}
// Populate job.dockerfileBytes, depending on which values were present in the config
if err := job.parseDockerfile(); err != nil {
return nil, nil, err
}
job.Log.Info("Cloning initial repository...")
// Clone repo
job.repoPath, err = os.MkdirTemp("", "")
if err != nil {
return nil, nil, err
}
if err := exec.Command("git", "clone", job.Repository, job.repoPath).Run(); err != nil {
return nil, nil, errors.Join(fmt.Errorf("git clone of repository %s at %s failed", job.Repository, job.repoPath), err)
}
job.Log.Info("Checking good and bad commits...")
// Make sure there is a path from BadCommit to GoodCommit
cmd := exec.Command("git", "rev-list", "--reverse", "--first-parent", job.BadCommit)
cmd.Dir = job.repoPath
out, err := cmd.Output()
if err != nil {
return nil, nil, errors.Join(fmt.Errorf("failed to get rev-list of bad commit %s", job.BadCommit), err)
}
if !strings.Contains(string(out), job.GoodCommit) {
return nil, nil, fmt.Errorf("good commit %s cannot be reached from bad commit %s", job.GoodCommit, job.BadCommit)
}
job.Log.Info("Getting all commits...")
// Get all commits
job.commits, err = getCommitsBetween(job.GoodCommit, job.BadCommit, job.repoPath)
if err != nil {
return nil, nil, fmt.Errorf("couldn't get commits between %s and %s - %v", job.GoodCommit, job.BadCommit, err)
}
job.Log.Info("Getting all built images...")
// Get all built images
job.builtImages = make(map[string]bool)
cli, err := client.NewClientWithOpts(client.FromEnv, client.WithAPIVersionNegotiation())
if err != nil {
return nil, nil, errors.Join(fmt.Errorf("failed to create new docker client"), err)
}
images, err := cli.ImageList(context.Background(), types.ImageListOptions{})
if err != nil {
return nil, nil, errors.Join(fmt.Errorf("failed to list all docker images"), err)
}
for _, image := range images {
if len(image.RepoTags) == 1 {
job.builtImages[image.RepoTags[0]] = true
}
}
cli.Close()
job.Log.Info("Creating replicas...")
// Make the channels
// TODO: Don't hardcode channel size
rsChan, ocChan := make(chan RunningSystem, 100), make(chan OffendingCommit, 100)
job.replicas = make([]*replica, job.ReplicasCount)
// Create all replicas
for i := range job.ReplicasCount {
var err error
// Create a new replica
job.replicas[i], err = createJobReplica(job, i)
if err != nil {
// Stop running replicas
for j := range i {
if err := job.replicas[j].stop(); err != nil {
return nil, nil, err
}
}
return nil, nil, errors.Join(fmt.Errorf("failed to create job replica"), err)
}
// Start the created replica
if err = job.replicas[i].start(rsChan, ocChan); err != nil {
// Stop running replicas
for j := range i {
if err := job.replicas[j].stop(); err != nil {
return nil, nil, errors.Join(fmt.Errorf("failed to stop job replica %d after start of %d failed", j, i), err)
}
}
return nil, nil, errors.Join(fmt.Errorf("failed to start job replica %d", i), err)
}
}
return rsChan, ocChan, nil
}
// Stop the job and all running replicas.
func (j *Job) Stop() error {
for _, replica := range j.replicas {
if err := replica.stop(); err != nil {
return err
}
}
return os.RemoveAll(j.repoPath)
}
// parseDockerfile sets j.dockerfileString based on the fields set.
// It prioritizes Dockerfile but uses DockerfilePath if it is empty.
// In addition, it sets dockerfileHash
func (j *Job) parseDockerfile() error {
j.dockerfileString = j.Dockerfile
if j.dockerfileString == "" {
file, err := os.ReadFile(j.DockerfilePath)
if err != nil {
return err
}
j.dockerfileString = string(file)
}
j.dockerfileHash = digest.FromString(j.dockerfileString).Encoded()
return nil
}
// getDockerImageOfCommit returns the name with the tag of the docker image which built the passed commit
func (j *Job) getDockerImageOfCommit(commit string) string {
return fmt.Sprintf("biscepter-%s:%s", commit, j.dockerfileHash)
}