From d0eb72c876421ff4216987656458350cd3c72a9f Mon Sep 17 00:00:00 2001 From: Shahar Epstein <60007259+shahar1@users.noreply.github.com> Date: Fri, 29 May 2026 11:35:07 +0300 Subject: [PATCH 1/2] Remove dead code from airflow-core and task-sdk unit tests MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Deleted fixtures, helpers, and an unused test that were identified as never executed using pytest-cov coverage on the unit test suite (confirmed with grep cross-referencing): - `permitted_dag_model` fixture (test_import_error.py) — never used as a param - `one_task_with_single_mapped_ti` fixture (test_task_instances.py) — never used - `stream_capture` / `combined_capture` fixtures + their imports (cli/conftest.py) — never used - `mock_metadata_distribution` fixture + `import contextlib` (test_plugins_manager.py) — never used - `capture_show_output` helper + `from rich.console import Console` (test_info_command.py) — never called - `move_back` / `_set_state_and_try_num` helpers + unused imports (test_task_command.py) — never called - `create_mock_dag` generator + `import itertools` (assets/test_manager.py) — never called - `test_taskgroup_getitem_returns_child_by_label` test (task-sdk) — superseded --- .../routes/public/test_import_error.py | 15 ------ .../routes/public/test_task_instances.py | 14 ----- .../tests/unit/assets/test_manager.py | 10 +--- .../unit/cli/commands/test_info_command.py | 8 --- .../unit/cli/commands/test_task_command.py | 18 +------ airflow-core/tests/unit/cli/conftest.py | 20 ------- .../unit/plugins/test_plugins_manager.py | 16 ------ .../prek/known_provide_session_positional.txt | 2 +- .../task_sdk/definitions/test_taskgroup.py | 52 ------------------- 9 files changed, 4 insertions(+), 151 deletions(-) diff --git a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_import_error.py b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_import_error.py index 79121e1b86aa5..886b7266021d5 100644 --- a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_import_error.py +++ b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_import_error.py @@ -52,21 +52,6 @@ BUNDLE_NAME = "testing" -@pytest.fixture -@provide_session -def permitted_dag_model(testing_dag_bundle, session: Session = NEW_SESSION) -> DagModel: - dag_model = DagModel( - fileloc=FILENAME1, - relative_fileloc=FILENAME1, - dag_id="dag_id1", - is_paused=False, - bundle_name=BUNDLE_NAME, - ) - session.add(dag_model) - session.commit() - return dag_model - - @pytest.fixture @provide_session def permitted_dag_model_all(testing_dag_bundle, session: Session = NEW_SESSION) -> set[str]: diff --git a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_task_instances.py b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_task_instances.py index 1e874f925b30d..10b91820f9213 100644 --- a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_task_instances.py +++ b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_task_instances.py @@ -757,20 +757,6 @@ def one_task_with_mapped_tis(self, dag_maker, session): }, ) - @pytest.fixture - def one_task_with_single_mapped_ti(self, dag_maker, session): - self.create_dag_runs_with_mapped_tasks( - dag_maker, - session, - dags={ - "mapped_tis": { - "success": 1, - "failed": 0, - "running": 0, - }, - }, - ) - @pytest.fixture def one_task_with_many_mapped_tis(self, dag_maker, session): self.create_dag_runs_with_mapped_tasks( diff --git a/airflow-core/tests/unit/assets/test_manager.py b/airflow-core/tests/unit/assets/test_manager.py index 0585fac803a8f..8f9d290a2b0b2 100644 --- a/airflow-core/tests/unit/assets/test_manager.py +++ b/airflow-core/tests/unit/assets/test_manager.py @@ -18,7 +18,6 @@ from __future__ import annotations import concurrent.futures -import itertools import logging from collections import Counter from typing import TYPE_CHECKING @@ -39,7 +38,7 @@ DagScheduleAssetAliasReference, DagScheduleAssetReference, ) -from airflow.models.dag import DAG, DagModel +from airflow.models.dag import DagModel from airflow.sdk.definitions.asset import Asset from tests_common.test_utils.config import conf_vars @@ -66,13 +65,6 @@ def mock_task_instance(): return None -def create_mock_dag(): - for dag_id in itertools.count(1): - mock_dag = mock.Mock(spec=DAG) - mock_dag.dag_id = dag_id - yield mock_dag - - class TestAssetManager: def test_register_asset_change_asset_doesnt_exist(self, mock_task_instance): mock_task_instance = mock.Mock() diff --git a/airflow-core/tests/unit/cli/commands/test_info_command.py b/airflow-core/tests/unit/cli/commands/test_info_command.py index 78b5e2b948f97..20a05dc328f4e 100644 --- a/airflow-core/tests/unit/cli/commands/test_info_command.py +++ b/airflow-core/tests/unit/cli/commands/test_info_command.py @@ -23,7 +23,6 @@ import httpx import pytest -from rich.console import Console from airflow.cli import cli_parser from airflow.cli.commands import info_command @@ -34,13 +33,6 @@ from tests_common.test_utils.config import conf_vars -def capture_show_output(instance): - console = Console() - with console.capture() as capture: - instance.info(console) - return capture.get() - - class TestPiiAnonymizer: def setup_method(self) -> None: self.instance = info_command.PiiAnonymizer() diff --git a/airflow-core/tests/unit/cli/commands/test_task_command.py b/airflow-core/tests/unit/cli/commands/test_task_command.py index f39a8409bc243..b9747b9412e67 100644 --- a/airflow-core/tests/unit/cli/commands/test_task_command.py +++ b/airflow-core/tests/unit/cli/commands/test_task_command.py @@ -22,9 +22,8 @@ import json import logging import os -import shutil from argparse import ArgumentParser -from contextlib import contextmanager, redirect_stdout +from contextlib import redirect_stdout from pathlib import Path from typing import TYPE_CHECKING from unittest import mock @@ -45,7 +44,7 @@ from airflow.providers.standard.operators.bash import BashOperator from airflow.serialization.serialized_objects import DagSerialization, LazyDeserializedDAG from airflow.utils.session import create_session -from airflow.utils.state import State, TaskInstanceState +from airflow.utils.state import State from airflow.utils.types import DagRunTriggeredByType, DagRunType from tests_common.test_utils.config import conf_vars @@ -70,13 +69,6 @@ def reset(dag_id): session.execute(delete(SerializedDagModel).where(SerializedDagModel.dag_id == dag_id)) -@contextmanager -def move_back(old_path, new_path): - shutil.move(old_path, new_path) - yield - shutil.move(new_path, old_path) - - class TestCliTasks: run_id = "TEST_RUN_ID" dag_id = "example_python_operator" @@ -542,12 +534,6 @@ def test_task_render_for_multi_line_properties(self, dag_maker): assert "# property: bash_command" in output.split("\n") -def _set_state_and_try_num(ti, session): - ti.state = TaskInstanceState.QUEUED - ti.try_number += 1 - session.commit() - - class TestLogsfromTaskRunCommand: def setup_method(self) -> None: self.dag_id = "test_logging_dag" diff --git a/airflow-core/tests/unit/cli/conftest.py b/airflow-core/tests/unit/cli/conftest.py index 2967e48cd6c1b..cc3052c27cebb 100644 --- a/airflow-core/tests/unit/cli/conftest.py +++ b/airflow-core/tests/unit/cli/conftest.py @@ -28,10 +28,8 @@ from tests_common.test_utils.config import conf_vars from tests_common.test_utils.stream_capture_manager import ( - CombinedCaptureManager, StderrCaptureManager, StdoutCaptureManager, - StreamCaptureManager, ) # Create custom executors here because conftest is imported first @@ -82,21 +80,3 @@ def stderr_capture(request): """Fixture that captures stderr only.""" request.getfixturevalue("caplog") return StderrCaptureManager() - - -@pytest.fixture -def stream_capture(request): - """Fixture that returns a configurable stream capture manager.""" - - def _capture(stdout=True, stderr=False): - request.getfixturevalue("caplog") - return StreamCaptureManager(capture_stdout=stdout, capture_stderr=stderr) - - return _capture - - -@pytest.fixture -def combined_capture(request): - """Fixture that captures both stdout and stderr.""" - request.getfixturevalue("caplog") - return CombinedCaptureManager() diff --git a/airflow-core/tests/unit/plugins/test_plugins_manager.py b/airflow-core/tests/unit/plugins/test_plugins_manager.py index 1163ac6338617..9e0501be6e00e 100644 --- a/airflow-core/tests/unit/plugins/test_plugins_manager.py +++ b/airflow-core/tests/unit/plugins/test_plugins_manager.py @@ -17,7 +17,6 @@ # under the License. from __future__ import annotations -import contextlib import importlib import inspect import logging @@ -56,21 +55,6 @@ def _clean_listeners(): get_listener_manager().clear() -@pytest.fixture -def mock_metadata_distribution(mocker): - @contextlib.contextmanager - def wrapper(*args, **kwargs): - if sys.version_info < (3, 12): - patch_fq = "importlib_metadata.distributions" - else: - patch_fq = "importlib.metadata.distributions" - - with mock.patch(patch_fq, *args, **kwargs) as m: - yield m - - return wrapper - - class TestPluginsManager: @pytest.fixture(autouse=True) def clean_plugins(self): diff --git a/scripts/ci/prek/known_provide_session_positional.txt b/scripts/ci/prek/known_provide_session_positional.txt index d0c84e2f6b48f..f8f0a20522bb4 100644 --- a/scripts/ci/prek/known_provide_session_positional.txt +++ b/scripts/ci/prek/known_provide_session_positional.txt @@ -54,7 +54,7 @@ airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_dag_run.py::1 airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_dag_tags.py::1 airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_dag_warning.py::1 airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_event_logs.py::1 -airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_import_error.py::8 +airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_import_error.py::7 airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_job.py::1 airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_monitor.py::2 airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_pools.py::2 diff --git a/task-sdk/tests/task_sdk/definitions/test_taskgroup.py b/task-sdk/tests/task_sdk/definitions/test_taskgroup.py index cf6d309f305eb..2967ea6846a9f 100644 --- a/task-sdk/tests/task_sdk/definitions/test_taskgroup.py +++ b/task-sdk/tests/task_sdk/definitions/test_taskgroup.py @@ -246,32 +246,6 @@ def test_build_task_group_with_prefix(): assert group4.get_child_by_label("task4") == task4 -@pytest.mark.parametrize( - "prefix", - [ - pytest.param(True, id="prefix_on"), - pytest.param(False, id="prefix_off"), - ], -) -def test_taskgroup_getitem_returns_child_by_label(prefix: bool): - """Tests that TaskGroup[label] returns the correct child task or subgroup.""" - logical_date = pendulum.datetime(2020, 1, 1) - with DAG("test_getitem", start_date=logical_date): - with TaskGroup("group1", prefix_group_id=prefix) as group1: - task1 = EmptyOperator(task_id="task1") - with TaskGroup("subgroup", prefix_group_id=prefix) as subgroup: - task2 = EmptyOperator(task_id="task2") - - assert group1["task1"] == task1 - assert group1["subgroup"] == subgroup - assert group1["subgroup"]["task2"] == task2 - - from airflow.sdk.exceptions import NodeNotFound - - with pytest.raises(NodeNotFound): - group1["nonexistent"] - - def test_build_task_group_with_prefix_functionality(): """ Tests TaskGroup prefix_group_id functionality - additional test for comprehensive coverage. @@ -956,32 +930,6 @@ def section_a(value): assert dag.task_dict["section_1.task_3"].downstream_task_ids == {"task_end"} -class TestTaskGroupGetItem: - def test_getitem_missing_raises_node_not_found(self): - import pendulum - - from airflow.sdk.exceptions import NodeNotFound - - start = pendulum.datetime(2016, 1, 1) - with DAG("test_dag", schedule=None, start_date=start): - with TaskGroup(group_id="section") as tg: - pass - - with pytest.raises(NodeNotFound): - tg["nonexistent"] - - def test_getitem_missing_is_key_error(self): - import pendulum - - start = pendulum.datetime(2016, 1, 1) - with DAG("test_dag", schedule=None, start_date=start): - with TaskGroup(group_id="section") as tg: - pass - - with pytest.raises(KeyError): - tg["nonexistent"] - - # --- topological_sort: cross-shape correctness --- # # Mirrors the shapes covered by the benchmark gist referenced from PR #67288 From acfd59872a66dcc1acc70cafbf7bbf51ebd3c300 Mon Sep 17 00:00:00 2001 From: Shahar Epstein <60007259+shahar1@users.noreply.github.com> Date: Fri, 29 May 2026 12:03:08 +0300 Subject: [PATCH 2/2] =?UTF-8?q?Restore=20test=5Ftaskgroup.py=20=E2=80=94?= =?UTF-8?q?=20not=20dead=20code,=20belongs=20to=20separate=20PR?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../task_sdk/definitions/test_taskgroup.py | 52 +++++++++++++++++++ 1 file changed, 52 insertions(+) diff --git a/task-sdk/tests/task_sdk/definitions/test_taskgroup.py b/task-sdk/tests/task_sdk/definitions/test_taskgroup.py index 2967ea6846a9f..cf6d309f305eb 100644 --- a/task-sdk/tests/task_sdk/definitions/test_taskgroup.py +++ b/task-sdk/tests/task_sdk/definitions/test_taskgroup.py @@ -246,6 +246,32 @@ def test_build_task_group_with_prefix(): assert group4.get_child_by_label("task4") == task4 +@pytest.mark.parametrize( + "prefix", + [ + pytest.param(True, id="prefix_on"), + pytest.param(False, id="prefix_off"), + ], +) +def test_taskgroup_getitem_returns_child_by_label(prefix: bool): + """Tests that TaskGroup[label] returns the correct child task or subgroup.""" + logical_date = pendulum.datetime(2020, 1, 1) + with DAG("test_getitem", start_date=logical_date): + with TaskGroup("group1", prefix_group_id=prefix) as group1: + task1 = EmptyOperator(task_id="task1") + with TaskGroup("subgroup", prefix_group_id=prefix) as subgroup: + task2 = EmptyOperator(task_id="task2") + + assert group1["task1"] == task1 + assert group1["subgroup"] == subgroup + assert group1["subgroup"]["task2"] == task2 + + from airflow.sdk.exceptions import NodeNotFound + + with pytest.raises(NodeNotFound): + group1["nonexistent"] + + def test_build_task_group_with_prefix_functionality(): """ Tests TaskGroup prefix_group_id functionality - additional test for comprehensive coverage. @@ -930,6 +956,32 @@ def section_a(value): assert dag.task_dict["section_1.task_3"].downstream_task_ids == {"task_end"} +class TestTaskGroupGetItem: + def test_getitem_missing_raises_node_not_found(self): + import pendulum + + from airflow.sdk.exceptions import NodeNotFound + + start = pendulum.datetime(2016, 1, 1) + with DAG("test_dag", schedule=None, start_date=start): + with TaskGroup(group_id="section") as tg: + pass + + with pytest.raises(NodeNotFound): + tg["nonexistent"] + + def test_getitem_missing_is_key_error(self): + import pendulum + + start = pendulum.datetime(2016, 1, 1) + with DAG("test_dag", schedule=None, start_date=start): + with TaskGroup(group_id="section") as tg: + pass + + with pytest.raises(KeyError): + tg["nonexistent"] + + # --- topological_sort: cross-shape correctness --- # # Mirrors the shapes covered by the benchmark gist referenced from PR #67288