Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion airflow-core/src/airflow/api_fastapi/common/cursors.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ def encode_cursor(row: Any, sort_param: SortParam) -> str:
if not resolved:
raise ValueError("SortParam has no resolved columns.")

parts = [getattr(row, attr_name, None) for attr_name, _col, _desc in resolved]
parts = [sort_param.row_value(row, attr_name) for attr_name, _col, _desc in resolved]
payload = msgspec.msgpack.encode(parts)
return base64.urlsafe_b64encode(payload).decode("ascii").rstrip("=")

Expand Down
31 changes: 30 additions & 1 deletion airflow-core/src/airflow/api_fastapi/common/parameters.py
Original file line number Diff line number Diff line change
Expand Up @@ -308,6 +308,9 @@ def _resolve(self) -> list[tuple[str, ColumnElement, bool]]:
resolved: list[tuple[str, ColumnElement, bool]] = []
for order_by_value in order_by_values:
lstriped_orderby = order_by_value.lstrip("-")
# Store the user-facing name in the resolved tuple. ``row_value`` resolves
# it back to the actual row accessor via ``to_replace`` when reading values
# for cursor encoding.
attr_name = lstriped_orderby
column: Column | None = None
if self.to_replace:
Expand All @@ -330,7 +333,8 @@ def _resolve(self) -> list[tuple[str, ColumnElement, bool]]:

primary_key_column = self.get_primary_key_column()
pk_name = self.get_primary_key_string()
if not any(name == pk_name for name, _, _ in resolved):
resolved_column_keys = {getattr(col, "key", None) for _, col, _ in resolved}
if pk_name not in resolved_column_keys:
pk_desc = bool(order_by_values and order_by_values[0].startswith("-"))
resolved.append((pk_name, primary_key_column, pk_desc))

Expand All @@ -352,6 +356,31 @@ def get_resolved_columns(self) -> list[tuple[str, ColumnElement, bool]]:
"""Return resolved sort columns as (attr_name, column_element, is_descending) tuples."""
return self._resolve()

def row_value(self, row: Any, name: str) -> Any:
"""
Extract the sort-key value for ``name`` from a result row.

Resolves the accessor through ``to_replace`` for string aliases
(e.g. ``{"dag_run_id": "run_id"}``); otherwise reads ``name`` directly.
"""
if self.to_replace:
replacement = self.to_replace.get(name)
if isinstance(replacement, str):
return getattr(row, replacement, None)
if replacement is not None:
# TODO: Column-form ``to_replace`` (e.g. ``{"last_run_state": DagRun.state}``)
# isn't supported for cursor pagination — no endpoint that uses cursor
# pagination needs it today. When one does, decide how the row exposes the
# value (projected label on the SELECT, eagerly loaded relationship, etc.)
# and wire it up here. Raising loudly so a future caller doesn't silently
# get ``None`` cursor tokens.
raise NotImplementedError(
f"Cursor pagination does not support column-form ``to_replace`` mapping for "
f"``{name}``. Use a string alias in ``to_replace`` or sort by a primary-model "
f"attribute."
)
return getattr(row, name, None)

def get_primary_key_column(self) -> Column:
"""Get the primary key column of the model of SortParam object."""
return inspect(self.model).primary_key[0]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,10 +100,33 @@ class DAGRunResponse(BaseModel):


class DAGRunCollectionResponse(BaseModel):
"""DAG Run Collection serializer for responses."""
"""
DAG Run collection response supporting both offset and cursor pagination.

A single flat model is used instead of a discriminated union
(``Annotated[Offset | Cursor, Field(discriminator=...)]``) because
the OpenAPI ``oneOf`` + ``discriminator`` construct is not handled
correctly by ``@hey-api/openapi-ts`` / ``@7nohe/openapi-react-query-codegen``:
return types degrade to ``unknown`` in JSDoc and can produce
incorrect TypeScript types (see hey-api/openapi-ts#1613, #3270).
"""

dag_runs: Iterable[DAGRunResponse]
total_entries: int
total_entries: int | None = Field(
default=None,
description="Total number of matching items. Populated for offset pagination, "
"``null`` when using cursor pagination.",
)
next_cursor: str | None = Field(
default=None,
description="Token pointing to the next page. Populated for cursor pagination, "
"``null`` when using offset pagination or when there is no next page.",
)
previous_cursor: str | None = Field(
default=None,
description="Token pointing to the previous page. Populated for cursor pagination, "
"``null`` when using offset pagination or when on the first page.",
)


