Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
80 changes: 34 additions & 46 deletions pkg/lib/k8s/pod.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
"bytes"
"context"
"regexp"
"strings"
"time"

"github.com/cortexlabs/cortex/pkg/lib/errors"
Expand All @@ -38,7 +37,12 @@ var _podTypeMeta = kmeta.TypeMeta{
Kind: "Pod",
}

const ReasonEvicted = "Evicted"
// pod termination reasons
// https://github.com/kubernetes/kube-state-metrics/blob/master/docs/pod-metrics.md
const (
ReasonEvicted = "Evicted"
ReasonOOMKilled = "OOMKilled"
)

type PodStatus string

Expand Down Expand Up @@ -148,22 +152,14 @@ func WasPodOOMKilled(pod *kcore.Pod) bool {
return true
}
for _, containerStatus := range pod.Status.ContainerStatuses {
var reason string
if containerStatus.LastTerminationState.Terminated != nil {
exitCode := containerStatus.LastTerminationState.Terminated.ExitCode
reason := strings.ToLower(containerStatus.LastTerminationState.Terminated.Reason)
if _killStatuses[exitCode] {
if strings.Contains(reason, "oom") {
return true
}
}
reason = containerStatus.LastTerminationState.Terminated.Reason
} else if containerStatus.State.Terminated != nil {
exitCode := containerStatus.State.Terminated.ExitCode
reason := strings.ToLower(containerStatus.State.Terminated.Reason)
if _killStatuses[exitCode] {
if strings.Contains(reason, "oom") {
return true
}
}
reason = containerStatus.State.Terminated.Reason
}
if reason == ReasonOOMKilled {
return true
}
}

Expand Down Expand Up @@ -194,25 +190,21 @@ func GetPodStatus(pod *kcore.Pod) PodStatus {
}

