Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 17 additions & 17 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ env:
- CI="true"
python: "3.6"
stages:
- pre-test
# - pre-test
- test
jobs:
include:
Expand All @@ -54,15 +54,15 @@ jobs:
PYTHON_VERSION=3.6
python: "3.6"
stage: test
- name: "Tests postgres kubernetes python 3.6 (git)"
env: >-
BACKEND=postgres
ENV=kubernetes
KUBERNETES_VERSION=v1.15.0
KUBERNETES_MODE=git_mode
PYTHON_VERSION=3.6
python: "3.6"
stage: test
# - name: "Tests postgres kubernetes python 3.6 (git)"
# env: >-
# BACKEND=postgres
# ENV=kubernetes
# KUBERNETES_VERSION=v1.15.0
# KUBERNETES_MODE=git_mode
# PYTHON_VERSION=3.6
# python: "3.6"
# stage: test
- name: "Tests postgres python 3.6"
env: >-
BACKEND=postgres
Expand All @@ -77,13 +77,13 @@ jobs:
PYTHON_VERSION=3.5
python: "3.5"
stage: test
- name: "Tests mysql python 3.7"
env:
BACKEND=mysql
ENV=docker
PYTHON_VERSION=3.7
python: "3.7"
stage: test
# - name: "Tests mysql python 3.7"
# env:
# BACKEND=mysql
# ENV=docker
# PYTHON_VERSION=3.7
# python: "3.7"
# stage: test
services:
- docker
before_install:
Expand Down
56 changes: 55 additions & 1 deletion airflow/contrib/example_dags/example_kubernetes_operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,51 @@
This is an example dag for using the KubernetesPodOperator.
"""
from airflow.models import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.utils.dates import days_ago
from airflow.utils.log.logging_mixin import LoggingMixin

log = LoggingMixin().log

BUSYBOX_SLEEP_YAML = """
apiVersion: v1
kind: Pod
metadata:
labels:
run: example-yaml-2
name: example-yaml-2
spec:
containers:
- args:
- sh
- -c
- echo 123; sleep 10
image: busybox
name: example-yaml-2
restartPolicy: Never
"""

ALPHINE_XCOM_YAML = """
apiVersion: v1
kind: Pod
metadata:
name: example-yaml-xcom
spec:
containers:
- args:
- sh
- -c
- mkdir -p /airflow/xcom/;echo '[1,2,3,4]' > /airflow/xcom/return.json
image: alpine
name: example-yaml-xcom
restartPolicy: Never
"""

try:
# Kubernetes is optional, so not available in vanilla Airflow
# pip install 'apache-airflow[kubernetes]'
from airflow.contrib.operators.kubernetes_pod_operator import KubernetesPodOperator
from airflow.contrib.operators.kubernetes_pod_operator import KubernetesPodOperator, \
KubernetesPodYamlOperator

default_args = {
'owner': 'Airflow',
Expand Down Expand Up @@ -63,6 +99,24 @@
tolerations=tolerations
)

start_pod = KubernetesPodYamlOperator(
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need a separate operator for this? Might a KubeernetesPodOperator(yaml=...) work?

Copy link
Copy Markdown
Member Author

@mik-laj mik-laj Oct 21, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am afraid that it will be very problematic to combine these two ways of defining pods. All we can do is say that the selected parameters are mutually exclusive. However, it will be very difficult for the user to understand if which parameters work with each other and which do not work. A similar case is in the configuration of the worker, where when configuring the template it is said that all other fields are ignored. However, In this case, we will ignore all parameters, but a very long list of parameters that this will make this operator difficult to use.

Copy link
Copy Markdown
Member Author

@mik-laj mik-laj Oct 21, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The following parameters from KubernetesPodOperator do not work with this operator:

  • affinity
  • annotations
  • args
  • cmds
  • configmaps
  • dnspolicy
  • envs
  • hostnetwork
  • image
  • image_pull_policy
  • image_pull_secrets
  • labels
  • name
  • namespace
  • node_selectors
  • pod
  • resources
  • security_context
  • service_account_name
  • tolerations

The following parameters are currently required by KubernetesPodOperator, but when these operators are combined, they will have to become optional.

  • namespace,
  • image,
  • name,

In my opinion, a certain improvement is to create one base class that will limit duplication of code, but for the benefit of users it is better to leave two classes.

task_id="start_pod",
yaml=BUSYBOX_SLEEP_YAML
)

start_pod_xcom = KubernetesPodYamlOperator(
task_id="start_pod_xcom",
yaml=ALPHINE_XCOM_YAML,
do_xcom_push=True
)

pod_task_xcom_result = BashOperator(
bash_command="echo \"{{ task_instance.xcom_pull('start_pod_xcom')[0] }}\"",
task_id="start_pod_xcom_result",
)

start_pod_xcom >> pod_task_xcom_result

except ImportError as e:
log.warning("Could not import KubernetesPodOperator: " + str(e))
log.warning("Install kubernetes dependencies with: "
Expand Down
109 changes: 107 additions & 2 deletions airflow/contrib/operators/kubernetes_pod_operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,13 @@
# specific language governing permissions and limitations
# under the License.
"""Executes task in a Kubernetes POD"""
from typing import Optional

import kubernetes.client.models as k8s
import yaml as yaml_deserializer

from airflow.exceptions import AirflowException
from airflow.kubernetes import kube_client, pod_generator, pod_launcher
from airflow.kubernetes import k8s_deserializer, kube_client, pod_generator, pod_launcher, pod_enricher
from airflow.kubernetes.k8s_model import append_to_pod
from airflow.models import BaseOperator
from airflow.utils.decorators import apply_defaults
Expand Down Expand Up @@ -126,7 +131,6 @@ def execute(self, context):
labels=self.labels,
name=self.name,
envs=self.env_vars,
extract_xcom=self.do_xcom_push,
image_pull_policy=self.image_pull_policy,
node_selectors=self.node_selectors,
annotations=self.annotations,
Expand All @@ -148,6 +152,8 @@ def execute(self, context):
pod = append_to_pod(pod, self.volume_mounts)
pod = append_to_pod(pod, self.secrets)

pod = pod_enricher.refine_pod(pod, extract_xcom=self.do_xcom_push)

self.pod = pod

launcher = pod_launcher.PodLauncher(kube_client=client,
Expand Down Expand Up @@ -245,3 +251,102 @@ def __init__(self, # pylint: disable=too-many-arguments,too-many-locals
self.pod_runtime_info_envs = pod_runtime_info_envs or []
self.dnspolicy = dnspolicy
self.full_pod_spec = full_pod_spec


class KubernetesPodYamlOperator(BaseOperator):
"""
Execute a pod using yaml definition in the Kubernetes cluster.

