Skip to content

Commit

Permalink
refactor(databricks): divest from databricks_api in favor of databric…
Browse files Browse the repository at this point in the history
…ks-cli (#12153)

### Summary & Motivation
Resolves #9871.

The layer of indirection provided by `databricks_api` is not needed:
begin to divest from it, in favor of `databricks-cli`, which is the
recommended approach to accessing the Databricks Python API.

A simple way to support arbitrary use cases with the
`dagster-databricks` integration is just to give a very simple way to
access the API client using our resources abstraction. This does just
that.

### How I Tested These Changes
bk, vercel docs
  • Loading branch information
rexledesma committed Feb 15, 2023
1 parent 840fda0 commit 9732826
Show file tree
Hide file tree
Showing 11 changed files with 70 additions and 12 deletions.
2 changes: 1 addition & 1 deletion docs/content/api/modules.json

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion docs/content/api/searchindex.json

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion docs/content/api/sections.json

Large diffs are not rendered by default.

Binary file modified docs/next/public/objects.inv
Binary file not shown.
1 change: 1 addition & 0 deletions docs/sphinx/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,7 @@
"croniter",
"dask",
"databricks_api",
"databricks_cli",
"datadog",
"docker",
"docker_image",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,3 +23,12 @@ APIs
:annotation: ResourceDefinition

.. autoclass:: dagster_databricks.DatabricksError

Resources
=========

.. autoclass:: DatabricksClient
:members:

.. autoconfigurable:: databricks_client
:annotation: ResourceDefinition
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@

from dagster._core.utils import check_dagster_package_version

from .databricks import DatabricksError, DatabricksJobRunner
from .databricks import DatabricksClient, DatabricksError, DatabricksJobRunner
from .databricks_pyspark_step_launcher import (
DatabricksConfig,
DatabricksPySparkStepLauncher,
Expand All @@ -30,6 +30,7 @@
__all__ = [
"create_databricks_job_op",
"databricks_client",
"DatabricksClient",
"DatabricksConfig",
"DatabricksError",
"DatabricksJobRunner",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@
import dagster._check as check
import dagster_pyspark
import requests.exceptions
from dagster._annotations import public
from databricks_api import DatabricksAPI
from databricks_cli.sdk.api_client import ApiClient

import dagster_databricks

Expand All @@ -29,8 +31,52 @@ class DatabricksClient:
def __init__(self, host, token, workspace_id=None):
self.host = host
self.workspace_id = workspace_id

# TODO: This is the old shim client that we were previously using. Arguably this is
# confusing for users to use since this is an unofficial wrapper around the documented
# Databricks REST API. We should consider removing this in the future.
self.client = DatabricksAPI(host=host, token=token)

# Expose an interface directly to the official Databricks API client.
self._api_client = ApiClient(host=host, token=token)

@public
@property
def api_client(self) -> ApiClient:
"""Retrieve a reference to the underlying Databricks API client. For more information,
see the `Databricks Python API <https://docs.databricks.com/dev-tools/python-api.html>`_.
**Examples:**
.. code-block:: python
from dagster import op
from databricks_cli.jobs.api import JobsApi
from databricks_cli.runs.api import RunsApi
@op(required_resource_keys={"databricks_client"})
def op1(context):
# Initialize the Databricks Jobs API
jobs_client = JobsApi(context.resources.databricks_client.api_client)
runs_client = RunsApi(context.resources.databricks_client.api_client)
# Example 1: Run a Databricks job with some parameters.
jobs_client.run_now(...)
# Example 2: Trigger a one-time run of a Databricks workload.
runs_client.submit_run(...)
# Example 3: Get an existing run.
runs_client.get_run(...)
# Example 4: Cancel a run.
runs_client.cancel_run(...)
Returns:
ApiClient: The authenticated Databricks API client.
"""
return self._api_client

def submit_run(self, *args, **kwargs):
"""Submit a run directly to the 'Runs Submit' API."""
return self.client.jobs.submit_run(*args, **kwargs)["run_id"] # pylint: disable=no-member
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,15 @@
"workspace_id": Field(
StringSource,
description=(
"The Databricks workspace ID, as described"
" inhttps://docs.databricks.com/workspace/workspace-details.html#workspace-instance-names-urls-and-ids.This"
" is used to log a URL for accessing the job in the Databricks UI."
"The Databricks workspace ID, as described in"
" https://docs.databricks.com/workspace/workspace-details.html#workspace-instance-names-urls-and-ids."
" This is used to log a URL for accessing the job in the Databricks UI."
),
is_required=False,
),
}
)
def databricks_client(init_context):
def databricks_client(init_context) -> DatabricksClient:
return DatabricksClient(
host=init_context.resource_config["host"],
token=init_context.resource_config["token"],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@


BASE_DATABRICKS_PYSPARK_STEP_LAUNCHER_CONFIG: Dict[str, object] = {
"databricks_host": os.environ.get("DATABRICKS_HOST"),
"databricks_host": os.environ.get("DATABRICKS_HOST") or "https://",
"databricks_token": os.environ.get("DATABRICKS_TOKEN"),
"local_pipeline_package_path": os.path.abspath(os.path.dirname(__file__)),
"staging_prefix": "/dagster-databricks-tests",
Expand Down Expand Up @@ -210,7 +210,7 @@ def test_pyspark_databricks(
"config": deep_merge_dicts(
config,
{
"databricks_host": "",
"databricks_host": "https://",
"databricks_token": "",
"poll_interval_sec": 0.1,
"local_dagster_job_package_path": os.path.abspath(
Expand Down Expand Up @@ -248,7 +248,7 @@ def test_pyspark_databricks(
"config": deep_merge_dicts(
config,
{
"databricks_host": "",
"databricks_host": "https://",
"databricks_token": "",
"poll_interval_sec": 0.1,
"local_dagster_job_package_path": os.path.abspath(
Expand Down
3 changes: 2 additions & 1 deletion python_modules/libraries/dagster-databricks/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,8 @@ def get_version() -> str:
install_requires=[
f"dagster{pin}",
f"dagster-pyspark{pin}",
"databricks_api",
"databricks-cli~=0.17",
"databricks_api", # Divest from this library in the future since it is unnecessary indirection.
],
zip_safe=False,
)

1 comment on commit 9732826

@vercel
Copy link

@vercel vercel bot commented on 9732826 Feb 15, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please sign in to comment.