-
Notifications
You must be signed in to change notification settings - Fork 85
/
daemon_job.go
125 lines (105 loc) · 3.88 KB
/
daemon_job.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
package scheduler
import (
"context"
"fmt"
"github.com/google/uuid"
"github.com/rs/zerolog/log"
"github.com/bacalhau-project/bacalhau/pkg/jobstore"
"github.com/bacalhau-project/bacalhau/pkg/models"
"github.com/bacalhau-project/bacalhau/pkg/orchestrator"
"github.com/bacalhau-project/bacalhau/pkg/util/idgen"
)
// DaemonJobScheduler is a scheduler for batch jobs that run until completion
type DaemonJobScheduler struct {
jobStore jobstore.Store
planner orchestrator.Planner
nodeSelector orchestrator.NodeSelector
}
type DaemonJobSchedulerParams struct {
JobStore jobstore.Store
Planner orchestrator.Planner
NodeSelector orchestrator.NodeSelector
}
func NewDaemonJobScheduler(params DaemonJobSchedulerParams) *DaemonJobScheduler {
return &DaemonJobScheduler{
jobStore: params.JobStore,
planner: params.Planner,
nodeSelector: params.NodeSelector,
}
}
func (b *DaemonJobScheduler) Process(ctx context.Context, evaluation *models.Evaluation) error {
ctx = log.Ctx(ctx).With().Str("JobID", evaluation.JobID).Str("EvalID", evaluation.ID).Logger().WithContext(ctx)
job, err := b.jobStore.GetJob(ctx, evaluation.JobID)
if err != nil {
return fmt.Errorf("failed to retrieve job %s: %w", evaluation.JobID, err)
}
// Retrieve the job state
jobExecutions, err := b.jobStore.GetExecutions(ctx, jobstore.GetExecutionsOptions{
JobID: evaluation.JobID,
})
if err != nil {
return fmt.Errorf("failed to retrieve job state for job %s when evaluating %s: %w",
evaluation.JobID, evaluation, err)
}
// Plan to hold the actions to be taken
plan := models.NewPlan(evaluation, &job)
existingExecs := execSetFromSliceOfValues(jobExecutions)
nonTerminalExecs := existingExecs.filterNonTerminal()
// early exit if the job is stopped
if job.IsTerminal() {
nonTerminalExecs.markStopped(orchestrator.ExecStoppedByJobStopEvent(), plan)
return b.planner.Process(ctx, plan)
}
// Retrieve the info for all the nodes that have executions for this job
nodeInfos, err := existingNodeInfos(ctx, b.nodeSelector, nonTerminalExecs)
if err != nil {
return err
}
// Mark executions that are running on nodes that are not healthy as failed
_, lost := nonTerminalExecs.filterByNodeHealth(nodeInfos)
lost.markStopped(orchestrator.ExecStoppedByNodeUnhealthyEvent(), plan)
// Look for new matching nodes and create new executions every time we evaluate the job
_, err = b.createMissingExecs(ctx, &job, plan, existingExecs)
if err != nil {
return fmt.Errorf("failed to find/create missing executions: %w", err)
}
plan.MarkJobRunningIfEligible()
return b.planner.Process(ctx, plan)
}
func (b *DaemonJobScheduler) createMissingExecs(
ctx context.Context, job *models.Job, plan *models.Plan, existingExecs execSet) (execSet, error) {
newExecs := execSet{}
nodes, _, err := b.nodeSelector.MatchingNodes(ctx, job)
if err != nil {
return newExecs, err
}
// map for existing NodeIDs for faster lookup and filtering of existing executions
existingNodes := make(map[string]struct{})
for _, exec := range existingExecs {
existingNodes[exec.NodeID] = struct{}{}
}
for _, node := range nodes {
if _, ok := existingNodes[node.NodeInfo.ID()]; ok {
// there is already a healthy execution on this node
continue
}
execution := &models.Execution{
JobID: job.ID,
Job: job,
EvalID: plan.EvalID,
ID: idgen.ExecutionIDPrefix + uuid.NewString(),
Namespace: job.Namespace,
ComputeState: models.NewExecutionState(models.ExecutionStateNew),
DesiredState: models.NewExecutionDesiredState(models.ExecutionDesiredStateRunning),
NodeID: node.NodeInfo.ID(),
}
execution.Normalize()
newExecs[execution.ID] = execution
}
for _, exec := range newExecs {
plan.AppendExecution(exec)
}
return newExecs, nil
}
// compile-time assertion that DaemonJobScheduler satisfies the Scheduler interface
var _ orchestrator.Scheduler = &DaemonJobScheduler{}