:param pod_file: Path to yaml file.
:type str: str
:param do_xcom_push: If True, the content of the file
/airflow/xcom/return.json in the container will also be pushed to an
XCom when the container completes.
:type do_xcom_push: bool
:param is_delete_operator_pod: What to do when the pod reaches its final
state, or the execution is interrupted.
If False (default): do nothing, If True: delete the pod
:type is_delete_operator_pod: bool
:param in_cluster: run kubernetes client with in_cluster configuration
:type in_cluster: bool
:param cluster_context: context that points to kubernetes cluster.
Ignored when in_cluster is True. If None, current-context is used.
:type cluster_context: str
:param is_delete_operator_pod: What to do when the pod reaches its final
state, or the execution is interrupted.
If False (default): do nothing, If True: delete the pod
:type is_delete_operator_pod: bool
:param get_logs: get the stdout of the container as logs of the tasks
:type get_logs: bool
:param startup_timeout_seconds: timeout in seconds to startup the pod
:type startup_timeout_seconds: int
"""
template_fields = ('yaml', 'config_file')
template_ext = ['yaml', 'yml']

@apply_defaults
def __init__(self,
yaml: Optional[str] = None,
do_xcom_push: bool = False,
config_file: Optional[str] = None,
in_cluster: bool = False,
cluster_context: Optional[str] = None,
is_delete_operator_pod: bool = False,
get_logs: bool = False,
startup_timeout_seconds: int = 120,
*args,
**kwargs):
super().__init__(*args, **kwargs)

self.yaml = yaml
self.do_xcom_push = do_xcom_push
self.config_file = config_file
self.in_cluster = in_cluster
self.cluster_context = cluster_context
self.is_delete_operator_pod = is_delete_operator_pod
self.get_logs = get_logs
self.startup_timeout_seconds = startup_timeout_seconds

self.pod = None

def execute(self, context):
try:
client = kube_client.get_kube_client(in_cluster=self.in_cluster,
cluster_context=self.cluster_context,
config_file=self.config_file)

yaml_document_all = list(yaml_deserializer.safe_load_all(self.yaml))

if not yaml_document_all:
raise AirflowException(
"You must specify Pod resource definitions."
)

if len(yaml_document_all) > 1:
raise AirflowException(
"You can only run one Pod at a time. Please delete the other resource definitions "
"from the YAML file"
)

pod_dict = yaml_document_all[0]
pod = k8s_deserializer.deserialize(pod_dict, k8s.V1Pod)

pod = pod_enricher.refine_pod(pod, extract_xcom=self.do_xcom_push)
launcher = pod_launcher.PodLauncher(kube_client=client,
extract_xcom=self.do_xcom_push)
try:
run_pod = launcher.run_pod(pod=pod, startup_timeout=self.startup_timeout_seconds,
get_logs=self.get_logs)
final_state, result = run_pod
finally:
if self.is_delete_operator_pod:
launcher.delete_pod(pod)

if final_state != State.SUCCESS:
raise AirflowException(
'Pod returned a failure: {state}'.format(state=final_state)
)

return result
except AirflowException as ex:
raise AirflowException('Pod Launching failed: {error}'.format(error=ex))
Loading