Skip to content

Commit

Permalink
[AIRFLOW-1314] Add executor_config and tests
Browse files Browse the repository at this point in the history
* Added in executor_config to the task_instance table and the base_operator table

* Fix test; bump up number of examples

* Fix up comments from PR

* Exclude the kubernetes example dag from a test

* Fix dict -> KubernetesExecutorConfig

* fixed up executor_config comment and type hint
  • Loading branch information
grantnicholas authored and Fokko Driesprong committed Apr 22, 2018
1 parent ad4e67c commit c0920ef
Show file tree
Hide file tree
Showing 19 changed files with 288 additions and 29 deletions.
67 changes: 59 additions & 8 deletions airflow/contrib/executors/kubernetes_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,18 +19,65 @@
from queue import Queue
from dateutil import parser
from uuid import uuid4
import kubernetes
from kubernetes import watch, client
from kubernetes.client.rest import ApiException
from airflow.contrib.kubernetes.pod_launcher import PodLauncher
from airflow.contrib.kubernetes.kube_client import get_kube_client
from airflow.contrib.kubernetes.worker_configuration import WorkerConfiguration
from airflow.executors.base_executor import BaseExecutor
from airflow.executors import Executors
from airflow.models import TaskInstance, KubeResourceVersion
from airflow.utils.state import State
from airflow import configuration, settings
from airflow.exceptions import AirflowConfigException
from airflow.contrib.kubernetes.pod import Pod, Resources
from airflow.utils.log.logging_mixin import LoggingMixin


class KubernetesExecutorConfig:

def __init__(self, image=None, request_memory=None, request_cpu=None, limit_memory=None, limit_cpu=None):
self.image = image
self.request_memory = request_memory
self.request_cpu = request_cpu
self.limit_memory = limit_memory
self.limit_cpu = limit_cpu

def __repr__(self):
return "{}(image={}, request_memory={} ,request_cpu={}, limit_memory={}, limit_cpu={})".format(
KubernetesExecutorConfig.__name__,
self.image, self.request_memory, self.request_cpu, self.limit_memory,self.limit_cpu
)

@staticmethod
def from_dict(obj):
if obj is None:
return KubernetesExecutorConfig()

if not isinstance(obj, dict):
raise TypeError("Cannot convert a non-dictionary object into a KubernetesExecutorConfig")

namespaced = obj.get(Executors.KubernetesExecutor, {})

return KubernetesExecutorConfig(
image=namespaced.get("image", None),
request_memory=namespaced.get("request_memory", None),
request_cpu=namespaced.get("request_cpu", None),
limit_memory=namespaced.get("limit_memory", None),
limit_cpu=namespaced.get("limit_cpu", None)
)

def as_dict(self):
return {
"image": self.image,
"request_memory": self.request_memory,
"request_cpu": self.request_cpu,
"limit_memory": self.limit_memory,
"limit_cpu": self.limit_cpu
}


