Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
from collections.abc import Callable, Iterable
from typing import Annotated

from pydantic import BeforeValidator, Field, PositiveInt
from pydantic import BeforeValidator, Field

from airflow.api_fastapi.core_api.base import BaseModel, StrictBaseModel

Expand All @@ -34,23 +34,35 @@ def _call_function(function: Callable[[], int]) -> int:
return function()


PoolSlots = Annotated[
int,
Field(ge=-1, description="Number of slots. Use -1 for unlimited."),
]


class BasePool(BaseModel):
"""Base serializer for Pool."""

pool: str = Field(serialization_alias="name")
slots: PositiveInt
slots: PoolSlots
description: str | None = Field(default=None)
include_deferred: bool


def _sanitize_open_slots(value) -> int:
if isinstance(value, float) and value == float("inf"):
return -1
return value


class PoolResponse(BasePool):
"""Pool serializer for responses."""

occupied_slots: Annotated[int, BeforeValidator(_call_function)]
running_slots: Annotated[int, BeforeValidator(_call_function)]
queued_slots: Annotated[int, BeforeValidator(_call_function)]
scheduled_slots: Annotated[int, BeforeValidator(_call_function)]
open_slots: Annotated[int, BeforeValidator(_call_function)]
open_slots: Annotated[int, BeforeValidator(lambda v: _sanitize_open_slots(_call_function(v)))]
deferred_slots: Annotated[int, BeforeValidator(_call_function)]
team_name: str | None

Expand All @@ -66,7 +78,7 @@ class PoolPatchBody(StrictBaseModel):
"""Pool serializer for patch bodies."""

