diff --git a/airflow/dag_processing/manager.py b/airflow/dag_processing/manager.py index 269ba769cbd6..3b26da2c6822 100644 --- a/airflow/dag_processing/manager.py +++ b/airflow/dag_processing/manager.py @@ -33,12 +33,10 @@ from collections import defaultdict from datetime import datetime, timedelta from importlib import import_module -from multiprocessing.connection import Connection as MultiprocessingConnection from pathlib import Path -from typing import Any, Callable, Iterator, NamedTuple, cast +from typing import TYPE_CHECKING, Any, Callable, Iterator, NamedTuple, cast from setproctitle import setproctitle -from sqlalchemy.orm import Session from tabulate import tabulate import airflow.models @@ -67,6 +65,11 @@ from airflow.utils.session import NEW_SESSION, provide_session from airflow.utils.sqlalchemy import prohibit_commit, skip_locked, with_row_locks +if TYPE_CHECKING: + from multiprocessing.connection import Connection as MultiprocessingConnection + + from sqlalchemy.orm import Session + class DagParsingStat(NamedTuple): """Information on processing progress.""" diff --git a/airflow/dag_processing/processor.py b/airflow/dag_processing/processor.py index ab17bad4dabd..1cb9c74a2743 100644 --- a/airflow/dag_processing/processor.py +++ b/airflow/dag_processing/processor.py @@ -18,25 +18,21 @@ import importlib import logging -import multiprocessing import os import signal import threading import time import zipfile from contextlib import redirect_stderr, redirect_stdout, suppress -from datetime import datetime, timedelta -from multiprocessing.connection import Connection as MultiprocessingConnection +from datetime import timedelta from typing import TYPE_CHECKING, Iterable, Iterator from setproctitle import setproctitle from sqlalchemy import delete, exc, func, or_, select -from sqlalchemy.orm.session import Session from airflow import settings from airflow.api_internal.internal_api_call import internal_api_call from airflow.callbacks.callback_requests import ( - CallbackRequest, DagCallbackRequest, SlaCallbackRequest, TaskCallbackRequest, @@ -44,7 +40,7 @@ from airflow.configuration import conf from airflow.exceptions import AirflowException, TaskNotFound from airflow.models import SlaMiss, errors -from airflow.models.dag import DAG, DagModel +from airflow.models.dag import DagModel from airflow.models.dagbag import DagBag from airflow.models.dagrun import DagRun as DR from airflow.models.dagwarning import DagWarning, DagWarningType @@ -59,6 +55,14 @@ from airflow.utils.state import TaskInstanceState if TYPE_CHECKING: + import multiprocessing + from datetime import datetime + from multiprocessing.connection import Connection as MultiprocessingConnection + + from sqlalchemy.orm.session import Session + + from airflow.callbacks.callback_requests import CallbackRequest + from airflow.models.dag import DAG from airflow.models.operator import Operator diff --git a/airflow/datasets/manager.py b/airflow/datasets/manager.py index c7e062803ab9..8714ba06589c 100644 --- a/airflow/datasets/manager.py +++ b/airflow/datasets/manager.py @@ -20,15 +20,16 @@ from typing import TYPE_CHECKING from sqlalchemy import exc, select -from sqlalchemy.orm.session import Session from airflow.configuration import conf -from airflow.datasets import Dataset from airflow.models.dataset import DatasetDagRunQueue, DatasetEvent, DatasetModel from airflow.stats import Stats from airflow.utils.log.logging_mixin import LoggingMixin if TYPE_CHECKING: + from sqlalchemy.orm.session import Session + + from airflow.datasets import Dataset from airflow.models.taskinstance import TaskInstance diff --git a/airflow/example_dags/example_dag_decorator.py b/airflow/example_dags/example_dag_decorator.py index e8ee8a72997a..447b4471b97e 100644 --- a/airflow/example_dags/example_dag_decorator.py +++ b/airflow/example_dags/example_dag_decorator.py @@ -17,7 +17,7 @@ # under the License. from __future__ import annotations -from typing import Any +from typing import TYPE_CHECKING, Any import httpx import pendulum @@ -25,7 +25,9 @@ from airflow.decorators import dag, task from airflow.models.baseoperator import BaseOperator from airflow.operators.email import EmailOperator -from airflow.utils.context import Context + +if TYPE_CHECKING: + from airflow.utils.context import Context class GetRequestOperator(BaseOperator): diff --git a/airflow/example_dags/example_params_trigger_ui.py b/airflow/example_dags/example_params_trigger_ui.py index 564861f52bec..1ca0a1980340 100644 --- a/airflow/example_dags/example_params_trigger_ui.py +++ b/airflow/example_dags/example_params_trigger_ui.py @@ -23,14 +23,17 @@ import datetime from pathlib import Path +from typing import TYPE_CHECKING from airflow import DAG from airflow.decorators import task -from airflow.models.dagrun import DagRun from airflow.models.param import Param -from airflow.models.taskinstance import TaskInstance from airflow.utils.trigger_rule import TriggerRule +if TYPE_CHECKING: + from airflow.models.dagrun import DagRun + from airflow.models.taskinstance import TaskInstance + with DAG( dag_id=Path(__file__).stem, description=__doc__.partition(".")[0], diff --git a/airflow/example_dags/example_params_ui_tutorial.py b/airflow/example_dags/example_params_ui_tutorial.py index 9af4e0ba4af9..a9da876bf2b7 100644 --- a/airflow/example_dags/example_params_ui_tutorial.py +++ b/airflow/example_dags/example_params_ui_tutorial.py @@ -25,13 +25,16 @@ import datetime import json from pathlib import Path +from typing import TYPE_CHECKING from airflow import DAG from airflow.decorators import task from airflow.exceptions import AirflowSkipException -from airflow.models.dagrun import DagRun from airflow.models.param import Param -from airflow.models.taskinstance import TaskInstance + +if TYPE_CHECKING: + from airflow.models.dagrun import DagRun + from airflow.models.taskinstance import TaskInstance with DAG( dag_id=Path(__file__).stem, diff --git a/airflow/example_dags/example_skip_dag.py b/airflow/example_dags/example_skip_dag.py index 7723d9f9c724..ced2f6ec134e 100644 --- a/airflow/example_dags/example_skip_dag.py +++ b/airflow/example_dags/example_skip_dag.py @@ -18,15 +18,19 @@ """Example DAG demonstrating the EmptyOperator and a custom EmptySkipOperator which skips by default.""" from __future__ import annotations +from typing import TYPE_CHECKING + import pendulum from airflow import DAG from airflow.exceptions import AirflowSkipException from airflow.models.baseoperator import BaseOperator from airflow.operators.empty import EmptyOperator -from airflow.utils.context import Context from airflow.utils.trigger_rule import TriggerRule +if TYPE_CHECKING: + from airflow.utils.context import Context + # Create some placeholder operators class EmptySkipOperator(BaseOperator): diff --git a/airflow/example_dags/plugins/workday.py b/airflow/example_dags/plugins/workday.py index 20363a69e7a4..79473e06ddc6 100644 --- a/airflow/example_dags/plugins/workday.py +++ b/airflow/example_dags/plugins/workday.py @@ -20,12 +20,16 @@ import logging from datetime import timedelta +from typing import TYPE_CHECKING # [START howto_timetable] from pendulum import UTC, Date, DateTime, Time from airflow.plugins_manager import AirflowPlugin -from airflow.timetables.base import DagRunInfo, DataInterval, TimeRestriction, Timetable +from airflow.timetables.base import DagRunInfo, DataInterval, Timetable + +if TYPE_CHECKING: + from airflow.timetables.base import TimeRestriction log = logging.getLogger(__name__) try: