This repository has been archived by the owner on Jan 30, 2020. It is now read-only.
/
job.go
179 lines (151 loc) · 4.84 KB
/
job.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
package job
import (
"errors"
"fmt"
"strings"
"github.com/coreos/fleet/unit"
)
type JobState string
const (
JobStateInactive = JobState("inactive")
JobStateLoaded = JobState("loaded")
JobStateLaunched = JobState("launched")
)
// fleet-specific unit file requirement keys.
// "X-" prefix only appears in unit file and is dropped in code before the value is used.
const (
// Require the unit be scheduled to a specific machine identified by given ID.
fleetXConditionMachineID = "ConditionMachineID"
// Legacy form of FleetXConditionMachineID.
fleetXConditionMachineBootID = "ConditionMachineBootID"
// Limit eligible machines to the one that hosts a specific unit.
fleetXConditionMachineOf = "ConditionMachineOf"
// Prevent a unit from being collocated with other units using glob-matching on the other unit names.
fleetXConflicts = "Conflicts"
// Machine metadata key in the unit file, without the X- prefix
fleetXConditionMachineMetadata = "ConditionMachineMetadata"
// Machine metadata key for the deprecated `require` flag
fleetFlagMachineMetadata = "MachineMetadata"
)
func ParseJobState(s string) *JobState {
js := JobState(s)
if js != JobStateInactive && js != JobStateLoaded && js != JobStateLaunched {
return nil
}
return &js
}
type Job struct {
Name string
State *JobState
Unit unit.Unit
UnitHash unit.Hash
UnitState *unit.UnitState
}
// NewJob creates a new Job based on the given name and Unit.
// The returned Job has a populated UnitHash and empty JobState and
// UnitState. nil is returned on failure.
func NewJob(name string, unit unit.Unit) *Job {
return &Job{
Name: name,
State: nil,
Unit: unit,
UnitHash: unit.Hash(),
UnitState: nil,
}
}
// Requirements returns all relevant options from the [X-Fleet] section of a unit file.
// Relevant options are identified with a `X-` prefix in the unit.
// This prefix is stripped from relevant options before being returned.
func (j *Job) Requirements() map[string][]string {
requirements := make(map[string][]string)
for key, value := range j.Unit.Contents["X-Fleet"] {
if !strings.HasPrefix(key, "X-") {
continue
}
// Strip off leading X-
key = key[2:]
if _, ok := requirements[key]; !ok {
requirements[key] = make([]string, 0)
}
requirements[key] = value
}
return requirements
}
// Conflicts returns a list of Job names that cannot be scheduled to the same
// machine as this Job.
func (j *Job) Conflicts() []string {
conflicts, ok := j.Requirements()[fleetXConflicts]
if ok {
return conflicts
} else {
return make([]string, 0)
}
}
// Peers returns a list of Job names that must be scheduled to the same
// machine as this Job.
func (j *Job) Peers() []string {
peers, ok := j.Requirements()[fleetXConditionMachineOf]
if !ok {
return []string{}
}
return peers
}
// RequiredTarget determines whether or not this Job must be scheduled to
// a specific machine. If such a requirement exists, the first value returned
// represents the ID of such a machine, while the second value will be a bool
// true. If no requirement exists, an empty string along with a bool false
// will be returned.
func (j *Job) RequiredTarget() (string, bool) {
requirements := j.Requirements()
machIDs, ok := requirements[fleetXConditionMachineID]
if ok && len(machIDs) != 0 {
return machIDs[0], true
}
// Fall back to the legacy option if it exists. This is unlikely
// to actually work as the user intends, but it's better to
// prevent a job from starting that has a legacy requirement
// than to ignore the requirement and let it start.
bootIDs, ok := requirements[fleetXConditionMachineBootID]
if ok && len(bootIDs) != 0 {
return bootIDs[0], true
}
return "", false
}
// Type attempts to determine the Type of systemd unit that this Job contains, based on the suffix of the job name
func (j *Job) Type() (string, error) {
for _, ut := range unit.SupportedUnitTypes() {
if strings.HasSuffix(j.Name, fmt.Sprintf(".%s", ut)) {
return ut, nil
}
}
return "", errors.New(fmt.Sprintf("Unrecognized systemd unit %s", j.Name))
}
// RequiredTargetMetadata return all machine-related metadata from a Job's requirements
func (j *Job) RequiredTargetMetadata() map[string][]string {
metadata := make(map[string][]string)
for key, values := range j.Requirements() {
// Deprecated syntax added to the metadata via the old `--require` flag.
if strings.HasPrefix(key, fleetFlagMachineMetadata) {
if len(values) == 0 {
continue
}
metadata[key[15:]] = values
} else if key == fleetXConditionMachineMetadata {
for _, valuePair := range values {
s := strings.Split(valuePair, "=")
if len(s) != 2 {
continue
}
if len(s[0]) == 0 || len(s[1]) == 0 {
continue
}
var mValues []string
if mv, ok := metadata[s[0]]; ok {
mValues = mv
}
metadata[s[0]] = append(mValues, s[1])
}
}
}
return metadata
}