/
worker_task_cache.go
112 lines (97 loc) · 2.21 KB
/
worker_task_cache.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
package db
import (
"database/sql"
sq "github.com/Masterminds/squirrel"
"github.com/concourse/concourse/atc"
)
type WorkerTaskCache struct {
WorkerName string
TaskCache UsedTaskCache
}
type UsedWorkerTaskCache struct {
ID int
WorkerName string
}
func (wtc WorkerTaskCache) findOrCreate(
tx Tx,
) (*UsedWorkerTaskCache, error) {
uwtc, found, err := wtc.find(tx)
if err != nil {
return nil, err
}
if !found {
var id int
err = psql.Insert("worker_task_caches").
Columns(
"worker_name",
"task_cache_id",
).
Values(wtc.WorkerName, wtc.TaskCache.ID()).
Suffix(`
ON CONFLICT (worker_name, task_cache_id) DO UPDATE SET
task_cache_id = EXCLUDED.task_cache_id
RETURNING id
`).
RunWith(tx).
QueryRow().
Scan(&id)
if err != nil {
return nil, err
}
return &UsedWorkerTaskCache{
ID: id,
WorkerName: wtc.WorkerName,
}, nil
}
return uwtc, err
}
func (workerTaskCache WorkerTaskCache) find(runner sq.Runner) (*UsedWorkerTaskCache, bool, error) {
var id int
err := psql.Select("id").
From("worker_task_caches").
Where(sq.Eq{
"worker_name": workerTaskCache.WorkerName,
"task_cache_id": workerTaskCache.TaskCache.ID(),
}).
RunWith(runner).
QueryRow().
Scan(&id)
if err != nil {
if err == sql.ErrNoRows {
return nil, false, nil
}
return nil, false, err
}
return &UsedWorkerTaskCache{
ID: id,
WorkerName: workerTaskCache.WorkerName,
}, true, nil
}
func removeUnusedWorkerTaskCaches(tx Tx, pipelineID int, jobConfigs []atc.JobConfig) error {
steps := make(map[string][]string)
for _, jobConfig := range jobConfigs {
_ = jobConfig.StepConfig().Visit(atc.StepRecursor{
OnTask: func(step *atc.TaskStep) error {
steps[jobConfig.Name] = append(steps[jobConfig.Name], step.Name)
return nil
},
})
}
query := sq.Or{}
for jobName, stepNames := range steps {
query = append(query, sq.And{sq.Eq{"j.name": jobName}, sq.NotEq{"tc.step_name": stepNames}})
}
_, err := psql.Delete("task_caches tc USING jobs j").
Where(
sq.Or{
query,
sq.Eq{
"j.active": false,
},
}).
Where(sq.Expr("j.id = tc.job_id")).
Where(sq.Eq{"j.pipeline_id": pipelineID}).
RunWith(tx).
Exec()
return err
}