Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -33,3 +33,17 @@ class DagResponse(BaseModel):
owners: str | None
tags: list[str]
next_dagrun: datetime | None


class DagTaskGroupsExistenceResponse(BaseModel):
"""Schema for batch Dag task group existence response."""

existing: list[str]
missing: list[str]


class DagTasksExistenceResponse(BaseModel):
"""Schema for batch Dag task existence response."""

existing: list[str]
missing: list[str]
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,15 @@

from __future__ import annotations

from fastapi import APIRouter, HTTPException, status
from fastapi import APIRouter, HTTPException, Query, status

from airflow.api_fastapi.common.dagbag import DagBagDep, get_latest_version_of_dag
from airflow.api_fastapi.common.db.common import SessionDep
from airflow.api_fastapi.execution_api.datamodels.dags import DagResponse
from airflow.api_fastapi.execution_api.datamodels.dags import (
DagResponse,
DagTaskGroupsExistenceResponse,
DagTasksExistenceResponse,
)
from airflow.models.dag import DagModel

router = APIRouter()
Expand Down Expand Up @@ -57,3 +62,69 @@ def get_dag(
tags=sorted(tag.name for tag in dag_model.tags),
next_dagrun=dag_model.next_dagrun,
)


@router.get(
"/{dag_id}/task-groups/existence",
responses={
status.HTTP_404_NOT_FOUND: {"description": "DAG not found for the given dag_id"},
},
)
def get_dag_task_groups_existence(
dag_id: str,
session: SessionDep,
dag_bag: DagBagDep,
task_group_ids: list[str] = Query(
default_factory=list, description="Task group ids to check for existence"
),
) -> DagTaskGroupsExistenceResponse:
"""Get the list of existing and missing Dag task group ids from the given ids."""
if not session.get(DagModel, dag_id):
raise HTTPException(
status.HTTP_404_NOT_FOUND,
detail={
"reason": "not_found",
"message": f"The Dag with dag_id: `{dag_id}` was not found",
},
)

dag = get_latest_version_of_dag(dag_bag, dag_id, session, include_reason=True)

existing: list[str] = []
missing: list[str] = []
for task_group_id in task_group_ids:
(existing if task_group_id in dag.task_group_dict else missing).append(task_group_id)

return DagTaskGroupsExistenceResponse(existing=existing, missing=missing)


@router.get(
"/{dag_id}/tasks/existence",
responses={
status.HTTP_404_NOT_FOUND: {"description": "DAG not found for the given dag_id"},
},
)
def get_dag_tasks_existence(
dag_id: str,
session: SessionDep,
dag_bag: DagBagDep,
task_ids: list[str] = Query(default_factory=list, description="Task ids to check for existence"),
) -> DagTasksExistenceResponse:
"""Get the list of existing and missing Dag task ids from the given ids."""
if not session.get(DagModel, dag_id):
raise HTTPException(
status.HTTP_404_NOT_FOUND,
detail={
"reason": "not_found",
"message": f"The Dag with dag_id: `{dag_id}` was not found",
},
)

dag = get_latest_version_of_dag(dag_bag, dag_id, session, include_reason=True)

existing: list[str] = []
missing: list[str] = []
for task_id in task_ids:
(existing if dag.has_task(task_id) else missing).append(task_id)

return DagTasksExistenceResponse(existing=existing, missing=missing)
Original file line number Diff line number Diff line change
Expand Up @@ -46,11 +46,18 @@
AddStateEndpoints,
AddTeamNameField,
)
from airflow.api_fastapi.execution_api.versions.v2026_06_30 import AddVariableKeysEndpoint
from airflow.api_fastapi.execution_api.versions.v2026_06_30 import (
AddDagTaskDetailsExistenceEndpoints,
AddVariableKeysEndpoint,
)

