Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions airflow/kubernetes/pod.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,15 +33,15 @@ class Resources(K8SModel):
:type request_memory: str
:param request_cpu: requested CPU number
:type request_cpu: float | str
:param request_ephemeral_storage: requested ephermeral storage
:param request_ephemeral_storage: requested ephemeral storage
:type request_ephemeral_storage: str
:param limit_memory: limit for memory usage
:type limit_memory: str
:param limit_cpu: Limit for CPU used
:type limit_cpu: float | str
:param limit_gpu: Limits for GPU used
:type limit_gpu: int
:param limit_ephemeral_storage: Limit for ephermeral storage
:param limit_ephemeral_storage: Limit for ephemeral storage
:type limit_ephemeral_storage: float | str
"""
def __init__(
Expand Down
33 changes: 21 additions & 12 deletions airflow/kubernetes/pod_generator.py
Original file line number Diff line number Diff line change
Expand Up @@ -344,18 +344,26 @@ def from_obj(obj):
resources = namespaced.get('resources')

if resources is None:
requests = {
'cpu': namespaced.get('request_cpu'),
'memory': namespaced.get('request_memory'),
'ephemeral-storage': namespaced.get('ephemeral-storage')
}
limits = {
'cpu': namespaced.get('limit_cpu'),
'memory': namespaced.get('limit_memory'),
'ephemeral-storage': namespaced.get('ephemeral-storage')
}
all_resources = list(requests.values()) + list(limits.values())
if all(r is None for r in all_resources):
def extract(cpu, memory, ephemeral_storage, limit_gpu=None):
resources_obj = {
'cpu': namespaced.pop(cpu, None),
'memory': namespaced.pop(memory, None),
'ephemeral-storage': namespaced.pop(ephemeral_storage, None),
}
if limit_gpu is not None:
resources_obj['nvidia.com/gpu'] = namespaced.pop(limit_gpu, None)

resources_obj = {k: v for k, v in resources_obj.items() if v is not None}

if all(r is None for r in resources_obj):
resources_obj = None
return namespaced, resources_obj

namespaced, requests = extract('request_cpu', 'request_memory', 'request_ephemeral_storage')
namespaced, limits = extract('limit_cpu', 'limit_memory', 'limit_ephemeral_storage',
limit_gpu='limit_gpu')

if requests is None and limits is None:
resources = None
else:
resources = k8s.V1ResourceRequirements(
Expand All @@ -371,6 +379,7 @@ def from_obj(obj):
'iam.cloud.google.com/service-account': gcp_service_account_key
})

namespaced['resources'] = resources
return PodGenerator(**namespaced).gen_pod()

@staticmethod
Expand Down
132 changes: 132 additions & 0 deletions tests/kubernetes/test_pod_generator.py
Original file line number Diff line number Diff line change
Expand Up @@ -287,6 +287,138 @@ def test_from_obj(self, mock_uuid):
}
}, result)

@mock.patch('uuid.uuid4')
def test_from_obj_with_resources(self, mock_uuid):
self.maxDiff = None

mock_uuid.return_value = self.static_uuid
result = PodGenerator.from_obj({
"KubernetesExecutor": {
"annotations": {"test": "annotation"},
"volumes": [
{
"name": "example-kubernetes-test-volume",
"hostPath": {"path": "/tmp/"},
},
],
"volume_mounts": [
{
"mountPath": "/foo/",
"name": "example-kubernetes-test-volume",
},
],
'request_cpu': "200m",
'limit_cpu': "400m",
'request_memory': "500Mi",
'limit_memory': "1000Mi",
'limit_gpu': "2",
'request_ephemeral_storage': '2Gi',
'limit_ephemeral_storage': '4Gi',
}
})
result = self.k8s_client.sanitize_for_serialization(result)

self.assertEqual({
'apiVersion': 'v1',
'kind': 'Pod',
'metadata': {
'annotations': {'test': 'annotation'},
},
'spec': {
'containers': [{
'args': [],
'command': [],
'env': [],
'envFrom': [],
'name': 'base',
'ports': [],
'resources': {
'limits': {
'cpu': '400m',
'ephemeral-storage': '4Gi',
'memory': '1000Mi',
'nvidia.com/gpu': "2",
},
'requests': {
'cpu': '200m',
'ephemeral-storage': '2Gi',
'memory': '500Mi',
},
},
'volumeMounts': [{
'mountPath': '/foo/',
'name': 'example-kubernetes-test-volume'
}],
}],
'hostNetwork': False,
'imagePullSecrets': [],
'volumes': [{
'hostPath': {'path': '/tmp/'},
'name': 'example-kubernetes-test-volume'
}],
}
}, result)

@mock.patch('uuid.uuid4')
def test_from_obj_with_only_request_resources(self, mock_uuid):
self.maxDiff = None

mock_uuid.return_value = self.static_uuid
result = PodGenerator.from_obj({
"KubernetesExecutor": {
"annotations": {"test": "annotation"},
"volumes": [
{
"name": "example-kubernetes-test-volume",
"hostPath": {"path": "/tmp/"},
},
],
"volume_mounts": [
{
"mountPath": "/foo/",
"name": "example-kubernetes-test-volume",
},
],
'request_cpu': "200m",
'request_memory': "500Mi",
}
})
result = self.k8s_client.sanitize_for_serialization(result)

self.assertEqual({
'apiVersion': 'v1',
'kind': 'Pod',
'metadata': {
'annotations': {'test': 'annotation'},
},
'spec': {
'containers': [{
'args': [],
'command': [],
'env': [],
'envFrom': [],
'name': 'base',
'ports': [],
'resources': {
'requests': {
'cpu': '200m',
'memory': '500Mi',
},
},
'volumeMounts': [{
'mountPath': '/foo/',
'name': 'example-kubernetes-test-volume'
}],
}],
'hostNetwork': False,
'imagePullSecrets': [],
'volumes': [{
'hostPath': {'path': '/tmp/'},
'name': 'example-kubernetes-test-volume'
}],
}
}, result)

@mock.patch('uuid.uuid4')
def test_reconcile_pods_empty_mutator_pod(self, mock_uuid):
mock_uuid.return_value = self.static_uuid
Expand Down