From e1d965c2c287980548b3bff71ecf1a96959ab273 Mon Sep 17 00:00:00 2001 From: deepinsight coder Date: Mon, 11 May 2026 03:07:27 +0000 Subject: [PATCH 1/3] Lock in DatabricksTaskBaseOperator depends_on parent-key behavior The runtime fix for issue #47614 shipped in PR #48492; this PR adds end-to-end regression coverage so the bug cannot silently regress, plus small type-hint and constructor-clarity follow-ups in the same area. Tests build a real DAG + DatabricksWorkflowTaskGroup with DatabricksNotebookOperator tasks and assert depends_on payloads for the default-key, custom-key, >100-char-key, diamond, fan-out, root-task, and external-upstream cases. Also fixes the existing test_convert_to_databricks_workflow_task to pass strings (not mocks) so the depends_on branch is actually exercised, and adds a one-line check that _generate_databricks_task_key raises when called with a parent task_id but no task_dict. closes: #47614 --- providers/databricks/docs/changelog.rst | 1 + .../databricks/operators/databricks.py | 4 +- .../operators/databricks_workflow.py | 2 +- .../databricks/operators/test_databricks.py | 26 ++- .../operators/test_databricks_workflow.py | 159 ++++++++++++++++++ 5 files changed, 186 insertions(+), 6 deletions(-) diff --git a/providers/databricks/docs/changelog.rst b/providers/databricks/docs/changelog.rst index 523ea7a1f54c3..0b75782f47339 100644 --- a/providers/databricks/docs/changelog.rst +++ b/providers/databricks/docs/changelog.rst @@ -59,6 +59,7 @@ Bug Fixes ~~~~~~~~~ * ``Add 'task_config' to 'template_fields' for 'DatabricksTaskOperator' (#65858)`` +* ``Lock in 'DatabricksTaskBaseOperator.depends_on' to reference the parent task's 'task_key' instead of its own (#47614)`` .. Below changes are excluded from the changelog. Move them to appropriate section above if needed. Do not delete the lines(!): diff --git a/providers/databricks/src/airflow/providers/databricks/operators/databricks.py b/providers/databricks/src/airflow/providers/databricks/operators/databricks.py index 4c581215be463..8cd77c695059c 100644 --- a/providers/databricks/src/airflow/providers/databricks/operators/databricks.py +++ b/providers/databricks/src/airflow/providers/databricks/operators/databricks.py @@ -1367,7 +1367,7 @@ def _get_current_databricks_task(self) -> dict[str, Any]: def _convert_to_databricks_workflow_task( self, - relevant_upstreams: list[BaseOperator], + relevant_upstreams: list[str], task_dict: dict[str, BaseOperator], context: Context | None = None, ) -> dict[str, object]: @@ -1621,7 +1621,7 @@ def _extend_workflow_notebook_packages( def _convert_to_databricks_workflow_task( self, - relevant_upstreams: list[BaseOperator], + relevant_upstreams: list[str], task_dict: dict[str, BaseOperator], context: Context | None = None, ) -> dict[str, object]: diff --git a/providers/databricks/src/airflow/providers/databricks/operators/databricks_workflow.py b/providers/databricks/src/airflow/providers/databricks/operators/databricks_workflow.py index 5bbf9d3c78a3e..779c2fc9f1528 100644 --- a/providers/databricks/src/airflow/providers/databricks/operators/databricks_workflow.py +++ b/providers/databricks/src/airflow/providers/databricks/operators/databricks_workflow.py @@ -158,7 +158,7 @@ def __init__( self.python_params = python_params or [] self.spark_submit_params = spark_submit_params or [] self.tasks_to_convert = tasks_to_convert or {} - self.relevant_upstreams = [task_id] + self.relevant_upstreams: list[str] = [] self.workflow_run_metadata: WorkflowRunMetadata | None = None super().__init__(task_id=task_id, **kwargs) diff --git a/providers/databricks/tests/unit/databricks/operators/test_databricks.py b/providers/databricks/tests/unit/databricks/operators/test_databricks.py index 84c625cddb793..1b85fc7eecf8c 100644 --- a/providers/databricks/tests/unit/databricks/operators/test_databricks.py +++ b/providers/databricks/tests/unit/databricks/operators/test_databricks.py @@ -2576,15 +2576,22 @@ def test_convert_to_databricks_workflow_task(self): operator.task_group = databricks_workflow_task_group operator.task_id = "test_task" operator.upstream_task_ids = ["upstream_task"] - relevant_upstreams = [MagicMock(task_id="upstream_task")] - task_dict = {"upstream_task": MagicMock(task_id="upstream_task")} + upstream_task = DatabricksNotebookOperator( + notebook_path="/path/to/upstream", + source="WORKSPACE", + task_id="upstream_task", + dag=dag, + ) + relevant_upstreams = ["upstream_task"] + task_dict = {"upstream_task": upstream_task} task_json = operator._convert_to_databricks_workflow_task(relevant_upstreams, task_dict) task_key = hashlib.md5(b"example_dag__test_task").hexdigest() + upstream_task_key = hashlib.md5(b"example_dag__upstream_task").hexdigest() expected_json = { "task_key": task_key, - "depends_on": [], + "depends_on": [{"task_key": upstream_task_key}], "timeout_seconds": 0, "email_notifications": {}, "notebook_task": { @@ -2755,6 +2762,19 @@ def test_generate_databricks_task_key(self): expected_task_key = hashlib.md5(task_key).hexdigest() assert expected_task_key == operator.databricks_task_key + def test_generate_databricks_task_key_requires_task_dict_when_task_id_passed(self): + """Looking up a parent task's key without a ``task_dict`` is a programmer error.""" + operator = DatabricksTaskOperator( + task_id="test_task", + databricks_conn_id="test_conn_id", + task_config={}, + ) + with pytest.raises( + ValueError, + match="Must pass task_dict if task_id is provided in _generate_databricks_task_key.", + ): + operator._generate_databricks_task_key(task_id="upstream_task") + def test_user_databricks_task_key(self): task_config = {} operator = DatabricksTaskOperator( diff --git a/providers/databricks/tests/unit/databricks/operators/test_databricks_workflow.py b/providers/databricks/tests/unit/databricks/operators/test_databricks_workflow.py index 9cfa7e91ae384..0f52b2797c9ad 100644 --- a/providers/databricks/tests/unit/databricks/operators/test_databricks_workflow.py +++ b/providers/databricks/tests/unit/databricks/operators/test_databricks_workflow.py @@ -17,6 +17,7 @@ from __future__ import annotations +import hashlib from unittest.mock import MagicMock, patch import pytest @@ -30,6 +31,7 @@ from airflow.models.baseoperator import BaseOperator from airflow.providers.common.compat.sdk import AirflowException from airflow.providers.databricks.hooks.databricks import RunLifeCycleState +from airflow.providers.databricks.operators.databricks import DatabricksNotebookOperator from airflow.providers.databricks.operators.databricks_workflow import ( DatabricksWorkflowTaskGroup, WorkflowRunMetadata, @@ -333,3 +335,160 @@ def test_on_kill(mock_databricks_hook, context, mock_workflow_run_metadata): operator.on_kill() operator._hook.cancel_run.assert_called_once_with(RUN_ID) + + +class TestWorkflowDependsOn: + """End-to-end coverage that ``depends_on`` references the *parent's* ``task_key``. + + Regression coverage for issue apache/airflow#47614 (root cause fixed by #48492). + Each test builds a real ``DAG`` + ``DatabricksWorkflowTaskGroup`` populated with + real ``DatabricksNotebookOperator`` tasks (no operator mocks), then drives + ``_CreateDatabricksWorkflowOperator.create_workflow_json`` and asserts the + resulting ``tasks[*]['depends_on']`` payload. + """ + + DAG_ID = "test_depends_on_dag" + GROUP_ID = "wf_group" + CONN_ID = "databricks_conn" + + @staticmethod + def _build_notebook(task_id: str, **kwargs) -> DatabricksNotebookOperator: + return DatabricksNotebookOperator( + task_id=task_id, + notebook_path=f"/path/{task_id}", + source="WORKSPACE", + **kwargs, + ) + + def _expected_default_key(self, group_task_id: str) -> str: + full_task_id = f"{self.GROUP_ID}.{group_task_id}" + return hashlib.md5(f"{self.DAG_ID}__{full_task_id}".encode()).hexdigest() + + def _launch_task(self, dag: DAG) -> _CreateDatabricksWorkflowOperator: + launch = dag.task_dict[f"{self.GROUP_ID}.launch"] + assert isinstance(launch, _CreateDatabricksWorkflowOperator) + return launch + + @staticmethod + def _tasks_by_key(workflow_json: dict) -> dict: + return {t["task_key"]: t for t in workflow_json["tasks"]} + + def test_depends_on_uses_parent_key_default_keys(self): + """``task_A >> task_B`` — ``task_B.depends_on`` references ``task_A``'s key.""" + with DAG(dag_id=self.DAG_ID, schedule=None, start_date=DEFAULT_DATE) as dag: + with DatabricksWorkflowTaskGroup(group_id=self.GROUP_ID, databricks_conn_id=self.CONN_ID): + task_a = self._build_notebook("task_a") + task_b = self._build_notebook("task_b") + task_a >> task_b + + tasks_by_key = self._tasks_by_key(self._launch_task(dag).create_workflow_json()) + a_key = self._expected_default_key("task_a") + b_key = self._expected_default_key("task_b") + + assert set(tasks_by_key) == {a_key, b_key} + assert tasks_by_key[a_key]["depends_on"] == [] + assert tasks_by_key[b_key]["depends_on"] == [{"task_key": a_key}] + + def test_depends_on_uses_parent_key_custom_parent_key(self): + """An explicit ``databricks_task_key`` on the parent flows into ``depends_on``.""" + with DAG(dag_id=self.DAG_ID, schedule=None, start_date=DEFAULT_DATE) as dag: + with DatabricksWorkflowTaskGroup(group_id=self.GROUP_ID, databricks_conn_id=self.CONN_ID): + task_a = self._build_notebook("task_a", databricks_task_key="custom_a") + task_b = self._build_notebook("task_b") + task_a >> task_b + + tasks_by_key = self._tasks_by_key(self._launch_task(dag).create_workflow_json()) + b_key = self._expected_default_key("task_b") + + assert "custom_a" in tasks_by_key + assert tasks_by_key[b_key]["depends_on"] == [{"task_key": "custom_a"}] + + def test_depends_on_falls_back_to_hash_when_parent_key_too_long(self): + """A >100-char explicit key is rejected; both task and ``depends_on`` use the hash.""" + too_long_key = "x" * 101 + with DAG(dag_id=self.DAG_ID, schedule=None, start_date=DEFAULT_DATE) as dag: + with DatabricksWorkflowTaskGroup(group_id=self.GROUP_ID, databricks_conn_id=self.CONN_ID): + task_a = self._build_notebook("task_a", databricks_task_key=too_long_key) + task_b = self._build_notebook("task_b") + task_a >> task_b + + tasks_by_key = self._tasks_by_key(self._launch_task(dag).create_workflow_json()) + a_key = self._expected_default_key("task_a") + b_key = self._expected_default_key("task_b") + + assert too_long_key not in tasks_by_key + assert a_key in tasks_by_key + assert tasks_by_key[b_key]["depends_on"] == [{"task_key": a_key}] + + def test_depends_on_diamond_dependency(self): + """``A >> [B, C] >> D`` — D depends on both B and C; B and C each depend only on A.""" + with DAG(dag_id=self.DAG_ID, schedule=None, start_date=DEFAULT_DATE) as dag: + with DatabricksWorkflowTaskGroup(group_id=self.GROUP_ID, databricks_conn_id=self.CONN_ID): + task_a = self._build_notebook("task_a") + task_b = self._build_notebook("task_b") + task_c = self._build_notebook("task_c") + task_d = self._build_notebook("task_d") + task_a >> [task_b, task_c] >> task_d + + tasks_by_key = self._tasks_by_key(self._launch_task(dag).create_workflow_json()) + a_key = self._expected_default_key("task_a") + b_key = self._expected_default_key("task_b") + c_key = self._expected_default_key("task_c") + d_key = self._expected_default_key("task_d") + + assert tasks_by_key[a_key]["depends_on"] == [] + assert tasks_by_key[b_key]["depends_on"] == [{"task_key": a_key}] + assert tasks_by_key[c_key]["depends_on"] == [{"task_key": a_key}] + d_parent_keys = {entry["task_key"] for entry in tasks_by_key[d_key]["depends_on"]} + assert d_parent_keys == {b_key, c_key} + + def test_depends_on_fan_out_dependency(self): + """``A >> [B, C]`` — both downstreams reference A's key only.""" + with DAG(dag_id=self.DAG_ID, schedule=None, start_date=DEFAULT_DATE) as dag: + with DatabricksWorkflowTaskGroup(group_id=self.GROUP_ID, databricks_conn_id=self.CONN_ID): + task_a = self._build_notebook("task_a") + task_b = self._build_notebook("task_b") + task_c = self._build_notebook("task_c") + task_a >> [task_b, task_c] + + tasks_by_key = self._tasks_by_key(self._launch_task(dag).create_workflow_json()) + a_key = self._expected_default_key("task_a") + b_key = self._expected_default_key("task_b") + c_key = self._expected_default_key("task_c") + + assert tasks_by_key[a_key]["depends_on"] == [] + assert tasks_by_key[b_key]["depends_on"] == [{"task_key": a_key}] + assert tasks_by_key[c_key]["depends_on"] == [{"task_key": a_key}] + + def test_root_tasks_have_empty_depends_on(self): + """Root tasks' Airflow upstream is the launch task; that must never appear in ``depends_on``.""" + with DAG(dag_id=self.DAG_ID, schedule=None, start_date=DEFAULT_DATE) as dag: + with DatabricksWorkflowTaskGroup(group_id=self.GROUP_ID, databricks_conn_id=self.CONN_ID): + root_a = self._build_notebook("root_a") + root_b = self._build_notebook("root_b") + self._build_notebook("downstream").set_upstream([root_a, root_b]) + + launch_task = self._launch_task(dag) + # Sanity: both roots actually have the launch task as an Airflow upstream. + for root_task_id in (f"{self.GROUP_ID}.root_a", f"{self.GROUP_ID}.root_b"): + assert launch_task.task_id in dag.task_dict[root_task_id].upstream_task_ids + + tasks_by_key = self._tasks_by_key(launch_task.create_workflow_json()) + root_a_key = self._expected_default_key("root_a") + root_b_key = self._expected_default_key("root_b") + + assert tasks_by_key[root_a_key]["depends_on"] == [] + assert tasks_by_key[root_b_key]["depends_on"] == [] + + def test_depends_on_filters_out_external_upstream(self): + """An Airflow upstream outside the workflow group must not appear in ``depends_on``.""" + with DAG(dag_id=self.DAG_ID, schedule=None, start_date=DEFAULT_DATE) as dag: + external_op = EmptyOperator(task_id="external_op") + with DatabricksWorkflowTaskGroup(group_id=self.GROUP_ID, databricks_conn_id=self.CONN_ID): + dbx_task = self._build_notebook("dbx_task") + external_op >> dbx_task + + tasks_by_key = self._tasks_by_key(self._launch_task(dag).create_workflow_json()) + dbx_key = self._expected_default_key("dbx_task") + + assert tasks_by_key[dbx_key]["depends_on"] == [] From f90a8fff341f6a8288b04eb95bda1c5637981d0b Mon Sep 17 00:00:00 2001 From: deepinsight coder Date: Wed, 13 May 2026 05:10:11 +0000 Subject: [PATCH 2/3] Remove manually added databricks changelog entry Provider changelogs are regenerated from git log by the release manager and should not be edited by hand. --- providers/databricks/docs/changelog.rst | 1 - 1 file changed, 1 deletion(-) diff --git a/providers/databricks/docs/changelog.rst b/providers/databricks/docs/changelog.rst index 0b75782f47339..523ea7a1f54c3 100644 --- a/providers/databricks/docs/changelog.rst +++ b/providers/databricks/docs/changelog.rst @@ -59,7 +59,6 @@ Bug Fixes ~~~~~~~~~ * ``Add 'task_config' to 'template_fields' for 'DatabricksTaskOperator' (#65858)`` -* ``Lock in 'DatabricksTaskBaseOperator.depends_on' to reference the parent task's 'task_key' instead of its own (#47614)`` .. Below changes are excluded from the changelog. Move them to appropriate section above if needed. Do not delete the lines(!): From 684bfee90984cb742a71cdd200ef12d95f4aa389 Mon Sep 17 00:00:00 2001 From: deepinsight coder Date: Tue, 19 May 2026 05:54:58 +0000 Subject: [PATCH 3/3] Assert depends_on reaches the Databricks Jobs API payload Existing TestWorkflowDependsOn coverage verified the in-process create_workflow_json() output. The new TestWorkflowDependsOnWirePayload class drives _CreateDatabricksWorkflowOperator._create_or_reset_job end to end and asserts on the spec that DatabricksHook.create_job / DatabricksHook.reset_job actually receive, i.e. the payload that lands in /api/2.1/jobs/create and /api/2.1/jobs/reset. Both branches (no existing job -> create_job, existing job -> reset_job) are exercised; both assert tasks[child].depends_on == [{task_key: md5(parent)}] and tasks[parent].depends_on == []. --- .../operators/test_databricks_workflow.py | 80 +++++++++++++++++++ 1 file changed, 80 insertions(+) diff --git a/providers/databricks/tests/unit/databricks/operators/test_databricks_workflow.py b/providers/databricks/tests/unit/databricks/operators/test_databricks_workflow.py index 0f52b2797c9ad..1a00ef8dadef4 100644 --- a/providers/databricks/tests/unit/databricks/operators/test_databricks_workflow.py +++ b/providers/databricks/tests/unit/databricks/operators/test_databricks_workflow.py @@ -492,3 +492,83 @@ def test_depends_on_filters_out_external_upstream(self): dbx_key = self._expected_default_key("dbx_task") assert tasks_by_key[dbx_key]["depends_on"] == [] + + +class TestWorkflowDependsOnWirePayload: + """Wire-boundary coverage: the spec sent to the Databricks Jobs API carries ``depends_on``. + + :class:`TestWorkflowDependsOn` asserts the in-process ``create_workflow_json`` payload. + These tests assert the *wire* payload — what ``_create_or_reset_job`` actually hands to + ``DatabricksHook.create_job`` (new job) or ``DatabricksHook.reset_job`` (existing job), + which is what the Databricks REST API receives. + """ + + DAG_ID = "test_depends_on_wire_dag" + GROUP_ID = "wf_group" + CONN_ID = "databricks_conn" + + @staticmethod + def _build_notebook(task_id: str, **kwargs) -> DatabricksNotebookOperator: + return DatabricksNotebookOperator( + task_id=task_id, + notebook_path=f"/path/{task_id}", + source="WORKSPACE", + **kwargs, + ) + + def _expected_default_key(self, group_task_id: str) -> str: + full_task_id = f"{self.GROUP_ID}.{group_task_id}" + return hashlib.md5(f"{self.DAG_ID}__{full_task_id}".encode()).hexdigest() + + def _launch_task(self, dag: DAG) -> _CreateDatabricksWorkflowOperator: + launch = dag.task_dict[f"{self.GROUP_ID}.launch"] + assert isinstance(launch, _CreateDatabricksWorkflowOperator) + return launch + + @staticmethod + def _tasks_by_key(workflow_json: dict) -> dict: + return {t["task_key"]: t for t in workflow_json["tasks"]} + + def _build_two_task_dag(self) -> DAG: + with DAG(dag_id=self.DAG_ID, schedule=None, start_date=DEFAULT_DATE) as dag: + with DatabricksWorkflowTaskGroup(group_id=self.GROUP_ID, databricks_conn_id=self.CONN_ID): + task_a = self._build_notebook("task_a") + task_b = self._build_notebook("task_b") + task_a >> task_b + return dag + + def _assert_parent_depends_on(self, job_spec: dict) -> None: + tasks_by_key = self._tasks_by_key(job_spec) + a_key = self._expected_default_key("task_a") + b_key = self._expected_default_key("task_b") + + assert len(job_spec["tasks"]) == 2 + assert set(tasks_by_key) == {a_key, b_key} + assert tasks_by_key[a_key]["depends_on"] == [] + assert tasks_by_key[b_key]["depends_on"] == [{"task_key": a_key}] + + def test_create_job_payload_carries_parent_depends_on(self, mock_databricks_hook): + """No existing job → ``create_job`` receives a spec whose ``depends_on`` references the parent key.""" + launch_task = self._launch_task(self._build_two_task_dag()) + launch_task._hook.list_jobs.return_value = [] + launch_task._hook.create_job.return_value = 999 + + launch_task._create_or_reset_job(context=MagicMock()) + + launch_task._hook.create_job.assert_called_once() + launch_task._hook.reset_job.assert_not_called() + (job_spec,) = launch_task._hook.create_job.call_args.args + self._assert_parent_depends_on(job_spec) + + def test_reset_job_payload_carries_parent_depends_on(self, mock_databricks_hook): + """Existing job → ``reset_job`` receives a spec whose ``depends_on`` references the parent key.""" + launch_task = self._launch_task(self._build_two_task_dag()) + launch_task._hook.list_jobs.return_value = [{"job_id": 42}] + + launch_task._create_or_reset_job(context=MagicMock()) + + launch_task._hook.reset_job.assert_called_once() + launch_task._hook.create_job.assert_not_called() + job_id, job_spec = launch_task._hook.reset_job.call_args.args + assert job_id == 42 + self._assert_parent_depends_on(job_spec)