Skip to content

Commit

Permalink
Adds k8s support for CPU and memory resource settings.
Browse files Browse the repository at this point in the history
  • Loading branch information
pcm32 committed Oct 4, 2017
1 parent c49adb1 commit 1253eff
Show file tree
Hide file tree
Showing 2 changed files with 135 additions and 0 deletions.
33 changes: 33 additions & 0 deletions config/job_conf.xml.sample_advanced
Expand Up @@ -248,6 +248,39 @@
a requirement of the other.
-->
<!-- <param id="k8s_default_requests_cpu">"500m"</param> -->
<!-- <param id="k8s_default_requests_memory">"500Mi"</param> -->
<!-- <param id="k8s_default_limits_cpu">"2"</param> -->
<!-- <param id="k8s_default_limits_memory">"2Gi"</param> -->
<!-- Kubernetes resource Requests and Limits
Parameters above (k8s_default_requests_* and k8s_default_limits_*) set default minimal (requests) and
maximal (limits) CPU and memory resources to be allocated to all containers in Kubernetes jobs by
default.
Limits and requests for CPU resources are measured in cpu units. Fractional requests are allowed. A
Container with requests_cpu of 0.5 is guaranteed half as much CPU as one that asks for 1 CPU. The
expression 0.1 is equivalent to the expression 100m, which can be read as “one hundred millicpu”. Some
people say “one hundred millicores”, and this is understood to mean the same thing. A request with a
decimal point, like 0.1, is converted to 100m by the API, and precision finer than 1m is not allowed.
For this reason, the form 100m might be preferred.
You can express memory as a plain integer or as a fixed-point integer using one of these suffixes: E,
P, T, G, M, K. You can also use the power-of-two equivalents: Ei, Pi, Ti, Gi, Mi, Ki.
For instance, "1Gi" stands for 1 Gigabyte, and "500Mi" stands for 500 megabytes. Using other formats
will make the jobs fail as Kubernetes won't recognize the assignment.
For more details on the Kubernetes resources management, see:
https://kubernetes.io/docs/concepts/configuration/manage-compute-resources-container/
It is the responsability of the tool developer or administrator to override these limits, either at the
destination or the tool itself. If not set, current defaults set on the code are:
requests_cpu = "500m"
limits_cpu = "1"
requests_memory = "500Mi"
limits_memory = "1Gi"
-->

</plugin>
<plugin id="godocker" type="runner" load="galaxy.jobs.runners.godocker:GodockerJobRunner">
<!-- Go-Docker is a batch computing/cluster management tool using Docker
Expand Down
102 changes: 102 additions & 0 deletions lib/galaxy/jobs/runners/kubernetes.py
Expand Up @@ -52,6 +52,10 @@ def __init__(self, app, nworkers, **kwargs):
k8s_supplemental_group_id=dict(map=str),
k8s_pull_policy=dict(map=str, default="Default"),
k8s_fs_group_id=dict(map=int),
k8s_default_requests_cpu=dict(map=str, default=None),
k8s_default_requests_memory=dict(map=str, default=None),
k8s_default_limits_cpu=dict(map=str, default=None),
k8s_default_limits_memory=dict(map=str, default=None),
k8s_pod_retrials=dict(map=int, valid=lambda x: int > 0, default=3))

if 'runner_param_specs' not in kwargs:
Expand Down Expand Up @@ -220,6 +224,10 @@ def __get_k8s_containers(self, job_wrapper):
}]
}

resources = self.__get_resources(job_wrapper)
if resources:
k8s_container['resources'] = resources

if self._default_pull_policy:
k8s_container["imagePullPolicy"] = self._default_pull_policy
# if self.__requires_ports(job_wrapper):
Expand All @@ -232,6 +240,100 @@ def __get_k8s_containers(self, job_wrapper):
# for k,v self.runner_params:
# if k.startswith("container_port_"):

def __get_resources(self, job_wrapper):
mem_request = self.__get_memory_request(job_wrapper)
cpu_request = self.__get_cpu_request(job_wrapper)

mem_limit = self.__get_memory_limit(job_wrapper)
cpu_limit = self.__get_cpu_limit(job_wrapper)

requests = {}
limits = {}

if mem_request:
requests['memory'] = mem_request
if cpu_request:
requests['cpu'] = cpu_request

if mem_limit:
limits['memory'] = mem_limit
if cpu_limit:
limits['cpu'] = cpu_limit

resources = {}
if requests:
resources['requests'] = requests
if limits:
resources['limits'] = limits

return resources


def __get_memory_request(self, job_wrapper):
"""Obtains memory requests for job, checking if available on the destination, otherwise using the default"""
job_destinantion = job_wrapper.job_destination

if 'requests_memory' in job_destinantion.params:
return self.__transform_memory_value(job_destinantion.params['requests_memory'])
return self.runner_params['k8s_default_requests_memory']

def __get_memory_limit(self, job_wrapper):
"""Obtains memory limits for job, checking if available on the destination, otherwise using the default"""
job_destinantion = job_wrapper.job_destination

if 'limits_memory' in job_destinantion.params:
return self.__transform_memory_value(job_destinantion.params['limits_memory'])
return self.runner_params['k8s_default_limits_memory']

def __get_cpu_request(self, job_wrapper):
"""Obtains cpu requests for job, checking if available on the destination, otherwise using the default"""
job_destinantion = job_wrapper.job_destination

if 'requests_cpu' in job_destinantion.params:
return self.__transform_cpu_value(job_destinantion.params['requests_cpu'])
return self.runner_params['k8s_default_requests_cpu']

def __get_cpu_limit(self, job_wrapper):
"""Obtains cpu requests for job, checking if available on the destination, otherwise using the default"""
job_destinantion = job_wrapper.job_destination

if 'limits_cpu' in job_destinantion.params:
return self.__transform_cpu_value(job_destinantion.params['limits_cpu'])
return self.runner_params['k8s_default_limits_cpu']

def __transform_cpu_value(self, cpu_value):
"""Transforms cpu value
If the value is 0 and not a string, then None is returned.
If the value is a float, then it is multiplied by 1000 and expressed as mili cpus.
If the value is an integer, then it is and expressed as CPUs (no unit).
If it is an already formatted string, it is returned as it was.
"""
if not isinstance(cpu_value, str) and float(cpu_value) == 0:
return None
if isinstance(cpu_value, float):
return str(int(cpu_value*1000))+"m"
elif isinstance(cpu_value, int):
return str(cpu_value)
return cpu_value

def __transform_memory_value(self, mem_value):
"""Transforms memory value
If the value is 0 and not a string, then None is returned.
If the value has a decimal part, then it is multiplied by 1000 and expressed as Megabytes.
If the value is an integer, then it is truncated and expressed as Gigabytes.
If it is an already formatted string, it is returned as it was.
"""
if not isinstance(mem_value, str) and float(mem_value) == 0:
return None
if isinstance(mem_value, float):
return str(int(mem_value*1000))+"M"
elif isinstance(mem_value, int):
return str(mem_value)+"G"
return mem_value


def __assemble_k8s_container_image_name(self, job_wrapper):
"""Assembles the container image name as repo/owner/image:tag, where repo, owner and tag are optional"""
job_destination = job_wrapper.job_destination
Expand Down

0 comments on commit 1253eff

Please sign in to comment.