class TriggerDAGRunPostBody(StrictBaseModel):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2023,7 +2023,25 @@ paths:


This endpoint allows specifying `~` as the dag_id to retrieve Dag Runs for
all DAGs.'
all DAGs.


Supports two pagination modes:


**Offset (default):** use `limit` and `offset` query parameters. Returns `total_entries`.


**Cursor:** pass `cursor` (empty string for the first page, then `next_cursor`
from the response).

When `cursor` is provided, `offset` is ignored and `total_entries` is not
returned.

``next_cursor`` is ``null`` when there are no more pages; ``previous_cursor``
is ``null``

on the first page.'
operationId: get_dag_runs
security:
- OAuth2PasswordBearer: []
Expand All @@ -2035,6 +2053,20 @@ paths:
schema:
type: string
title: Dag Id
- name: cursor
in: query
required: false
schema:
anyOf:
- type: string
- type: 'null'
description: Cursor for keyset-based pagination. Pass an empty string for
the first page, then use ``next_cursor`` from the response. When ``cursor``
is provided, ``offset`` is ignored.
title: Cursor
description: Cursor for keyset-based pagination. Pass an empty string for
the first page, then use ``next_cursor`` from the response. When ``cursor``
is provided, ``offset`` is ignored.
- name: limit
in: query
required: false
Expand Down Expand Up @@ -10752,14 +10784,45 @@ components:
type: array
title: Dag Runs
total_entries:
type: integer
anyOf:
- type: integer
- type: 'null'
title: Total Entries
description: Total number of matching items. Populated for offset pagination,
``null`` when using cursor pagination.
next_cursor:
anyOf:
- type: string
- type: 'null'
title: Next Cursor
description: Token pointing to the next page. Populated for cursor pagination,
``null`` when using offset pagination or when there is no next page.
previous_cursor:
anyOf:
- type: string
- type: 'null'
title: Previous Cursor
description: Token pointing to the previous page. Populated for cursor pagination,
``null`` when using offset pagination or when on the first page.
type: object
required:
- dag_runs
- total_entries
title: DAGRunCollectionResponse
description: DAG Run Collection serializer for responses.
description: 'DAG Run collection response supporting both offset and cursor
pagination.


A single flat model is used instead of a discriminated union

(``Annotated[Offset | Cursor, Field(discriminator=...)]``) because

the OpenAPI ``oneOf`` + ``discriminator`` construct is not handled

correctly by ``@hey-api/openapi-ts`` / ``@7nohe/openapi-react-query-codegen``:

return types degrade to ``unknown`` in JSDoc and can produce

