Lock in Databricks workflow depends_on parent-key behavior (#47614)#66681
Lock in Databricks workflow depends_on parent-key behavior (#47614)#66681Vamsi-klu wants to merge 3 commits into
Conversation
|
@eladkal @jscheffl @shahar1 — could one of you take a look when you have a moment? This PR is regression coverage / type-hint cleanup for the depends_on bug whose runtime fix you reviewed in #48492. The actual behavior is unchanged; the goal is to lock it in with end-to-end tests so the same bug can't silently come back, and to close out #47614. |
|
cc @moomindani for review |
moomindani
left a comment
There was a problem hiding this comment.
The diff itself looks good to me — the type annotation is the right one, the relevant_upstreams = [task_id] → [] initializer is genuinely dead code being cleaned up (the bare "launch" could never match the child tasks' prefixed "<group>.launch" upstreams, so the original list was always semantically empty), the existing MagicMock-based test fix actually exercises the depends_on branch now, and TestWorkflowDependsOn covers default/custom/oversize key, diamond, fan-out, root-task filtering, and external-upstream filtering. Changelog revert in 025e8a4 is clean.
A couple of things before I'd be comfortable approving:
-
CI is red. The remaining failure (
Compat 3.0.6:P3.10) isgh run downloadartifact infra, not your code, but we shouldn't merge on red — could you re-trigger it (or push an empty commit) so we get a clean build? -
Real-workspace validation. The runtime fix has been live since #48492 in April 2025, so the user-visible behavior change here is small, but this PR does touch production code (
relevant_upstreams = [task_id] → []). Have you (or anyone) actually run aDatabricksWorkflowTaskGroupwith a parent/child task pair against a Databricks workspace and confirmed the rendered Jobs API JSON hasdepends_on: [{task_key: <parent_key>}]? The unit tests build realDAGobjects but assert on the in-processcreate_workflow_json()payload, not what arrives at Databricks. Anairflow dags testrun + a screenshot of the resulting Databricks job graph would close the loop (similar pattern to what I used in #66613).
Once CI is green and there's some evidence the wire payload is right, LGTM.
025e8a4 to
c231337
Compare
|
@moomindani thanks for the review. Two things addressed: (A) CI the failing (B) Wire-payload validation added a new
Both tests build a real DAG + I don't have a Databricks workspace handy for the live |
moomindani
left a comment
There was a problem hiding this comment.
Real-workspace validation
Drove the PR branch end-to-end through airflow dags test against a Databricks workspace with a minimal DatabricksWorkflowTaskGroup (task_a >> task_b, both DatabricksNotebookOperator), then queried the created Databricks job back via 2.2/jobs/list. The Jobs API payload is correctly parent-keyed:
{
"tasks": [
{
"task_key": "d13f300f3f3b28c910b3710198314589"
// md5("pr66681_realenv__wf.task_a"); no depends_on
},
{
"task_key": "dcf56d6d083da88df804ce506620a7a2", // md5("pr66681_realenv__wf.task_b")
"depends_on": [
{"task_key": "d13f300f3f3b28c910b3710198314589"} // parent task_a's key, not its own
]
}
]
}Closes the wire-payload loop on top of the unit tests.
Reproduction
export AIRFLOW_CONN_DATABRICKS_DEFAULT='{"conn_type":"databricks","host":"https://<workspace>","password":"<token>"}'
export PR66681_NOTEBOOK_PATH=/Users/<you>/airflow-pr66681-noop # any existing notebook in the workspace
airflow dags test pr66681_realenvThen verify the resulting Databricks job via:
curl -s -H "Authorization: Bearer $DATABRICKS_TOKEN" \
"https://<workspace>/api/2.2/jobs/list?name=pr66681_realenv.wf&limit=1&expand_tasks=true" | jq '.jobs[0].settings.tasks'dags/dag_pr66681_realenv.py
"""Real-environment validation DAG for PR #66681 (GH-47614)."""
from __future__ import annotations
import hashlib
import os
from datetime import datetime
from airflow.providers.databricks.hooks.databricks import DatabricksHook
from airflow.providers.databricks.operators.databricks import DatabricksNotebookOperator
from airflow.providers.databricks.operators.databricks_workflow import DatabricksWorkflowTaskGroup
from airflow.sdk import DAG, task
NOTEBOOK_PATH = os.environ["PR66681_NOTEBOOK_PATH"]
DAG_ID = "pr66681_realenv"
GROUP_ID = "wf"
CONN_ID = "databricks_default"
def _hook() -> DatabricksHook:
return DatabricksHook(databricks_conn_id=CONN_ID)
def _expected_task_key(task_id_in_group: str) -> str:
full_task_id = f"{GROUP_ID}.{task_id_in_group}"
return hashlib.md5(f"{DAG_ID}__{full_task_id}".encode()).hexdigest()
with DAG(dag_id=DAG_ID, start_date=datetime(2026, 1, 1), schedule=None, catchup=False) as dag:
with DatabricksWorkflowTaskGroup(
group_id=GROUP_ID,
databricks_conn_id=CONN_ID,
job_clusters=[
{
"job_cluster_key": "shared",
"new_cluster": {
"spark_version": "15.4.x-scala2.12",
"node_type_id": "i3.xlarge",
"num_workers": 0,
"spark_conf": {"spark.master": "local[*]"},
"custom_tags": {"ResourceClass": "SingleNode"},
},
}
],
) as wf:
task_a = DatabricksNotebookOperator(
task_id="task_a", notebook_path=NOTEBOOK_PATH, source="WORKSPACE", job_cluster_key="shared"
)
task_b = DatabricksNotebookOperator(
task_id="task_b", notebook_path=NOTEBOOK_PATH, source="WORKSPACE", job_cluster_key="shared"
)
task_a >> task_b
@task(task_id="verify_depends_on", trigger_rule="all_done")
def verify_depends_on() -> None:
jobs = _hook()._do_api_call(
("GET", "2.2/jobs/list"),
{"name": f"{DAG_ID}.{GROUP_ID}", "limit": 1, "expand_tasks": True},
)
job = jobs["jobs"][0]
tasks_by_key = {t["task_key"]: t for t in job["settings"]["tasks"]}
a_key = _expected_task_key("task_a")
b_key = _expected_task_key("task_b")
assert tasks_by_key[a_key].get("depends_on", []) == []
assert tasks_by_key[b_key]["depends_on"] == [{"task_key": a_key}]
_hook()._do_api_call(("POST", "2.2/jobs/delete"), {"job_id": job["job_id"]})
wf >> verify_depends_on()Approving — note I'm not a committer, so this is only a non-binding LGTM; a committer still needs to sign off before merge.
| def _convert_to_databricks_workflow_task( | ||
| self, | ||
| relevant_upstreams: list[BaseOperator], | ||
| relevant_upstreams: list[str], |
There was a problem hiding this comment.
Can you clarify why this change is needed?
There was a problem hiding this comment.
Good catch. This is correcting the annotation to match the runtime value rather than changing behavior.
relevant_upstreams is populated from task.task_id in DatabricksWorkflowTaskGroup.__exit__, so it is a list of task-id strings. _convert_to_databricks_workflow_task() then compares those strings with self.upstream_task_ids:
for task_id in self.upstream_task_ids
if task_id in relevant_upstreamsThe old list[BaseOperator] annotation was misleading: passing operators there would make that membership check fail because it compares str task IDs to operator objects. The actual BaseOperator instances are still carried separately in task_dict, which is used when resolving the parent task’s Databricks task key.
So this change is mainly a type-hint cleanup that also makes the tests reflect the real call shape.
The runtime fix for issue apache#47614 shipped in PR apache#48492; this PR adds end-to-end regression coverage so the bug cannot silently regress, plus small type-hint and constructor-clarity follow-ups in the same area. Tests build a real DAG + DatabricksWorkflowTaskGroup with DatabricksNotebookOperator tasks and assert depends_on payloads for the default-key, custom-key, >100-char-key, diamond, fan-out, root-task, and external-upstream cases. Also fixes the existing test_convert_to_databricks_workflow_task to pass strings (not mocks) so the depends_on branch is actually exercised, and adds a one-line check that _generate_databricks_task_key raises when called with a parent task_id but no task_dict. closes: apache#47614
Provider changelogs are regenerated from git log by the release manager and should not be edited by hand.
Existing TestWorkflowDependsOn coverage verified the in-process
create_workflow_json() output. The new TestWorkflowDependsOnWirePayload
class drives _CreateDatabricksWorkflowOperator._create_or_reset_job end
to end and asserts on the spec that DatabricksHook.create_job /
DatabricksHook.reset_job actually receive, i.e. the payload that lands
in /api/2.1/jobs/create and /api/2.1/jobs/reset.
Both branches (no existing job -> create_job, existing job -> reset_job)
are exercised; both assert tasks[child].depends_on == [{task_key:
md5(parent)}] and tasks[parent].depends_on == [].
c231337 to
684bfee
Compare
The runtime fix for issue #47614 shipped in #48492 (commit 9dcce2f).
Closes: #47614
The GitHub issue stayed open because:
test_convert_to_databricks_workflow_task) passedrelevant_upstreams = [MagicMock(task_id="upstream_task")]— a list ofmock objects, not strings — so the
task_id in relevant_upstreamsfilterwas silently always-False and the
depends_onbranch was never exercised.The fix could regress without anyone noticing.
This PR is therefore a lock-in / regression / hardening change, not a new fix.
Changes
databricks.py: correctedrelevant_upstreamsannotation fromlist[BaseOperator]tolist[str]on bothDatabricksTaskBaseOperatorand
DatabricksNotebookOperatoroverrides.databricks_workflow.py: changedself.relevant_upstreams = [task_id]to
self.relevant_upstreams: list[str] = []. Behavior is unchanged(the launch task's raw, unprefixed task_id was previously filtered out
only by an accidental prefix mismatch); the new initializer makes the
intent explicit.
test_databricks_workflow.py: newTestWorkflowDependsOnclass with 7end-to-end tests that build a real
DAG+DatabricksWorkflowTaskGroupwith real
DatabricksNotebookOperatortasks and assert on thetasks[*]['depends_on']payload returned bycreate_workflow_json().Covers default keys, custom parent key, >100-char parent key, diamond,
fan-out, root tasks (launch task is filtered out), and external
upstream filtering.
test_databricks.py: fixed the existingtest_convert_to_databricks_workflow_taskto pass strings instead ofmocks and to assert the correct parent-keyed
depends_on. Addedtest_generate_databricks_task_key_requires_task_dict_when_task_id_passed.changelog.rst: Bug Fixes entry under 7.14.0.closes: #47614
Was generative AI tooling used to co-author this PR?
Generated-by: Claude Code (Opus 4.7) following the guidelines