Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions airflow-core/newsfragments/67725.improvement.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Type the ``asset_expression`` field in DAG REST API responses with a structured scheduling-expression model instead of an untyped object, so generated API clients describe its real shape rather than treating it as an opaque dictionary.
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,94 @@

from airflow.api_fastapi.core_api.base import BaseModel, StrictBaseModel

# Asset Scheduling Expression Data Models
#
# These mirror the JSON produced by ``BaseAsset.as_expression()`` (see
# ``airflow.serialization.definitions.assets``), which is stored verbatim in
# ``DagModel.asset_expression``. Declaring them gives the REST API -- and the
# TypeScript client generated from its OpenAPI spec -- a real type instead of an
# opaque ``dict``. The shape is a recursive boolean tree whose leaves are assets,
# asset aliases, or asset references.


class AssetExpressionAssetInfo(BaseModel):
"""
Body of an ``asset`` leaf node.

``id`` is injected by ``DagModelOperation.update_dag_asset_expression`` when the expression is
persisted; ``BaseAsset.as_expression()`` itself only emits ``uri``/``name``/``group``. It is left
optional so a row persisted before id-enrichment (or migrated from the pre-3.0 dataset format)
degrades gracefully instead of failing response validation.
"""

uri: str
name: str
group: str
id: int | None = None


class AssetExpressionAliasInfo(BaseModel):
"""Body of an ``alias`` leaf node."""

name: str
group: str


class AssetExpressionAsset(BaseModel):
"""An asset leaf: ``{"asset": {"uri": ..., "name": ..., "group": ...}}``."""

asset: AssetExpressionAssetInfo


class AssetExpressionAlias(BaseModel):
"""An asset alias leaf: ``{"alias": {"name": ..., "group": ...}}``."""

alias: AssetExpressionAliasInfo


class AssetExpressionRef(BaseModel):
"""An unresolved asset reference leaf: ``{"asset_ref": {"name": ...}}`` or ``{"asset_ref": {"uri": ...}}``."""

asset_ref: dict[str, str]


class AssetExpressionAny(BaseModel):
"""An "or" node: ``{"any": [...]}`` -- satisfied when any child is satisfied."""

any: list[AssetExpression]


class AssetExpressionAll(BaseModel):
"""An "and" node: ``{"all": [...]}`` -- satisfied when all children are satisfied."""

all: list[AssetExpression]


def _asset_expression_discriminator(value: Any) -> str | None:
"""Select an expression variant by the single key that is present."""
keys = ("asset", "alias", "asset_ref", "any", "all")
if isinstance(value, dict):
present = [key for key in keys if key in value]
else:
present = [key for key in keys if getattr(value, key, None) is not None]
return present[0] if len(present) == 1 else None


AssetExpression = Annotated[
Union[
Annotated[AssetExpressionAsset, Tag("asset")],
Annotated[AssetExpressionAlias, Tag("alias")],
Annotated[AssetExpressionRef, Tag("asset_ref")],
Annotated[AssetExpressionAny, Tag("any")],
Annotated[AssetExpressionAll, Tag("all")],
],
Discriminator(_asset_expression_discriminator),
]
"""A nested asset scheduling expression; see ``BaseAsset.as_expression()``."""

AssetExpressionAny.model_rebuild()
AssetExpressionAll.model_rebuild()

# Common Bulk Data Models
T = TypeVar("T")
K = TypeVar("K")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@

from airflow._shared.module_loading import qualname
from airflow.api_fastapi.core_api.base import BaseModel, StrictBaseModel, make_partial_model
from airflow.api_fastapi.core_api.datamodels.common import AssetExpression
from airflow.api_fastapi.core_api.datamodels.dag_tags import DagTagResponse
from airflow.api_fastapi.core_api.datamodels.dag_versions import DagVersionResponse
from airflow.configuration import conf
Expand Down Expand Up @@ -191,7 +192,7 @@ class DAGDetailsResponse(DAGResponse):

catchup: bool
dag_run_timeout: timedelta | None
asset_expression: dict | None
asset_expression: AssetExpression | None
doc_md: str | None
start_date: datetime | None
end_date: datetime | None
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

from __future__ import annotations

from datetime import datetime

from pydantic import Field

from airflow.api_fastapi.core_api.base import BaseModel
from airflow.api_fastapi.core_api.datamodels.common import AssetExpression


class NextRunAssetEventResponse(BaseModel):
"""An asset, and the time of its latest event, that a DAG's next run is waiting on."""

id: int
uri: str
name: str
# Serialized as ``lastUpdate`` for the UI; ``None`` until the asset has a qualifying event.
last_update: datetime | None = Field(default=None, alias="lastUpdate")


class NextRunAssetsResponse(BaseModel):
"""Assets feeding a DAG's next run, with the scheduling expression that combines them."""

asset_expression: AssetExpression | None = None
events: list[NextRunAssetEventResponse]
pending_partition_count: int | None = None
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
from __future__ import annotations

from airflow.api_fastapi.core_api.base import BaseModel
from airflow.api_fastapi.core_api.datamodels.common import AssetExpression
from airflow.api_fastapi.core_api.datamodels.dags import DAGResponse
from airflow.api_fastapi.core_api.datamodels.hitl import HITLDetail
from airflow.api_fastapi.core_api.datamodels.ui.dag_runs import DAGRunLightResponse
Expand All @@ -26,7 +27,7 @@
class DAGWithLatestDagRunsResponse(DAGResponse):
"""DAG with latest dag runs response serializer."""

asset_expression: dict | None
asset_expression: AssetExpression | None
latest_dag_runs: list[DAGRunLightResponse]
pending_actions: list[HITLDetail]
is_favorite: bool
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
from __future__ import annotations

from airflow.api_fastapi.core_api.base import BaseModel
from airflow.api_fastapi.core_api.datamodels.common import AssetExpression


class PartitionedDagRunResponse(BaseModel):
Expand All @@ -37,7 +38,7 @@ class PartitionedDagRunCollectionResponse(BaseModel):

partitioned_dag_runs: list[PartitionedDagRunResponse]
total: int
asset_expressions: dict[str, dict | None] | None = None
asset_expressions: dict[str, AssetExpression | None] | None = None


class PartitionedDagRunAssetResponse(BaseModel):
Expand All @@ -61,4 +62,4 @@ class PartitionedDagRunDetailResponse(BaseModel):
assets: list[PartitionedDagRunAssetResponse]
total_required: int
total_received: int
asset_expression: dict | None = None
asset_expression: AssetExpression | None = None
Loading