Skip to content

Commit

Permalink
Uses k8s pod log as stdout/stderr.
Browse files Browse the repository at this point in the history
  • Loading branch information
pcm32 committed Apr 18, 2016
1 parent bdf283e commit 8ff8c1c
Showing 1 changed file with 21 additions and 1 deletion.
22 changes: 21 additions & 1 deletion lib/galaxy/jobs/runners/kubernetes.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from galaxy.jobs import JobDestination
from galaxy.jobs.runners import AsynchronousJobState, AsynchronousJobRunner
from .util.cli import CliInterface, split_params
from os import sep as os_sep

# pykube imports:
try:
Expand Down Expand Up @@ -218,9 +219,13 @@ def check_watched_item(self, job_state):
if 'failed' in job.obj['status']:
failed = job.obj['status']['failed']

# This assumes jobs dependent on a single pod.
# This assumes jobs dependent on a single pod, single container
if succeeded > 0:
logs_file_path = self.__produce_log_file(job_state)
job_state.output_file = logs_file_path

self.mark_as_finished(job_state)

elif active > 0 or succeeded + active + failed == 0:
self.mark_as_queued(job_state)
elif failed > job_state.job_destination.params['max_pod_retrials']:
Expand All @@ -235,9 +240,24 @@ def check_watched_item(self, job_state):
# TODO: possibly some warning or message should be provided here to stderr
# TODO: of the job
# there is more than one job associated to the expected unique job id used as selector.
job_state.error_file = self.__produce_log_file(job_state)
self.mark_as_failed(job_state)
return job_state

def __produce_log_file(self, job_state):
pod_r = Pod.objects(self._pykube_api).filter(selector="app=" + job_state.job_id)
logs = ""
for pod_obj in pod_r.response['items']:
pod = Pod(self._pykube_api, pod_obj)
logs += "\n\n==== Pod " + pod.name + " log start ====\n\n"
logs += pod.get_logs(timestamps=True)
logs += "\n\n==== Pod " + pod.name + " log end ===="
logs_file_path = job_state.files_dir + os_sep + pod.name + '.log'
logs_file = open(logs_file_path)
logs_file.write(logs)
logs_file.close()
return logs_file_path

def stop_job( self, job ):
"""Attempts to delete a dispatched job to the k8s cluster"""
try:
Expand Down

0 comments on commit 8ff8c1c

Please sign in to comment.