From d87bc25401138ba0c57e4e4fba6250549a723476 Mon Sep 17 00:00:00 2001 From: Robert Lucian Chiriac Date: Wed, 20 Jan 2021 02:16:23 +0200 Subject: [PATCH 1/2] Fix OOM status reports for Task/Batch APIs --- pkg/lib/k8s/pod.go | 80 ++++++++++----------- pkg/operator/main.go | 2 +- pkg/operator/operator/cron.go | 12 +++- pkg/operator/resources/job/batchapi/cron.go | 33 +++++---- pkg/operator/resources/job/taskapi/cron.go | 29 ++++---- 5 files changed, 86 insertions(+), 70 deletions(-) diff --git a/pkg/lib/k8s/pod.go b/pkg/lib/k8s/pod.go index 734a500a4d..fd11e52350 100644 --- a/pkg/lib/k8s/pod.go +++ b/pkg/lib/k8s/pod.go @@ -20,7 +20,6 @@ import ( "bytes" "context" "regexp" - "strings" "time" "github.com/cortexlabs/cortex/pkg/lib/errors" @@ -38,7 +37,12 @@ var _podTypeMeta = kmeta.TypeMeta{ Kind: "Pod", } -const ReasonEvicted = "Evicted" +// pod termination reasons +// https://github.com/kubernetes/kube-state-metrics/blob/master/docs/pod-metrics.md +const ( + ReasonEvicted = "Evicted" + ReasonOOMKilled = "OOMKilled" +) type PodStatus string @@ -148,22 +152,14 @@ func WasPodOOMKilled(pod *kcore.Pod) bool { return true } for _, containerStatus := range pod.Status.ContainerStatuses { + var reason string if containerStatus.LastTerminationState.Terminated != nil { - exitCode := containerStatus.LastTerminationState.Terminated.ExitCode - reason := strings.ToLower(containerStatus.LastTerminationState.Terminated.Reason) - if _killStatuses[exitCode] { - if strings.Contains(reason, "oom") { - return true - } - } + reason = containerStatus.LastTerminationState.Terminated.Reason } else if containerStatus.State.Terminated != nil { - exitCode := containerStatus.State.Terminated.ExitCode - reason := strings.ToLower(containerStatus.State.Terminated.Reason) - if _killStatuses[exitCode] { - if strings.Contains(reason, "oom") { - return true - } - } + reason = containerStatus.State.Terminated.Reason + } + if reason == ReasonOOMKilled { + return true } } @@ -194,25 +190,21 @@ func GetPodStatus(pod *kcore.Pod) PodStatus { } for _, containerStatus := range pod.Status.ContainerStatuses { + var reason string + var exitCode int32 if containerStatus.LastTerminationState.Terminated != nil { - exitCode := containerStatus.LastTerminationState.Terminated.ExitCode - reason := strings.ToLower(containerStatus.LastTerminationState.Terminated.Reason) - if _killStatuses[exitCode] { - if strings.Contains(reason, "oom") { - return PodStatusKilledOOM - } - return PodStatusKilled - } + reason = containerStatus.LastTerminationState.Terminated.Reason + exitCode = containerStatus.LastTerminationState.Terminated.ExitCode } else if containerStatus.State.Terminated != nil { - exitCode := containerStatus.State.Terminated.ExitCode - reason := strings.ToLower(containerStatus.State.Terminated.Reason) - if _killStatuses[exitCode] { - if strings.Contains(reason, "oom") { - return PodStatusKilledOOM - } - return PodStatusKilled - } + reason = containerStatus.State.Terminated.Reason + exitCode = containerStatus.State.Terminated.ExitCode + } + if reason == ReasonOOMKilled { + return PodStatusKilledOOM + } else if _killStatuses[exitCode] { + return PodStatusKilled } + } return PodStatusFailed case kcore.PodRunning: @@ -245,29 +237,29 @@ func PodStatusFromContainerStatuses(containerStatuses []kcore.ContainerStatus) P numRunning++ } else if containerStatus.State.Terminated != nil { exitCode := containerStatus.State.Terminated.ExitCode - reason := strings.ToLower(containerStatus.State.Terminated.Reason) + reason := containerStatus.State.Terminated.Reason + if reason == ReasonOOMKilled { + numKilledOOM++ + continue + } if exitCode == 0 { numSucceeded++ } else if _killStatuses[exitCode] { - if strings.Contains(reason, "oom") { - numKilledOOM++ - } else { - numKilled++ - } + numKilled++ } else { numFailed++ } } else if containerStatus.LastTerminationState.Terminated != nil { exitCode := containerStatus.LastTerminationState.Terminated.ExitCode - reason := strings.ToLower(containerStatus.LastTerminationState.Terminated.Reason) + reason := containerStatus.LastTerminationState.Terminated.Reason + if reason == ReasonOOMKilled { + numKilledOOM++ + continue + } if exitCode == 0 { numSucceeded++ } else if _killStatuses[exitCode] { - if strings.Contains(reason, "oom") { - numKilledOOM++ - } else { - numKilled++ - } + numKilled++ } else { numFailed++ } diff --git a/pkg/operator/main.go b/pkg/operator/main.go index 85ea9621d5..8de49f28fa 100644 --- a/pkg/operator/main.go +++ b/pkg/operator/main.go @@ -47,7 +47,7 @@ func main() { telemetry.Event("operator.init", map[string]interface{}{"provider": config.Provider}) - cron.Run(operator.DeleteEvictedPods, operator.ErrorHandler("delete evicted pods"), 12*time.Hour) + cron.Run(operator.DeleteEvictedPods, operator.ErrorHandler("delete evicted pods"), time.Hour) switch config.Provider { case types.AWSProviderType: diff --git a/pkg/operator/operator/cron.go b/pkg/operator/operator/cron.go index 40600d2374..d6d4f8ccde 100644 --- a/pkg/operator/operator/cron.go +++ b/pkg/operator/operator/cron.go @@ -22,6 +22,7 @@ import ( "github.com/cortexlabs/cortex/pkg/lib/aws" "github.com/cortexlabs/cortex/pkg/lib/errors" "github.com/cortexlabs/cortex/pkg/lib/k8s" + "github.com/cortexlabs/cortex/pkg/lib/sets/strset" "github.com/cortexlabs/cortex/pkg/lib/telemetry" "github.com/cortexlabs/cortex/pkg/operator/config" "github.com/cortexlabs/cortex/pkg/operator/lib/logging" @@ -30,6 +31,7 @@ import ( ) var operatorLogger = logging.GetOperatorLogger() +var previousListOfEvictedPods = strset.New() func DeleteEvictedPods() error { failedPods, err := config.K8s.ListPods(&kmeta.ListOptions{ @@ -40,14 +42,22 @@ func DeleteEvictedPods() error { } var errs []error + currentEvictedPods := strset.New() for _, pod := range failedPods { - if pod.Status.Reason == k8s.ReasonEvicted { + if pod.Status.Reason != k8s.ReasonEvicted { + continue + } + if previousListOfEvictedPods.Has(pod.Name) { _, err := config.K8s.DeletePod(pod.Name) if err != nil { errs = append(errs, err) } + previousListOfEvictedPods.Remove(pod.Name) + continue } + currentEvictedPods.Add(pod.Name) } + previousListOfEvictedPods = currentEvictedPods if errors.HasError(errs) { return errors.FirstError(errs...) diff --git a/pkg/operator/resources/job/batchapi/cron.go b/pkg/operator/resources/job/batchapi/cron.go index 3ac97e486b..d01fa8fb5e 100644 --- a/pkg/operator/resources/job/batchapi/cron.go +++ b/pkg/operator/resources/job/batchapi/cron.go @@ -284,8 +284,9 @@ func reconcileInProgressJob(jobState *job.State, queueURL *string, k8sJob *kbatc } func checkIfJobCompleted(jobKey spec.JobKey, queueURL string, k8sJob *kbatch.Job) error { - if int(k8sJob.Status.Failed) > 0 { - return investigateJobFailure(jobKey) + err := investigatePossibleJobFailure(jobKey, k8sJob) + if err != nil { + return err } queueMessages, err := getQueueMetricsFromURL(queueURL) @@ -349,14 +350,13 @@ func checkIfJobCompleted(jobKey spec.JobKey, queueURL string, k8sJob *kbatch.Job return nil } -func investigateJobFailure(jobKey spec.JobKey) error { - reasonFound := false - +func investigatePossibleJobFailure(jobKey spec.JobKey, k8sJob *kbatch.Job) error { jobLogger, err := operator.GetJobLogger(jobKey) if err != nil { return err } + reasonFound := false pods, _ := config.K8s.ListPodsByLabel("jobID", jobKey.ID) for _, pod := range pods { if k8s.WasPodOOMKilled(&pod) { @@ -382,13 +382,22 @@ func investigateJobFailure(jobKey spec.JobKey) error { } } - if !reasonFound { - jobLogger.Error("workers were killed for unknown reason") + if int(k8sJob.Status.Failed) > 0 { + if !reasonFound { + jobLogger.Error("workers were killed for unknown reason") + } + return errors.FirstError( + job.SetWorkerErrorStatus(jobKey), + deleteJobRuntimeResources(jobKey), + ) + } else if int(k8sJob.Status.Succeeded) == 1 && len(pods) == 0 { + // pods could have been marked as evicted and removed by the evicter cron + // not ideal, but we can at least mark it as errored + return errors.FirstError( + job.SetUnexpectedErrorStatus(jobKey), + deleteJobRuntimeResources(jobKey), + ) } - return errors.FirstError( - err, - job.SetWorkerErrorStatus(jobKey), - deleteJobRuntimeResources(jobKey), - ) + return nil } diff --git a/pkg/operator/resources/job/taskapi/cron.go b/pkg/operator/resources/job/taskapi/cron.go index ab2de9d895..2250a77837 100644 --- a/pkg/operator/resources/job/taskapi/cron.go +++ b/pkg/operator/resources/job/taskapi/cron.go @@ -202,27 +202,32 @@ func reconcileInProgressJob(jobState *job.State, k8sJob *kbatch.Job) (status.Job } func checkIfJobCompleted(jobKey spec.JobKey, k8sJob *kbatch.Job) error { - if int(k8sJob.Status.Failed) == 1 { - pods, _ := config.K8s.ListPodsByLabel("jobID", jobKey.ID) - for _, pod := range pods { - if k8s.WasPodOOMKilled(&pod) { - return errors.FirstError( - job.SetWorkerOOMStatus(jobKey), - deleteJobRuntimeResources(jobKey), - ) - } + pods, _ := config.K8s.ListPodsByLabel("jobID", jobKey.ID) + for _, pod := range pods { + if k8s.WasPodOOMKilled(&pod) { + return errors.FirstError( + job.SetWorkerOOMStatus(jobKey), + deleteJobRuntimeResources(jobKey), + ) } + } + if int(k8sJob.Status.Failed) == 1 { return errors.FirstError( job.SetWorkerErrorStatus(jobKey), deleteJobRuntimeResources(jobKey), ) - } - - if int(k8sJob.Status.Succeeded) == 1 { + } else if int(k8sJob.Status.Succeeded) == 1 && len(pods) > 0 { return errors.FirstError( job.SetSucceededStatus(jobKey), deleteJobRuntimeResources(jobKey), ) + } else if int(k8sJob.Status.Succeeded) == 1 && len(pods) == 0 { + // pods could have been marked as evicted and removed by the evicter cron + // not ideal, but we can at least mark it as errored + return errors.FirstError( + job.SetUnexpectedErrorStatus(jobKey), + deleteJobRuntimeResources(jobKey), + ) } return nil From e1ce8b6ac23826945cc88fb7caf96a2d04dda5c9 Mon Sep 17 00:00:00 2001 From: Robert Lucian Chiriac Date: Wed, 20 Jan 2021 04:05:16 +0200 Subject: [PATCH 2/2] Address PR comments --- pkg/lib/k8s/pod.go | 8 ++------ pkg/operator/operator/cron.go | 1 - pkg/operator/resources/job/batchapi/cron.go | 19 +++++++++---------- pkg/operator/resources/job/taskapi/cron.go | 3 +-- 4 files changed, 12 insertions(+), 19 deletions(-) diff --git a/pkg/lib/k8s/pod.go b/pkg/lib/k8s/pod.go index fd11e52350..3d92852770 100644 --- a/pkg/lib/k8s/pod.go +++ b/pkg/lib/k8s/pod.go @@ -240,9 +240,7 @@ func PodStatusFromContainerStatuses(containerStatuses []kcore.ContainerStatus) P reason := containerStatus.State.Terminated.Reason if reason == ReasonOOMKilled { numKilledOOM++ - continue - } - if exitCode == 0 { + } else if exitCode == 0 { numSucceeded++ } else if _killStatuses[exitCode] { numKilled++ @@ -254,9 +252,7 @@ func PodStatusFromContainerStatuses(containerStatuses []kcore.ContainerStatus) P reason := containerStatus.LastTerminationState.Terminated.Reason if reason == ReasonOOMKilled { numKilledOOM++ - continue - } - if exitCode == 0 { + } else if exitCode == 0 { numSucceeded++ } else if _killStatuses[exitCode] { numKilled++ diff --git a/pkg/operator/operator/cron.go b/pkg/operator/operator/cron.go index d6d4f8ccde..60d9c984b5 100644 --- a/pkg/operator/operator/cron.go +++ b/pkg/operator/operator/cron.go @@ -52,7 +52,6 @@ func DeleteEvictedPods() error { if err != nil { errs = append(errs, err) } - previousListOfEvictedPods.Remove(pod.Name) continue } currentEvictedPods.Add(pod.Name) diff --git a/pkg/operator/resources/job/batchapi/cron.go b/pkg/operator/resources/job/batchapi/cron.go index d01fa8fb5e..d2f6a5101a 100644 --- a/pkg/operator/resources/job/batchapi/cron.go +++ b/pkg/operator/resources/job/batchapi/cron.go @@ -284,8 +284,8 @@ func reconcileInProgressJob(jobState *job.State, queueURL *string, k8sJob *kbatc } func checkIfJobCompleted(jobKey spec.JobKey, queueURL string, k8sJob *kbatch.Job) error { - err := investigatePossibleJobFailure(jobKey, k8sJob) - if err != nil { + jobFailed, err := checkForJobFailure(jobKey, k8sJob) + if err != nil || jobFailed { return err } @@ -350,10 +350,10 @@ func checkIfJobCompleted(jobKey spec.JobKey, queueURL string, k8sJob *kbatch.Job return nil } -func investigatePossibleJobFailure(jobKey spec.JobKey, k8sJob *kbatch.Job) error { +func checkForJobFailure(jobKey spec.JobKey, k8sJob *kbatch.Job) (bool, error) { jobLogger, err := operator.GetJobLogger(jobKey) if err != nil { - return err + return false, err } reasonFound := false @@ -361,7 +361,7 @@ func investigatePossibleJobFailure(jobKey spec.JobKey, k8sJob *kbatch.Job) error for _, pod := range pods { if k8s.WasPodOOMKilled(&pod) { jobLogger.Error("at least one worker was killed because it ran out of out of memory") - return errors.FirstError( + return true, errors.FirstError( job.SetWorkerOOMStatus(jobKey), deleteJobRuntimeResources(jobKey), ) @@ -386,18 +386,17 @@ func investigatePossibleJobFailure(jobKey spec.JobKey, k8sJob *kbatch.Job) error if !reasonFound { jobLogger.Error("workers were killed for unknown reason") } - return errors.FirstError( + return true, errors.FirstError( job.SetWorkerErrorStatus(jobKey), deleteJobRuntimeResources(jobKey), ) } else if int(k8sJob.Status.Succeeded) == 1 && len(pods) == 0 { - // pods could have been marked as evicted and removed by the evicter cron - // not ideal, but we can at least mark it as errored - return errors.FirstError( + // really unexpected situation which doesn't hurt if we check + return true, errors.FirstError( job.SetUnexpectedErrorStatus(jobKey), deleteJobRuntimeResources(jobKey), ) } - return nil + return false, nil } diff --git a/pkg/operator/resources/job/taskapi/cron.go b/pkg/operator/resources/job/taskapi/cron.go index 2250a77837..6c895c39c7 100644 --- a/pkg/operator/resources/job/taskapi/cron.go +++ b/pkg/operator/resources/job/taskapi/cron.go @@ -222,8 +222,7 @@ func checkIfJobCompleted(jobKey spec.JobKey, k8sJob *kbatch.Job) error { deleteJobRuntimeResources(jobKey), ) } else if int(k8sJob.Status.Succeeded) == 1 && len(pods) == 0 { - // pods could have been marked as evicted and removed by the evicter cron - // not ideal, but we can at least mark it as errored + // really unexpected situation which doesn't hurt if we check return errors.FirstError( job.SetUnexpectedErrorStatus(jobKey), deleteJobRuntimeResources(jobKey),