Skip to content

Commit

Permalink
[AIRFLOW-5873] KubernetesPodOperator fixes and test (apache#6524)
Browse files Browse the repository at this point in the history
- `security_context` was missing from docs of `KubernetesPodOperator`
- `KubernetesPodOperator` kwarg `in_cluster` erroneously defaults to
False in comparison to `default_args.py`, also default `do_xcom_push`
 was overwritten to False in contradiction to `BaseOperator`
- `KubernetesPodOperator` kwarg `resources` is erroneously passed to
 `base_operator`, instead should only go to `PodGenerator`. The two
 have different syntax. (both on `master` and `v1-10-test` branches)
- `kubernetes/pod.py`: classes do not have `__slots__`
 so they would accept arbitrary values in `setattr`
- Reduce amount of times the pod object is copied before execution
  • Loading branch information
ddelange authored and Emmanuel Garette committed Nov 13, 2019
1 parent e5bba9e commit dd28423
Show file tree
Hide file tree
Showing 6 changed files with 146 additions and 73 deletions.
105 changes: 61 additions & 44 deletions airflow/contrib/operators/kubernetes_pod_operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,15 @@
# specific language governing permissions and limitations
# under the License.
"""Executes task in a Kubernetes POD"""
import re

from airflow.exceptions import AirflowException
from airflow.kubernetes import kube_client, pod_generator, pod_launcher
from airflow.kubernetes.k8s_model import append_to_pod
from airflow.kubernetes.pod import Resources
from airflow.models import BaseOperator
from airflow.utils.decorators import apply_defaults
from airflow.utils.helpers import validate_key
from airflow.utils.state import State


Expand All @@ -32,59 +36,62 @@ class KubernetesPodOperator(BaseOperator): # pylint: disable=too-many-instance-
:class:`~airflow.gcp.operators.kubernetes_engine.GKEPodOperator`, which
simplifies the authorization process.
:param image: Docker image you wish to launch. Defaults to dockerhub.io,
but fully qualified URLS will point to custom repositories
:param image: Docker image you wish to launch. Defaults to hub.docker.com,
but fully qualified URLS will point to custom repositories.
:type image: str
:param namespace: the namespace to run within kubernetes
:param namespace: the namespace to run within kubernetes.
:type namespace: str
:param cmds: entrypoint of the container. (templated)
The docker images's entrypoint is used if this is not provide.
The docker images's entrypoint is used if this is not provided.
:type cmds: list[str]
:param arguments: arguments of the entrypoint. (templated)
The docker image's CMD is used if this is not provided.
:type arguments: list[str]
:param image_pull_policy: Specify a policy to cache or always pull an image
:param image_pull_policy: Specify a policy to cache or always pull an image.
:type image_pull_policy: str
:param image_pull_secrets: Any image pull secrets to be given to the pod.
If more than one secret is required, provide a
comma separated list: secret_a,secret_b
:type image_pull_secrets: str
:param ports: ports for launched pod
:type ports: list[airflow.kubernetes.models.port.Port]
:param volume_mounts: volumeMounts for launched pod
:type volume_mounts: list[airflow.kubernetes.models.volume_mount.VolumeMount]
:param volumes: volumes for launched pod. Includes ConfigMaps and PersistentVolumes
:type volumes: list[airflow.kubernetes.models.volume.Volume]
:param labels: labels to apply to the Pod
:param ports: ports for launched pod.
:type ports: list[airflow.kubernetes.pod.Port]
:param volume_mounts: volumeMounts for launched pod.
:type volume_mounts: list[airflow.kubernetes.volume_mount.VolumeMount]
:param volumes: volumes for launched pod. Includes ConfigMaps and PersistentVolumes.
:type volumes: list[airflow.kubernetes.volume.Volume]
:param labels: labels to apply to the Pod.
:type labels: dict
:param startup_timeout_seconds: timeout in seconds to startup the pod
:param startup_timeout_seconds: timeout in seconds to startup the pod.
:type startup_timeout_seconds: int
:param name: name of the task you want to run,
will be used to generate a pod id
:param name: name of the pod in which the task will run, will be used to
generate a pod id (DNS-1123 subdomain, containing only [a-z0-9.-]).
:type name: str
:param env_vars: Environment variables initialized in the container. (templated)
:type env_vars: dict
:param secrets: Kubernetes secrets to inject in the container,
:param secrets: Kubernetes secrets to inject in the container.
They can be exposed as environment vars or files in a volume.
:type secrets: list[airflow.kubernetes.models.secret.Secret]
:param in_cluster: run kubernetes client with in_cluster configuration
:type secrets: list[airflow.kubernetes.secret.Secret]
:param in_cluster: run kubernetes client with in_cluster configuration.
:type in_cluster: bool
:param cluster_context: context that points to kubernetes cluster.
Ignored when in_cluster is True. If None, current-context is used.
:type cluster_context: str
:param get_logs: get the stdout of the container as logs of the tasks
:param get_logs: get the stdout of the container as logs of the tasks.
:type get_logs: bool
:param annotations: non-identifying metadata you can attach to the Pod.
Can be a large range of data, and can include characters
that are not permitted by labels.
:type annotations: dict
:param resources: A dict containing a group of resources requests and limits
:param resources: A dict containing resources requests and limits.
Possible keys are request_memory, request_cpu, limit_memory, limit_cpu,
and limit_gpu, which will be used to generate airflow.kubernetes.pod.Resources.
See also kubernetes.io/docs/concepts/configuration/manage-compute-resources-container
:type resources: dict
:param affinity: A dict containing a group of affinity scheduling rules
:param affinity: A dict containing a group of affinity scheduling rules.
:type affinity: dict
:param node_selectors: A dict containing a group of scheduling rules
:param node_selectors: A dict containing a group of scheduling rules.
:type node_selectors: dict
:param config_file: The path to the Kubernetes config file.
:param config_file: The path to the Kubernetes config file. (templated)
If not specified, default value is ``~/.kube/config``
:type config_file: str
:param do_xcom_push: If True, the content of the file
Expand All @@ -95,17 +102,19 @@ class KubernetesPodOperator(BaseOperator): # pylint: disable=too-many-instance-
state, or the execution is interrupted.
If False (default): do nothing, If True: delete the pod
:type is_delete_operator_pod: bool
:param hostnetwork: If True enable host networking on the pod
:param hostnetwork: If True enable host networking on the pod.
:type hostnetwork: bool
:param tolerations: A list of kubernetes tolerations
:param tolerations: A list of kubernetes tolerations.
:type tolerations: list tolerations
:param configmaps: A list of configmap names objects that we
want mount as env variables
want mount as env variables.
:type configmaps: list[str]
:param pod_runtime_info_envs: environment variables about
pod runtime information (ip, namespace, nodeName, podName)
:type pod_runtime_info_envs: list[airflow.kubernetes.models.pod_runtime_info_env.PodRuntimeInfoEnv]
:param dnspolicy: Specify a dnspolicy for the pod
pod runtime information (ip, namespace, nodeName, podName).
:type pod_runtime_info_envs: list[airflow.kubernetes.pod_runtime_info_env.PodRuntimeInfoEnv]
:param security_context: security options the pod should run with (PodSecurityContext).
:type security_context: dict
:param dnspolicy: dnspolicy for the pod.
:type dnspolicy: str
:param full_pod_spec: The complete podSpec
:type full_pod_spec: kubernetes.client.models.V1Pod
Expand Down Expand Up @@ -138,15 +147,18 @@ def execute(self, context):
configmaps=self.configmaps,
security_context=self.security_context,
dnspolicy=self.dnspolicy,
resources=self.resources,
pod=self.full_pod_spec,
).gen_pod()

