diff --git a/pkg/lib/k8s/pod.go b/pkg/lib/k8s/pod.go index 734a500a4d..3d92852770 100644 --- a/pkg/lib/k8s/pod.go +++ b/pkg/lib/k8s/pod.go @@ -20,7 +20,6 @@ import ( "bytes" "context" "regexp" - "strings" "time" "github.com/cortexlabs/cortex/pkg/lib/errors" @@ -38,7 +37,12 @@ var _podTypeMeta = kmeta.TypeMeta{ Kind: "Pod", } -const ReasonEvicted = "Evicted" +// pod termination reasons +// https://github.com/kubernetes/kube-state-metrics/blob/master/docs/pod-metrics.md +const ( + ReasonEvicted = "Evicted" + ReasonOOMKilled = "OOMKilled" +) type PodStatus string @@ -148,22 +152,14 @@ func WasPodOOMKilled(pod *kcore.Pod) bool { return true } for _, containerStatus := range pod.Status.ContainerStatuses { + var reason string if containerStatus.LastTerminationState.Terminated != nil { - exitCode := containerStatus.LastTerminationState.Terminated.ExitCode - reason := strings.ToLower(containerStatus.LastTerminationState.Terminated.Reason) - if _killStatuses[exitCode] { - if strings.Contains(reason, "oom") { - return true - } - } + reason = containerStatus.LastTerminationState.Terminated.Reason } else if containerStatus.State.Terminated != nil { - exitCode := containerStatus.State.Terminated.ExitCode - reason := strings.ToLower(containerStatus.State.Terminated.Reason) - if _killStatuses[exitCode] { - if strings.Contains(reason, "oom") { - return true - } - } + reason = containerStatus.State.Terminated.Reason + } + if reason == ReasonOOMKilled { + return true } } @@ -194,25 +190,21 @@ func GetPodStatus(pod *kcore.Pod) PodStatus { } for _, containerStatus := range pod.Status.ContainerStatuses { + var reason string + var exitCode int32 if containerStatus.LastTerminationState.Terminated != nil { - exitCode := containerStatus.LastTerminationState.Terminated.ExitCode - reason := strings.ToLower(containerStatus.LastTerminationState.Terminated.Reason) - if _killStatuses[exitCode] { - if strings.Contains(reason, "oom") { - return PodStatusKilledOOM - } - return PodStatusKilled - } + reason = containerStatus.LastTerminationState.Terminated.Reason + exitCode = containerStatus.LastTerminationState.Terminated.ExitCode } else if containerStatus.State.Terminated != nil { - exitCode := containerStatus.State.Terminated.ExitCode - reason := strings.ToLower(containerStatus.State.Terminated.Reason) - if _killStatuses[exitCode] { - if strings.Contains(reason, "oom") { - return PodStatusKilledOOM - } - return PodStatusKilled - } + reason = containerStatus.State.Terminated.Reason + exitCode = containerStatus.State.Terminated.ExitCode } + if reason == ReasonOOMKilled { + return PodStatusKilledOOM + } else if _killStatuses[exitCode] { + return PodStatusKilled + } + } return PodStatusFailed case kcore.PodRunning: @@ -245,29 +237,25 @@ func PodStatusFromContainerStatuses(containerStatuses []kcore.ContainerStatus) P numRunning++ } else if containerStatus.State.Terminated != nil { exitCode := containerStatus.State.Terminated.ExitCode - reason := strings.ToLower(containerStatus.State.Terminated.Reason) - if exitCode == 0 { + reason := containerStatus.State.Terminated.Reason + if reason == ReasonOOMKilled { + numKilledOOM++ + } else if exitCode == 0 { numSucceeded++ } else if _killStatuses[exitCode] { - if strings.Contains(reason, "oom") { - numKilledOOM++ - } else { - numKilled++ - } + numKilled++ } else { numFailed++ } } else if containerStatus.LastTerminationState.Terminated != nil { exitCode := containerStatus.LastTerminationState.Terminated.ExitCode - reason := strings.ToLower(containerStatus.LastTerminationState.Terminated.Reason) - if exitCode == 0 { + reason := containerStatus.LastTerminationState.Terminated.Reason + if reason == ReasonOOMKilled { + numKilledOOM++ + } else if exitCode == 0 { numSucceeded++ } else if _killStatuses[exitCode] { - if strings.Contains(reason, "oom") { - numKilledOOM++ - } else { - numKilled++ - } + numKilled++ } else { numFailed++ } diff --git a/pkg/operator/main.go b/pkg/operator/main.go index 85ea9621d5..8de49f28fa 100644 --- a/pkg/operator/main.go +++ b/pkg/operator/main.go @@ -47,7 +47,7 @@ func main() { telemetry.Event("operator.init", map[string]interface{}{"provider": config.Provider}) - cron.Run(operator.DeleteEvictedPods, operator.ErrorHandler("delete evicted pods"), 12*time.Hour) + cron.Run(operator.DeleteEvictedPods, operator.ErrorHandler("delete evicted pods"), time.Hour) switch config.Provider { case types.AWSProviderType: diff --git a/pkg/operator/operator/cron.go b/pkg/operator/operator/cron.go index 40600d2374..60d9c984b5 100644 --- a/pkg/operator/operator/cron.go +++ b/pkg/operator/operator/cron.go @@ -22,6 +22,7 @@ import ( "github.com/cortexlabs/cortex/pkg/lib/aws" "github.com/cortexlabs/cortex/pkg/lib/errors" "github.com/cortexlabs/cortex/pkg/lib/k8s" + "github.com/cortexlabs/cortex/pkg/lib/sets/strset" "github.com/cortexlabs/cortex/pkg/lib/telemetry" "github.com/cortexlabs/cortex/pkg/operator/config" "github.com/cortexlabs/cortex/pkg/operator/lib/logging" @@ -30,6 +31,7 @@ import ( ) var operatorLogger = logging.GetOperatorLogger() +var previousListOfEvictedPods = strset.New() func DeleteEvictedPods() error { failedPods, err := config.K8s.ListPods(&kmeta.ListOptions{ @@ -40,14 +42,21 @@ func DeleteEvictedPods() error { } var errs []error + currentEvictedPods := strset.New() for _, pod := range failedPods { - if pod.Status.Reason == k8s.ReasonEvicted { + if pod.Status.Reason != k8s.ReasonEvicted { + continue + } + if previousListOfEvictedPods.Has(pod.Name) { _, err := config.K8s.DeletePod(pod.Name) if err != nil { errs = append(errs, err) } + continue } + currentEvictedPods.Add(pod.Name) } + previousListOfEvictedPods = currentEvictedPods if errors.HasError(errs) { return errors.FirstError(errs...) diff --git a/pkg/operator/resources/job/batchapi/cron.go b/pkg/operator/resources/job/batchapi/cron.go index 3ac97e486b..d2f6a5101a 100644 --- a/pkg/operator/resources/job/batchapi/cron.go +++ b/pkg/operator/resources/job/batchapi/cron.go @@ -284,8 +284,9 @@ func reconcileInProgressJob(jobState *job.State, queueURL *string, k8sJob *kbatc } func checkIfJobCompleted(jobKey spec.JobKey, queueURL string, k8sJob *kbatch.Job) error { - if int(k8sJob.Status.Failed) > 0 { - return investigateJobFailure(jobKey) + jobFailed, err := checkForJobFailure(jobKey, k8sJob) + if err != nil || jobFailed { + return err } queueMessages, err := getQueueMetricsFromURL(queueURL) @@ -349,19 +350,18 @@ func checkIfJobCompleted(jobKey spec.JobKey, queueURL string, k8sJob *kbatch.Job return nil } -func investigateJobFailure(jobKey spec.JobKey) error { - reasonFound := false - +func checkForJobFailure(jobKey spec.JobKey, k8sJob *kbatch.Job) (bool, error) { jobLogger, err := operator.GetJobLogger(jobKey) if err != nil { - return err + return false, err } + reasonFound := false pods, _ := config.K8s.ListPodsByLabel("jobID", jobKey.ID) for _, pod := range pods { if k8s.WasPodOOMKilled(&pod) { jobLogger.Error("at least one worker was killed because it ran out of out of memory") - return errors.FirstError( + return true, errors.FirstError( job.SetWorkerOOMStatus(jobKey), deleteJobRuntimeResources(jobKey), ) @@ -382,13 +382,21 @@ func investigateJobFailure(jobKey spec.JobKey) error { } } - if !reasonFound { - jobLogger.Error("workers were killed for unknown reason") + if int(k8sJob.Status.Failed) > 0 { + if !reasonFound { + jobLogger.Error("workers were killed for unknown reason") + } + return true, errors.FirstError( + job.SetWorkerErrorStatus(jobKey), + deleteJobRuntimeResources(jobKey), + ) + } else if int(k8sJob.Status.Succeeded) == 1 && len(pods) == 0 { + // really unexpected situation which doesn't hurt if we check + return true, errors.FirstError( + job.SetUnexpectedErrorStatus(jobKey), + deleteJobRuntimeResources(jobKey), + ) } - return errors.FirstError( - err, - job.SetWorkerErrorStatus(jobKey), - deleteJobRuntimeResources(jobKey), - ) + return false, nil } diff --git a/pkg/operator/resources/job/taskapi/cron.go b/pkg/operator/resources/job/taskapi/cron.go index ab2de9d895..6c895c39c7 100644 --- a/pkg/operator/resources/job/taskapi/cron.go +++ b/pkg/operator/resources/job/taskapi/cron.go @@ -202,27 +202,31 @@ func reconcileInProgressJob(jobState *job.State, k8sJob *kbatch.Job) (status.Job } func checkIfJobCompleted(jobKey spec.JobKey, k8sJob *kbatch.Job) error { - if int(k8sJob.Status.Failed) == 1 { - pods, _ := config.K8s.ListPodsByLabel("jobID", jobKey.ID) - for _, pod := range pods { - if k8s.WasPodOOMKilled(&pod) { - return errors.FirstError( - job.SetWorkerOOMStatus(jobKey), - deleteJobRuntimeResources(jobKey), - ) - } + pods, _ := config.K8s.ListPodsByLabel("jobID", jobKey.ID) + for _, pod := range pods { + if k8s.WasPodOOMKilled(&pod) { + return errors.FirstError( + job.SetWorkerOOMStatus(jobKey), + deleteJobRuntimeResources(jobKey), + ) } + } + if int(k8sJob.Status.Failed) == 1 { return errors.FirstError( job.SetWorkerErrorStatus(jobKey), deleteJobRuntimeResources(jobKey), ) - } - - if int(k8sJob.Status.Succeeded) == 1 { + } else if int(k8sJob.Status.Succeeded) == 1 && len(pods) > 0 { return errors.FirstError( job.SetSucceededStatus(jobKey), deleteJobRuntimeResources(jobKey), ) + } else if int(k8sJob.Status.Succeeded) == 1 && len(pods) == 0 { + // really unexpected situation which doesn't hurt if we check + return errors.FirstError( + job.SetUnexpectedErrorStatus(jobKey), + deleteJobRuntimeResources(jobKey), + ) } return nil