Skip to content

Commit

Permalink
[AIRFLOW-6542] Add spark-on-k8s operator/hook/sensor (#7163)
Browse files Browse the repository at this point in the history
  • Loading branch information
roitvt committed Mar 10, 2020
1 parent 644554a commit 6c39a3b
Show file tree
Hide file tree
Showing 24 changed files with 1,505 additions and 5 deletions.
1 change: 1 addition & 0 deletions MANIFEST.in
Original file line number Diff line number Diff line change
Expand Up @@ -33,3 +33,4 @@ graft scripts/upstart
graft airflow/config_templates
recursive-exclude airflow/www/node_modules *
global-exclude __pycache__ *.pyc
include airflow/providers/cncf/kubernetes/example_dags/example_spark_kubernetes_operator_spark_pi.yaml
2 changes: 2 additions & 0 deletions airflow/models/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@
"hiveserver2": ("airflow.providers.apache.hive.hooks.hive.HiveServer2Hook", "hiveserver2_conn_id"),
"jdbc": ("airflow.providers.jdbc.hooks.jdbc.JdbcHook", "jdbc_conn_id"),
"jira": ("airflow.providers.jira.hooks.jira.JiraHook", "jira_conn_id"),
"kubernetes": ("airflow.providers.cncf.kubernetes.hooks.kubernetes.KubernetesHook", "kubernetes_conn_id"),
"mongo": ("airflow.providers.mongo.hooks.mongo.MongoHook", "conn_id"),
"mssql": ("airflow.providers.microsoft.mssql.hooks.mssql.MsSqlHook", "mssql_conn_id"),
"mysql": ("airflow.providers.mysql.hooks.mysql.MySqlHook", "mysql_conn_id"),
Expand Down Expand Up @@ -161,6 +162,7 @@ class Connection(Base, LoggingMixin):
('yandexcloud', 'Yandex Cloud'),
('livy', 'Apache Livy'),
('tableau', 'Tableau'),
('kubernetes', 'Kubernetes cluster Connection'),
]

def __init__(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
"""
This is an example DAG which uses SparkKubernetesOperator and SparkKubernetesSensor.
In this example, we create two tasks which execute sequentially.
The first task is to submit sparkApplication on Kubernetes cluster(the example uses spark-pi application).
and the second task is to check the final state of the sparkApplication that submitted in the first state.
Spark-on-k8s operator is required to be already installed on Kubernetes
https://github.com/GoogleCloudPlatform/spark-on-k8s-operator
"""

from datetime import timedelta

# [START import_module]
# The DAG object; we'll need this to instantiate a DAG
from airflow import DAG
# Operators; we need this to operate!
from airflow.providers.cncf.kubernetes.operators.spark_kubernetes import SparkKubernetesOperator
from airflow.providers.cncf.kubernetes.sensors.spark_kubernetes import SparkKubernetesSensor
from airflow.utils.dates import days_ago

# [END import_module]

# [START default_args]
# These args will get passed on to each operator
# You can override them on a per-task basis during operator initialization
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': days_ago(1),
'email': ['airflow@example.com'],
'email_on_failure': False,
'email_on_retry': False,
'max_active_runs': 1
}
# [END default_args]

# [START instantiate_dag]

dag = DAG(
'spark_pi',
default_args=default_args,
description='submit spark-pi as sparkApplication on kubernetes',
schedule_interval=timedelta(days=1),
)

t1 = SparkKubernetesOperator(
task_id='spark_pi_submit',
namespace="default",
application_file="example_spark_kubernetes_operator_spark_pi.yaml",
kubernetes_conn_id="kubernetes_default",
do_xcom_push=True,
dag=dag,
)

t2 = SparkKubernetesSensor(
task_id='spark_pi_monitor',
namespace="default",
application_name="{{ task_instance.xcom_pull(task_ids='spark_pi_submit')['metadata']['name'] }}",
kubernetes_conn_id="kubernetes_default",
dag=dag
)
t1 >> t2
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

apiVersion: "sparkoperator.k8s.io/v1beta2"
kind: SparkApplication
metadata:
name: "spark-pi-{{ ds }}-{{ task_instance.try_number }}"
namespace: default
spec:
type: Scala
mode: cluster
image: "gcr.io/spark-operator/spark:v2.4.4"
imagePullPolicy: Always
mainClass: org.apache.spark.examples.SparkPi
mainApplicationFile: "local:///opt/spark/examples/jars/spark-examples_2.11-2.4.4.jar"
sparkVersion: "2.4.4"
restartPolicy:
type: Never
volumes:
- name: "test-volume"
hostPath:
path: "/tmp"
type: Directory
driver:
cores: 1
coreLimit: "1200m"
memory: "512m"
labels:
version: 2.4.4
serviceAccount: default
volumeMounts:
- name: "test-volume"
mountPath: "/tmp"
executor:
cores: 1
instances: 1
memory: "512m"
labels:
version: 2.4.4
volumeMounts:
- name: "test-volume"
mountPath: "/tmp"
16 changes: 16 additions & 0 deletions airflow/providers/cncf/kubernetes/hooks/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
150 changes: 150 additions & 0 deletions airflow/providers/cncf/kubernetes/hooks/kubernetes.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
import tempfile
from typing import Optional, Union

