Skip to content

Commit

Permalink
Support setting global k8s affinity and toleration configuration in t…
Browse files Browse the repository at this point in the history
…he airflow config file.
  • Loading branch information
kppullin-nt committed Dec 2, 2018
1 parent 5c7e4e4 commit 77bde41
Show file tree
Hide file tree
Showing 8 changed files with 162 additions and 13 deletions.
10 changes: 10 additions & 0 deletions airflow/config_templates/default_airflow.cfg
Expand Up @@ -648,6 +648,16 @@ gcp_service_account_keys =
# It will raise an exception if called from a process not running in a kubernetes environment.
in_cluster = True

# Affinity configuration as a single line formatted JSON object.
# See the affinity model for top-level key names (e.g. `nodeAffinity`, etc.):
# https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.12/#affinity-v1-core
affinity =

# A list of toleration objects as a single line formatted JSON array
# See:
# https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.12/#toleration-v1-core
tolerations =

[kubernetes_node_selectors]
# The Key-value pairs to be given to worker pods.
# The worker pods will be scheduled to the nodes of the specified key-value pairs.
Expand Down
32 changes: 30 additions & 2 deletions airflow/contrib/example_dags/example_kubernetes_executor.py
Expand Up @@ -32,6 +32,31 @@
schedule_interval=None
)

affinity = {
'podAntiAffinity': {
'requiredDuringSchedulingIgnoredDuringExecution': [
{
'topologyKey': 'kubernetes.io/hostname',
'labelSelector': {
'matchExpressions': [
{
'key': 'app',
'operator': 'In',
'values': ['airflow']
}
]
}
}
]
}
}

tolerations = [{
'key': 'dedicated',
'operator': 'Equal',
'value': 'airflow'
}]


def print_stuff():
print("stuff!")
Expand Down Expand Up @@ -59,11 +84,14 @@ def use_zip_binary():
executor_config={"KubernetesExecutor": {"image": "airflow/ci_zip:latest"}}
)

# Limit resources on this operator/task
# Limit resources on this operator/task with node affinity & tolerations
three_task = PythonOperator(
task_id="three_task", python_callable=print_stuff, dag=dag,
executor_config={
"KubernetesExecutor": {"request_memory": "128Mi", "limit_memory": "128Mi"}}
"KubernetesExecutor": {"request_memory": "128Mi",
"limit_memory": "128Mi",
"tolerations": tolerations,
"affinity": affinity}}
)

start_task.set_downstream([one_task, two_task, three_task])
23 changes: 20 additions & 3 deletions airflow/contrib/executors/kubernetes_executor.py
Expand Up @@ -16,6 +16,7 @@
# under the License.

import base64
import json
import multiprocessing
from queue import Queue
from dateutil import parser
Expand All @@ -40,7 +41,7 @@ class KubernetesExecutorConfig:
def __init__(self, image=None, image_pull_policy=None, request_memory=None,
request_cpu=None, limit_memory=None, limit_cpu=None,
gcp_service_account_key=None, node_selectors=None, affinity=None,
annotations=None, volumes=None, volume_mounts=None):
annotations=None, volumes=None, volume_mounts=None, tolerations=None):
self.image = image
self.image_pull_policy = image_pull_policy
self.request_memory = request_memory
Expand All @@ -53,16 +54,18 @@ def __init__(self, image=None, image_pull_policy=None, request_memory=None,
self.annotations = annotations
self.volumes = volumes
self.volume_mounts = volume_mounts
self.tolerations = tolerations

def __repr__(self):
return "{}(image={}, image_pull_policy={}, request_memory={}, request_cpu={}, " \
"limit_memory={}, limit_cpu={}, gcp_service_account_key={}, " \
"node_selectors={}, affinity={}, annotations={}, volumes={}, " \
"volume_mounts={})" \
"volume_mounts={}, tolerations={})" \
.format(KubernetesExecutorConfig.__name__, self.image, self.image_pull_policy,
self.request_memory, self.request_cpu, self.limit_memory,
self.limit_cpu, self.gcp_service_account_key, self.node_selectors,
self.affinity, self.annotations, self.volumes, self.volume_mounts)
self.affinity, self.annotations, self.volumes, self.volume_mounts,
self.tolerations)

