Skip to content

Commit

Permalink
Initial commit of queue_job plus required helper methods (not finished).
Browse files Browse the repository at this point in the history
  • Loading branch information
pcm32 committed Apr 7, 2016
1 parent 6384258 commit d5abfb6
Showing 1 changed file with 80 additions and 44 deletions.
124 changes: 80 additions & 44 deletions lib/galaxy/jobs/runners/kubernetes.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,71 +69,107 @@ def parse_destination_params( self, params ):
return split_params( params )

def queue_job( self, job_wrapper ):
"""Create job script and submit it to the DRM"""
"""Create job script and submit it to Kubernetes cluster"""
# prepare the job
# TODO understand weather we need include_metadata and include_work_dir_outputs
if not self.prepare_job( job_wrapper, include_metadata=True ):
return

# Get shell and job execution interface
job_destination = job_wrapper.job_destination
shell_params, job_params = self.parse_destination_params(job_destination.params)
shell, job_interface = self.get_cli_plugins(shell_params, job_params)

# Determine the job's Kubernetes destination (context, namespace) and options from the job destination definition

k8s_job_obj = {
"apiVersion": "batch/v1",
"kind": "Job",
"metadata":
# metadata.name is the name of the pod resource created, and must be unique
# http://kubernetes.io/docs/user-guide/configuring-containers/
{"name": self.__produce_unique_k8s_job_name(job_wrapper)}
,
"spec": self.__produce_k8s_job_spec(job_wrapper)}


# wrapper.get_id_tag() instead of job_id for compatibility with TaskWrappers.
galaxy_id_tag = job_wrapper.get_id_tag()

k8s_job = Job(self._pykube_api, k8s_job_obj).create()


# define job attributes
ajs = AsynchronousJobState( files_dir=job_wrapper.working_directory, job_wrapper=job_wrapper )

job_file_kwargs = job_interface.job_script_kwargs(ajs.output_file, ajs.error_file, ajs.job_name)
script = self.get_job_file(
job_wrapper,
exit_code_path=ajs.exit_code_file,
**job_file_kwargs
)
# external_runJob_script can be None, in which case it's not used.
external_runjob_script = job_wrapper.get_destination_configuration("drmaa_external_runjob_script", None)

def __produce_unique_k8s_job_name(self, job_wrapper):
return job_wrapper.get_id_tag() + "-" +

def __get_k8s_job_spec(self, job_wrapper):
"""Creates the k8s Job spec. For a Job spec, the only requirement is to have a .spec.template."""
k8s_job_spec = {"template": self.__get_k8s_job_spec_template(job_wrapper)}
return k8s_job_spec

def __get_k8s_job_spec_template(self, job_wrapper):
"""The spec template is nothing but a Pod spec, except that it is nested and does not have an apiversion
nor kind. In addition to required fields for a Pod, a pod template in a job must specify appropriate labels
(see pod selector) and an appropriate restart policy."""
k8s_spec_template = {
"volumes": self.__get_k8s_mountable_volumes(self, job_wrapper),
"containers": self.__get_k8s_containers(self, job_wrapper),
"restartPolicy": self.__get_k8s_restart_policy(self, job_wrapper)
}
# TODO include other relevant elements that people might want to use from
# TODO http://kubernetes.io/docs/api-reference/v1/definitions/#_v1_podspec

return k8s_spec_template

def __get_k8s_restart_policy(self, job_wrapper):
"""The default Kubernetes restart policy for Jobs"""
return "Never"

def __get_k8s_mountable_volumes(self, job_wrapper):
"""Provides the required volumes that the containers in the pod should be able to mount. This should be using
the new persistent volumes and persistent volumes claim objects.
"""


try:
self.write_executable_script( ajs.job_file, script )
except:
log.exception("(%s) failure writing job script" % galaxy_id_tag )
job_wrapper.fail("failure preparing job script", exception=True)
return
def __get_k8s_containers(self, job_wrapper):
"""Fills in all required for setting up the docker containers to be used."""
k8s_container = {
"name": self.__get_k8s_container_name(job_wrapper),
"image": self.__assemble_k8s_container_image_name(job_wrapper)
}

# job was deleted while we were preparing it
if job_wrapper.get_state() == model.Job.states.DELETED:
log.info("(%s) Job deleted by user before it entered the queue" % galaxy_id_tag )
if job_wrapper.cleanup_job in ("always", "onsuccess"):
job_wrapper.cleanup()
return
if self.__requires_ports(job_wrapper):
k8s_container['ports'] = self.__get_k8s_containers_ports(job_wrapper)

log.debug( "(%s) submitting file: %s" % ( galaxy_id_tag, ajs.job_file ) )
return k8s_container

cmd_out = shell.execute(job_interface.submit(ajs.job_file))
if cmd_out.returncode != 0:
log.error('(%s) submission failed (stdout): %s' % (galaxy_id_tag, cmd_out.stdout))
log.error('(%s) submission failed (stderr): %s' % (galaxy_id_tag, cmd_out.stderr))
job_wrapper.fail("failure submitting job")
return
# Some job runners return something like 'Submitted batch job XXXX'
# Strip and split to get job ID.
external_job_id = cmd_out.stdout.strip().split()[-1]
if not external_job_id:
log.error('(%s) submission did not return a job identifier, failing job' % galaxy_id_tag)
job_wrapper.fail("failure submitting job")
return
def __assemble_k8s_container_image_name(self, job_wrapper):
"""Assembles the container image name as repo/owner/image:tag, where repo, owner and tag are optional"""
job_destination = job_wrapper.job_destination

# Determine the job's Kubernetes destination (context, namespace) and options from the job destination definition
repo = ""
owner = ""
if 'repo' in job_destination.params:
repo = job_destination.params['repo']+"/"
if 'owner' in job_destination.params:
owner = job_destination.params['owner']+"/"

k8s_cont_image = repo + owner + job_destination.params['image']

log.info("(%s) queued with identifier: %s" % ( galaxy_id_tag, external_job_id ) )
if 'tag' in job_destination.params:
k8s_cont_image += ":" + job_destination.params['tag']

# store runner information for tracking if Galaxy restarts
job_wrapper.set_job_destination( job_destination, external_job_id )
return k8s_cont_image

# Store state information for job
ajs.job_id = external_job_id
ajs.old_state = 'new'
ajs.job_destination = job_destination
def __get_k8s_container_name(self, job_wrapper):
# TODO check if this is correct
return job_wrapper.job_destination.id

# Add to our 'queue' of jobs to monitor
self.monitor_queue.put( ajs )

def check_watched_items( self ):
"""
Expand Down

0 comments on commit d5abfb6

Please sign in to comment.