import yaml
from kubernetes import client, config

from airflow.exceptions import AirflowException
from airflow.hooks.base_hook import BaseHook


def _load_body_to_dict(body):
try:
body_dict = yaml.safe_load(body)
except yaml.YAMLError as e:
raise AirflowException("Exception when loading resource definition: %s\n" % e)
return body_dict


class KubernetesHook(BaseHook):
"""
Creates Kubernetes API connection.
:param conn_id: the connection to Kubernetes cluster
:type conn_id: str
"""

def __init__(
self,
conn_id: str = "kubernetes_default"
):
self.conn_id = conn_id

def get_conn(self):
"""
Returns kubernetes api session for use with requests
"""
connection = self.get_connection(self.conn_id)
extras = connection.extra_dejson
if extras.get("extra__kubernetes__in_cluster"):
self.log.debug("loading kube_config from: in_cluster configuration")
config.load_incluster_config()
elif extras.get("extra__kubernetes__kube_config") is None:
self.log.debug("loading kube_config from: default file")
config.load_kube_config()
else:
with tempfile.NamedTemporaryFile() as temp_config:
self.log.debug("loading kube_config from: connection kube_config")
temp_config.write(extras.get("extra__kubernetes__kube_config").encode())
temp_config.flush()
config.load_kube_config(temp_config.name)
return client.ApiClient()

def create_custom_resource_definition(self,
group: str,
version: str,
plural: str,
body: Union[str, dict],
namespace: Optional[str] = None
):
"""
Creates custom resource definition object in Kubernetes
:param group: api group
:type group: str
:param version: api version
:type version: str
:param plural: api plural
:type plural: str
:param body: crd object definition
:type body: Union[str, dict]
:param namespace: kubernetes namespace
:type namespace: str
"""
api = client.CustomObjectsApi(self.get_conn())
if namespace is None:
namespace = self.get_namespace()
if isinstance(body, str):
body = _load_body_to_dict(body)
try:
response = api.create_namespaced_custom_object(
group=group,
version=version,
namespace=namespace,
plural=plural,
body=body
)
self.log.debug("Response: %s", response)
return response
except client.rest.ApiException as e:
raise AirflowException("Exception when calling -> create_custom_resource_definition: %s\n" % e)

def get_custom_resource_definition(self,
group: str,
version: str,
plural: str,
name: str,
namespace: Optional[str] = None):
"""
Get custom resource definition object from Kubernetes
:param group: api group
:type group: str
:param version: api version
:type version: str
:param plural: api plural
:type plural: str
:param name: crd object name
:type name: str
:param namespace: kubernetes namespace
:type namespace: str
"""
custom_resource_definition_api = client.CustomObjectsApi(self.get_conn())
if namespace is None:
namespace = self.get_namespace()
try:
response = custom_resource_definition_api.get_namespaced_custom_object(
group=group,
version=version,
namespace=namespace,
plural=plural,
name=name
)
return response
except client.rest.ApiException as e:
raise AirflowException("Exception when calling -> get_custom_resource_definition: %s\n" % e)

def get_namespace(self):
"""
Returns the namespace that defined in the connection
"""
connection = self.get_connection(self.conn_id)
extras = connection.extra_dejson
namespace = extras.get("extra__kubernetes__namespace", "default")
return namespace
65 changes: 65 additions & 0 deletions airflow/providers/cncf/kubernetes/operators/spark_kubernetes.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from typing import Optional

from airflow.models import BaseOperator
from airflow.providers.cncf.kubernetes.hooks.kubernetes import KubernetesHook
from airflow.utils.decorators import apply_defaults


class SparkKubernetesOperator(BaseOperator):
"""
Creates sparkApplication object in kubernetes cluster:
.. seealso::
For more detail about Spark Application Object have a look at the reference:
https://github.com/GoogleCloudPlatform/spark-on-k8s-operator/blob/v1beta2-1.1.0-2.4.5/docs/api-docs.md#sparkapplication
:param application_file: filepath to kubernetes custom_resource_definition of sparkApplication
:type application_file: str
:param namespace: kubernetes namespace to put sparkApplication
:type namespace: str
:param kubernetes_conn_id: the connection to Kubernetes cluster
:type kubernetes_conn_id: str
"""

template_fields = ['application_file', 'namespace']
template_ext = ('yaml', 'yml', 'json')
ui_color = '#f4a460'

@apply_defaults
def __init__(self,
application_file: str,
namespace: Optional[str] = None,
kubernetes_conn_id: str = 'kubernetes_default',
*args, **kwargs) -> None:
super().__init__(*args, **kwargs)
self.application_file = application_file
self.namespace = namespace
self.kubernetes_conn_id = kubernetes_conn_id

def execute(self, context):
self.log.info("Creating sparkApplication")
hook = KubernetesHook(conn_id=self.kubernetes_conn_id)
response = hook.create_custom_resource_definition(
group="sparkoperator.k8s.io",
version="v1beta2",
plural="sparkapplications",
body=self.application_file,
namespace=self.namespace)
return response
Loading

0 comments on commit 6c39a3b

Please sign in to comment.