Skip to content

Commit

Permalink
Allow adding multiple volume claims
Browse files Browse the repository at this point in the history
  • Loading branch information
mvdbeek committed Feb 19, 2019
1 parent 626657e commit 7cf161b
Show file tree
Hide file tree
Showing 3 changed files with 48 additions and 47 deletions.
16 changes: 7 additions & 9 deletions config/job_conf.xml.sample_advanced
Expand Up @@ -165,7 +165,7 @@
kubectl create -f <path/to/persistent_volume.yaml>
kubectl create -f <path/to/pv_claim.yaml>
pointing of course to the same Kubernetes cluster that you intend to use.
pointing to the Kubernetes cluster that you intend to use.
-->

<param id="k8s_config_path">/path/to/kubeconfig</param>
Expand All @@ -191,14 +191,12 @@
Kubernetes 1.2 and on. Changing this a much newer version in the future might require changes to the
plugin runner code. Value extensions/v1beta1 is also supported for pre 1.2 legacy installations.
-->
<param id="k8s_persistent_volume_claim_name">galaxy_pvc</param>
<!-- The name of the Persisten Volume Claim (PVC) to be used, details above, needs to match the PVC's
metadata:name -->

<param id="k8s_persistent_volume_claim_mount_path">/scratch1/galaxy_data</param>
<!-- The mount path needs to be parent directory of the "file_path" and "new_file_path" paths
set in universe_wsgi.ini (or equivalent general galaxy config file). This is the mount path of the
PVC within the docker container that will be actually running the tool -->
<param id="k8s_persistent_volume_claims">galaxy_pvc1:/mount_point1,galaxy_pvc2:/mount_point2</param>
<!-- Comma separated list of Persistent Volume Claim (PVC) to container mount point mappings, in the format
PVC:mount point
Typical mount paths are the file_path, job_working_directory, all paths containing tools and scripts
and all library paths set in galaxy.yml (or equivalent general galaxy config file). -->

<!-- <param id="k8s_namespace">default</param> -->
<!-- The namespace to be used on the Kubernetes cluster, if different from default, this needs to be set
Expand Down
33 changes: 11 additions & 22 deletions lib/galaxy/jobs/runners/kubernetes.py
Expand Up @@ -47,8 +47,7 @@ def __init__(self, app, nworkers, **kwargs):
runner_param_specs = dict(
k8s_config_path=dict(map=str, default=os.environ.get('KUBECONFIG', None)),
k8s_use_service_account=dict(map=bool, default=False),
k8s_persistent_volume_claim_name=dict(map=str),
k8s_persistent_volume_claim_mount_path=dict(map=str),
k8s_persistent_volume_claims=dict(map=str),
k8s_namespace=dict(map=str, default="default"),
k8s_galaxy_instance_id=dict(map=str),
k8s_timeout_seconds_job_deletion=dict(map=int, valid=lambda x: int > 0, default=30),
Expand All @@ -73,7 +72,6 @@ def __init__(self, app, nworkers, **kwargs):
self._pykube_api = HTTPClient(KubeConfig.from_service_account())
else:
self._pykube_api = HTTPClient(KubeConfig.from_file(self.runner_params["k8s_config_path"]))
self._galaxy_vol_name = "pvc-galaxy" # TODO this needs to be read from params!!

self._galaxy_instance_id = self.__get_galaxy_instance_id()

Expand All @@ -83,6 +81,14 @@ def __init__(self, app, nworkers, **kwargs):

self._init_monitor_thread()
self._init_worker_threads()
self.setup_volumes()

def setup_volumes(self):
volume_claims = dict(volume.split(":") for volume in self.runner_params['k8s_persistent_volume_claims'].split(','))
mountable_volumes = [{'name': claim_name, 'persistentVolumeClaim': {'claimName': claim_name}} for claim_name in volume_claims]
self.runner_params['k8s_mountable_volumes'] = mountable_volumes
volume_mounts = [{'name': claim_name, 'mountPath': mount_path} for claim_name, mount_path in volume_claims.items()]
self.runner_params['k8s_volume_mounts'] = volume_mounts

def queue_job(self, job_wrapper):
"""Create job script and submit it to Kubernetes cluster"""
Expand Down Expand Up @@ -224,7 +230,7 @@ def __get_k8s_job_spec_template(self, ajs):
"labels": {"app": self.__produce_unique_k8s_job_name(ajs.job_wrapper.get_id_tag())}
},
"spec": {
"volumes": self.__get_k8s_mountable_volumes(ajs.job_wrapper),
"volumes": self.runner_params['k8s_mountable_volumes'],
"restartPolicy": self.__get_k8s_restart_policy(ajs.job_wrapper),
"containers": self.__get_k8s_containers(ajs)
}
Expand All @@ -246,20 +252,6 @@ def __get_k8s_restart_policy(self, job_wrapper):
"""The default Kubernetes restart policy for Jobs"""
return "Never"

