Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,10 @@

from __future__ import annotations

from typing import cast

from sqlalchemy import func, select
from sqlalchemy.sql import ColumnElement

from airflow.models.dag import DagModel
from airflow.models.dagrun import DagRun
Expand All @@ -27,7 +30,7 @@
DagRun.dag_id,
DagRun.state,
DagModel.dag_display_name,
func.count(DagRun.state),
cast("ColumnElement[int]", func.count(DagRun.state).label("count")),
)
.join(DagModel, DagRun.dag_id == DagModel.dag_id)
.group_by(DagRun.dag_id, DagRun.state, DagModel.dag_display_name)
Expand Down
12 changes: 8 additions & 4 deletions airflow-core/src/airflow/api_fastapi/common/parameters.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
Generic,
Literal,
TypeVar,
cast,
overload,
)

Expand Down Expand Up @@ -67,6 +68,7 @@
from sqlalchemy.orm.attributes import InstrumentedAttribute
from sqlalchemy.sql import ColumnElement, Select


T = TypeVar("T")


Expand All @@ -75,7 +77,7 @@ class BaseParam(OrmClause[T], ABC):

def __init__(self, value: T | None = None, skip_none: bool = True) -> None:
super().__init__(value)
self.attribute: ColumnElement | None = None
self.attribute: ColumnElement | InstrumentedAttribute | None = None
self.skip_none = skip_none

def set_value(self, value: T | None) -> Self:
Expand Down Expand Up @@ -387,7 +389,7 @@ def depends(cls, *args: Any, **kwargs: Any) -> Self:


def filter_param_factory(
attribute: ColumnElement,
attribute: ColumnElement | InstrumentedAttribute,
_type: type,
filter_option: FilterOptionEnum = FilterOptionEnum.EQUAL,
filter_name: str | None = None,
Expand All @@ -399,7 +401,7 @@ def filter_param_factory(
description: str | None = None,
) -> Callable[[T | None], FilterParam[T | None]]:
# if filter_name is not provided, use the attribute name as the default
filter_name = filter_name or attribute.name
filter_name = filter_name or getattr(attribute, "name", str(attribute))
# can only set either default_value or default_factory
query = (
Query(alias=filter_name, default_factory=default_factory, description=description)
Expand All @@ -410,7 +412,9 @@ def filter_param_factory(
def depends_filter(value: T | None = query) -> FilterParam[T | None]:
if transform_callable:
value = transform_callable(value)
return FilterParam(attribute, value, filter_option, skip_none)
# Cast to InstrumentedAttribute for type compatibility
attr = cast("InstrumentedAttribute", attribute)
return FilterParam(attr, value, filter_option, skip_none)

# add type hint to value at runtime
depends_filter.__annotations__["value"] = _type
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ def _serialize_xcoms(self) -> dict[str, Any]:
task_ids=self.result_task_ids,
dag_ids=self.dag_id,
)
xcom_query = self.session.scalars(xcom_query.order_by(XComModel.task_id, XComModel.map_index)).all()
xcom_results = self.session.scalars(xcom_query.order_by(XComModel.task_id, XComModel.map_index))

def _group_xcoms(g: Iterator[XComModel]) -> Any:
entries = list(g)
Expand All @@ -67,7 +67,7 @@ def _group_xcoms(g: Iterator[XComModel]) -> Any:

return {
task_id: _group_xcoms(g)
for task_id, g in itertools.groupby(xcom_query, key=operator.attrgetter("task_id"))
for task_id, g in itertools.groupby(xcom_results, key=operator.attrgetter("task_id"))
}

def _serialize_response(self, dag_run: DagRun) -> str:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ def setup_dag_runs(self, session=None) -> None:
triggered_by=DagRunTriggeredByType.TEST,
)
if dag_run.start_date is not None:
dag_run.end_date = dag_run.start_date.add(hours=1)
dag_run.end_date = dag_run.start_date + pendulum.duration(hours=1)
session.add(dag_run)
session.commit()

Expand Down
Loading