From b898eab4fc006db589ec0f86f2f2218669f0b4fe Mon Sep 17 00:00:00 2001 From: AI Assistant Date: Tue, 26 May 2026 12:37:31 +0530 Subject: [PATCH] Fix rendered_map_index sort using integer map_index for correct ordering Sorting mapped task instances by rendered_map_index was producing lexicographic order ("0","1","10","11","2") instead of numeric order (0,1,2,...,10,11), and caused a 500 error on cursor pagination when a CASE expression was used in to_replace. Map rendered_map_index to the integer map_index column via a string alias in SortParam.to_replace. This routes ORDER BY through the integer column, fixing both the sort order and cursor pagination. Also adds secondary_sort parameter to SortParam as generic infrastructure for appending tiebreaker columns to ORDER BY. --- .../airflow/api_fastapi/common/parameters.py | 18 +++++++++++++----- .../core_api/routes/public/task_instances.py | 2 ++ 2 files changed, 15 insertions(+), 5 deletions(-) diff --git a/airflow-core/src/airflow/api_fastapi/common/parameters.py b/airflow-core/src/airflow/api_fastapi/common/parameters.py index 56b4c20884cbb..2c1b7a87b8f89 100644 --- a/airflow-core/src/airflow/api_fastapi/common/parameters.py +++ b/airflow-core/src/airflow/api_fastapi/common/parameters.py @@ -543,12 +543,17 @@ class SortParam(BaseParam[list[str]]): MAX_SORT_PARAMS = 10 def __init__( - self, allowed_attrs: list[str], model: Base, to_replace: dict[str, str | Column] | None = None + self, + allowed_attrs: list[str], + model: Base, + to_replace: dict[str, str | Column] | None = None, + secondary_sort: dict[str, Column] | None = None, ) -> None: super().__init__() self.allowed_attrs = allowed_attrs self.model = model self.to_replace = to_replace + self.secondary_sort: dict[str, Column] = secondary_sort or {} self._cached_resolution: list[tuple[str, ColumnElement, bool]] | None = None def set_value(self, value: list[str] | None) -> Self: @@ -611,10 +616,13 @@ def to_orm(self, select: Select, *, reversed: bool = False) -> Select: raise ValueError(f"Cannot set 'skip_none' to False on a {type(self)}") resolved = self._resolve() - if reversed: - columns = [col.asc() if is_desc else col.desc() for _, col, is_desc in resolved] - else: - columns = [col.desc() if is_desc else col.asc() for _, col, is_desc in resolved] + columns = [] + for attr_name, col, is_desc in resolved: + effective_desc = not is_desc if reversed else is_desc + columns.append(col.desc() if effective_desc else col.asc()) + secondary = self.secondary_sort.get(attr_name) + if secondary is not None: + columns.append(secondary.desc() if effective_desc else secondary.asc()) return select.order_by(None).order_by(*columns) def get_resolved_columns(self) -> list[tuple[str, ColumnElement, bool]]: diff --git a/airflow-core/src/airflow/api_fastapi/core_api/routes/public/task_instances.py b/airflow-core/src/airflow/api_fastapi/core_api/routes/public/task_instances.py index e64435ce9f67c..d838dc08d4f11 100644 --- a/airflow-core/src/airflow/api_fastapi/core_api/routes/public/task_instances.py +++ b/airflow-core/src/airflow/api_fastapi/core_api/routes/public/task_instances.py @@ -216,6 +216,7 @@ def get_mapped_task_instances( "logical_date": DagRun.logical_date, "data_interval_start": DagRun.data_interval_start, "data_interval_end": DagRun.data_interval_end, + "rendered_map_index": "map_index", }, ).dynamic_depends(default="map_index") ), @@ -508,6 +509,7 @@ def get_task_instances( "run_after": DagRun.run_after, "data_interval_start": DagRun.data_interval_start, "data_interval_end": DagRun.data_interval_end, + "rendered_map_index": "map_index", }, ).dynamic_depends(default="map_index") ),