Skip to content

Commit

Permalink
Databricks jobs 2.1 (#19544)
Browse files Browse the repository at this point in the history
  • Loading branch information
christophergrant committed Nov 14, 2021
1 parent 4212c49 commit 0a4a8bd
Show file tree
Hide file tree
Showing 4 changed files with 25 additions and 10 deletions.
10 changes: 5 additions & 5 deletions airflow/providers/databricks/hooks/databricks.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,17 +34,17 @@
from airflow.exceptions import AirflowException
from airflow.hooks.base import BaseHook

RESTART_CLUSTER_ENDPOINT = ("POST", "api/2.1/clusters/restart")
START_CLUSTER_ENDPOINT = ("POST", "api/2.1/clusters/start")
TERMINATE_CLUSTER_ENDPOINT = ("POST", "api/2.1/clusters/delete")
RESTART_CLUSTER_ENDPOINT = ("POST", "api/2.0/clusters/restart")
START_CLUSTER_ENDPOINT = ("POST", "api/2.0/clusters/start")
TERMINATE_CLUSTER_ENDPOINT = ("POST", "api/2.0/clusters/delete")

RUN_NOW_ENDPOINT = ('POST', 'api/2.1/jobs/run-now')
SUBMIT_RUN_ENDPOINT = ('POST', 'api/2.1/jobs/runs/submit')
GET_RUN_ENDPOINT = ('GET', 'api/2.1/jobs/runs/get')
CANCEL_RUN_ENDPOINT = ('POST', 'api/2.1/jobs/runs/cancel')

INSTALL_LIBS_ENDPOINT = ('POST', 'api/2.1/libraries/install')
UNINSTALL_LIBS_ENDPOINT = ('POST', 'api/2.1/libraries/uninstall')
INSTALL_LIBS_ENDPOINT = ('POST', 'api/2.0/libraries/install')
UNINSTALL_LIBS_ENDPOINT = ('POST', 'api/2.0/libraries/uninstall')

USER_AGENT_HEADER = {'user-agent': f'airflow-{__version__}'}

Expand Down
9 changes: 9 additions & 0 deletions airflow/providers/databricks/operators/databricks.py
Original file line number Diff line number Diff line change
Expand Up @@ -264,6 +264,7 @@ def __init__(
self,
*,
json: Optional[Any] = None,
tasks: Optional[List[object]] = None,
spark_jar_task: Optional[Dict[str, str]] = None,
notebook_task: Optional[Dict[str, str]] = None,
spark_python_task: Optional[Dict[str, Union[str, List[str]]]] = None,
Expand All @@ -279,6 +280,8 @@ def __init__(
databricks_retry_limit: int = 3,
databricks_retry_delay: int = 1,
do_xcom_push: bool = False,
idempotency_token: Optional[str] = None,
access_control_list: Optional[List[Dict[str, str]]] = None,
**kwargs,
) -> None:
"""Creates a new ``DatabricksSubmitRunOperator``."""
Expand All @@ -288,6 +291,8 @@ def __init__(
self.polling_period_seconds = polling_period_seconds
self.databricks_retry_limit = databricks_retry_limit
self.databricks_retry_delay = databricks_retry_delay
if tasks is not None:
self.json['tasks'] = tasks
if spark_jar_task is not None:
self.json['spark_jar_task'] = spark_jar_task
if notebook_task is not None:
Expand All @@ -310,6 +315,10 @@ def __init__(
self.json['timeout_seconds'] = timeout_seconds
if 'run_name' not in self.json:
self.json['run_name'] = run_name or kwargs['task_id']
if idempotency_token is not None:
self.json['idempotency_token'] = idempotency_token
if access_control_list is not None:
self.json['access_control_list'] = access_control_list

self.json = _deep_string_coerce(self.json)
# This variable will be used in case our task gets killed.
Expand Down
10 changes: 5 additions & 5 deletions tests/providers/databricks/hooks/test_databricks.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,35 +100,35 @@ def start_cluster_endpoint(host):
"""
Utility function to generate the get run endpoint given the host.
"""
return f'https://{host}/api/2.1/clusters/start'
return f'https://{host}/api/2.0/clusters/start'


def restart_cluster_endpoint(host):
"""
Utility function to generate the get run endpoint given the host.
"""
return f'https://{host}/api/2.1/clusters/restart'
return f'https://{host}/api/2.0/clusters/restart'


def terminate_cluster_endpoint(host):
"""
Utility function to generate the get run endpoint given the host.
"""
return f'https://{host}/api/2.1/clusters/delete'
return f'https://{host}/api/2.0/clusters/delete'


def install_endpoint(host):
"""
Utility function to generate the install endpoint given the host.
"""
return f'https://{host}/api/2.1/libraries/install'
return f'https://{host}/api/2.0/libraries/install'


def uninstall_endpoint(host):
"""
Utility function to generate the uninstall endpoint given the host.
"""
return f'https://{host}/api/2.1/libraries/uninstall'
return f'https://{host}/api/2.0/libraries/uninstall'


def create_valid_response_mock(content):
Expand Down
6 changes: 6 additions & 0 deletions tests/providers/databricks/operators/test_databricks.py
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,12 @@ def test_init_with_json(self):
)
assert expected == op.json

def test_init_with_tasks(self):
tasks = [{"task_key": 1, "new_cluster": NEW_CLUSTER, "notebook_task": NOTEBOOK_TASK}]
op = DatabricksSubmitRunOperator(task_id=TASK_ID, tasks=tasks)
expected = databricks_operator._deep_string_coerce({'run_name': TASK_ID, "tasks": tasks})
assert expected == op.json

def test_init_with_specified_run_name(self):
"""
Test the initializer with a specified run_name.
Expand Down

0 comments on commit 0a4a8bd

Please sign in to comment.