Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion airflow-core/src/airflow/jobs/scheduler_job_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -3034,7 +3034,9 @@ def _exceeds_max_active_runs(
session=session,
)
active_non_backfill_runs = runs_dict.get(dag_model.dag_id, 0)
exceeds = active_non_backfill_runs >= dag_model.max_active_runs
exceeds = (
dag_model.max_active_runs is not None and active_non_backfill_runs >= dag_model.max_active_runs
)
return exceeds, active_non_backfill_runs


Expand Down
2 changes: 1 addition & 1 deletion airflow-core/src/airflow/models/trigger.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ class Trigger(Base):
encrypted_kwargs: Mapped[str] = mapped_column("kwargs", Text, nullable=False)
created_date: Mapped[datetime.datetime] = mapped_column(UtcDateTime, nullable=False)
triggerer_id: Mapped[int | None] = mapped_column(Integer, nullable=True)
queue: Mapped[str] = mapped_column(String(256), nullable=True)
queue: Mapped[str | None] = mapped_column(String(256), nullable=True)

triggerer_job = relationship(
"Job",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
try:
from sqlalchemy.engine.url import URL
except ImportError:
URL = None
URL = None # type: ignore[assignment,misc]

from airflow.providers.amazon.aws.hooks.base_aws import AwsBaseHook
from airflow.providers.amazon.aws.utils.connection_wrapper import AwsConnectionWrapper
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
from sqlalchemy import create_engine
from sqlalchemy.engine.url import URL
except ImportError:
URL = create_engine = None
URL = create_engine = None # type: ignore[assignment,misc]

from airflow.exceptions import AirflowOptionalProviderFeatureException
from airflow.providers.amazon.aws.hooks.base_aws import AwsBaseHook
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,11 @@
from sqlalchemy.engine import make_url
from sqlalchemy.exc import ArgumentError, NoSuchModuleError
except ImportError:
create_engine = None
inspect = None
make_url = None
ArgumentError = Exception
NoSuchModuleError = Exception
create_engine = None # type: ignore[assignment]
inspect = None # type: ignore[assignment]
make_url = None # type: ignore[assignment]
ArgumentError = Exception # type: ignore[misc,assignment]
NoSuchModuleError = Exception # type: ignore[misc,assignment]


from airflow.exceptions import AirflowOptionalProviderFeatureException, AirflowProviderDeprecationWarning
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
try:
from sqlalchemy.engine import URL
except ImportError:
URL = None
URL = None # type: ignore[assignment,misc]

from airflow.exceptions import AirflowOptionalProviderFeatureException, AirflowProviderDeprecationWarning
from airflow.providers.common.sql.hooks.handlers import return_single_query_results
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,7 @@ def sample_callable(**kwargs):
task_instance = create_task_instance(
t,
run_id=run_id,
dag_version_id=dagrun.created_dag_version_id,
dag_version_id=uuid.UUID(dagrun.created_dag_version_id),
)
else:
task_instance = TaskInstance(t, run_id=run_id) # type: ignore
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -256,7 +256,7 @@ def test_execute(self, dag_maker: DagMaker, session: Session) -> None:

expected_params_in_trigger_kwargs: dict[str, dict[str, Any]]
# trigger_kwargs are encoded via BaseSerialization in versions < 3.2
expected_ti_id = ti.id
expected_ti_id: str | UUID = ti.id
if AIRFLOW_V_3_2_PLUS:
expected_params_in_trigger_kwargs = expected_params
# trigger_kwargs are encoded via serde from task sdk in versions >= 3.2
Expand Down
10 changes: 9 additions & 1 deletion task-sdk/src/airflow/sdk/serde/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
from fnmatch import fnmatch
from importlib import import_module
from re import Pattern
from typing import TYPE_CHECKING, Any, TypeVar, cast
from typing import TYPE_CHECKING, Any, TypeVar, cast, overload

import attr

Expand Down Expand Up @@ -85,6 +85,14 @@ def decode(d: dict[str, Any]) -> tuple[str, int, Any]:
return classname, version, data


@overload
def serialize(o: dict, depth: int = 0) -> dict: ...
@overload
def serialize(o: None, depth: int = 0) -> None: ...
@overload
def serialize(o: object, depth: int = 0) -> U | None: ...


def serialize(o: object, depth: int = 0) -> U | None:
"""
Serialize an object into a representation consisting only built-in types.
Expand Down