/
job_kill.go
252 lines (220 loc) · 7.9 KB
/
job_kill.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
// Copyright (c) 2019 Uber Technologies, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package goalstate
import (
"context"
"time"
"github.com/uber/peloton/.gen/peloton/api/v0/job"
"github.com/uber/peloton/.gen/peloton/api/v0/peloton"
"github.com/uber/peloton/.gen/peloton/api/v0/task"
"github.com/uber/peloton/pkg/common/goalstate"
"github.com/uber/peloton/pkg/common/util"
"github.com/uber/peloton/pkg/jobmgr/cached"
jobmgrcommon "github.com/uber/peloton/pkg/jobmgr/common"
"github.com/pkg/errors"
log "github.com/sirupsen/logrus"
"go.uber.org/yarpc/yarpcerrors"
)
// JobKill will stop all tasks in the job.
func JobKill(ctx context.Context, entity goalstate.Entity) error {
id := entity.GetID()
jobID := &peloton.JobID{Value: id}
goalStateDriver := entity.(*jobEntity).driver
cachedJob := goalStateDriver.jobFactory.GetJob(jobID)
if cachedJob == nil {
return nil
}
jobState, nonTerminalTaskKilled, err :=
killJob(ctx, cachedJob, goalStateDriver)
if err != nil {
return err
}
// Only enqueue the job into goal state:
// 1. any of the non terminated tasks need to be killed.
// 2. job state is already KILLED
if nonTerminalTaskKilled || util.IsPelotonJobStateTerminal(jobState) {
EnqueueJobWithDefaultDelay(jobID, goalStateDriver, cachedJob)
}
log.WithField("job_id", id).
Info("initiated kill of all tasks in the job")
return nil
}
// createRuntimeDiffForKill creates the runtime diffs to kill the tasks in job.
// it returns:
// runtimeDiffNonTerminatedTasks which is used to kill non-terminated tasks,
// runtimeDiffTerminatedTasks which is used to kill tasks already terminal
// state (to prevent restart),
// runtimeDiffAll which is a union of runtimeDiffNonTerminatedTasks and
// runtimeDiffTerminatedTasks
func createRuntimeDiffForKill(
ctx context.Context,
cachedJob cached.Job,
) (
runtimeDiffNonTerminatedTasks map[uint32]jobmgrcommon.RuntimeDiff,
runtimeDiffTerminatedTasks map[uint32]jobmgrcommon.RuntimeDiff,
runtimeDiffAll map[uint32]jobmgrcommon.RuntimeDiff,
err error,
) {
runtimeDiffNonTerminatedTasks = make(map[uint32]jobmgrcommon.RuntimeDiff)
runtimeDiffTerminatedTasks = make(map[uint32]jobmgrcommon.RuntimeDiff)
runtimeDiffAll = make(map[uint32]jobmgrcommon.RuntimeDiff)
tasks := cachedJob.GetAllTasks()
for instanceID, cachedTask := range tasks {
runtime, err := cachedTask.GetRuntime(ctx)
// runtime not created yet, ignore the task
if yarpcerrors.IsNotFound(err) {
continue
}
if err != nil {
log.WithError(err).
WithField("job_id", cachedJob.ID().Value).
WithField("instance_id", instanceID).
Info("failed to fetch task runtime to kill a job")
return nil, nil, nil, err
}
// A task in terminal state can be running later due to failure
// retry (batch job) or task restart (stateless job), so it is
// necessary to kill a task even if it is in terminal state as
// long as the goal state is not KILLED.
if runtime.GetGoalState() == task.TaskState_KILLED {
continue
}
runtimeDiff := jobmgrcommon.RuntimeDiff{
jobmgrcommon.GoalStateField: task.TaskState_KILLED,
jobmgrcommon.MessageField: "Task stop API request",
jobmgrcommon.ReasonField: "",
jobmgrcommon.TerminationStatusField: &task.TerminationStatus{
Reason: task.TerminationStatus_TERMINATION_STATUS_REASON_KILLED_ON_REQUEST,
},
jobmgrcommon.DesiredHostField: "",
}
runtimeDiffAll[instanceID] = runtimeDiff
if util.IsPelotonStateTerminal(runtime.GetState()) {
runtimeDiffTerminatedTasks[instanceID] = runtimeDiff
} else {
runtimeDiffNonTerminatedTasks[instanceID] = runtimeDiff
}
}
return runtimeDiffNonTerminatedTasks, runtimeDiffTerminatedTasks, runtimeDiffAll, nil
}
// calculateJobState calculates if the job to be killed is
// in KILLING state or KILLED state
func calculateJobState(
ctx context.Context,
cachedJob cached.Job,
jobRuntime *job.RuntimeInfo,
runtimeDiffNonTerminatedTasks map[uint32]jobmgrcommon.RuntimeDiff) job.JobState {
// if no non-terminal instance is killed, check if the job
// should directly enter KILLED state
if len(runtimeDiffNonTerminatedTasks) == 0 {
if cachedJob.GetJobType() == job.JobType_BATCH &&
jobRuntime.GetState() != job.JobState_INITIALIZED {
return job.JobState_KILLING
}
for _, cachedTask := range cachedJob.GetAllTasks() {
runtime, err := cachedTask.GetRuntime(ctx)
if err != nil || !util.IsPelotonStateTerminal(runtime.GetState()) {
return job.JobState_KILLING
}
}
return job.JobState_KILLED
}
return job.JobState_KILLING
}
// killJob kills all tasks in the job, and returns
// 1. the new job state after the kill
// 2. whether any non-terminated task is killed
func killJob(
ctx context.Context,
cachedJob cached.Job,
goalStateDriver Driver,
) (newState job.JobState, taskKilled bool, err error) {
// Get job runtime and update job state to killing
jobRuntime, err := cachedJob.GetRuntime(ctx)
if err != nil {
err = errors.Wrap(err, "failed to get job runtime during job kill")
return
}
runtimeDiffNonTerminatedTasks, allTasksMarked, err := stopTasks(ctx, cachedJob, goalStateDriver)
if err != nil {
err = errors.Wrap(err, "failed to update task runtimes to kill a job")
return
}
// if the goal state of some tasks could not be patched, do not update
// the state of the job. We should update the job state to KILLING only
// when the goalstate of all tasks have been set to KILLED.
if !allTasksMarked {
return jobRuntime.GetState(), false, nil
}
jobState := calculateJobState(
ctx,
cachedJob,
jobRuntime,
runtimeDiffNonTerminatedTasks,
)
runtimeUpdate := &job.RuntimeInfo{
State: jobState,
StateVersion: jobRuntime.DesiredStateVersion,
}
if util.IsPelotonJobStateTerminal(jobState) {
runtimeUpdate.CompletionTime = time.Now().UTC().Format(time.RFC3339Nano)
}
// update job state as well as state version,
// once state version == desired state version,
// goal state engine knows that the all the tasks
// are being sent to task goal state engine to kill and
// no further action is needed.
err = cachedJob.Update(ctx, &job.JobInfo{
Runtime: runtimeUpdate,
}, nil,
nil,
cached.UpdateCacheAndDB)
if err != nil {
err = errors.Wrap(err, "failed to update job runtime during job kill")
return
}
return jobState, len(runtimeDiffNonTerminatedTasks) > 0, err
}
func stopTasks(
ctx context.Context,
cachedJob cached.Job,
goalStateDriver Driver,
) (
runtimeDiffNonTerminatedTasks map[uint32]jobmgrcommon.RuntimeDiff,
allTasksMarked bool,
err error,
) {
// Update task runtimes in DB and cache to kill task
runtimeDiffNonTerminatedTasks, _, runtimeDiffAll, err :=
createRuntimeDiffForKill(ctx, cachedJob)
if err != nil {
return nil, false, err
}
_, instancesToBeRetried, err := cachedJob.PatchTasks(ctx, runtimeDiffAll, false)
// Schedule non terminated tasks in goal state engine.
// This should happen even if PatchTasks fail, so if part of
// the tasks are updated successfully, those tasks can be
// terminated. Otherwise, those tasks would not be enqueued
// into goal state engine in JobKill retry.
for instanceID := range runtimeDiffNonTerminatedTasks {
goalStateDriver.EnqueueTask(cachedJob.ID(), instanceID, time.Now())
}
// If patching of few non terminal tasks failed, enqueue
// the job so that the action is retried
if len(instancesToBeRetried) != 0 {
EnqueueJobWithDefaultDelay(cachedJob.ID(), goalStateDriver, cachedJob)
return runtimeDiffNonTerminatedTasks, false, err
}
return runtimeDiffNonTerminatedTasks, true, err
}