Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[AIRFLOW-5873] KubernetesPodOperator fixes and test #6524

Merged
merged 1 commit into from
Nov 12, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
105 changes: 61 additions & 44 deletions airflow/contrib/operators/kubernetes_pod_operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,15 @@
# specific language governing permissions and limitations
# under the License.
"""Executes task in a Kubernetes POD"""
import re

from airflow.exceptions import AirflowException
from airflow.kubernetes import kube_client, pod_generator, pod_launcher
from airflow.kubernetes.k8s_model import append_to_pod
from airflow.kubernetes.pod import Resources
from airflow.models import BaseOperator
from airflow.utils.decorators import apply_defaults
from airflow.utils.helpers import validate_key
from airflow.utils.state import State


Expand All @@ -32,59 +36,62 @@ class KubernetesPodOperator(BaseOperator): # pylint: disable=too-many-instance-
:class:`~airflow.gcp.operators.kubernetes_engine.GKEPodOperator`, which
simplifies the authorization process.

:param image: Docker image you wish to launch. Defaults to dockerhub.io,
but fully qualified URLS will point to custom repositories
:param image: Docker image you wish to launch. Defaults to hub.docker.com,
but fully qualified URLS will point to custom repositories.
:type image: str
:param namespace: the namespace to run within kubernetes
:param namespace: the namespace to run within kubernetes.
:type namespace: str
:param cmds: entrypoint of the container. (templated)
The docker images's entrypoint is used if this is not provide.
The docker images's entrypoint is used if this is not provided.
:type cmds: list[str]
:param arguments: arguments of the entrypoint. (templated)
The docker image's CMD is used if this is not provided.
:type arguments: list[str]
:param image_pull_policy: Specify a policy to cache or always pull an image
:param image_pull_policy: Specify a policy to cache or always pull an image.
:type image_pull_policy: str
:param image_pull_secrets: Any image pull secrets to be given to the pod.
If more than one secret is required, provide a
comma separated list: secret_a,secret_b
:type image_pull_secrets: str
:param ports: ports for launched pod
:type ports: list[airflow.kubernetes.models.port.Port]
:param volume_mounts: volumeMounts for launched pod
:type volume_mounts: list[airflow.kubernetes.models.volume_mount.VolumeMount]
:param volumes: volumes for launched pod. Includes ConfigMaps and PersistentVolumes
:type volumes: list[airflow.kubernetes.models.volume.Volume]
:param labels: labels to apply to the Pod
:param ports: ports for launched pod.
:type ports: list[airflow.kubernetes.pod.Port]
:param volume_mounts: volumeMounts for launched pod.
:type volume_mounts: list[airflow.kubernetes.volume_mount.VolumeMount]
:param volumes: volumes for launched pod. Includes ConfigMaps and PersistentVolumes.
:type volumes: list[airflow.kubernetes.volume.Volume]
:param labels: labels to apply to the Pod.
:type labels: dict
:param startup_timeout_seconds: timeout in seconds to startup the pod
:param startup_timeout_seconds: timeout in seconds to startup the pod.
:type startup_timeout_seconds: int
:param name: name of the task you want to run,
will be used to generate a pod id
:param name: name of the pod in which the task will run, will be used to
generate a pod id (DNS-1123 subdomain, containing only [a-z0-9.-]).
:type name: str
:param env_vars: Environment variables initialized in the container. (templated)
:type env_vars: dict
:param secrets: Kubernetes secrets to inject in the container,
:param secrets: Kubernetes secrets to inject in the container.
They can be exposed as environment vars or files in a volume.
:type secrets: list[airflow.kubernetes.models.secret.Secret]
:param in_cluster: run kubernetes client with in_cluster configuration
:type secrets: list[airflow.kubernetes.secret.Secret]
:param in_cluster: run kubernetes client with in_cluster configuration.
:type in_cluster: bool
:param cluster_context: context that points to kubernetes cluster.
Ignored when in_cluster is True. If None, current-context is used.
:type cluster_context: str
:param get_logs: get the stdout of the container as logs of the tasks
:param get_logs: get the stdout of the container as logs of the tasks.
:type get_logs: bool
:param annotations: non-identifying metadata you can attach to the Pod.
Can be a large range of data, and can include characters
that are not permitted by labels.
:type annotations: dict
:param resources: A dict containing a group of resources requests and limits
:param resources: A dict containing resources requests and limits.
Possible keys are request_memory, request_cpu, limit_memory, limit_cpu,
and limit_gpu, which will be used to generate airflow.kubernetes.pod.Resources.
See also kubernetes.io/docs/concepts/configuration/manage-compute-resources-container
:type resources: dict
:param affinity: A dict containing a group of affinity scheduling rules
:param affinity: A dict containing a group of affinity scheduling rules.
:type affinity: dict
:param node_selectors: A dict containing a group of scheduling rules
:param node_selectors: A dict containing a group of scheduling rules.
:type node_selectors: dict
:param config_file: The path to the Kubernetes config file.
:param config_file: The path to the Kubernetes config file. (templated)
If not specified, default value is ``~/.kube/config``
:type config_file: str
:param do_xcom_push: If True, the content of the file
Expand All @@ -95,17 +102,19 @@ class KubernetesPodOperator(BaseOperator): # pylint: disable=too-many-instance-
state, or the execution is interrupted.
If False (default): do nothing, If True: delete the pod
:type is_delete_operator_pod: bool
:param hostnetwork: If True enable host networking on the pod
:param hostnetwork: If True enable host networking on the pod.
:type hostnetwork: bool
:param tolerations: A list of kubernetes tolerations
:param tolerations: A list of kubernetes tolerations.
:type tolerations: list tolerations
:param configmaps: A list of configmap names objects that we
want mount as env variables
want mount as env variables.
:type configmaps: list[str]
:param pod_runtime_info_envs: environment variables about
pod runtime information (ip, namespace, nodeName, podName)
:type pod_runtime_info_envs: list[airflow.kubernetes.models.pod_runtime_info_env.PodRuntimeInfoEnv]
:param dnspolicy: Specify a dnspolicy for the pod
pod runtime information (ip, namespace, nodeName, podName).
:type pod_runtime_info_envs: list[airflow.kubernetes.pod_runtime_info_env.PodRuntimeInfoEnv]
:param security_context: security options the pod should run with (PodSecurityContext).
:type security_context: dict
:param dnspolicy: dnspolicy for the pod.
:type dnspolicy: str
:param full_pod_spec: The complete podSpec
:type full_pod_spec: kubernetes.client.models.V1Pod
Expand Down Expand Up @@ -138,15 +147,18 @@ def execute(self, context):
configmaps=self.configmaps,
security_context=self.security_context,
dnspolicy=self.dnspolicy,
resources=self.resources,
pod=self.full_pod_spec,
).gen_pod()

