Skip to content

Commit

Permalink
[AIRFLOW-5873] KubernetesPodOperator fixes and test (#6524)
Browse files Browse the repository at this point in the history
- `security_context` was missing from docs of `KubernetesPodOperator`
- `KubernetesPodOperator` kwarg `in_cluster` erroneously defaults to
False in comparison to `default_args.py`, also default `do_xcom_push`
was overwritten to False in contradiction to `BaseOperator`
- `KubernetesPodOperator` kwarg `resources` is erroneously passed to
`base_operator`, instead should only go to `PodGenerator`. The two
have different syntax. (both on `master` and `v1-10-test` branches)
- `kubernetes/pod.py`: classes do not have `__slots__`
so they would accept arbitrary values in `setattr`
- Reduce amount of times the pod object is copied before execution

(cherry picked from commit cf38ddc)
  • Loading branch information
ddelange authored and astro-sql-decorator committed Jun 4, 2020
1 parent 0e37b73 commit 4827a67
Show file tree
Hide file tree
Showing 5 changed files with 87 additions and 29 deletions.
65 changes: 44 additions & 21 deletions airflow/contrib/operators/kubernetes_pod_operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,15 @@
"""Executes task in a Kubernetes POD"""
import warnings

import re

from airflow.exceptions import AirflowException
from airflow.kubernetes import pod_generator, kube_client, pod_launcher
from airflow.kubernetes import kube_client, pod_generator, pod_launcher
from airflow.kubernetes.k8s_model import append_to_pod
from airflow.kubernetes.pod import Resources
from airflow.models import BaseOperator
from airflow.utils.decorators import apply_defaults
from airflow.utils.helpers import validate_key
from airflow.utils.state import State
from airflow.version import version as airflow_version

Expand All @@ -30,6 +34,11 @@ class KubernetesPodOperator(BaseOperator): # pylint: disable=too-many-instance-
"""
Execute a task in a Kubernetes Pod
.. note::
If you use `Google Kubernetes Engine <https://cloud.google.com/kubernetes-engine/>`__, use
:class:`~airflow.gcp.operators.kubernetes_engine.GKEPodOperator`, which
simplifies the authorization process.
:param image: Docker image you wish to launch. Defaults to hub.docker.com,
but fully qualified URLS will point to custom repositories.
:type image: str
Expand All @@ -47,13 +56,13 @@ class KubernetesPodOperator(BaseOperator): # pylint: disable=too-many-instance-
If more than one secret is required, provide a
comma separated list: secret_a,secret_b
:type image_pull_secrets: str
:param ports: ports for launched pod
:type ports: list[airflow.kubernetes.models.port.Port]
:param volume_mounts: volumeMounts for launched pod
:type volume_mounts: list[airflow.kubernetes.models.volume_mount.VolumeMount]
:param volumes: volumes for launched pod. Includes ConfigMaps and PersistentVolumes
:type volumes: list[airflow.kubernetes.models.volume.Volume]
:param labels: labels to apply to the Pod
:param ports: ports for launched pod.
:type ports: list[airflow.kubernetes.pod.Port]
:param volume_mounts: volumeMounts for launched pod.
:type volume_mounts: list[airflow.kubernetes.volume_mount.VolumeMount]
:param volumes: volumes for launched pod. Includes ConfigMaps and PersistentVolumes.
:type volumes: list[airflow.kubernetes.volume.Volume]
:param labels: labels to apply to the Pod.
:type labels: dict
:param startup_timeout_seconds: timeout in seconds to startup the pod.
:type startup_timeout_seconds: int
Expand All @@ -62,10 +71,10 @@ class KubernetesPodOperator(BaseOperator): # pylint: disable=too-many-instance-
:type name: str
:param env_vars: Environment variables initialized in the container. (templated)
:type env_vars: dict
:param secrets: Kubernetes secrets to inject in the container,
:param secrets: Kubernetes secrets to inject in the container.
They can be exposed as environment vars or files in a volume.
:type secrets: list[airflow.kubernetes.models.secret.Secret]
:param in_cluster: run kubernetes client with in_cluster configuration
:type secrets: list[airflow.kubernetes.secret.Secret]
:param in_cluster: run kubernetes client with in_cluster configuration.
:type in_cluster: bool
:param cluster_context: context that points to kubernetes cluster.
Ignored when in_cluster is True. If None, current-context is used.
Expand All @@ -86,6 +95,8 @@ class KubernetesPodOperator(BaseOperator): # pylint: disable=too-many-instance-
:param node_selectors: A dict containing a group of scheduling rules.
:type node_selectors: dict
:param config_file: The path to the Kubernetes config file. (templated)
:param config_file: The path to the Kubernetes config file. (templated)
If not specified, default value is ``~/.kube/config``
:type config_file: str
:param do_xcom_push: If do_xcom_push is True, the content of the file
/airflow/xcom/return.json in the container will also be pushed to an
Expand All @@ -103,9 +114,11 @@ class KubernetesPodOperator(BaseOperator): # pylint: disable=too-many-instance-
want mount as env variables.
:type configmaps: list[str]
:param pod_runtime_info_envs: environment variables about
pod runtime information (ip, namespace, nodeName, podName)
:type pod_runtime_info_envs: list[airflow.kubernetes.models.pod_runtime_info_env.PodRuntimeInfoEnv]
:param dnspolicy: Specify a dnspolicy for the pod
pod runtime information (ip, namespace, nodeName, podName).
:type pod_runtime_info_envs: list[airflow.kubernetes.pod_runtime_info_env.PodRuntimeInfoEnv]
:param security_context: security options the pod should run with (PodSecurityContext).
:type security_context: dict
:param dnspolicy: dnspolicy for the pod.
:type dnspolicy: str
:param full_pod_spec: The complete podSpec
:type full_pod_spec: kubernetes.client.models.V1Pod
Expand Down Expand Up @@ -146,15 +159,18 @@ def execute(self, context):
configmaps=self.configmaps,
security_context=self.security_context,
dnspolicy=self.dnspolicy,
resources=self.resources,
pod=self.full_pod_spec,
).gen_pod()

pod = append_to_pod(pod, self.ports)
pod = append_to_pod(pod, self.pod_runtime_info_envs)
pod = append_to_pod(pod, self.volumes)
pod = append_to_pod(pod, self.volume_mounts)
pod = append_to_pod(pod, self.secrets)
pod = append_to_pod(
pod,
self.pod_runtime_info_envs +
self.ports +
self.resources +
self.secrets +
self.volumes +
self.volume_mounts
)

self.pod = pod

Expand All @@ -179,6 +195,13 @@ def execute(self, context):
except AirflowException as ex:
raise AirflowException('Pod Launching failed: {error}'.format(error=ex))

def _set_resources(self, resources):
return [Resources(**resources) if resources else Resources()]

def _set_name(self, name):
validate_key(name, max_length=63)
return re.sub(r'[^a-z0-9.-]+', '-', name.lower())

@apply_defaults
def __init__(self, # pylint: disable=too-many-arguments,too-many-locals
namespace,
Expand Down Expand Up @@ -244,7 +267,7 @@ def __init__(self, # pylint: disable=too-many-arguments,too-many-locals
self.node_selectors = node_selectors or {}
self.annotations = annotations or {}
self.affinity = affinity or {}
self.resources = resources
self.resources = self._set_resources(resources)
self.config_file = config_file
self.image_pull_secrets = image_pull_secrets
self.service_account_name = service_account_name
Expand Down
21 changes: 21 additions & 0 deletions airflow/kubernetes/pod.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,27 @@
"""

import copy

import kubernetes.client.models as k8s

from airflow.kubernetes.k8s_model import K8SModel


class Resources(K8SModel):
__slots__ = ('request_memory', 'request_cpu', 'limit_memory', 'limit_cpu', 'limit_gpu')

"""
:param request_memory: requested memory
:type request_memory: str
:param request_cpu: requested CPU number
:type request_cpu: float | str
:param limit_memory: limit for memory usage
:type limit_memory: str
:param limit_cpu: Limit for CPU used
:type limit_cpu: float | str
:param limit_gpu: Limits for GPU used
:type limit_gpu: int
"""
def __init__(
self,
request_memory=None,
Expand All @@ -40,12 +54,15 @@ def __init__(
self.limit_gpu = limit_gpu

def is_empty_resource_request(self):
"""Whether resource is empty"""
return not self.has_limits() and not self.has_requests()

def has_limits(self):
"""Whether resource has limits"""
return self.limit_cpu is not None or self.limit_memory is not None or self.limit_gpu is not None

def has_requests(self):
"""Whether resource has requests"""
return self.request_cpu is not None or self.request_memory is not None

def to_k8s_client_obj(self):
Expand All @@ -62,10 +79,14 @@ def attach_to_pod(self, pod):


class Port(K8SModel):
"""POD port"""
__slots__ = ('name', 'container_port')

def __init__(
self,
name=None,
container_port=None):
"""Creates port"""
self.name = name
self.container_port = container_port

Expand Down
1 change: 0 additions & 1 deletion airflow/kubernetes/pod_generator.py
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,6 @@ def from_obj(obj):
requests = {
'cpu': namespaced.get('request_cpu'),
'memory': namespaced.get('request_memory')

}
limits = {
'cpu': namespaced.get('limit_cpu'),
Expand Down
4 changes: 2 additions & 2 deletions tests/runtime/kubernetes/test_kubernetes_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ def test_integration_run_dag(self):
self.ensure_dag_expected_state(host=host,
execution_date=execution_date,
dag_id=dag_id,
expected_final_state='success', timeout=100)
expected_final_state='success', timeout=200)

def test_integration_run_dag_with_scheduler_failure(self):
host = KUBERNETES_HOST
Expand Down Expand Up @@ -229,7 +229,7 @@ def test_integration_run_dag_with_scheduler_failure(self):
self.ensure_dag_expected_state(host=host,
execution_date=execution_date,
dag_id=dag_id,
expected_final_state='success', timeout=100)
expected_final_state='success', timeout=200)

self.assertEqual(self._num_pods_in_namespace('test-namespace'),
0,
Expand Down
25 changes: 20 additions & 5 deletions tests/runtime/kubernetes/test_kubernetes_pod_operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,13 @@
from kubernetes.client.api_client import ApiClient
from kubernetes.client.rest import ApiException

from airflow.kubernetes.volume import Volume
from airflow import AirflowException
from airflow.contrib.operators.kubernetes_pod_operator import KubernetesPodOperator
from airflow.kubernetes.pod import Port
from airflow.kubernetes.pod_generator import PodDefaults
from airflow.kubernetes.pod_launcher import PodLauncher
from airflow.kubernetes.secret import Secret
from airflow.kubernetes.volume import Volume
from airflow.kubernetes.volume_mount import VolumeMount
from airflow.version import version as airflow_version
from tests.compat import mock
Expand Down Expand Up @@ -67,6 +67,11 @@ def setUp(self):
'envFrom': [],
'name': 'base',
'ports': [],
'resources': {'limits': {'cpu': None,
'memory': None,
'nvidia.com/gpu': None},
'requests': {'cpu': None,
'memory': None}},
'volumeMounts': [],
}],
'hostNetwork': False,
Expand Down Expand Up @@ -114,17 +119,17 @@ def test_config_path(self, client_mock, launcher_mock):
labels={"foo": "bar"},
name="test",
task_id="task",
config_file=file_path,
in_cluster=False,
do_xcom_push=False,
config_file=file_path,
cluster_context='default',
)
launcher_mock.return_value = (State.SUCCESS, None)
k.execute(None)
client_mock.assert_called_once_with(
in_cluster=False,
cluster_context='default',
config_file=file_path
config_file=file_path,
)

@mock.patch("airflow.kubernetes.pod_launcher.PodLauncher.run_pod")
Expand Down Expand Up @@ -289,7 +294,17 @@ def test_pod_resources(self):
)
k.execute(None)
actual_pod = self.api_client.sanitize_for_serialization(k.pod)
self.expected_pod['spec']['containers'][0]['resources'] = resources
self.expected_pod['spec']['containers'][0]['resources'] = {
'requests': {
'memory': '64Mi',
'cpu': '250m'
},
'limits': {
'memory': '64Mi',
'cpu': 0.25,
'nvidia.com/gpu': None
}
}
self.assertEqual(self.expected_pod, actual_pod)

def test_pod_affinity(self):
Expand Down Expand Up @@ -500,7 +515,7 @@ def test_faulty_service_account(self):
in_cluster=False,
do_xcom_push=False,
startup_timeout_seconds=5,
service_account_name=bad_service_account_name
service_account_name=bad_service_account_name,
)
with self.assertRaises(ApiException):
k.execute(None)
Expand Down

0 comments on commit 4827a67

Please sign in to comment.