class KubeConfig:
core_section = "core"
kubernetes_section = "kubernetes"
Expand Down Expand Up @@ -219,15 +266,15 @@ def run_next(self, next_job):
:return:
"""
self.log.debug('k8s: job is {}'.format(str(next_job)))
key, command = next_job
self.log.info('k8s: job is {}'.format(str(next_job)))
key, command, kube_executor_config = next_job
dag_id, task_id, execution_date = key
self.log.debug("k8s: running for command {}".format(command))
self.log.debug("k8s: launching image {}".format(self.kube_config.kube_image))
pod = self.worker_configuration.make_pod(
namespace=self.namespace, pod_id=self._create_pod_id(dag_id, task_id),
dag_id=dag_id, task_id=task_id, execution_date=self._datetime_to_label_safe_datestring(execution_date),
airflow_command=command
airflow_command=command, kube_executor_config=kube_executor_config
)
# the watcher will monitor pods, so we do not block.
self.launcher.run_pod_async(pod)
Expand Down Expand Up @@ -405,9 +452,13 @@ def start(self):
self._inject_secrets()
self.clear_not_launched_queued_tasks()

def execute_async(self, key, command, queue=None):
self.log.info("k8s: adding task {} with command {}".format(key, command))
self.task_queue.put((key, command))

def execute_async(self, key, command, queue=None, executor_config=None):
self.log.info("k8s: adding task {} with command {} with executor_config {}".format(
key, command, executor_config
))
kube_executor_config = KubernetesExecutorConfig.from_dict(executor_config)
self.task_queue.put((key, command, kube_executor_config))

def sync(self):
self.log.info("self.running: {}".format(self.running))
Expand All @@ -425,8 +476,8 @@ def sync(self):
KubeResourceVersion.checkpoint_resource_version(last_resource_version, session=self._session)

if not self.task_queue.empty():
key, command = self.task_queue.get()
self.kube_scheduler.run_next((key, command))
key, command, kube_executor_config = self.task_queue.get()
self.kube_scheduler.run_next((key, command, kube_executor_config))

def _change_state(self, key, state, pod_id):
if state != State.RUNNING:
Expand Down
2 changes: 1 addition & 1 deletion airflow/contrib/executors/mesos_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -285,7 +285,7 @@ def start(self):
self.mesos_driver = driver
self.mesos_driver.start()

def execute_async(self, key, command, queue=None):
def execute_async(self, key, command, queue=None, executor_config=None):
self.task_queue.put((key, command))

def sync(self):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,27 @@ def extract_env_and_secrets(pod, req):
KubernetesRequestFactory.add_secret_to_env(env, secret)
req['spec']['containers'][0]['env'] = env

@staticmethod
def extract_resources(pod, req):
if not pod.resources or pod.resources.is_empty_resource_request():
return

req['spec']['containers'][0]['resources'] = {}

if pod.resources.has_requests():
req['spec']['containers'][0]['resources']['requests'] = {}
if pod.resources.request_memory:
req['spec']['containers'][0]['resources']['requests']['memory'] = pod.resources.request_memory
if pod.resources.request_cpu:
req['spec']['containers'][0]['resources']['requests']['cpu'] = pod.resources.request_cpu

if pod.resources.has_limits():
req['spec']['containers'][0]['resources']['limits'] = {}
if pod.resources.request_memory:
req['spec']['containers'][0]['resources']['limits']['memory'] = pod.resources.limit_memory
if pod.resources.request_cpu:
req['spec']['containers'][0]['resources']['limits']['cpu'] = pod.resources.limit_cpu

@staticmethod
def extract_init_containers(pod, req):
if pod.init_containers:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ def create(self, pod):
self.extract_volume_secrets(pod, req)
self.attach_volumes(pod, req)
self.attach_volume_mounts(pod, req)
self.extract_resources(pod, req)
self.extract_service_account_name(pod, req)
self.extract_init_containers(pod, req)
self.extract_image_pull_secrets(pod, req)
Expand Down
22 changes: 21 additions & 1 deletion airflow/contrib/kubernetes/pod.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,23 @@
# limitations under the License.


class Resources:
def __init__(self, request_memory=None, request_cpu=None, limit_memory=None, limit_cpu=None):
self.request_memory = request_memory
self.request_cpu = request_cpu
self.limit_memory = limit_memory
self.limit_cpu = limit_cpu

def is_empty_resource_request(self):
return not self.has_limits() and not self.has_requests()

def has_limits(self):
return self.limit_cpu is not None or self.limit_memory is not None

def has_requests(self):
return self.request_cpu is not None or self.request_memory is not None


class Pod:
"""
Represents a kubernetes pod and manages execution of a single pod.
Expand Down Expand Up @@ -46,7 +63,9 @@ def __init__(
image_pull_policy="IfNotPresent",
image_pull_secrets=None,
init_containers=None,
service_account_name=None):
service_account_name=None,
resources=None
):
self.image = image
self.envs = envs if envs else {}
self.cmds = cmds
Expand All @@ -61,3 +80,4 @@ def __init__(
self.image_pull_secrets = image_pull_secrets
self.init_containers = init_containers
self.service_account_name = service_account_name
self.resources = resources or Resources()
16 changes: 12 additions & 4 deletions airflow/contrib/kubernetes/worker_configuration.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
import os
import six

from airflow.contrib.kubernetes.pod import Pod
from airflow.contrib.kubernetes.pod import Pod, Resources
from airflow.contrib.kubernetes.secret import Secret


Expand Down Expand Up @@ -133,13 +133,20 @@ def _get_image_pull_secrets(self):
return []
return self.kube_config.image_pull_secrets.split(',')

def make_pod(self, namespace, pod_id, dag_id, task_id, execution_date, airflow_command):
def make_pod(self, namespace, pod_id, dag_id, task_id, execution_date, airflow_command, kube_executor_config):
volumes, volume_mounts = self._get_volumes_and_mounts()
worker_init_container_spec = self._get_init_containers(copy.deepcopy(volume_mounts))
resources = Resources(
request_memory=kube_executor_config.request_memory,
request_cpu=kube_executor_config.request_cpu,
limit_memory=kube_executor_config.limit_memory,
limit_cpu=kube_executor_config.limit_cpu
)

return Pod(
namespace=namespace,
name=pod_id,
image=self.kube_config.kube_image,
image=kube_executor_config.image or self.kube_config.kube_image,
cmds=["bash", "-cx", "--"],
args=[airflow_command],
labels={
Expand All @@ -154,5 +161,6 @@ def make_pod(self, namespace, pod_id, dag_id, task_id, execution_date, airflow_c
image_pull_secrets=self.kube_config.image_pull_secrets,
init_containers=worker_init_container_spec,
volumes=volumes,
volume_mounts=volume_mounts
volume_mounts=volume_mounts,
resources=resources
)
66 changes: 66 additions & 0 deletions airflow/example_dags/example_kubernetes_executor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
# -*- coding: utf-8 -*-
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import print_function
import airflow
from airflow.operators.python_operator import PythonOperator
from airflow.models import DAG
from airflow.contrib.executors.kubernetes_executor import KubernetesExecutorConfig
import os


args = {
'owner': 'airflow',
'start_date': airflow.utils.dates.days_ago(2)
}

dag = DAG(
dag_id='example_kubernetes_executor', default_args=args,
schedule_interval=None
)


def print_stuff():
print("stuff!")


def use_zip_binary():
rc = os.system("zip")
assert rc == 0



# You don't have to use any special KubernetesExecutor configuration if you don't want to
start_task = PythonOperator(
task_id="start_task", python_callable=print_stuff, dag=dag
)

# But you can if you want to
one_task = PythonOperator(
task_id="one_task", python_callable=print_stuff, dag=dag,
executor_config={"KubernetesExecutor": {"image": "airflow/ci:latest"}}
)

# Use the zip binary, which is only found in this special docker image
two_task = PythonOperator(
task_id="two_task", python_callable=use_zip_binary, dag=dag,
executor_config={"KubernetesExecutor": {"image": "airflow/ci_zip:latest"}}
)

# Limit resources on this operator/task
three_task = PythonOperator(
task_id="three_task", python_callable=print_stuff, dag=dag,
executor_config={"KubernetesExecutor": {"request_memory": "128Mi", "limit_memory": "128Mi"}}
)

start_task.set_downstream([one_task, two_task, three_task])
20 changes: 14 additions & 6 deletions airflow/executors/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,27 +50,35 @@ def GetDefaultExecutor():
return DEFAULT_EXECUTOR


class Executors:
LocalExecutor = "LocalExecutor"
SequentialExecutor = "SequentialExecutor"
CeleryExecutor = "CeleryExecutor"
DaskExecutor = "DaskExecutor"
MesosExecutor = "MesosExecutor"
KubernetesExecutor = "KubernetesExecutor"



def _get_executor(executor_name):
"""
Creates a new instance of the named executor. In case the executor name is not know in airflow,
look for it in the plugins
"""
if executor_name == 'LocalExecutor':
if executor_name == Executors.LocalExecutor:
return LocalExecutor()
elif executor_name == 'SequentialExecutor':
elif executor_name == Executors.SequentialExecutor:
return SequentialExecutor()
elif executor_name == 'CeleryExecutor':
elif executor_name == Executors.CeleryExecutor:
from airflow.executors.celery_executor import CeleryExecutor
return CeleryExecutor()
elif executor_name == 'DaskExecutor':
elif executor_name == Executors.DaskExecutor:
from airflow.executors.dask_executor import DaskExecutor
return DaskExecutor()
elif executor_name == 'MesosExecutor':
elif executor_name == Executors.MesosExecutor:
from airflow.contrib.executors.mesos_executor import MesosExecutor
return MesosExecutor()
elif executor_name == 'KubernetesExecutor':
elif executor_name == Executors.KubernetesExecutor:
from airflow.contrib.executors.kubernetes_executor import KubernetesExecutor
return KubernetesExecutor()
else:
Expand Down
4 changes: 2 additions & 2 deletions airflow/executors/base_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ def heartbeat(self):
ti.refresh_from_db()
if ti.state != State.RUNNING:
self.running[key] = command
self.execute_async(key, command=command, queue=queue)
self.execute_async(key, command=command, queue=queue, executor_config=ti.executor_config)
else:
self.logger.info(
'Task is already running, not sending to '
Expand Down Expand Up @@ -174,7 +174,7 @@ def get_event_buffer(self, dag_ids=None):

return cleared_events

def execute_async(self, key, command, queue=None): # pragma: no cover
def execute_async(self, key, command, queue=None, executor_config=None): # pragma: no cover
"""
This method will execute the command asynchronously.
"""
Expand Down
2 changes: 1 addition & 1 deletion airflow/executors/celery_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ def start(self):
self.last_state = {}

def execute_async(self, key, command,
queue=DEFAULT_CELERY_CONFIG['task_default_queue']):
queue=DEFAULT_CELERY_CONFIG['task_default_queue'], executor_config=None):
self.log.info( "[celery] queuing {key} through celery, "
"queue={queue}".format(**locals()))
self.tasks[key] = execute_command.apply_async(
Expand Down
2 changes: 1 addition & 1 deletion airflow/executors/dask_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ def start(self):
self.client = distributed.Client(self.cluster_address, security=security)
self.futures = {}

def execute_async(self, key, command, queue=None):
def execute_async(self, key, command, queue=None, executor_config=None):
if queue is not None:
warnings.warn(
'DaskExecutor does not support queues. All tasks will be run in the same cluster'
Expand Down
4 changes: 2 additions & 2 deletions airflow/executors/local_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -221,8 +221,8 @@ def start(self):

self.impl.start()

def execute_async(self, key, command, queue=None):
self.impl.execute_async(key=key, command=command)
def execute_async(self, key, command, queue=None, executor_config=None):
self.queue.put((key, command))

def sync(self):
self.impl.sync()
Expand Down

0 comments on commit c0920ef

Please sign in to comment.