-
Notifications
You must be signed in to change notification settings - Fork 85
/
exec.go
372 lines (307 loc) · 10.8 KB
/
exec.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
package exec
import (
"context"
"encoding/json"
"fmt"
"os"
"path/filepath"
"strings"
"github.com/spf13/cobra"
"gopkg.in/alessio/shellescape.v1"
"k8s.io/kubectl/pkg/util/i18n"
"github.com/bacalhau-project/bacalhau/cmd/util"
"github.com/bacalhau-project/bacalhau/cmd/util/flags/cliflags"
"github.com/bacalhau-project/bacalhau/cmd/util/hook"
"github.com/bacalhau-project/bacalhau/cmd/util/parse"
"github.com/bacalhau-project/bacalhau/cmd/util/printer"
"github.com/bacalhau-project/bacalhau/pkg/lib/template"
"github.com/bacalhau-project/bacalhau/pkg/models"
"github.com/bacalhau-project/bacalhau/pkg/models/migration/legacy"
"github.com/bacalhau-project/bacalhau/pkg/publicapi/apimodels"
"github.com/bacalhau-project/bacalhau/pkg/storage/inline"
"github.com/bacalhau-project/bacalhau/pkg/userstrings"
"github.com/bacalhau-project/bacalhau/pkg/util/templates"
)
var (
getLong = templates.LongDesc(i18n.T(
fmt.Sprintf(`Execute a specific job type.
Allows for the execution of a job type with the given code,
without the need to create a container, or webassembly module.
By specifying the code with the '--code' flag you can ship the code
to the cluster for execution, specified by the remainder of the
command line. See examples below.
Supported job types:
%s
`, supportedJobTypes()),
))
//nolint:lll // Documentation
getExample = templates.Examples(i18n.T(`
# Execute the app.py script with Python
bacalhau exec --code app.py python app.py
# Run a duckdb query against a CSV file
bacalhau exec -i src=...,dst=/inputs/data.csv duckdb "select * from /inputs/data.csv"
`))
)
type ExecOptions struct {
SpecSettings *cliflags.SpecFlagSettings
RunTimeSettings *cliflags.RunTimeSettings
Code string
}
func NewExecOptions() *ExecOptions {
return &ExecOptions{
SpecSettings: cliflags.NewSpecFlagDefaultSettings(),
RunTimeSettings: cliflags.DefaultRunTimeSettings(),
}
}
func NewCmd() *cobra.Command {
options := NewExecOptions()
return NewCmdWithOptions(options)
}
func NewCmdWithOptions(options *ExecOptions) *cobra.Command {
execCmd := &cobra.Command{
Use: "exec [jobtype]",
Short: "Execute a specific job type",
Long: getLong,
Example: getExample,
Args: cobra.MinimumNArgs(1),
PreRunE: hook.RemoteCmdPreRunHooks,
PostRunE: hook.RemoteCmdPostRunHooks,
FParseErrWhitelist: cobra.FParseErrWhitelist{UnknownFlags: true},
Run: func(cmd *cobra.Command, cmdArgs []string) {
// Find the unknown arguments from the original args. We only want to find the
// flags that are unknown. We will only support the long form for custom
// job types as we will want to use them as keys in template completions.
unknownArgs := ExtractUnknownArgs(cmd.Flags(), os.Args[1:])
if err := exec(cmd, cmdArgs, unknownArgs, options); err != nil {
util.Fatal(cmd, err, 1)
}
},
}
execCmd.PersistentFlags().AddFlagSet(cliflags.SpecFlags(options.SpecSettings))
execCmd.PersistentFlags().AddFlagSet(cliflags.NewRunTimeSettingsFlags(options.RunTimeSettings))
execCmd.Flags().StringVar(&options.Code, "code", "", "Specifies the file, or directory of code to send with the request")
return execCmd
}
func exec(cmd *cobra.Command, cmdArgs []string, unknownArgs []string, options *ExecOptions) error {
job, err := PrepareJob(cmd, cmdArgs, unknownArgs, options)
if err != nil {
return err
}
job.Normalize()
err = job.ValidateSubmission()
if err != nil {
return fmt.Errorf("%s: %w", userstrings.JobSpecBad, err)
}
client := util.GetAPIClientV2(cmd)
resp, err := client.Jobs().Put(cmd.Context(), &apimodels.PutJobRequest{
Job: job,
})
if err != nil {
return fmt.Errorf("failed request: %w", err)
}
if err := printer.PrintJobExecution(cmd.Context(), resp.JobID, cmd, options.RunTimeSettings, client); err != nil {
return fmt.Errorf("failed to print job execution: %w", err)
}
return nil
}
// Provides a string to diplay the currently available job types
func supportedJobTypes() string {
tpl, _ := NewTemplateMap(embeddedFiles, "templates")
var sb strings.Builder
for _, s := range tpl.AllTemplates() {
sb.WriteString(fmt.Sprintf(" * %s\n", s))
}
return sb.String()
}
//nolint:funlen
func PrepareJob(cmd *cobra.Command, cmdArgs []string, unknownArgs []string, options *ExecOptions) (*models.Job, error) {
var err error
var jobType, templateString string
var job *models.Job
// Determine the job type and lookup the template for that type. If we
// don't have a template, then we don't know how to submit that job type.
jobType = cmdArgs[0]
for i := range cmdArgs {
// If any parameters were quoted, we should make sure we try and add
// them back in after they were stripped for us.
if strings.Contains(cmdArgs[i], " ") {
cmdArgs[i] = shellescape.Quote(cmdArgs[i])
}
}
tpl, err := NewTemplateMap(embeddedFiles, "templates")
if err != nil {
return nil, fmt.Errorf("failed to find supported job types, templates missing")
}
// Get the template string, or if we can't find one for this type, then
// provide a list of ones we _do_ support.
if templateString, err = tpl.Get(jobType); err != nil {
knownTypes := tpl.AllTemplates()
supportedTypes := ""
if len(knownTypes) > 0 {
supportedTypes = "\nSupported types:\n"
for _, kt := range knownTypes {
supportedTypes = supportedTypes + fmt.Sprintf(" * %s\n", kt)
}
}
return nil, fmt.Errorf("the job type '%s' is not supported."+supportedTypes, jobType)
}
// Convert the unknown args to a map which we can use to fill in the template
replacements := flagsToMap(unknownArgs)
parser, err := template.NewParser(template.ParserParams{
Replacements: replacements,
})
if err != nil {
return nil, fmt.Errorf("failed to create %s job when parsing template: %+w", jobType, err)
}
tplResult, err := parser.ParseBytes([]byte(templateString))
if err != nil {
return nil, fmt.Errorf("%s: %w", userstrings.JobSpecBad, err)
}
// tplResult is now a []byte containing json for the job we will eventually submit.
if err = json.Unmarshal(tplResult, &job); err != nil {
return nil, fmt.Errorf("%s: %w", userstrings.JobSpecBad, err)
}
// Attach the command line arguments that were provided to exec. These are passed through
// to the template as Command/Arguments. e.g. `bacalhau exec python app.py` will set
// Command -> python, and Arguments -> ["app.py"]
job.Tasks[0].Engine.Params["Command"] = jobType
job.Tasks[0].Engine.Params["Arguments"] = cmdArgs[1:]
// Attach any inputs the user specified to the job spec
if err := prepareInputs(options, job); err != nil {
return nil, err
}
// Process --code if anything was specified. In future we may want to try and determine this
// ourselves where it is not specified, but it will likely be dependent on job type.
if options.Code != "" {
if err = addInlineContent(cmd.Context(), options.Code, job); err != nil {
return nil, err
}
}
publisherSpec := options.SpecSettings.Publisher.Value()
if publisherSpec != nil {
job.Tasks[0].Publisher = &models.SpecConfig{
Type: publisherSpec.Type.String(),
Params: publisherSpec.Params,
}
}
// Handle ResultPaths by using the legacy parser and converting.
if err := prepareJobOutputs(cmd.Context(), options, job); err != nil {
return nil, err
}
// Parse labels from flag, we expect key=value for the non-legacy models.Job
if err := prepareLabels(options, job); err != nil {
return nil, err
}
// Constraints for node selection
if err := prepareConstraints(options, job); err != nil {
return nil, err
}
// Environment variables
if err := prepareEnvVars(options, job); err != nil {
return nil, err
}
// Set the execution timeouts
job.Tasks[0].Timeouts = &models.TimeoutConfig{
ExecutionTimeout: options.SpecSettings.Timeout,
}
// Unsupported in new job specifications (models.Job)
// options.SpecSettings.DoNotTrack
return job, nil
}
func prepareConstraints(options *ExecOptions, job *models.Job) error {
if nodeSelectorRequirements, err := parse.NodeSelector(options.SpecSettings.Selector); err != nil {
return err
} else {
constraints, err := legacy.FromLegacyLabelSelector(nodeSelectorRequirements)
if err != nil {
return err
}
job.Constraints = constraints
}
return nil
}
func prepareInputs(options *ExecOptions, job *models.Job) error {
for _, ss := range options.SpecSettings.Inputs.Values() {
src, err := legacy.FromLegacyStorageSpecToInputSource(ss)
if err != nil {
return fmt.Errorf("failed to process input %s: %w", ss.Name, err)
}
job.Tasks[0].InputSources = append(job.Tasks[0].InputSources, src)
}
return nil
}
func prepareLabels(options *ExecOptions, job *models.Job) error {
if len(options.SpecSettings.Labels) > 0 {
if labels, err := parse.StringSliceToMap(options.SpecSettings.Labels); err != nil {
return err
} else {
job.Labels = labels
}
}
return nil
}
func prepareEnvVars(options *ExecOptions, job *models.Job) error {
if len(options.SpecSettings.EnvVar) > 0 {
if env, err := parse.StringSliceToMap(options.SpecSettings.EnvVar); err != nil {
return err
} else {
job.Tasks[0].Env = env
}
}
return nil
}
func prepareJobOutputs(ctx context.Context, options *ExecOptions, job *models.Job) error {
legacyOutputs, err := parse.JobOutputs(ctx, options.SpecSettings.OutputVolumes)
if err != nil {
return err
}
if len(legacyOutputs) == 0 {
return nil
}
// If we only have the single legacy default output then we will only use it if we have a publisher
// configured. If no publisher then we can just return early.
if len(legacyOutputs) == 1 && legacyOutputs[0].Name == "outputs" && legacyOutputs[0].Path == "/outputs" {
if job.Tasks[0].Publisher == nil {
return nil
}
}
job.Tasks[0].ResultPaths = make([]*models.ResultPath, 0, len(legacyOutputs))
for _, output := range legacyOutputs {
rp := &models.ResultPath{
Name: output.Name,
Path: output.Path,
}
e := rp.Validate()
if e != nil {
return e
}
job.Tasks[0].ResultPaths = append(job.Tasks[0].ResultPaths, rp)
}
return nil
}
// addInlineContent will use codeLocation to determine if it is a single file or a
// directory and will attach to the job as an inline attachment.
func addInlineContent(ctx context.Context, codeLocation string, job *models.Job) error {
absPath, err := filepath.Abs(codeLocation)
if err != nil {
return err
}
target := "/code"
if finfo, err := os.Stat(absPath); err != nil {
return fmt.Errorf("file '%s' not found", codeLocation)
} else {
if !finfo.IsDir() {
target = fmt.Sprintf("/code/%s", finfo.Name())
}
}
specConfig, err := inline.NewStorage().Upload(ctx, absPath)
if err != nil {
return fmt.Errorf("failed to attach code '%s' to job submission: %w", codeLocation, err)
}
job.Tasks[0].InputSources = append(job.Tasks[0].InputSources, &models.InputSource{
Source: &specConfig,
Alias: "code",
Target: target,
})
return nil
}