pod = append_to_pod(pod, self.ports)
pod = append_to_pod(pod, self.pod_runtime_info_envs)
pod = append_to_pod(pod, self.volumes)
pod = append_to_pod(pod, self.volume_mounts)
pod = append_to_pod(pod, self.secrets)
pod = append_to_pod(
pod,
self.pod_runtime_info_envs +
self.ports +
self.resources +
self.secrets +
self.volumes +
self.volume_mounts
)

self.pod = pod

Expand All @@ -171,6 +183,13 @@ def execute(self, context):
except AirflowException as ex:
raise AirflowException('Pod Launching failed: {error}'.format(error=ex))

def _set_resources(self, resources):
return [Resources(**resources) if resources else Resources()]

def _set_name(self, name):
validate_key(name, max_length=63)
return re.sub(r'[^a-z0-9.-]+', '-', name.lower())

@apply_defaults
def __init__(self, # pylint: disable=too-many-arguments,too-many-locals
namespace,
Expand All @@ -183,7 +202,7 @@ def __init__(self, # pylint: disable=too-many-arguments,too-many-locals
volumes=None,
env_vars=None,
secrets=None,
in_cluster=False,
in_cluster=True,
cluster_context=None,
labels=None,
startup_timeout_seconds=120,
Expand All @@ -193,10 +212,9 @@ def __init__(self, # pylint: disable=too-many-arguments,too-many-locals
resources=None,
affinity=None,
config_file=None,
do_xcom_push=False,
node_selectors=None,
image_pull_secrets=None,
service_account_name="default",
service_account_name='default',
is_delete_operator_pod=False,
hostnetwork=False,
tolerations=None,
Expand All @@ -207,7 +225,9 @@ def __init__(self, # pylint: disable=too-many-arguments,too-many-locals
full_pod_spec=None,
*args,
**kwargs):
super().__init__(*args, **kwargs)
if kwargs.get('xcom_push') is not None:
raise AirflowException("'xcom_push' was deprecated, use 'do_xcom_push' instead")
super().__init__(*args, resources=None, **kwargs)

self.pod = None

Expand All @@ -217,7 +237,7 @@ def __init__(self, # pylint: disable=too-many-arguments,too-many-locals
self.arguments = arguments or []
self.labels = labels or {}
self.startup_timeout_seconds = startup_timeout_seconds
self.name = name
self.name = self._set_name(name)
self.env_vars = env_vars or {}
self.ports = ports or []
self.volume_mounts = volume_mounts or []
Expand All @@ -230,10 +250,7 @@ def __init__(self, # pylint: disable=too-many-arguments,too-many-locals
self.node_selectors = node_selectors or {}
self.annotations = annotations or {}
self.affinity = affinity or {}
self.do_xcom_push = do_xcom_push
if kwargs.get('xcom_push') is not None:
raise AirflowException("'xcom_push' was deprecated, use 'do_xcom_push' instead")
self.resources = resources
self.resources = self._set_resources(resources)
self.config_file = config_file
self.image_pull_secrets = image_pull_secrets
self.service_account_name = service_account_name
Expand Down
4 changes: 4 additions & 0 deletions airflow/kubernetes/pod.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ class Resources(K8SModel):
:param limit_gpu: Limits for GPU used
:type limit_gpu: int
"""
__slots__ = ('request_memory', 'request_cpu', 'limit_memory', 'limit_cpu', 'limit_gpu')

def __init__(
self,
request_memory=None,
Expand Down Expand Up @@ -82,6 +84,8 @@ def attach_to_pod(self, pod: k8s.V1Pod) -> k8s.V1Pod:

class Port(K8SModel):
"""POD port"""
__slots__ = ('name', 'container_port')

def __init__(
self,
name=None,
Expand Down
1 change: 0 additions & 1 deletion airflow/kubernetes/pod_generator.py
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,6 @@ def from_obj(obj) -> k8s.V1Pod:
requests = {
'cpu': namespaced.get('request_cpu'),
'memory': namespaced.get('request_memory')

}
limits = {
'cpu': namespaced.get('limit_cpu'),
Expand Down
2 changes: 1 addition & 1 deletion airflow/utils/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@
'core', 'KILLED_TASK_CLEANUP_TIME'
)

KEY_REGEX = re.compile(r'^[\w\-\.]+$')
KEY_REGEX = re.compile(r'^[\w.-]+$')


def validate_key(k, max_length=250):
Expand Down
4 changes: 3 additions & 1 deletion scripts/ci/_utils.sh
Original file line number Diff line number Diff line change
Expand Up @@ -863,7 +863,9 @@ function build_image_on_ci() {
echo "Finding changed file names ${TRAVIS_BRANCH}...HEAD"
echo

CHANGED_FILE_NAMES=$(git diff --name-only "${TRAVIS_BRANCH}...HEAD")
git config remote.origin.fetch "+refs/heads/*:refs/remotes/origin/*"
git fetch origin "${TRAVIS_BRANCH}"
CHANGED_FILE_NAMES=$(git diff --name-only "remotes/origin/${TRAVIS_BRANCH}...HEAD")
echo
echo "Changed file names in this commit"
echo "${CHANGED_FILE_NAMES}"
Expand Down

0 comments on commit dd28423

Please sign in to comment.