Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
REPAIR_RUN_ENDPOINT = ("POST", "2.2/jobs/runs/repair")
OUTPUT_RUNS_JOB_ENDPOINT = ("GET", "2.2/jobs/runs/get-output")
CANCEL_ALL_RUNS_ENDPOINT = ("POST", "2.2/jobs/runs/cancel-all")
DELETE_ENDPOINT = ("POST", "2.2/jobs/delete")
Copy link
Copy Markdown
Contributor Author

@ha2hi ha2hi Apr 11, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I intentionally named it DELETE_ENDPOINT to maintain consistency with existing Job-related endpoints in this file, such as CREATE_ENDPOINT, RESET_ENDPOINT, and UPDATE_ENDPOINT, which do not include the JOB infix.

CREATE_ENDPOINT = ("POST", "2.2/jobs/create")
RESET_ENDPOINT = ("POST", "2.2/jobs/reset")
UPDATE_ENDPOINT = ("POST", "2.2/jobs/update")

To resolve this, I think we have three options:

  1. Change only this new endpoint : Rename it to DELETE_JOB_ENDPOINT as Copilot suggested.

  2. Rename all job endpoints : Add the JOB infix to all existing legacy endpoints (e.g., CREATE_JOB_ENDPOINT, RESET_JOB_ENDPOINT) for overall clarity.

  3. Maintain current convention : Keep it as DELETE_ENDPOINT to match the existing names in this module.

I lean towards Option 3 to avoid scope creep or potential breaking changes if anyone is importing those constants directly, but I'm fully open to whichever direction the maintainers prefer. Please let me know what you think!


INSTALL_LIBS_ENDPOINT = ("POST", "2.0/libraries/install")
UNINSTALL_LIBS_ENDPOINT = ("POST", "2.0/libraries/uninstall")
Expand Down Expand Up @@ -302,6 +303,14 @@ def create_job(self, json: dict) -> int:
response = self._do_api_call(CREATE_ENDPOINT, json)
return response["job_id"]

def delete_job(self, job_id: int) -> None:
"""
Call the ``api/2.2/jobs/delete`` endpoint.

:param job_id: The unique identifier of the job to be deleted.
"""
self._do_api_call(DELETE_ENDPOINT, {"job_id": job_id})
Comment on lines +306 to +312

def reset_job(self, job_id: str, json: dict) -> None:
"""
Call the ``api/2.2/jobs/reset`` endpoint.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -410,6 +410,51 @@ def execute(self, context: Context) -> int:
return job_id


class DatabricksDeleteJobsOperator(BaseOperator):
"""
Deletes a Databricks job.

:param job_id: The ID of the Databricks job to delete. (templated)
:param databricks_conn_id: Reference to the Databricks connection.
"""

template_fields: Sequence[str] = ("job_id",)

def __init__(
self,
*,
job_id: int,
databricks_conn_id: str = "databricks_default",
Comment on lines +423 to +427
polling_period_seconds: int = 30,
databricks_retry_limit: int = 3,
databricks_retry_delay: int = 1,
databricks_retry_args: dict[Any, Any] | None = None,
**kwargs,
) -> None:
super().__init__(**kwargs)
self.job_id = job_id
self.databricks_conn_id = databricks_conn_id
self.polling_period_seconds = polling_period_seconds
self.databricks_retry_limit = databricks_retry_limit
self.databricks_retry_delay = databricks_retry_delay
self.databricks_retry_args = databricks_retry_args
Comment on lines +436 to +440

@cached_property
def _hook(self):
return DatabricksHook(
self.databricks_conn_id,
retry_limit=self.databricks_retry_limit,
retry_delay=self.databricks_retry_delay,
retry_args=self.databricks_retry_args,
caller="DatabricksDeleteJobsOperator",
)

def execute(self, context: Context) -> None:
self._hook.delete_job(self.job_id)

self.log.info("Successfully deleted Databricks job ID: %s", self.job_id)


class DatabricksSubmitRunOperator(BaseOperator):
"""
Submits a Spark job run to Databricks using the api/2.2/jobs/runs/submit API endpoint.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
from airflow.providers.databricks.hooks.databricks import RunState, SQLStatementState
from airflow.providers.databricks.operators.databricks import (
DatabricksCreateJobsOperator,
DatabricksDeleteJobsOperator,
DatabricksNotebookOperator,
DatabricksRunNowOperator,
DatabricksSQLStatementsOperator,
Expand Down Expand Up @@ -603,6 +604,44 @@ def test_exec_update_job_permission_with_empty_acl(self, db_mock_class):
db_mock.update_job_permission.assert_not_called()


class TestDatabricksDeleteJobsOperator:
@mock.patch("airflow.providers.databricks.operators.databricks.DatabricksHook")
def test_execute(self, mock_hook_class):
# Set up the mock hook instance
mock_hook = mock_hook_class.return_value

# Instantiate the DatabricksDeleteJobOperator
op = DatabricksDeleteJobsOperator(
task_id="delete_job_task",
job_id=12345,
databricks_conn_id="databricks_default",
)

# Execute the operator
op.execute({})

# Verify that the DatabricksHook was initialized with the correct connection ID and default retry arguments
mock_hook_class.assert_called_once_with(
"databricks_default",
retry_limit=3,
retry_delay=1,
retry_args=None,
Comment on lines +623 to +628
caller="DatabricksDeleteJobsOperator",
)

# Verify that the 'delete_job' method of the hook was called directly with the correct job_id
mock_hook.delete_job.assert_called_once_with(12345)

def test_template_fields(self):
# Ensure that 'job_id' is registered as a template field for dynamic injection (e.g., via Jinja or XCom)
op = DatabricksDeleteJobsOperator(
task_id="test_template",
job_id="{{ task_instance.xcom_pull(task_ids='create_job') }}",
)

assert "job_id" in op.template_fields


class TestDatabricksSubmitRunOperator:
def test_init_with_notebook_task_named_parameters(self):
"""
Expand Down
Loading