diff --git a/airflow-core/src/airflow/api_fastapi/core_api/datamodels/pools.py b/airflow-core/src/airflow/api_fastapi/core_api/datamodels/pools.py index 077ddbe550552..a661308d4434c 100644 --- a/airflow-core/src/airflow/api_fastapi/core_api/datamodels/pools.py +++ b/airflow-core/src/airflow/api_fastapi/core_api/datamodels/pools.py @@ -20,7 +20,7 @@ from collections.abc import Callable, Iterable from typing import Annotated -from pydantic import BeforeValidator, Field, PositiveInt +from pydantic import BeforeValidator, Field from airflow.api_fastapi.core_api.base import BaseModel, StrictBaseModel @@ -34,15 +34,27 @@ def _call_function(function: Callable[[], int]) -> int: return function() +PoolSlots = Annotated[ + int, + Field(ge=-1, description="Number of slots. Use -1 for unlimited."), +] + + class BasePool(BaseModel): """Base serializer for Pool.""" pool: str = Field(serialization_alias="name") - slots: PositiveInt + slots: PoolSlots description: str | None = Field(default=None) include_deferred: bool +def _sanitize_open_slots(value) -> int: + if isinstance(value, float) and value == float("inf"): + return -1 + return value + + class PoolResponse(BasePool): """Pool serializer for responses.""" @@ -50,7 +62,7 @@ class PoolResponse(BasePool): running_slots: Annotated[int, BeforeValidator(_call_function)] queued_slots: Annotated[int, BeforeValidator(_call_function)] scheduled_slots: Annotated[int, BeforeValidator(_call_function)] - open_slots: Annotated[int, BeforeValidator(_call_function)] + open_slots: Annotated[int, BeforeValidator(lambda v: _sanitize_open_slots(_call_function(v)))] deferred_slots: Annotated[int, BeforeValidator(_call_function)] team_name: str | None @@ -66,7 +78,7 @@ class PoolPatchBody(StrictBaseModel): """Pool serializer for patch bodies.""" name: str | None = Field(default=None, alias="pool") - slots: PositiveInt | None = None + slots: PoolSlots | None = None description: str | None = None include_deferred: bool | None = None team_name: str | None = Field(max_length=50, default=None) diff --git a/airflow-core/src/airflow/api_fastapi/core_api/openapi/v2-rest-api-generated.yaml b/airflow-core/src/airflow/api_fastapi/core_api/openapi/v2-rest-api-generated.yaml index 847409b00769b..11c8007ada61a 100644 --- a/airflow-core/src/airflow/api_fastapi/core_api/openapi/v2-rest-api-generated.yaml +++ b/airflow-core/src/airflow/api_fastapi/core_api/openapi/v2-rest-api-generated.yaml @@ -12041,8 +12041,9 @@ components: title: Name slots: type: integer - exclusiveMinimum: 0.0 + minimum: -1.0 title: Slots + description: Number of slots. Use -1 for unlimited. description: anyOf: - type: string @@ -12091,7 +12092,8 @@ components: slots: anyOf: - type: integer - exclusiveMinimum: 0.0 + minimum: -1.0 + description: Number of slots. Use -1 for unlimited. - type: 'null' title: Slots description: @@ -12121,8 +12123,9 @@ components: title: Name slots: type: integer - exclusiveMinimum: 0.0 + minimum: -1.0 title: Slots + description: Number of slots. Use -1 for unlimited. description: anyOf: - type: string diff --git a/airflow-core/src/airflow/ui/openapi-gen/requests/schemas.gen.ts b/airflow-core/src/airflow/ui/openapi-gen/requests/schemas.gen.ts index 8b0bf124203b2..e0b53fb6d6f02 100644 --- a/airflow-core/src/airflow/ui/openapi-gen/requests/schemas.gen.ts +++ b/airflow-core/src/airflow/ui/openapi-gen/requests/schemas.gen.ts @@ -4597,8 +4597,9 @@ export const $PoolBody = { }, slots: { type: 'integer', - exclusiveMinimum: 0, - title: 'Slots' + minimum: -1, + title: 'Slots', + description: 'Number of slots. Use -1 for unlimited.' }, description: { anyOf: [ @@ -4673,7 +4674,8 @@ export const $PoolPatchBody = { anyOf: [ { type: 'integer', - exclusiveMinimum: 0 + minimum: -1, + description: 'Number of slots. Use -1 for unlimited.' }, { type: 'null' @@ -4730,8 +4732,9 @@ export const $PoolResponse = { }, slots: { type: 'integer', - exclusiveMinimum: 0, - title: 'Slots' + minimum: -1, + title: 'Slots', + description: 'Number of slots. Use -1 for unlimited.' }, description: { anyOf: [ diff --git a/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts b/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts index b98c076f66a05..c409b984d10f8 100644 --- a/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts +++ b/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts @@ -1193,6 +1193,9 @@ export type PluginResponse = { */ export type PoolBody = { name: string; + /** + * Number of slots. Use -1 for unlimited. + */ slots: number; description?: string | null; include_deferred?: boolean; @@ -1223,6 +1226,9 @@ export type PoolPatchBody = { */ export type PoolResponse = { name: string; + /** + * Number of slots. Use -1 for unlimited. + */ slots: number; description?: string | null; include_deferred: boolean; diff --git a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_pools.py b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_pools.py index bb34660f0e02f..e2ef872073530 100644 --- a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_pools.py +++ b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_pools.py @@ -292,15 +292,7 @@ class TestPatchPool(TestPoolsEndpoint): {"slots": -10}, 422, { - "detail": [ - { - "ctx": {"gt": 0}, - "input": -10, - "loc": ["body", "slots"], - "msg": "Input should be greater than 0", - "type": "greater_than", - }, - ], + "detail": "Slots must be greater than or equal to -1. Use -1 for unlimited.", }, ), # Partial body on default_pool @@ -380,10 +372,10 @@ def test_should_respond_200( body = response.json() if response.status_code == 422: - for error in body["detail"]: - # pydantic version can vary in tests (lower constraints), we do not assert the url. - if "url" in error: - del error["url"] + detail = response.json().get("detail") + assert detail is not None + assert "slots" in str(detail) + return assert body == expected_response if response.status_code == 200: @@ -498,6 +490,39 @@ def test_should_respond_200(self, test_client, session, body, expected_status_co assert session.scalar(select(func.count()).select_from(Pool)) == n_pools + 1 check_last_log(session, dag_id=None, event="post_pool", logical_date=None) + def test_post_pool_allows_unlimited_slots(self, test_client, session): + self.create_pools() + n_pools = session.scalar(select(func.count()).select_from(Pool)) + + response = test_client.post( + "/pools", + json={ + "name": "unlimited_pool", + "slots": -1, + "description": "Unlimited pool", + "include_deferred": False, + }, + ) + + assert response.status_code == 201 + body = response.json() + assert body["name"] == "unlimited_pool" + assert body["slots"] == -1 + assert body["open_slots"] == -1 + assert session.scalar(select(func.count()).select_from(Pool)) == n_pools + 1 + check_last_log(session, dag_id=None, event="post_pool", logical_date=None) + + def test_post_pool_rejects_infinity_string(self, test_client, session): + response = test_client.post( + "/pools", + json={ + "name": "bad_pool", + "slots": "infinity", + "include_deferred": False, + }, + ) + assert response.status_code == 422 + def test_should_respond_401(self, unauthenticated_test_client): response = unauthenticated_test_client.post("/pools", json={}) assert response.status_code == 401 diff --git a/airflow-ctl/src/airflowctl/api/datamodels/generated.py b/airflow-ctl/src/airflowctl/api/datamodels/generated.py index fbb0881b90936..921709c0b68a0 100644 --- a/airflow-ctl/src/airflowctl/api/datamodels/generated.py +++ b/airflow-ctl/src/airflowctl/api/datamodels/generated.py @@ -633,14 +633,14 @@ class PoolBody(BaseModel): extra="forbid", ) name: Annotated[str, Field(max_length=256, title="Name")] - slots: Annotated[int, Field(gt=0, title="Slots")] + slots: Annotated[int, Field(description="Number of slots. Use -1 for unlimited.", ge=-1, title="Slots")] description: Annotated[str | None, Field(title="Description")] = None include_deferred: Annotated[bool | None, Field(title="Include Deferred")] = False team_name: Annotated[TeamName | None, Field(title="Team Name")] = None class Slots(RootModel[int]): - root: Annotated[int, Field(gt=0, title="Slots")] + root: Annotated[int, Field(description="Number of slots. Use -1 for unlimited.", ge=-1, title="Slots")] class PoolPatchBody(BaseModel): @@ -664,7 +664,7 @@ class PoolResponse(BaseModel): """ name: Annotated[str, Field(title="Name")] - slots: Annotated[int, Field(gt=0, title="Slots")] + slots: Annotated[int, Field(description="Number of slots. Use -1 for unlimited.", ge=-1, title="Slots")] description: Annotated[str | None, Field(title="Description")] = None include_deferred: Annotated[bool, Field(title="Include Deferred")] occupied_slots: Annotated[int, Field(title="Occupied Slots")]