Skip to content

Commit

Permalink
k8s runner: Adds ability to detect OOMKilled jobs and signals it for …
Browse files Browse the repository at this point in the history
…resubmission.
  • Loading branch information
pcm32 committed Oct 16, 2017
1 parent ca2e121 commit 7654935
Showing 1 changed file with 37 additions and 10 deletions.
47 changes: 37 additions & 10 deletions lib/galaxy/jobs/runners/kubernetes.py
Expand Up @@ -11,7 +11,8 @@
from galaxy import model
from galaxy.jobs.runners import (
AsynchronousJobRunner,
AsynchronousJobState
AsynchronousJobState,
JobState
)

# pykube imports:
Expand Down Expand Up @@ -394,19 +395,13 @@ def check_watched_item(self, job_state):
job_state.running = False
self.mark_as_finished(job_state)
return None
elif failed > 0 and self.__job_failed_due_to_low_memory(job_state):
return self._handle_job_failure(job, job_state, reason="OOM")
elif active > 0 and failed <= max_pod_retrials:
job_state.running = True
return job_state
elif failed > max_pod_retrials:
self.__produce_log_file(job_state)
error_file = open(job_state.error_file, 'w')
error_file.write("Exceeded max number of Kubernetes pod retrials allowed for job\n")
error_file.close()
job_state.running = False
job_state.fail_message = "More pods failed than allowed. See stdout for pods details."
self.mark_as_failed(job_state)
job.scale(replicas=0)
return None
return self._handle_job_failure(job, job_state)
# We should not get here
log.debug(
"Reaching unexpected point for Kubernetes job, where it is not classified as succ., active nor failed.")
Expand All @@ -430,6 +425,38 @@ def check_watched_item(self, job_state):
self.mark_as_failed(job_state)
return job_state

def _handle_job_failure(self, job, job_state, reason=None):
self.__produce_log_file(job_state)
error_file = open(job_state.error_file, 'w')
if reason == "OOM":
error_file.write("Job killed after running out of memory. Try with more memory.\n")
job_state.fail_message = "Tool failed due to insufficient memory. Try with more memory."
job_state.runner_state = JobState.runner_states.MEMORY_LIMIT_REACHED
else:
error_file.write("Exceeded max number of Kubernetes pod retrials allowed for job\n")
job_state.fail_message = "More pods failed than allowed. See stdout for pods details."
error_file.close()
job_state.running = False
self.mark_as_failed(job_state)
job.scale(replicas=0)
return None

def __job_failed_due_to_low_memory(self, job_state):
"""
checks the state of the pod to see if it was killed
for being out of memory (pod status OOMKilled). If that is the case
marks the job for resubmission (resubmit logic is part of destinations).
"""

pods = Pod.objects(self._pykube_api).filter(selector="app=" + job_state.job_id)
pod = Pod(self._pykube_api, pods.response['items'][0])

if pod.obj['status']['phase'] == "Failed" and \
pod.obj['status']['containerStatuses'][0]['state']['terminated']['reason'] == "OOMKilled":
return True

return False

def fail_job(self, job_state):
"""
Kubernetes runner overrides fail_job (called by mark_as_failed) to rescue the pod's log files which are left as
Expand Down

0 comments on commit 7654935

Please sign in to comment.