def __get_k8s_mountable_volumes(self, job_wrapper):
"""Provides the required volumes that the containers in the pod should be able to mount. This should be using
the new persistent volumes and persistent volumes claim objects. This requires that both a PersistentVolume and
a PersistentVolumeClaim are created before starting galaxy (starting a k8s job).
"""
# TODO on this initial version we only support a single volume to be mounted.
k8s_mountable_volume = {
"name": self._galaxy_vol_name,
"persistentVolumeClaim": {
"claimName": self.runner_params['k8s_persistent_volume_claim_name']
}
}
return [k8s_mountable_volume]

def __get_k8s_containers(self, ajs):
"""Fills in all required for setting up the docker containers to be used, including setting a pull policy if
this has been set.
Expand All @@ -285,10 +277,7 @@ def __get_k8s_containers(self, ajs):
"command": [ajs.job_wrapper.shell],
"args": ["-c", ajs.job_file],
"workingDir": ajs.job_wrapper.working_directory,
"volumeMounts": [{
"mountPath": self.runner_params['k8s_persistent_volume_claim_mount_path'],
"name": self._galaxy_vol_name
}]
"volumeMounts": self.runner_params['k8s_volume_mounts']
}

resources = self.__get_resources(ajs.job_wrapper)
Expand Down
46 changes: 30 additions & 16 deletions test/integration/test_kubernetes_runner.py
Expand Up @@ -16,6 +16,7 @@
PERSISTENT_VOLUME_NAME = 'pv-galaxy-integration-test'
PERSISTENT_VOLUME_CLAIM_NAME = 'galaxy-pvc-integration-test'
Config = collections.namedtuple('ConfigTuple', 'path')
TOOL_DIR = os.path.abspath(os.path.join(os.path.dirname(__file__), os.pardir, os.pardir, 'tools'))


class KubeSetupConfigTuple(Config):
Expand All @@ -27,7 +28,7 @@ def teardown(self):
subprocess.check_call(['kubectl', 'delete', '-f', self.path])


def persistent_volume(path):
def persistent_volume(path, persistent_volume_name):
volume_yaml = string.Template("""
kind: PersistentVolume
apiVersion: v1
Expand All @@ -53,13 +54,13 @@ def persistent_volume(path):
values:
- 'i-do-not-exist'
""").substitute(path=path,
persistent_volume_name=PERSISTENT_VOLUME_NAME)
persistent_volume_name=persistent_volume_name)
with tempfile.NamedTemporaryFile(suffix="_persistent_volume.yml", mode="w", delete=False) as volume:
volume.write(volume_yaml)
return KubeSetupConfigTuple(path=volume.name)


def persistent_volume_claim():
def persistent_volume_claim(persistent_volume_name, persistent_volum_claim_name):
peristent_volume_claim_yaml = string.Template("""
kind: PersistentVolumeClaim
apiVersion: v1
Expand All @@ -73,20 +74,19 @@ def persistent_volume_claim():
requests:
storage: 2Gi
storageClassName: manual
""").substitute(persistent_volume_name=PERSISTENT_VOLUME_NAME,
persistent_volume_claim_name=PERSISTENT_VOLUME_CLAIM_NAME)
""").substitute(persistent_volume_name=persistent_volume_name,
persistent_volume_claim_name=persistent_volum_claim_name)
with tempfile.NamedTemporaryFile(suffix="_persistent_volume_claim.yml", mode="w", delete=False) as volume_claim:
volume_claim.write(peristent_volume_claim_yaml)
return KubeSetupConfigTuple(path=volume_claim.name)