@staticmethod
def from_dict(obj):
Expand All @@ -88,6 +91,7 @@ def from_dict(obj):
annotations=namespaced.get('annotations', {}),
volumes=namespaced.get('volumes', []),
volume_mounts=namespaced.get('volume_mounts', []),
tolerations=namespaced.get('tolerations', None),
)

def as_dict(self):
Expand All @@ -104,6 +108,7 @@ def as_dict(self):
'annotations': self.annotations,
'volumes': self.volumes,
'volume_mounts': self.volume_mounts,
'tolerations': self.tolerations,
}


Expand Down Expand Up @@ -201,6 +206,18 @@ def __init__(self):
# configmap
self.airflow_configmap = conf.get(self.kubernetes_section, 'airflow_configmap')

affinity_json = conf.get(self.kubernetes_section, 'affinity')
if affinity_json:
self.kube_affinity = json.loads(affinity_json)
else:
self.kube_affinity = None

tolerations_json = conf.get(self.kubernetes_section, 'tolerations')
if tolerations_json:
self.kube_tolerations = json.loads(tolerations_json)
else:
self.kube_tolerations = None

self._validate()

def _validate(self):
Expand Down
4 changes: 4 additions & 0 deletions airflow/contrib/kubernetes/pod.py
Expand Up @@ -60,6 +60,10 @@ class Pod:
:type image_pull_secrets: str
:param affinity: A dict containing a group of affinity scheduling rules
:type affinity: dict
:param hostnetwork: If True enable host networking on the pod
:type hostnetwork: bool
:param tolerations: A list of kubernetes tolerations
:type tolerations: list tolerations
"""
def __init__(
self,
Expand Down
8 changes: 6 additions & 2 deletions airflow/contrib/kubernetes/worker_configuration.py
Expand Up @@ -197,10 +197,13 @@ def make_pod(self, namespace, worker_uuid, pod_id, dag_id, task_id, execution_da
limit_cpu=kube_executor_config.limit_cpu
)
gcp_sa_key = kube_executor_config.gcp_service_account_key
annotations = kube_executor_config.annotations.copy()
annotations = list(kube_executor_config.annotations)
if gcp_sa_key:
annotations['iam.cloud.google.com/service-account'] = gcp_sa_key

affinity = kube_executor_config.affinity or self.kube_config.kube_affinity
tolerations = kube_executor_config.tolerations or self.kube_config.kube_tolerations

return Pod(
namespace=namespace,
name=pod_id,
Expand All @@ -225,5 +228,6 @@ def make_pod(self, namespace, worker_uuid, pod_id, dag_id, task_id, execution_da
annotations=annotations,
node_selectors=(kube_executor_config.node_selectors or
self.kube_config.kube_node_selectors),
affinity=kube_executor_config.affinity
affinity=affinity,
tolerations=tolerations
)
6 changes: 4 additions & 2 deletions airflow/contrib/operators/kubernetes_pod_operator.py
Expand Up @@ -84,8 +84,10 @@ class KubernetesPodOperator(BaseOperator):
/airflow/xcom/return.json in the container will also be pushed to an
XCom when the container completes.
:type xcom_push: bool
:param tolerations: Kubernetes tolerations
:type list of tolerations
:param hostnetwork: If True enable host networking on the pod
:type hostnetwork: bool
:param tolerations: A list of kubernetes tolerations
:type tolerations: list tolerations
"""
template_fields = ('cmds', 'arguments', 'env_vars', 'config_file')

Expand Down
4 changes: 4 additions & 0 deletions scripts/ci/kubernetes/kube/configmaps.yaml
Expand Up @@ -191,6 +191,10 @@ data:
namespace = default
gcp_service_account_keys =
# Example affinity and toleration definitions.
affinity = {"nodeAffinity":{"requiredDuringSchedulingIgnoredDuringExecution":{"nodeSelectorTerms":[{"matchExpressions":[{"key":"kubernetes.io/hostname","operator":"NotIn","values":["4e5e6a99-e28a-450b-bba9-e0124853de9b"]}]}]}}}
tolerations = [{ "key": "dedicated", "operator": "Equal", "value": "airflow", "effect": "NoSchedule" }, { "key": "prod", "operator": "Exists" }]
# For cloning DAGs from git repositories into volumes: https://github.com/kubernetes/git-sync
git_sync_container_repository = gcr.io/google-containers/git-sync-amd64
git_sync_container_tag = v2.0.5
Expand Down
88 changes: 84 additions & 4 deletions tests/contrib/executors/test_kubernetes_executor.py
Expand Up @@ -14,6 +14,8 @@
#