name: str | None = Field(default=None, alias="pool")
slots: PositiveInt | None = None
slots: PoolSlots | None = None
description: str | None = None
include_deferred: bool | None = None
team_name: str | None = Field(max_length=50, default=None)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12041,8 +12041,9 @@ components:
title: Name
slots:
type: integer
exclusiveMinimum: 0.0
minimum: -1.0
title: Slots
description: Number of slots. Use -1 for unlimited.
description:
anyOf:
- type: string
Expand Down Expand Up @@ -12091,7 +12092,8 @@ components:
slots:
anyOf:
- type: integer
exclusiveMinimum: 0.0
minimum: -1.0
description: Number of slots. Use -1 for unlimited.
- type: 'null'
title: Slots
description:
Expand Down Expand Up @@ -12121,8 +12123,9 @@ components:
title: Name
slots:
type: integer
exclusiveMinimum: 0.0
minimum: -1.0
title: Slots
description: Number of slots. Use -1 for unlimited.
description:
anyOf:
- type: string
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4597,8 +4597,9 @@ export const $PoolBody = {
},
slots: {
type: 'integer',
exclusiveMinimum: 0,
title: 'Slots'
minimum: -1,
title: 'Slots',
description: 'Number of slots. Use -1 for unlimited.'
},
description: {
anyOf: [
Expand Down Expand Up @@ -4673,7 +4674,8 @@ export const $PoolPatchBody = {
anyOf: [
{
type: 'integer',
exclusiveMinimum: 0
minimum: -1,
description: 'Number of slots. Use -1 for unlimited.'
},
{
type: 'null'
Expand Down Expand Up @@ -4730,8 +4732,9 @@ export const $PoolResponse = {
},
slots: {
type: 'integer',
exclusiveMinimum: 0,
title: 'Slots'
minimum: -1,
title: 'Slots',
description: 'Number of slots. Use -1 for unlimited.'
},
description: {
anyOf: [
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1193,6 +1193,9 @@ export type PluginResponse = {
*/
export type PoolBody = {
name: string;
/**
* Number of slots. Use -1 for unlimited.
*/
slots: number;
description?: string | null;
include_deferred?: boolean;
Expand Down Expand Up @@ -1223,6 +1226,9 @@ export type PoolPatchBody = {
*/
export type PoolResponse = {
name: string;
/**
* Number of slots. Use -1 for unlimited.
*/
slots: number;
description?: string | null;
include_deferred: boolean;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -292,15 +292,7 @@ class TestPatchPool(TestPoolsEndpoint):
{"slots": -10},
422,
{
"detail": [
{
"ctx": {"gt": 0},
"input": -10,
"loc": ["body", "slots"],
"msg": "Input should be greater than 0",
"type": "greater_than",
},
],
"detail": "Slots must be greater than or equal to -1. Use -1 for unlimited.",
},
),
# Partial body on default_pool
Expand Down Expand Up @@ -380,10 +372,10 @@ def test_should_respond_200(
body = response.json()

if response.status_code == 422:
for error in body["detail"]:
# pydantic version can vary in tests (lower constraints), we do not assert the url.
if "url" in error:
del error["url"]
detail = response.json().get("detail")
assert detail is not None
assert "slots" in str(detail)
return

assert body == expected_response
if response.status_code == 200:
Expand Down Expand Up @@ -498,6 +490,39 @@ def test_should_respond_200(self, test_client, session, body, expected_status_co
assert session.scalar(select(func.count()).select_from(Pool)) == n_pools + 1
check_last_log(session, dag_id=None, event="post_pool", logical_date=None)

def test_post_pool_allows_unlimited_slots(self, test_client, session):
self.create_pools()
n_pools = session.scalar(select(func.count()).select_from(Pool))

response = test_client.post(
"/pools",
json={
"name": "unlimited_pool",
"slots": -1,
"description": "Unlimited pool",
"include_deferred": False,
},
)

assert response.status_code == 201
body = response.json()
assert body["name"] == "unlimited_pool"
assert body["slots"] == -1
assert body["open_slots"] == -1
assert session.scalar(select(func.count()).select_from(Pool)) == n_pools + 1
check_last_log(session, dag_id=None, event="post_pool", logical_date=None)

def test_post_pool_rejects_infinity_string(self, test_client, session):
response = test_client.post(
"/pools",
json={
"name": "bad_pool",
"slots": "infinity",
"include_deferred": False,
},
)
assert response.status_code == 422

def test_should_respond_401(self, unauthenticated_test_client):
response = unauthenticated_test_client.post("/pools", json={})
assert response.status_code == 401
Expand Down
6 changes: 3 additions & 3 deletions airflow-ctl/src/airflowctl/api/datamodels/generated.py
Original file line number Diff line number Diff line change
Expand Up @@ -633,14 +633,14 @@ class PoolBody(BaseModel):
extra="forbid",
)
name: Annotated[str, Field(max_length=256, title="Name")]
slots: Annotated[int, Field(gt=0, title="Slots")]
slots: Annotated[int, Field(description="Number of slots. Use -1 for unlimited.", ge=-1, title="Slots")]
description: Annotated[str | None, Field(title="Description")] = None
include_deferred: Annotated[bool | None, Field(title="Include Deferred")] = False
team_name: Annotated[TeamName | None, Field(title="Team Name")] = None


class Slots(RootModel[int]):
root: Annotated[int, Field(gt=0, title="Slots")]
root: Annotated[int, Field(description="Number of slots. Use -1 for unlimited.", ge=-1, title="Slots")]


class PoolPatchBody(BaseModel):
Expand All @@ -664,7 +664,7 @@ class PoolResponse(BaseModel):
"""

name: Annotated[str, Field(title="Name")]
slots: Annotated[int, Field(gt=0, title="Slots")]
slots: Annotated[int, Field(description="Number of slots. Use -1 for unlimited.", ge=-1, title="Slots")]
description: Annotated[str | None, Field(title="Description")] = None
include_deferred: Annotated[bool, Field(title="Include Deferred")]
occupied_slots: Annotated[int, Field(title="Occupied Slots")]
Expand Down
Loading