Skip to content

Commit

Permalink
Move k8sexecutor out of contrib to closer match master (#8904)
Browse files Browse the repository at this point in the history
Considering that the k8s executor is now supported by core
committers, we should move it from contrib to the primary executor
directory.

Co-authored-by: Daniel Imberman <daniel@astronomer.io>
  • Loading branch information
dimberman and astro-sql-decorator committed May 29, 2020
1 parent 1e7616b commit e68a7d0
Show file tree
Hide file tree
Showing 33 changed files with 124 additions and 98 deletions.
2 changes: 1 addition & 1 deletion airflow/contrib/hooks/spark_submit_hook.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
from airflow.utils.log.logging_mixin import LoggingMixin

try:
from airflow.contrib.kubernetes import kube_client
from airflow.kubernetes import kube_client
except ImportError:
pass

Expand Down
5 changes: 5 additions & 0 deletions airflow/contrib/kubernetes/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
# -*- coding: utf-8 -*-
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
Expand All @@ -14,3 +16,6 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#

from airflow.kubernetes import * # noqa
12 changes: 6 additions & 6 deletions airflow/contrib/operators/kubernetes_pod_operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@
from airflow.exceptions import AirflowException
from airflow.models import BaseOperator
from airflow.utils.decorators import apply_defaults
from airflow.contrib.kubernetes import kube_client, pod_generator, pod_launcher
from airflow.contrib.kubernetes.pod import Resources
from airflow.kubernetes import kube_client, pod_generator, pod_launcher
from airflow.kubernetes.pod import Resources
from airflow.utils.helpers import validate_key
from airflow.utils.state import State
from airflow.version import version as airflow_version
Expand Down Expand Up @@ -50,11 +50,11 @@ class KubernetesPodOperator(BaseOperator): # pylint: disable=too-many-instance-
comma separated list: secret_a,secret_b
:type image_pull_secrets: str
:param ports: ports for launched pod.
:type ports: list[airflow.contrib.kubernetes.pod.Port]
:type ports: list[airflow.kubernetes.pod.Port]
:param volume_mounts: volumeMounts for launched pod.
:type volume_mounts: list[airflow.contrib.kubernetes.volume_mount.VolumeMount]
:type volume_mounts: list[airflow.kubernetes.volume_mount.VolumeMount]
:param volumes: volumes for launched pod. Includes ConfigMaps and PersistentVolumes.
:type volumes: list[airflow.contrib.kubernetes.volume.Volume]
:type volumes: list[airflow.kubernetes.volume.Volume]
:param labels: labels to apply to the Pod.
:type labels: dict
:param startup_timeout_seconds: timeout in seconds to startup the pod.
Expand All @@ -66,7 +66,7 @@ class KubernetesPodOperator(BaseOperator): # pylint: disable=too-many-instance-
:type env_vars: dict
:param secrets: Kubernetes secrets to inject in the container.
They can be exposed as environment vars or files in a volume
:type secrets: list[airflow.contrib.kubernetes.secret.Secret]
:type secrets: list[airflow.kubernetes.secret.Secret]
:param in_cluster: run kubernetes client with in_cluster configuration.
:type in_cluster: bool
:param cluster_context: context that points to kubernetes cluster.
Expand Down
2 changes: 1 addition & 1 deletion airflow/executors/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ def _get_executor(executor_name):
from airflow.contrib.executors.mesos_executor import MesosExecutor
return MesosExecutor()
elif executor_name == Executors.KubernetesExecutor:
from airflow.contrib.executors.kubernetes_executor import KubernetesExecutor
from airflow.executors.kubernetes_executor import KubernetesExecutor
return KubernetesExecutor()
elif executor_name == Executors.DebugExecutor:
from airflow.executors.debug_executor import DebugExecutor
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,9 @@
from urllib3.exceptions import HTTPError, ReadTimeoutError

from airflow.configuration import conf
from airflow.contrib.kubernetes.pod_launcher import PodLauncher
from airflow.contrib.kubernetes.kube_client import get_kube_client
from airflow.contrib.kubernetes.worker_configuration import WorkerConfiguration
from airflow.kubernetes.pod_launcher import PodLauncher
from airflow.kubernetes.kube_client import get_kube_client
from airflow.kubernetes.worker_configuration import WorkerConfiguration
from airflow.executors.base_executor import BaseExecutor
from airflow.executors import Executors
from airflow.models import KubeResourceVersion, KubeWorkerIdentifier, TaskInstance
Expand Down
File renamed without changes.
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
from kubernetes.client.rest import ApiException # pylint: disable=unused-import
from kubernetes.client.api_client import ApiClient
from kubernetes.client import Configuration
from airflow.contrib.kubernetes.refresh_config import ( # pylint: disable=ungrouped-imports
from airflow.kubernetes.refresh_config import ( # pylint: disable=ungrouped-imports
load_kube_config,
RefreshConfiguration,
)
Expand Down
16 changes: 16 additions & 0 deletions airflow/kubernetes/kubernetes_request_factory/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,15 @@
# under the License.

import yaml
from airflow.contrib.kubernetes.pod import Pod
from airflow.contrib.kubernetes.kubernetes_request_factory.kubernetes_request_factory \
from airflow.kubernetes.pod import Pod
from airflow.kubernetes.kubernetes_request_factory.kubernetes_request_factory \
import KubernetesRequestFactory


class SimplePodRequestFactory(KubernetesRequestFactory):
"""
Request generator for a simple pod.
Request generator for a pod.
"""
_yaml = """apiVersion: v1
kind: Pod
Expand All @@ -38,6 +39,7 @@ class SimplePodRequestFactory(KubernetesRequestFactory):
"""

def __init__(self):

pass

def create(self, pod):
Expand Down Expand Up @@ -71,7 +73,9 @@ def create(self, pod):
class ExtractXcomPodRequestFactory(KubernetesRequestFactory):
"""
Request generator for a pod with sidecar container.
"""

XCOM_MOUNT_PATH = '/airflow/xcom'
SIDECAR_CONTAINER_NAME = 'airflow-xcom-sidecar'
_yaml = """apiVersion: v1
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ class Pod:
:param cmds: The command to be run on the pod
:type cmds: list[str]
:param secrets: Secrets to be launched to the pod
:type secrets: list[airflow.contrib.kubernetes.secret.Secret]
:type secrets: list[airflow.kubernetes.secret.Secret]
:param result: The result that will be returned to the operator after
successful execution of the pod
:type result: any
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,10 @@
# specific language governing permissions and limitations
# under the License.

from airflow.contrib.kubernetes.pod import Pod, Port
from airflow.contrib.kubernetes.volume import Volume
from airflow.contrib.kubernetes.volume_mount import VolumeMount
from airflow.kubernetes.pod import Pod, Port
from airflow.kubernetes.volume import Volume
from airflow.kubernetes.volume_mount import VolumeMount

import uuid


Expand Down Expand Up @@ -70,7 +71,7 @@ def add_port(self, port): # type: (Port) -> None
Adds a Port to the generator
:param port: ports for generated pod
:type port: airflow.contrib.kubernetes.pod.Port
:type port: airflow.kubernetes.pod.Port
"""
self.ports.append({'name': port.name, 'containerPort': port.container_port})

Expand All @@ -79,7 +80,7 @@ def add_volume(self, volume): # type: (Volume) -> None
Adds a Volume to the generator
:param volume: volume for generated pod
:type volume: airflow.contrib.kubernetes.volume.Volume
:type volume: airflow.kubernetes.volume.Volume
"""

self._add_volume(name=volume.name, configs=volume.configs)
Expand Down Expand Up @@ -140,7 +141,7 @@ def add_mount(self,
Adds a VolumeMount to the generator
:param volume_mount: volume for generated pod
:type volume_mount: airflow.contrib.kubernetes.volume_mount.VolumeMount
:type volume_mount: airflow.kubernetes.volume_mount.VolumeMount
"""
self._add_mount(
name=volume_mount.name,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@
from airflow.utils.state import State
from airflow import AirflowException

from airflow.contrib.kubernetes.pod import Pod
from airflow.contrib.kubernetes.kubernetes_request_factory import \
from airflow.kubernetes.pod import Pod
from airflow.kubernetes.kubernetes_request_factory import \
pod_request_factory as pod_factory

from .kube_client import get_kube_client
Expand Down
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@
import six

from airflow.configuration import conf
from airflow.contrib.kubernetes.pod import Pod, Resources
from airflow.contrib.kubernetes.secret import Secret
from airflow.kubernetes.pod import Pod, Resources
from airflow.kubernetes.secret import Secret
from airflow.utils.log.logging_mixin import LoggingMixin
from airflow.version import version as airflow_version

Expand Down
3 changes: 0 additions & 3 deletions docs/autoapi_templates/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -107,9 +107,6 @@ All executors are in the following packages:

airflow/executors/index

airflow/contrib/executors/index


Models
------
Models are built on top of the SQLAlchemy ORM Base class, and instances are
Expand Down
2 changes: 2 additions & 0 deletions docs/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,7 @@
'_api/airflow/sentry',
'_api/airflow/stats',
'_api/airflow/task',
'_api/airflow/kubernetes',
'_api/airflow/ti_deps',
'_api/airflow/utils',
'_api/airflow/version',
Expand Down Expand Up @@ -479,6 +480,7 @@
'*/airflow/contrib/operators/s3_to_gcs_transfer_operator.py',
'*/airflow/contrib/operators/gcs_to_gcs_transfer_operator.py',
'*/airflow/contrib/operators/gcs_to_gcs_transfer_operator.py',
'*/airflow/kubernetes/kubernetes_request_factory/*',

'*/node_modules/*',
'*/migrations/*',
Expand Down
8 changes: 4 additions & 4 deletions docs/kubernetes.rst
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,10 @@ any type of executor.
.. code:: python
from airflow.contrib.operators.kubernetes_pod_operator import KubernetesPodOperator
from airflow.contrib.kubernetes.secret import Secret
from airflow.contrib.kubernetes.volume import Volume
from airflow.contrib.kubernetes.volume_mount import VolumeMount
from airflow.contrib.kubernetes.pod import Port
from airflow.kubernetes.secret import Secret
from airflow.kubernetes.volume import Volume
from airflow.kubernetes.volume_mount import VolumeMount
from airflow.kubernetes.pod import Port
secret_file = Secret('volume', '/etc/sql_conn', 'airflow-secrets', 'sql_alchemy_conn')
Expand Down
2 changes: 1 addition & 1 deletion tests/contrib/hooks/test_spark_submit_hook.py
Original file line number Diff line number Diff line change
Expand Up @@ -706,7 +706,7 @@ def test_standalone_cluster_process_on_kill(self):
self.assertEqual(kill_cmd[3], '--kill')
self.assertEqual(kill_cmd[4], 'driver-20171128111415-0001')

@patch('airflow.contrib.kubernetes.kube_client.get_kube_client')
@patch('airflow.kubernetes.kube_client.get_kube_client')
@patch('airflow.contrib.hooks.spark_submit_hook.subprocess.Popen')
def test_k8s_process_on_kill(self, mock_popen, mock_client_method):
# Given
Expand Down

0 comments on commit e68a7d0

Please sign in to comment.