import unittest
import uuid

import mock
import re
import string
Expand All @@ -25,6 +27,7 @@
from kubernetes.client.rest import ApiException
from airflow.contrib.executors.kubernetes_executor import AirflowKubernetesScheduler
from airflow.contrib.executors.kubernetes_executor import KubernetesExecutor
from airflow.contrib.executors.kubernetes_executor import KubernetesExecutorConfig
from airflow.contrib.kubernetes.worker_configuration import WorkerConfiguration
except ImportError:
AirflowKubernetesScheduler = None
Expand Down Expand Up @@ -85,21 +88,49 @@ class TestKubernetesWorkerConfiguration(unittest.TestCase):
options are passed to worker pod config
"""

affinity_config = {
'podAntiAffinity': {
'requiredDuringSchedulingIgnoredDuringExecution': [
{
'topologyKey': 'kubernetes.io/hostname',
'labelSelector': {
'matchExpressions': [
{
'key': 'app',
'operator': 'In',
'values': ['airflow']
}
]
}
}
]
}
}

tolerations_config = [
{
'key': 'dedicated',
'operator': 'Equal',
'value': 'airflow'
},
{
'key': 'prod',
'operator': 'Exists'
}
]

def setUp(self):
if AirflowKubernetesScheduler is None:
self.skipTest("kubernetes python package is not installed")

self.pod = mock.patch(
'airflow.contrib.kubernetes.worker_configuration.Pod'
)
self.resources = mock.patch(
'airflow.contrib.kubernetes.worker_configuration.Resources'
)
self.secret = mock.patch(
'airflow.contrib.kubernetes.worker_configuration.Secret'
)

for patcher in [self.pod, self.resources, self.secret]:
for patcher in [self.resources, self.secret]:
self.mock_foo = patcher.start()
self.addCleanup(patcher.stop)

Expand Down Expand Up @@ -155,6 +186,55 @@ def test_worker_environment_when_dags_folder_specified(self):

self.assertEqual(dags_folder, env['AIRFLOW__CORE__DAGS_FOLDER'])

def test_make_pod_with_empty_executor_config(self):
self.kube_config.kube_affinity = self.affinity_config
self.kube_config.kube_tolerations = self.tolerations_config

worker_config = WorkerConfiguration(self.kube_config)
kube_executor_config = KubernetesExecutorConfig(annotations=[],
volumes=[],
volume_mounts=[]
)

pod = worker_config.make_pod("default", str(uuid.uuid4()), "test_pod_id", "test_dag_id",
"test_task_id", str(datetime.utcnow()), "bash -c 'ls /'",
kube_executor_config)

self.assertTrue(pod.affinity['podAntiAffinity'] is not None)
self.assertEqual('app',
pod.affinity['podAntiAffinity']
['requiredDuringSchedulingIgnoredDuringExecution'][0]
['labelSelector']
['matchExpressions'][0]
['key'])

self.assertEqual(2, len(pod.tolerations))
self.assertEqual('prod', pod.tolerations[1]['key'])

def test_make_pod_with_executor_config(self):
worker_config = WorkerConfiguration(self.kube_config)
kube_executor_config = KubernetesExecutorConfig(affinity=self.affinity_config,
tolerations=self.tolerations_config,
annotations=[],
volumes=[],
volume_mounts=[]
)

pod = worker_config.make_pod("default", str(uuid.uuid4()), "test_pod_id", "test_dag_id",
"test_task_id", str(datetime.utcnow()), "bash -c 'ls /'",
kube_executor_config)

self.assertTrue(pod.affinity['podAntiAffinity'] is not None)
self.assertEqual('app',
pod.affinity['podAntiAffinity']
['requiredDuringSchedulingIgnoredDuringExecution'][0]
['labelSelector']
['matchExpressions'][0]
['key'])

self.assertEqual(2, len(pod.tolerations))
self.assertEqual('prod', pod.tolerations[1]['key'])


class TestKubernetesExecutor(unittest.TestCase):
"""
Expand Down

0 comments on commit 77bde41

Please sign in to comment.