pod = append_to_pod(pod, self.ports)
pod = append_to_pod(pod, self.pod_runtime_info_envs)
pod = append_to_pod(pod, self.volumes)
pod = append_to_pod(pod, self.volume_mounts)
pod = append_to_pod(pod, self.secrets)
pod = append_to_pod(
pod,
self.pod_runtime_info_envs +
self.ports +
self.resources +
self.secrets +
self.volumes +
self.volume_mounts
)

self.pod = pod

Expand All @@ -171,6 +183,13 @@ def execute(self, context):
except AirflowException as ex:
raise AirflowException('Pod Launching failed: {error}'.format(error=ex))

def _set_resources(self, resources):
return [Resources(**resources) if resources else Resources()]

def _set_name(self, name):
validate_key(name, max_length=63)
return re.sub(r'[^a-z0-9.-]+', '-', name.lower())

@apply_defaults
def __init__(self, # pylint: disable=too-many-arguments,too-many-locals
namespace,
Expand All @@ -183,7 +202,7 @@ def __init__(self, # pylint: disable=too-many-arguments,too-many-locals
volumes=None,
env_vars=None,
secrets=None,
in_cluster=False,
in_cluster=True,
cluster_context=None,
labels=None,
startup_timeout_seconds=120,
Expand All @@ -193,10 +212,9 @@ def __init__(self, # pylint: disable=too-many-arguments,too-many-locals
resources=None,
affinity=None,
config_file=None,
do_xcom_push=False,
node_selectors=None,
image_pull_secrets=None,
service_account_name="default",
service_account_name='default',
is_delete_operator_pod=False,
hostnetwork=False,
tolerations=None,
Expand All @@ -207,7 +225,9 @@ def __init__(self, # pylint: disable=too-many-arguments,too-many-locals
full_pod_spec=None,
*args,
**kwargs):
super().__init__(*args, **kwargs)
if kwargs.get('xcom_push') is not None:
raise AirflowException("'xcom_push' was deprecated, use 'do_xcom_push' instead")
super().__init__(*args, resources=None, **kwargs)

self.pod = None

Expand All @@ -217,7 +237,7 @@ def __init__(self, # pylint: disable=too-many-arguments,too-many-locals
self.arguments = arguments or []
self.labels = labels or {}
self.startup_timeout_seconds = startup_timeout_seconds
self.name = name
self.name = self._set_name(name)
self.env_vars = env_vars or {}
self.ports = ports or []
self.volume_mounts = volume_mounts or []
Expand All @@ -230,10 +250,7 @@ def __init__(self, # pylint: disable=too-many-arguments,too-many-locals
self.node_selectors = node_selectors or {}
self.annotations = annotations or {}
self.affinity = affinity or {}
self.do_xcom_push = do_xcom_push
if kwargs.get('xcom_push') is not None:
raise AirflowException("'xcom_push' was deprecated, use 'do_xcom_push' instead")
self.resources = resources
self.resources = self._set_resources(resources)
self.config_file = config_file
self.image_pull_secrets = image_pull_secrets
self.service_account_name = service_account_name
Expand Down
4 changes: 4 additions & 0 deletions airflow/kubernetes/pod.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ class Resources(K8SModel):
:param limit_gpu: Limits for GPU used
:type limit_gpu: int
"""
__slots__ = ('request_memory', 'request_cpu', 'limit_memory', 'limit_cpu', 'limit_gpu')
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice!


def __init__(
self,
request_memory=None,
Expand Down Expand Up @@ -82,6 +84,8 @@ def attach_to_pod(self, pod: k8s.V1Pod) -> k8s.V1Pod:

class Port(K8SModel):
"""POD port"""
__slots__ = ('name', 'container_port')

def __init__(
self,
name=None,
Expand Down
1 change: 0 additions & 1 deletion airflow/kubernetes/pod_generator.py
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,6 @@ def from_obj(obj) -> k8s.V1Pod:
requests = {
'cpu': namespaced.get('request_cpu'),
'memory': namespaced.get('request_memory')

}
limits = {
'cpu': namespaced.get('limit_cpu'),
Expand Down
2 changes: 1 addition & 1 deletion airflow/utils/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@
'core', 'KILLED_TASK_CLEANUP_TIME'
)

KEY_REGEX = re.compile(r'^[\w\-\.]+$')
KEY_REGEX = re.compile(r'^[\w.-]+$')


def validate_key(k, max_length=250):
Expand Down
4 changes: 3 additions & 1 deletion scripts/ci/_utils.sh
Original file line number Diff line number Diff line change
Expand Up @@ -863,7 +863,9 @@ function build_image_on_ci() {
echo "Finding changed file names ${TRAVIS_BRANCH}...HEAD"
echo

CHANGED_FILE_NAMES=$(git diff --name-only "${TRAVIS_BRANCH}...HEAD")
git config remote.origin.fetch "+refs/heads/*:refs/remotes/origin/*"
git fetch origin "${TRAVIS_BRANCH}"
CHANGED_FILE_NAMES=$(git diff --name-only "remotes/origin/${TRAVIS_BRANCH}...HEAD")
echo
echo "Changed file names in this commit"
echo "${CHANGED_FILE_NAMES}"
Expand Down