forked from coreos/fleet
-
Notifications
You must be signed in to change notification settings - Fork 0
/
event.go
136 lines (110 loc) · 3.93 KB
/
event.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
package engine
import (
log "github.com/coreos/fleet/third_party/github.com/golang/glog"
"github.com/coreos/fleet/event"
"github.com/coreos/fleet/job"
"github.com/coreos/fleet/machine"
"github.com/coreos/fleet/registry"
)
type EventHandler struct {
engine *Engine
}
func NewEventHandler(engine *Engine) *EventHandler {
return &EventHandler{engine}
}
func (eh *EventHandler) HandleCommandLoadJob(ev event.Event) {
jobName := ev.Payload.(string)
j, _ := eh.engine.registry.GetJob(jobName)
if j == nil {
log.Infof("CommandLoadJob(%s): asked to offer job that could not be found")
return
}
log.Infof("CommandLoadJob(%s): publishing JobOffer", jobName)
eh.engine.OfferJob(*j)
}
func (eh *EventHandler) HandleCommandUnloadJob(ev event.Event) {
jobName := ev.Payload.(string)
target := ev.Context.(string)
if target != "" {
log.Infof("CommandUnloadJob(%s): clearing scheduling decision", jobName)
eh.engine.registry.ClearJobTarget(jobName, target)
}
}
func (eh *EventHandler) HandleEventJobScheduled(ev event.Event) {
jobName := ev.Payload.(string)
target := ev.Context.(string)
log.V(1).Infof("EventJobScheduled(%s): updating cluster", jobName)
eh.engine.clust.jobScheduled(jobName, target)
}
// EventJobUnscheduled is triggered when a scheduling decision has been
// rejected, or is now unfulfillable due to changes in the cluster.
// Attempt to reschedule the job if it is in a non-inactive state.
func (eh *EventHandler) HandleEventJobUnscheduled(ev event.Event) {
jobName := ev.Payload.(string)
ts, _ := eh.engine.registry.GetJobTargetState(jobName)
if ts == nil || *ts == job.JobStateInactive {
return
}
j, _ := eh.engine.registry.GetJob(jobName)
if j == nil {
log.Errorf("EventJobUnscheduled(%s): unable to re-offer Job, as it could not be found in the Registry", jobName)
return
}
log.Infof("EventJobUnscheduled(%s): publishing JobOffer", jobName)
eh.engine.OfferJob(*j)
}
func (eh *EventHandler) HandleCommandStopJob(ev event.Event) {
jobName := ev.Payload.(string)
log.V(1).Infof("EventJobStopped(%s): updating cluster", jobName)
eh.engine.clust.jobStopped(jobName)
}
func (eh *EventHandler) HandleEventJobBidSubmitted(ev event.Event) {
jb := ev.Payload.(job.JobBid)
err := eh.engine.ResolveJobOffer(jb.JobName, jb.MachineID)
if err == nil {
log.Infof("EventJobBidSubmitted(%s): successfully scheduled Job to Machine(%s)", jb.JobName, jb.MachineID)
} else {
log.Infof("EventJobBidSubmitted(%s): failed to schedule Job to Machine(%s)", jb.JobName, jb.MachineID)
}
}
func (eh *EventHandler) HandleEventMachineCreated(ev event.Event) {
machineState := ev.Payload.(machine.MachineState)
log.V(1).Infof("EventMachineCreated(%s): updating cluster", machineState.ID)
eh.engine.clust.machineCreated(machineState.ID)
}
func (eh *EventHandler) HandleEventMachineRemoved(ev event.Event) {
machID := ev.Payload.(string)
mutex := eh.engine.registry.LockMachine(machID, eh.engine.machine.State().ID)
if mutex == nil {
log.V(1).Infof("EventMachineRemoved(%s): failed to lock Machine, ignoring event", machID)
return
}
defer mutex.Unlock()
jobs := getJobsScheduledToMachine(eh.engine.registry, machID)
for _, j := range jobs {
log.Infof("EventMachineRemoved(%s): clearing UnitState(%s)", machID, j.Name)
err := eh.engine.registry.RemoveUnitState(j.Name)
if err != nil {
log.Errorf("Failed removing UnitState(%s) from Registry: %v", j.Name, err)
}
log.Infof("EventMachineRemoved(%s): unscheduling Job(%s)", machID, j.Name)
eh.engine.registry.ClearJobTarget(j.Name, machID)
}
for _, j := range jobs {
log.Infof("EventMachineRemoved(%s): re-publishing JobOffer(%s)", machID, j.Name)
eh.engine.OfferJob(j)
}
eh.engine.clust.machineRemoved(machID)
}
func getJobsScheduledToMachine(r registry.Registry, machID string) []job.Job {
var jobs []job.Job
jj, _ := r.GetAllJobs()
for _, j := range jj {
tgt, _ := r.GetJobTarget(j.Name)
if tgt == "" || tgt != machID {
continue
}
jobs = append(jobs, j)
}
return jobs
}