forked from raystack/optimus
-
Notifications
You must be signed in to change notification settings - Fork 1
/
compiler.go
93 lines (77 loc) · 2.49 KB
/
compiler.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
package dag
import (
"bytes"
"fmt"
"github.com/goto/salt/log"
"github.com/goto/optimus/config"
"github.com/goto/optimus/core/scheduler"
"github.com/goto/optimus/core/tenant"
"github.com/goto/optimus/internal/errors"
"github.com/goto/optimus/sdk/plugin"
)
type PluginRepo interface {
GetByName(name string) (*plugin.Plugin, error)
}
type Compiler struct {
hostname string
grpcHost string
log log.Logger
templates templates
pluginRepo PluginRepo
}
func (c *Compiler) Compile(project *tenant.Project, jobDetails *scheduler.JobWithDetails) ([]byte, error) {
task, err := PrepareTask(jobDetails.Job, c.pluginRepo)
if err != nil {
return nil, err
}
hooks, err := PrepareHooksForJob(jobDetails.Job, c.pluginRepo)
if err != nil {
return nil, err
}
slaDuration, err := SLAMissDuration(jobDetails)
if err != nil {
return nil, err
}
runtimeConfig := SetupRuntimeConfig(jobDetails)
upstreams := SetupUpstreams(jobDetails.Upstreams, c.hostname)
templateContext := TemplateContext{
JobDetails: jobDetails,
Tenant: jobDetails.Job.Tenant,
Version: config.BuildVersion,
SLAMissDuration: slaDuration,
Hostname: c.hostname,
GRPCHostName: c.grpcHost,
ExecutorTask: scheduler.ExecutorTask.String(),
ExecutorHook: scheduler.ExecutorHook.String(),
Task: task,
Hooks: hooks,
RuntimeConfig: runtimeConfig,
Priority: jobDetails.Priority,
Upstreams: upstreams,
}
airflowVersion, err := project.GetConfig(tenant.ProjectSchedulerVersion)
if err != nil {
c.log.Warn("%s is not provided in project %s, %s. Use default version %s instead", tenant.ProjectSchedulerVersion, project.Name(), err.Error(), defaultVersion)
airflowVersion = defaultVersion
}
tmpl := c.templates.GetTemplate(airflowVersion)
var buf bytes.Buffer
if err = tmpl.Execute(&buf, templateContext); err != nil {
msg := fmt.Sprintf("unable to compile template for job %s with airflow version %s, %s", jobDetails.Name.String(), airflowVersion, err.Error())
return nil, errors.InvalidArgument(EntitySchedulerAirflow, msg)
}
return buf.Bytes(), nil
}
func NewDagCompiler(l log.Logger, hostname, grpcHost string, repo PluginRepo) (*Compiler, error) {
templates, err := NewTemplates()
if err != nil {
return nil, errors.InternalError(EntitySchedulerAirflow, "unable to instantiate templates", err)
}
return &Compiler{
log: l,
hostname: hostname,
grpcHost: grpcHost,
templates: templates,
pluginRepo: repo,
}, nil
}