-
Notifications
You must be signed in to change notification settings - Fork 4.6k
/
job.go
268 lines (220 loc) · 7.01 KB
/
job.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
// Copyright Project Harbor Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package preheat
import (
"time"
"github.com/goharbor/harbor/src/jobservice/job"
"github.com/goharbor/harbor/src/lib/errors"
"github.com/goharbor/harbor/src/pkg/p2p/preheat/models/provider"
pr "github.com/goharbor/harbor/src/pkg/p2p/preheat/provider"
"github.com/goharbor/harbor/src/pkg/p2p/preheat/provider/auth"
)
const (
// PreheatParamProvider is a parameter keeping the preheating provider instance info.
PreheatParamProvider = "provider"
// PreheatParamImage is a parameter keeping the preheating artifact (image) info.
PreheatParamImage = "image"
// checkInterval indicates the interval of loop check.
checkInterval = 10 * time.Second
// checkTimeout indicates the overall timeout of the loop check.
checkTimeout = 1801 * time.Second
)
// Job preheats the given artifact(image) to the target preheat provider.
type Job struct{}
// MaxFails of preheat job. Don't need to retry.
func (j *Job) MaxFails() uint {
return 1
}
// MaxCurrency indicates no limitation to the concurrency of preheat job.
func (j *Job) MaxCurrency() uint {
return 0
}
// ShouldRetry indicates no need to retry preheat job as it's just for a cache purpose.
func (j *Job) ShouldRetry() bool {
return false
}
// Validate the parameters of preheat job.
func (j *Job) Validate(params job.Parameters) error {
_, err := parseParamProvider(params)
if err != nil {
return err
}
_, err = parseParamImage(params)
return err
}
// Run the preheat process.
func (j *Job) Run(ctx job.Context, params job.Parameters) error {
// Get logger
myLogger := ctx.GetLogger()
// preheatJobRunningError is an internal error format
preheatJobRunningError := func(err error) error {
myLogger.Error(err)
return errors.Wrap(err, "preheat job running error")
}
// shouldStop checks if the job should be stopped
shouldStop := func() bool {
if cmd, ok := ctx.OPCommand(); ok && cmd == job.StopCommand {
return true
}
return false
}
// Parse parameters, ignore errors as they have been validated already
p, _ := parseParamProvider(params)
pi, _ := parseParamImage(params)
// Print related info to log first
myLogger.Infof(
"Preheating image '%s:%s@%s' to the target preheat provider: %s %s:%s\n",
pi.ImageName,
pi.Tag,
pi.Digest,
p.Vendor,
p.Name,
p.Endpoint,
)
if shouldStop() {
return nil
}
// Get driver factory for the given provider
fac, ok := pr.GetProvider(p.Vendor)
if !ok {
err := errors.Errorf("No driver registered for provider %s", p.Vendor)
return preheatJobRunningError(err)
}
// Construct driver
d, err := fac(p)
if err != nil {
return preheatJobRunningError(err)
}
myLogger.Infof("Get preheat provider driver: %s", p.Vendor)
// Start the preheat process
// First, check the health of the provider
h, err := d.GetHealth()
if err != nil {
return preheatJobRunningError(err)
}
if h.Status != pr.DriverStatusHealthy {
err = errors.Errorf("unhealthy target preheat provider: %s", p.Vendor)
return preheatJobRunningError(err)
}
myLogger.Infof("Check health of preheat provider instance: %s", pr.DriverStatusHealthy)
if shouldStop() {
return nil
}
// Then send the preheat requests to the target provider.
st, err := d.Preheat(pi)
if err != nil {
return preheatJobRunningError(err)
}
myLogger.Info("Sending preheat request is successfully done")
// For some of the drivers, e.g: Kraken, the returned status of preheating request contains the
// final status info. No need to loop check the status.
switch st.Status {
case provider.PreheatingStatusSuccess:
myLogger.Info("Preheating is completed")
return nil
case provider.PreheatingStatusFail:
err = errors.New("preheating is failed")
return preheatJobRunningError(err)
case provider.PreheatingStatusPending,
provider.PreheatingStatusRunning:
// do nothing
default:
// in case
err = errors.Errorf("unknown status '%s' returned by the preheat provider %s-%s:%s", st.Status, p.Vendor, p.Name, p.Endpoint)
return preheatJobRunningError(err)
}
if shouldStop() {
return nil
}
myLogger.Info("Start to loop check the preheating status until it's success or timeout(30m)")
// If process is not completed, loop check the status until it's ready.
tk := time.NewTicker(checkInterval)
defer tk.Stop()
tm := time.NewTimer(checkTimeout)
defer tm.Stop()
for {
select {
case <-tk.C:
s, err := d.CheckProgress(st.TaskID)
if err != nil {
return preheatJobRunningError(err)
}
myLogger.Infof("Check preheat progress: %s", s)
switch s.Status {
case provider.PreheatingStatusFail:
// Fail
return preheatJobRunningError(errors.Errorf("preheat failed: %s", s))
case provider.PreheatingStatusSuccess:
// Finished
return nil
default:
// do nothing, check again
}
if shouldStop() {
return nil
}
case <-tm.C:
return preheatJobRunningError(errors.Errorf("status check timeout: %v", checkTimeout))
}
}
}
// parseParamProvider parses the provider param.
func parseParamProvider(params job.Parameters) (*provider.Instance, error) {
data, err := parseStrValue(params, PreheatParamProvider)
if err != nil {
return nil, err
}
ins := &provider.Instance{}
if err := ins.FromJSON(data); err != nil {
return nil, errors.Wrap(err, "parse job parameter error")
}
// Validate required info
if len(ins.Vendor) == 0 {
return nil, errors.New("missing vendor of preheat provider")
}
if ins.AuthMode != auth.AuthModeNone && len(ins.AuthInfo) == 0 {
return nil, errors.Errorf("missing auth info for '%s' auth mode", ins.AuthMode)
}
if len(ins.Endpoint) == 0 {
return nil, errors.Errorf("missing endpoint of preheat provider")
}
return ins, nil
}
// parseParamImage parses the preheating image param.
func parseParamImage(params job.Parameters) (*pr.PreheatImage, error) {
data, err := parseStrValue(params, PreheatParamImage)
if err != nil {
return nil, err
}
img := &pr.PreheatImage{}
if err := img.FromJSON(data); err != nil {
return nil, errors.Wrap(err, "parse job parameter error")
}
if err := img.Validate(); err != nil {
return nil, errors.Wrap(err, "parse job parameter error")
}
return img, nil
}
// parseStrValue parses the string data of the given parameter key from the job parameters.
func parseStrValue(params job.Parameters, key string) (string, error) {
param, ok := params[key]
if !ok || param == nil {
return "", errors.Errorf("missing job parameter '%s'", key)
}
data, ok := param.(string)
if !ok || len(data) == 0 {
return "", errors.Errorf("bad job parameter '%s'", key)
}
return data, nil
}