-
Notifications
You must be signed in to change notification settings - Fork 134
/
preempt_runs.go
103 lines (89 loc) · 3.06 KB
/
preempt_runs.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
package processors
import (
"fmt"
"time"
log "github.com/sirupsen/logrus"
v1 "k8s.io/api/core/v1"
"github.com/armadaproject/armada/internal/common/armadacontext"
executorContext "github.com/armadaproject/armada/internal/executor/context"
"github.com/armadaproject/armada/internal/executor/domain"
"github.com/armadaproject/armada/internal/executor/job"
"github.com/armadaproject/armada/internal/executor/reporter"
"github.com/armadaproject/armada/internal/executor/util"
"github.com/armadaproject/armada/pkg/armadaevents"
)
type RunPreemptedProcessor struct {
clusterContext executorContext.ClusterContext
jobRunStateStore job.RunStateStore
eventReporter reporter.EventReporter
}
func NewRunPreemptedProcessor(
clusterContext executorContext.ClusterContext,
jobRunStateStore job.RunStateStore,
eventReporter reporter.EventReporter,
) *RunPreemptedProcessor {
return &RunPreemptedProcessor{
clusterContext: clusterContext,
jobRunStateStore: jobRunStateStore,
eventReporter: eventReporter,
}
}
func (j *RunPreemptedProcessor) Run() {
managedPods, err := j.clusterContext.GetBatchPods()
if err != nil {
log.Errorf("Failed to cancel runs because unable to get a current managed pods due to %s", err)
return
}
runsToCancel := j.jobRunStateStore.GetAllWithFilter(func(state *job.RunState) bool {
return state.PreemptionRequested
})
runPodInfos := createRunPodInfos(runsToCancel, managedPods)
util.ProcessItemsWithThreadPool(armadacontext.Background(), 20, runPodInfos,
func(runInfo *runPodInfo) {
pod := runInfo.Pod
if pod == nil {
// No pod to preempt
return
}
if util.IsInTerminalState(pod) {
// If pod is already finished, nothing to preempt
return
}
if !util.IsReportedPreempted(pod) {
err := j.reportPodPreempted(runInfo.Run, runInfo.Pod)
if err != nil {
log.Errorf("failed to report run (runId = %s, jobId = %s) preempted because %s ",
runInfo.Run.Meta.RunId, runInfo.Run.Meta.JobId, err)
return
}
}
j.clusterContext.DeletePods([]*v1.Pod{pod})
},
)
}
func (j *RunPreemptedProcessor) reportPodPreempted(run *job.RunState, pod *v1.Pod) error {
preemptedEvent, err := reporter.CreateSimpleJobPreemptedEvent(pod)
if err != nil {
return fmt.Errorf("failed creating preempted event because - %s", err)
}
failedEvent, err := reporter.CreateSimpleJobFailedEvent(pod, "Run preempted", "", j.clusterContext.GetClusterId(), armadaevents.KubernetesReason_AppError)
if err != nil {
return fmt.Errorf("failed creating failed event because - %s", err)
}
events := []reporter.EventMessage{
{Event: preemptedEvent, JobRunId: run.Meta.RunId},
{Event: failedEvent, JobRunId: run.Meta.RunId},
}
err = j.eventReporter.Report(events)
if err != nil {
return fmt.Errorf("failed reporting preempted events because - %s", err)
}
err = j.clusterContext.AddAnnotation(pod, map[string]string{
domain.JobPreemptedAnnotation: time.Now().String(),
string(v1.PodFailed): time.Now().String(),
})
if err != nil {
return fmt.Errorf("failed to annotate pod as preempted - %s", err)
}
return nil
}