bundle = VersionBundle(
HeadVersion(),
Version("2026-06-30", AddVariableKeysEndpoint),
Version(
"2026-06-30",
AddVariableKeysEndpoint,
AddDagTaskDetailsExistenceEndpoints,
),
Version(
"2026-06-16",
AddRetryPolicyFields,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,3 +26,14 @@ class AddVariableKeysEndpoint(VersionChange):
description = __doc__

instructions_to_migrate_to_previous_version = (endpoint("/variables/keys", ["GET"]).didnt_exist,)


class AddDagTaskDetailsExistenceEndpoints(VersionChange):
"""Add Dag task and task group existence endpoints."""

description = __doc__

instructions_to_migrate_to_previous_version = (
endpoint("/dags/{dag_id}/tasks/existence", ["GET"]).didnt_exist,
endpoint("/dags/{dag_id}/task-groups/existence", ["GET"]).didnt_exist,
)
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@

from airflow.models import DagModel
from airflow.providers.standard.operators.empty import EmptyOperator
from airflow.sdk import TaskGroup

from tests_common.test_utils.db import clear_db_runs

Expand Down Expand Up @@ -126,3 +127,86 @@ def test_get_dag_defaults(self, client, session, dag_maker):
"tags": [],
"next_dagrun": ANY,
}

def test_get_dag_task_groups_existence_partitions(self, client, session, dag_maker):
"""Test partitioning task groups into existing and missing."""

dag_id = "test_dag_task_groups_existence"

with dag_maker(dag_id=dag_id, session=session, serialized=True):
with TaskGroup(group_id="grp"):
EmptyOperator(task_id="a")
session.commit()

response = client.get(
f"/execution/dags/{dag_id}/task-groups/existence",
params={"task_group_ids": ["grp", "ghost_group"]},
)

assert response.status_code == 200
assert response.json() == {"existing": ["grp"], "missing": ["ghost_group"]}

def test_get_dag_task_groups_existence_dag_not_found(self, client, session, dag_maker):
"""Test missing Dag when checking task group existence."""

response = client.get(
"/execution/dags/no_such_dag/task-groups/existence",
params={"task_group_ids": ["grp"]},
)

assert response.status_code == 404
assert response.json() == {
"detail": {
"message": "The Dag with dag_id: `no_such_dag` was not found",
"reason": "not_found",
}
}

def test_get_dag_tasks_existence_partitions(self, client, session, dag_maker):
"""Test partitioning tasks into existing and missing."""

dag_id = "test_dag_tasks_existence"

with dag_maker(dag_id=dag_id, session=session, serialized=True):
EmptyOperator(task_id="a")
EmptyOperator(task_id="b")

session.commit()

response = client.get(
f"/execution/dags/{dag_id}/tasks/existence",
params={"task_ids": ["a", "b", "ghost"]},
)

assert response.status_code == 200
assert response.json() == {"existing": ["a", "b"], "missing": ["ghost"]}

def test_get_dag_tasks_existence_empty_list(self, client, session, dag_maker):
"""Test empty task_ids returns empty partition."""

dag_id = "test_dag_tasks_existence_empty"

with dag_maker(dag_id=dag_id, session=session, serialized=True):
EmptyOperator(task_id="a")
session.commit()

response = client.get(f"/execution/dags/{dag_id}/tasks/existence")

assert response.status_code == 200
assert response.json() == {"existing": [], "missing": []}

def test_get_dag_tasks_existence_dag_not_found(self, client, session, dag_maker):
"""Test missing Dag when checking task existence."""

response = client.get(
"/execution/dags/no_such_dag/tasks/existence",
params={"task_ids": ["a"]},
)

assert response.status_code == 404
assert response.json() == {
"detail": {
"message": "The Dag with dag_id: `no_such_dag` was not found",
"reason": "not_found",
}
}
4 changes: 4 additions & 0 deletions airflow-core/tests/unit/dag_processing/test_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -1954,6 +1954,8 @@ def get_type_names(union_type):
"GetDagRun",
"GetDagRunState",
"GetDag",
"GetDagTaskGroupsExistence",
"GetDagTasksExistence",
"GetDRCount",
"GetTaskBreadcrumbs",
"GetTaskRescheduleStartDate",
Expand Down Expand Up @@ -1995,6 +1997,8 @@ def get_type_names(union_type):
"DagResult",
"DagRunResult",
"DagRunStateResult",
"DagTaskGroupsExistenceResult",
"DagTasksExistenceResult",
"DRCount",
"SentFDs",
"StartupDetails",
Expand Down
4 changes: 4 additions & 0 deletions airflow-core/tests/unit/jobs/test_triggerer_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -1908,6 +1908,8 @@ def get_type_names(union_type):
"GetAssetEventByAsset",
"GetAssetEventByAssetAlias",
"GetDagRun",
"GetDagTaskGroupsExistence",
"GetDagTasksExistence",
"GetPrevSuccessfulDagRun",
"GetPreviousDagRun",
"GetTaskBreadcrumbs",
Expand Down Expand Up @@ -1947,6 +1949,8 @@ def get_type_names(union_type):
"AssetsByAliasResult",
"AssetEventsResult",
"DagRunResult",
"DagTaskGroupsExistenceResult",
"DagTasksExistenceResult",
"SentFDs",
"StartupDetails",
"TaskBreadcrumbsResult",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
from airflow.providers.standard.version_compat import (
AIRFLOW_V_3_0_PLUS,
AIRFLOW_V_3_2_PLUS,
AIRFLOW_V_3_3_PLUS,
BaseOperator,
)
from airflow.utils.file import correct_maybe_zipped
Expand Down Expand Up @@ -441,6 +442,12 @@ def execute(self, context: Context) -> None:

dttm_filter = self._get_dttm_filter(context)
if AIRFLOW_V_3_0_PLUS:
# It relies on endpoints (GetDagTasksExistence and GetDagTaskGroupsExistence)
# that don't exist on previous versions.
if AIRFLOW_V_3_3_PLUS:
if self.check_existence and not self._has_checked_existence:
self._check_for_existence_af3(context)

self.defer(
timeout=datetime.timedelta(seconds=timeout_value) if timeout_value else None,
trigger=WorkflowTrigger(
Expand Down Expand Up @@ -544,6 +551,56 @@ def _check_for_existence(self, session: Session) -> None:

self._has_checked_existence = True

def _check_for_existence_af3(self, context: Context) -> None:
from airflow.sdk.exceptions import AirflowRuntimeError, ErrorType

ti = context["ti"]

def _raise_if_dag_missing(exc: AirflowRuntimeError) -> None:
if exc.error.error == ErrorType.DAG_NOT_FOUND:
raise ExternalDagNotFoundError(f"The external DAG {self.external_dag_id} does not exist.")
raise exc

try:
# Workaround for checking Dag existence by calling get_dag_tasks_existence with an empty task list.
ti.get_dag_tasks_existence(
dag_id=self.external_dag_id,
task_ids=[],
)
except AirflowRuntimeError as exc:
_raise_if_dag_missing(exc)

if self.external_task_ids:
try:
tasks_existence = ti.get_dag_tasks_existence(
dag_id=self.external_dag_id,
task_ids=list(self.external_task_ids),
)
except AirflowRuntimeError as exc:
_raise_if_dag_missing(exc)
else:
if tasks_existence.missing:
raise ExternalTaskNotFoundError(
f"The external task(s) {sorted(tasks_existence.missing)} in Dag "
f"{self.external_dag_id} do not exist."
)
elif self.external_task_group_id:
try:
group_existence = ti.get_dag_task_groups_existence(
dag_id=self.external_dag_id,
task_group_ids=[self.external_task_group_id],
)
except AirflowRuntimeError as exc:
_raise_if_dag_missing(exc)
else:
if group_existence.missing:
raise ExternalTaskGroupNotFoundError(
f"The external task group '{self.external_task_group_id}' in "
f"Dag '{self.external_dag_id}' does not exist."
)

self._has_checked_existence = True

def get_count(self, dttm_filter: Sequence[datetime.datetime], session: Session, states: list[str]) -> int:
"""
Get the count of records against dttm filter and states.
Expand Down
Loading
Loading