incorrect TypeScript types (see hey-api/openapi-ts#1613, #3270).'
DAGRunPatchBody:
properties:
state:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,14 @@
)
from airflow.api_fastapi.app import get_auth_manager
from airflow.api_fastapi.auth.managers.models.resource_details import DagAccessEntity, DagDetails
from airflow.api_fastapi.common.cursors import (
apply_cursor_filter,
encode_cursor,
make_backward_cursor,
parse_cursor,
)
from airflow.api_fastapi.common.dagbag import DagBagDep, get_dag_for_run, get_latest_version_of_dag
from airflow.api_fastapi.common.db.common import SessionDep, paginated_select
from airflow.api_fastapi.common.db.common import SessionDep, apply_filters_to_select, paginated_select
from airflow.api_fastapi.common.db.dag_runs import (
attach_dag_versions_to_runs,
eager_load_dag_run_for_list,
Expand Down Expand Up @@ -64,6 +70,7 @@
)
from airflow.api_fastapi.common.router import AirflowRouter
from airflow.api_fastapi.common.types import Mimetype
from airflow.api_fastapi.core_api.base import OrmClause
from airflow.api_fastapi.core_api.datamodels.assets import AssetEventCollectionResponse
from airflow.api_fastapi.core_api.datamodels.dag_run import (
DAGRunClearBody,
Expand Down Expand Up @@ -400,12 +407,28 @@ def get_dag_runs(
dag_id_pattern: Annotated[_SearchParam, Depends(search_param_factory(DagRun.dag_id, "dag_id_pattern"))],
partition_key_pattern: QueryDagRunPartitionKeySearch,
consuming_asset_pattern: QueryConsumingAssetPatternSearch,
cursor: str | None = Query(
None,
description="Cursor for keyset-based pagination. "
"Pass an empty string for the first page, then use ``next_cursor`` from the response. "
"When ``cursor`` is provided, ``offset`` is ignored.",
),
) -> DAGRunCollectionResponse:
"""
Get all DAG Runs.

This endpoint allows specifying `~` as the dag_id to retrieve Dag Runs for all DAGs.

Supports two pagination modes:

**Offset (default):** use `limit` and `offset` query parameters. Returns `total_entries`.

**Cursor:** pass `cursor` (empty string for the first page, then `next_cursor` from the response).
When `cursor` is provided, `offset` is ignored and `total_entries` is not returned.
``next_cursor`` is ``null`` when there are no more pages; ``previous_cursor`` is ``null``
on the first page.
"""
use_cursor = cursor is not None
query = select(DagRun).options(*eager_load_dag_run_for_list())

if dag_id != "~":
Expand All @@ -416,27 +439,66 @@ def get_dag_runs(
if dag_version.value:
query = query.join(DagVersion, DagRun.created_dag_version_id == DagVersion.id)

filters: list[OrmClause] = [
run_after,
logical_date,
start_date_range,
end_date_range,
update_at_range,
duration_range,
conf_contains,
state,
run_type,
dag_version,
bundle_version,
readable_dag_runs_filter,
run_id_pattern,
triggering_user_name_pattern,
dag_id_pattern,
partition_key_pattern,
consuming_asset_pattern,
]

if use_cursor:
# Fetch one extra row so we can detect whether a next page exists.
page_limit = cast(
"int", limit.value
) # LimitFilter value is guaranteed to be set to the default value of QueryLimit
cursor_limit = LimitFilter().set_value(page_limit + 1)
dag_run_select = apply_filters_to_select(statement=query, filters=[*filters, order_by, cursor_limit])

is_backward = False
if cursor:
token, is_backward = parse_cursor(cursor)
if is_backward:
dag_run_select = order_by.to_orm(dag_run_select, reversed=True)
dag_run_select = apply_cursor_filter(dag_run_select, token, order_by, is_backward=is_backward)

fetched = list(session.scalars(dag_run_select).unique())
has_more = len(fetched) > page_limit
dag_runs = fetched[:page_limit]

if is_backward:
dag_runs.reverse()
has_prev = has_more
has_next = True
else:
has_prev = bool(cursor)
has_next = has_more

attach_dag_versions_to_runs(dag_runs, session=session)

return DAGRunCollectionResponse(
dag_runs=dag_runs,
next_cursor=(encode_cursor(dag_runs[-1], order_by) if has_next and dag_runs else None),
previous_cursor=(
make_backward_cursor(encode_cursor(dag_runs[0], order_by)) if has_prev and dag_runs else None
),
)

dag_run_select, total_entries = paginated_select(
statement=query,
filters=[
run_after,
logical_date,
start_date_range,
end_date_range,
update_at_range,
duration_range,
conf_contains,
state,
run_type,
dag_version,
bundle_version,
readable_dag_runs_filter,
run_id_pattern,
triggering_user_name_pattern,
dag_id_pattern,
partition_key_pattern,
consuming_asset_pattern,
],
filters=filters,
order_by=order_by,
offset=offset,
limit=limit,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -551,7 +551,7 @@ def get_task_instances(
# Fetch one extra row so we can detect whether a next page exists.
page_limit = cast(
"int", limit.value
) # LimitFilter value is guaranteed to be set of the default value of QueryLimit
) # LimitFilter value is guaranteed to be set to the default value of QueryLimit
cursor_limit = LimitFilter().set_value(page_limit + 1)
task_instance_select = apply_filters_to_select(
statement=query, filters=[*filters, order_by, cursor_limit]
Expand Down
5 changes: 3 additions & 2 deletions airflow-core/src/airflow/ui/openapi-gen/queries/common.ts
Original file line number Diff line number Diff line change
Expand Up @@ -143,10 +143,11 @@ export const UseDagRunServiceGetUpstreamAssetEventsKeyFn = ({ dagId, dagRunId }:
export type DagRunServiceGetDagRunsDefaultResponse = Awaited<ReturnType<typeof DagRunService.getDagRuns>>;
export type DagRunServiceGetDagRunsQueryResult<TData = DagRunServiceGetDagRunsDefaultResponse, TError = unknown> = UseQueryResult<TData, TError>;
export const useDagRunServiceGetDagRunsKey = "DagRunServiceGetDagRuns";
export const UseDagRunServiceGetDagRunsKeyFn = ({ bundleVersion, confContains, consumingAssetPattern, dagId, dagIdPattern, dagVersion, durationGt, durationGte, durationLt, durationLte, endDateGt, endDateGte, endDateLt, endDateLte, limit, logicalDateGt, logicalDateGte, logicalDateLt, logicalDateLte, offset, orderBy, partitionKeyPattern, runAfterGt, runAfterGte, runAfterLt, runAfterLte, runIdPattern, runType, startDateGt, startDateGte, startDateLt, startDateLte, state, triggeringUserNamePattern, updatedAtGt, updatedAtGte, updatedAtLt, updatedAtLte }: {
export const UseDagRunServiceGetDagRunsKeyFn = ({ bundleVersion, confContains, consumingAssetPattern, cursor, dagId, dagIdPattern, dagVersion, durationGt, durationGte, durationLt, durationLte, endDateGt, endDateGte, endDateLt, endDateLte, limit, logicalDateGt, logicalDateGte, logicalDateLt, logicalDateLte, offset, orderBy, partitionKeyPattern, runAfterGt, runAfterGte, runAfterLt, runAfterLte, runIdPattern, runType, startDateGt, startDateGte, startDateLt, startDateLte, state, triggeringUserNamePattern, updatedAtGt, updatedAtGte, updatedAtLt, updatedAtLte }: {
bundleVersion?: string;
confContains?: string;
consumingAssetPattern?: string;
cursor?: string;
dagId: string;
dagIdPattern?: string;
dagVersion?: number[];
Expand Down Expand Up @@ -182,7 +183,7 @@ export const UseDagRunServiceGetDagRunsKeyFn = ({ bundleVersion, confContains, c
updatedAtGte?: string;
updatedAtLt?: string;
updatedAtLte?: string;
}, queryKey?: Array<unknown>) => [useDagRunServiceGetDagRunsKey, ...(queryKey ?? [{ bundleVersion, confContains, consumingAssetPattern, dagId, dagIdPattern, dagVersion, durationGt, durationGte, durationLt, durationLte, endDateGt, endDateGte, endDateLt, endDateLte, limit, logicalDateGt, logicalDateGte, logicalDateLt, logicalDateLte, offset, orderBy, partitionKeyPattern, runAfterGt, runAfterGte, runAfterLt, runAfterLte, runIdPattern, runType, startDateGt, startDateGte, startDateLt, startDateLte, state, triggeringUserNamePattern, updatedAtGt, updatedAtGte, updatedAtLt, updatedAtLte }])];
}, queryKey?: Array<unknown>) => [useDagRunServiceGetDagRunsKey, ...(queryKey ?? [{ bundleVersion, confContains, consumingAssetPattern, cursor, dagId, dagIdPattern, dagVersion, durationGt, durationGte, durationLt, durationLte, endDateGt, endDateGte, endDateLt, endDateLte, limit, logicalDateGt, logicalDateGte, logicalDateLt, logicalDateLte, offset, orderBy, partitionKeyPattern, runAfterGt, runAfterGte, runAfterLt, runAfterLte, runIdPattern, runType, startDateGt, startDateGte, startDateLt, startDateLte, state, triggeringUserNamePattern, updatedAtGt, updatedAtGte, updatedAtLt, updatedAtLte }])];
export type DagRunServiceWaitDagRunUntilFinishedDefaultResponse = Awaited<ReturnType<typeof DagRunService.waitDagRunUntilFinished>>;
export type DagRunServiceWaitDagRunUntilFinishedQueryResult<TData = DagRunServiceWaitDagRunUntilFinishedDefaultResponse, TError = unknown> = UseQueryResult<TData, TError>;
export const useDagRunServiceWaitDagRunUntilFinishedKey = "DagRunServiceWaitDagRunUntilFinished";
Expand Down
Loading
Loading