-
Notifications
You must be signed in to change notification settings - Fork 129
/
common.go
147 lines (135 loc) · 4.79 KB
/
common.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
package scheduler
import (
"fmt"
"strconv"
"github.com/pkg/errors"
"golang.org/x/exp/maps"
"github.com/armadaproject/armada/internal/armada/configuration"
armadamaps "github.com/armadaproject/armada/internal/common/maps"
armadaslices "github.com/armadaproject/armada/internal/common/slices"
schedulerconfig "github.com/armadaproject/armada/internal/scheduler/configuration"
"github.com/armadaproject/armada/internal/scheduler/interfaces"
"github.com/armadaproject/armada/internal/scheduler/schedulerobjects"
)
// SchedulerResult is returned by Rescheduler.Schedule().
type SchedulerResult struct {
// Running jobs that should be preempted.
PreemptedJobs []interfaces.LegacySchedulerJob
// Queued jobs that should be scheduled.
ScheduledJobs []interfaces.LegacySchedulerJob
// For each preempted job, maps the job id to the id of the node on which the job was running.
// For each scheduled job, maps the job id to the id of the node on which the job should be scheduled.
NodeIdByJobId map[string]string
}
func NewSchedulerResult[S ~[]T, T interfaces.LegacySchedulerJob](
preemptedJobs S,
scheduledJobs S,
nodeIdByJobId map[string]string,
) *SchedulerResult {
castPreemptedJobs := make([]interfaces.LegacySchedulerJob, len(preemptedJobs))
for i, job := range preemptedJobs {
castPreemptedJobs[i] = job
}
castScheduledJobs := make([]interfaces.LegacySchedulerJob, len(scheduledJobs))
for i, job := range scheduledJobs {
castScheduledJobs[i] = job
}
return &SchedulerResult{
PreemptedJobs: castPreemptedJobs,
ScheduledJobs: castScheduledJobs,
NodeIdByJobId: nodeIdByJobId,
}
}
// PreemptedJobsFromSchedulerResult returns the slice of preempted jobs in the result,
// cast to type T.
func PreemptedJobsFromSchedulerResult[T interfaces.LegacySchedulerJob](sr *SchedulerResult) []T {
rv := make([]T, len(sr.PreemptedJobs))
for i, job := range sr.PreemptedJobs {
rv[i] = job.(T)
}
return rv
}
// ScheduledJobsFromScheduleResult returns the slice of scheduled jobs in the result,
// cast to type T.
func ScheduledJobsFromSchedulerResult[T interfaces.LegacySchedulerJob](sr *SchedulerResult) []T {
rv := make([]T, len(sr.ScheduledJobs))
for i, job := range sr.ScheduledJobs {
rv[i] = job.(T)
}
return rv
}
// JobsSummary returns a string giving an overview of the provided jobs meant for logging.
// For example: "affected queues [A, B]; resources {A: {cpu: 1}, B: {cpu: 2}}; jobs [jobAId, jobBId]".
func JobsSummary(jobs []interfaces.LegacySchedulerJob) string {
if len(jobs) == 0 {
return ""
}
jobsByQueue := armadaslices.GroupByFunc(
jobs,
func(job interfaces.LegacySchedulerJob) string { return job.GetQueue() },
)
resourcesByQueue := armadamaps.MapValues(
jobsByQueue,
func(jobs []interfaces.LegacySchedulerJob) schedulerobjects.ResourceList {
rv := schedulerobjects.NewResourceListWithDefaultSize()
for _, job := range jobs {
rv.AddV1ResourceList(job.GetResourceRequirements().Requests)
}
return rv
},
)
jobIdsByQueue := armadamaps.MapValues(
jobsByQueue,
func(jobs []interfaces.LegacySchedulerJob) []string {
rv := make([]string, len(jobs))
for i, job := range jobs {
rv[i] = job.GetId()
}
return rv
},
)
return fmt.Sprintf(
"affected queues %v; resources %v; jobs %v",
maps.Keys(jobsByQueue),
armadamaps.MapValues(
resourcesByQueue,
func(rl schedulerobjects.ResourceList) string {
return rl.CompactString()
},
),
jobIdsByQueue,
)
}
func isEvictedJob(job interfaces.LegacySchedulerJob) bool {
return job.GetAnnotations()[schedulerconfig.IsEvictedAnnotation] == "true"
}
func targetNodeIdFromNodeSelector(nodeSelector map[string]string) (string, bool) {
nodeId, ok := nodeSelector[schedulerconfig.NodeIdLabel]
return nodeId, ok
}
// GangIdAndCardinalityFromLegacySchedulerJob returns a tuple (gangId, gangCardinality, isGangJob, error).
func GangIdAndCardinalityFromLegacySchedulerJob(job interfaces.LegacySchedulerJob) (string, int, bool, error) {
return GangIdAndCardinalityFromAnnotations(job.GetAnnotations())
}
// GangIdAndCardinalityFromAnnotations returns a tuple (gangId, gangCardinality, isGangJob, error).
func GangIdAndCardinalityFromAnnotations(annotations map[string]string) (string, int, bool, error) {
if annotations == nil {
return "", 0, false, nil
}
gangId, ok := annotations[configuration.GangIdAnnotation]
if !ok {
return "", 0, false, nil
}
gangCardinalityString, ok := annotations[configuration.GangCardinalityAnnotation]
if !ok {
return "", 0, false, errors.Errorf("missing annotation %s", configuration.GangCardinalityAnnotation)
}
gangCardinality, err := strconv.Atoi(gangCardinalityString)
if err != nil {
return "", 0, false, errors.WithStack(err)
}
if gangCardinality <= 0 {
return "", 0, false, errors.Errorf("gang cardinality is non-positive %d", gangCardinality)
}
return gangId, gangCardinality, true, nil
}