for _, containerStatus := range pod.Status.ContainerStatuses {
var reason string
var exitCode int32
if containerStatus.LastTerminationState.Terminated != nil {
exitCode := containerStatus.LastTerminationState.Terminated.ExitCode
reason := strings.ToLower(containerStatus.LastTerminationState.Terminated.Reason)
if _killStatuses[exitCode] {
if strings.Contains(reason, "oom") {
return PodStatusKilledOOM
}
return PodStatusKilled
}
reason = containerStatus.LastTerminationState.Terminated.Reason
exitCode = containerStatus.LastTerminationState.Terminated.ExitCode
} else if containerStatus.State.Terminated != nil {
exitCode := containerStatus.State.Terminated.ExitCode
reason := strings.ToLower(containerStatus.State.Terminated.Reason)
if _killStatuses[exitCode] {
if strings.Contains(reason, "oom") {
return PodStatusKilledOOM
}
return PodStatusKilled
}
reason = containerStatus.State.Terminated.Reason
exitCode = containerStatus.State.Terminated.ExitCode
}
if reason == ReasonOOMKilled {
return PodStatusKilledOOM
} else if _killStatuses[exitCode] {
return PodStatusKilled
}

}
return PodStatusFailed
case kcore.PodRunning:
Expand Down Expand Up @@ -245,29 +237,25 @@ func PodStatusFromContainerStatuses(containerStatuses []kcore.ContainerStatus) P
numRunning++
} else if containerStatus.State.Terminated != nil {
exitCode := containerStatus.State.Terminated.ExitCode
reason := strings.ToLower(containerStatus.State.Terminated.Reason)
if exitCode == 0 {
reason := containerStatus.State.Terminated.Reason
if reason == ReasonOOMKilled {
numKilledOOM++
} else if exitCode == 0 {
numSucceeded++
} else if _killStatuses[exitCode] {
if strings.Contains(reason, "oom") {
numKilledOOM++
} else {
numKilled++
}
numKilled++
} else {
numFailed++
}
} else if containerStatus.LastTerminationState.Terminated != nil {
exitCode := containerStatus.LastTerminationState.Terminated.ExitCode
reason := strings.ToLower(containerStatus.LastTerminationState.Terminated.Reason)
if exitCode == 0 {
reason := containerStatus.LastTerminationState.Terminated.Reason
if reason == ReasonOOMKilled {
numKilledOOM++
} else if exitCode == 0 {
numSucceeded++
} else if _killStatuses[exitCode] {
if strings.Contains(reason, "oom") {
numKilledOOM++
} else {
numKilled++
}
numKilled++
} else {
numFailed++
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/operator/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ func main() {

telemetry.Event("operator.init", map[string]interface{}{"provider": config.Provider})

cron.Run(operator.DeleteEvictedPods, operator.ErrorHandler("delete evicted pods"), 12*time.Hour)
cron.Run(operator.DeleteEvictedPods, operator.ErrorHandler("delete evicted pods"), time.Hour)

switch config.Provider {
case types.AWSProviderType:
Expand Down
11 changes: 10 additions & 1 deletion pkg/operator/operator/cron.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/cortexlabs/cortex/pkg/lib/aws"
"github.com/cortexlabs/cortex/pkg/lib/errors"
"github.com/cortexlabs/cortex/pkg/lib/k8s"
"github.com/cortexlabs/cortex/pkg/lib/sets/strset"
"github.com/cortexlabs/cortex/pkg/lib/telemetry"
"github.com/cortexlabs/cortex/pkg/operator/config"
"github.com/cortexlabs/cortex/pkg/operator/lib/logging"
Expand All @@ -30,6 +31,7 @@ import (
)

var operatorLogger = logging.GetOperatorLogger()
var previousListOfEvictedPods = strset.New()

func DeleteEvictedPods() error {
failedPods, err := config.K8s.ListPods(&kmeta.ListOptions{
Expand All @@ -40,14 +42,21 @@ func DeleteEvictedPods() error {
}

var errs []error
currentEvictedPods := strset.New()
for _, pod := range failedPods {
if pod.Status.Reason == k8s.ReasonEvicted {
if pod.Status.Reason != k8s.ReasonEvicted {
continue
}
if previousListOfEvictedPods.Has(pod.Name) {
_, err := config.K8s.DeletePod(pod.Name)
if err != nil {
errs = append(errs, err)
}
continue
}
currentEvictedPods.Add(pod.Name)
}
previousListOfEvictedPods = currentEvictedPods

if errors.HasError(errs) {
return errors.FirstError(errs...)
Expand Down
36 changes: 22 additions & 14 deletions pkg/operator/resources/job/batchapi/cron.go
Original file line number Diff line number Diff line change
Expand Up @@ -284,8 +284,9 @@ func reconcileInProgressJob(jobState *job.State, queueURL *string, k8sJob *kbatc
}

func checkIfJobCompleted(jobKey spec.JobKey, queueURL string, k8sJob *kbatch.Job) error {
if int(k8sJob.Status.Failed) > 0 {
return investigateJobFailure(jobKey)
jobFailed, err := checkForJobFailure(jobKey, k8sJob)
if err != nil || jobFailed {
return err
}

queueMessages, err := getQueueMetricsFromURL(queueURL)
Expand Down Expand Up @@ -349,19 +350,18 @@ func checkIfJobCompleted(jobKey spec.JobKey, queueURL string, k8sJob *kbatch.Job
return nil
}

func investigateJobFailure(jobKey spec.JobKey) error {
reasonFound := false

func checkForJobFailure(jobKey spec.JobKey, k8sJob *kbatch.Job) (bool, error) {
jobLogger, err := operator.GetJobLogger(jobKey)
if err != nil {
return err
return false, err
}

reasonFound := false
pods, _ := config.K8s.ListPodsByLabel("jobID", jobKey.ID)
for _, pod := range pods {
if k8s.WasPodOOMKilled(&pod) {
jobLogger.Error("at least one worker was killed because it ran out of out of memory")
return errors.FirstError(
return true, errors.FirstError(
job.SetWorkerOOMStatus(jobKey),
deleteJobRuntimeResources(jobKey),
)
Expand All @@ -382,13 +382,21 @@ func investigateJobFailure(jobKey spec.JobKey) error {
}
}

if !reasonFound {
jobLogger.Error("workers were killed for unknown reason")
if int(k8sJob.Status.Failed) > 0 {
if !reasonFound {
jobLogger.Error("workers were killed for unknown reason")
}
return true, errors.FirstError(
job.SetWorkerErrorStatus(jobKey),
deleteJobRuntimeResources(jobKey),
)
} else if int(k8sJob.Status.Succeeded) == 1 && len(pods) == 0 {
// really unexpected situation which doesn't hurt if we check
return true, errors.FirstError(
job.SetUnexpectedErrorStatus(jobKey),
deleteJobRuntimeResources(jobKey),
)
}

return errors.FirstError(
err,
job.SetWorkerErrorStatus(jobKey),
deleteJobRuntimeResources(jobKey),
)
return false, nil
}
28 changes: 16 additions & 12 deletions pkg/operator/resources/job/taskapi/cron.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,27 +202,31 @@ func reconcileInProgressJob(jobState *job.State, k8sJob *kbatch.Job) (status.Job
}

func checkIfJobCompleted(jobKey spec.JobKey, k8sJob *kbatch.Job) error {
if int(k8sJob.Status.Failed) == 1 {
pods, _ := config.K8s.ListPodsByLabel("jobID", jobKey.ID)
for _, pod := range pods {
if k8s.WasPodOOMKilled(&pod) {
return errors.FirstError(
job.SetWorkerOOMStatus(jobKey),
deleteJobRuntimeResources(jobKey),
)
}
pods, _ := config.K8s.ListPodsByLabel("jobID", jobKey.ID)
for _, pod := range pods {
if k8s.WasPodOOMKilled(&pod) {
return errors.FirstError(
job.SetWorkerOOMStatus(jobKey),
deleteJobRuntimeResources(jobKey),
)
}
}
if int(k8sJob.Status.Failed) == 1 {
return errors.FirstError(
job.SetWorkerErrorStatus(jobKey),
deleteJobRuntimeResources(jobKey),
)
}

if int(k8sJob.Status.Succeeded) == 1 {
} else if int(k8sJob.Status.Succeeded) == 1 && len(pods) > 0 {
return errors.FirstError(
job.SetSucceededStatus(jobKey),
deleteJobRuntimeResources(jobKey),
)
} else if int(k8sJob.Status.Succeeded) == 1 && len(pods) == 0 {
// really unexpected situation which doesn't hurt if we check
return errors.FirstError(
job.SetUnexpectedErrorStatus(jobKey),
deleteJobRuntimeResources(jobKey),
)
}

return nil
Expand Down