def job_config(path):
def job_config(jobs_directory):
job_conf_template = string.Template("""<job_conf>
<plugins>
<plugin id="local" type="runner" load="galaxy.jobs.runners.local:LocalJobRunner" workers="2"/>
<plugin id="k8s" type="runner" load="galaxy.jobs.runners.kubernetes:KubernetesJobRunner">
<param id="k8s_persistent_volume_claim_name">galaxy-pvc-integration-test</param>
<param id="k8s_persistent_volume_claim_mount_path">$path</param>
<param id="k8s_persistent_volume_claims">jobs-directory-claim:$jobs_directory,tool-directory-claim:$tool_directory</param>
<param id="k8s_config_path">$k8s_config_path</param>
<param id="k8s_galaxy_instance_id">gx-short-id</param>
</plugin>
Expand All @@ -107,7 +107,10 @@ def job_config(path):
</tools>
</job_conf>
""")
job_conf_str = job_conf_template.substitute(path=path, k8s_config_path=os.environ.get('GALAXY_TEST_KUBE_CONFIG_PATH', '~/.kube/config'))
job_conf_str = job_conf_template.substitute(jobs_directory=jobs_directory,
tool_directory=TOOL_DIR,
k8s_config_path=os.environ.get('GALAXY_TEST_KUBE_CONFIG_PATH', '~/.kube/config'),
)
with tempfile.NamedTemporaryFile(suffix="_kubernetes_integration_job_conf", mode="w", delete=False) as job_conf:
job_conf.write(job_conf_str)
return Config(job_conf.name)
Expand All @@ -123,17 +126,28 @@ def setUp(self):
@classmethod
def setUpClass(cls):
cls.jobs_directory = tempfile.mkdtemp()
cls.persistent_volume = persistent_volume(path=cls.jobs_directory)
cls.persistent_volume.setup()
cls.persistent_volume_claim = persistent_volume_claim()
cls.persistent_volume_claim.setup()
cls.job_config = job_config(path=cls.jobs_directory)
cls.volumes = [
[cls.jobs_directory, 'jobs-directory-volume', 'jobs-directory-claim'],
[TOOL_DIR, 'tool-directory-volume', 'tool-directory-claim'],
]
cls.persistent_volumes = []
cls.persistent_volume_claims = []
for (path, volume, claim) in cls.volumes:
volume_obj = persistent_volume(path, volume)
volume_obj.setup()
cls.persistent_volumes.append(volume_obj)
claim_obj = persistent_volume_claim(volume, claim)
claim_obj.setup()
cls.persistent_volume_claims.append(claim_obj)
cls.job_config = job_config(jobs_directory=cls.jobs_directory)
super(BaseKubernetesIntegrationTestCase, cls).setUpClass()

@classmethod
def tearDownClass(cls):
cls.persistent_volume_claim.teardown()
cls.persistent_volume.teardown()
for claim in cls.persistent_volume_claims:
claim.teardown()
for volume in cls.persistent_volumes:
volume.teardown()
super(BaseKubernetesIntegrationTestCase, cls).tearDownClass()

@classmethod
Expand Down

0 comments on commit 7cf161b

Please sign in to comment.