From 4ad51ae3b9bd900b93c3bda5fd0f6ad4e813eee2 Mon Sep 17 00:00:00 2001 From: Hussein Awala Date: Sun, 27 Aug 2023 21:48:41 +0200 Subject: [PATCH 1/3] Improve importing the modules in Airflow dag_processing, datasets and example_dags packages --- airflow/dag_processing/manager.py | 13 ++++++++----- airflow/dag_processing/processor.py | 18 ++++++++++++------ airflow/datasets/manager.py | 5 +++-- airflow/example_dags/example_dag_decorator.py | 6 ++++-- .../example_dags/example_params_trigger_ui.py | 7 +++++-- .../example_dags/example_params_ui_tutorial.py | 7 +++++-- airflow/example_dags/example_skip_dag.py | 6 +++++- airflow/example_dags/plugins/workday.py | 6 +++++- 8 files changed, 47 insertions(+), 21 deletions(-) diff --git a/airflow/dag_processing/manager.py b/airflow/dag_processing/manager.py index 269ba769cbd67..14f613d8ac75c 100644 --- a/airflow/dag_processing/manager.py +++ b/airflow/dag_processing/manager.py @@ -33,12 +33,10 @@ from collections import defaultdict from datetime import datetime, timedelta from importlib import import_module -from multiprocessing.connection import Connection as MultiprocessingConnection from pathlib import Path -from typing import Any, Callable, Iterator, NamedTuple, cast +from typing import TYPE_CHECKING, Any, Callable, Iterator, NamedTuple, cast from setproctitle import setproctitle -from sqlalchemy.orm import Session from tabulate import tabulate import airflow.models @@ -67,6 +65,11 @@ from airflow.utils.session import NEW_SESSION, provide_session from airflow.utils.sqlalchemy import prohibit_commit, skip_locked, with_row_locks +if TYPE_CHECKING: + from multiprocessing.connection import Connection as MultiprocessingConnection + + from sqlalchemy.orm import Session + class DagParsingStat(NamedTuple): """Information on processing progress.""" @@ -865,7 +868,7 @@ def _log_file_processing_stats(self, known_file_paths): rows.append((file_path, processor_pid, runtime, num_dags, num_errors, last_runtime, last_run)) # Sort by longest last runtime. (Can't sort None values in python3) - rows.sort(key=lambda x: x[3] or 0.0) + rows = sorted(rows, key=lambda x: x[3] or 0.0) formatted_rows = [] for file_path, pid, runtime, num_dags, num_errors, last_runtime, last_run in rows: @@ -1167,7 +1170,7 @@ def prepare_file_path_queue(self): if is_mtime_mode: file_paths = sorted(files_with_mtime, key=files_with_mtime.get, reverse=True) elif list_mode == "alphabetical": - file_paths.sort() + file_paths = sorted(file_paths) elif list_mode == "random_seeded_by_host": # Shuffle the list seeded by hostname so multiple schedulers can work on different # set of files. Since we set the seed, the sort order will remain same per host diff --git a/airflow/dag_processing/processor.py b/airflow/dag_processing/processor.py index ab17bad4dabd5..fec29347534ef 100644 --- a/airflow/dag_processing/processor.py +++ b/airflow/dag_processing/processor.py @@ -18,25 +18,21 @@ import importlib import logging -import multiprocessing import os import signal import threading import time import zipfile from contextlib import redirect_stderr, redirect_stdout, suppress -from datetime import datetime, timedelta -from multiprocessing.connection import Connection as MultiprocessingConnection +from datetime import timedelta from typing import TYPE_CHECKING, Iterable, Iterator from setproctitle import setproctitle from sqlalchemy import delete, exc, func, or_, select -from sqlalchemy.orm.session import Session from airflow import settings from airflow.api_internal.internal_api_call import internal_api_call from airflow.callbacks.callback_requests import ( - CallbackRequest, DagCallbackRequest, SlaCallbackRequest, TaskCallbackRequest, @@ -44,7 +40,7 @@ from airflow.configuration import conf from airflow.exceptions import AirflowException, TaskNotFound from airflow.models import SlaMiss, errors -from airflow.models.dag import DAG, DagModel +from airflow.models.dag import DagModel from airflow.models.dagbag import DagBag from airflow.models.dagrun import DagRun as DR from airflow.models.dagwarning import DagWarning, DagWarningType @@ -59,6 +55,16 @@ from airflow.utils.state import TaskInstanceState if TYPE_CHECKING: + import multiprocessing + from datetime import datetime + from multiprocessing.connection import Connection as MultiprocessingConnection + + from sqlalchemy.orm.session import Session + + from airflow.callbacks.callback_requests import ( + CallbackRequest, + ) + from airflow.models.dag import DAG from airflow.models.operator import Operator diff --git a/airflow/datasets/manager.py b/airflow/datasets/manager.py index c7e062803ab97..8714ba06589c4 100644 --- a/airflow/datasets/manager.py +++ b/airflow/datasets/manager.py @@ -20,15 +20,16 @@ from typing import TYPE_CHECKING from sqlalchemy import exc, select -from sqlalchemy.orm.session import Session from airflow.configuration import conf -from airflow.datasets import Dataset from airflow.models.dataset import DatasetDagRunQueue, DatasetEvent, DatasetModel from airflow.stats import Stats from airflow.utils.log.logging_mixin import LoggingMixin if TYPE_CHECKING: + from sqlalchemy.orm.session import Session + + from airflow.datasets import Dataset from airflow.models.taskinstance import TaskInstance diff --git a/airflow/example_dags/example_dag_decorator.py b/airflow/example_dags/example_dag_decorator.py index e8ee8a72997a2..447b4471b97e4 100644 --- a/airflow/example_dags/example_dag_decorator.py +++ b/airflow/example_dags/example_dag_decorator.py @@ -17,7 +17,7 @@ # under the License. from __future__ import annotations -from typing import Any +from typing import TYPE_CHECKING, Any import httpx import pendulum @@ -25,7 +25,9 @@ from airflow.decorators import dag, task from airflow.models.baseoperator import BaseOperator from airflow.operators.email import EmailOperator -from airflow.utils.context import Context + +if TYPE_CHECKING: + from airflow.utils.context import Context class GetRequestOperator(BaseOperator): diff --git a/airflow/example_dags/example_params_trigger_ui.py b/airflow/example_dags/example_params_trigger_ui.py index 564861f52bec0..1ca0a1980340e 100644 --- a/airflow/example_dags/example_params_trigger_ui.py +++ b/airflow/example_dags/example_params_trigger_ui.py @@ -23,14 +23,17 @@ import datetime from pathlib import Path +from typing import TYPE_CHECKING from airflow import DAG from airflow.decorators import task -from airflow.models.dagrun import DagRun from airflow.models.param import Param -from airflow.models.taskinstance import TaskInstance from airflow.utils.trigger_rule import TriggerRule +if TYPE_CHECKING: + from airflow.models.dagrun import DagRun + from airflow.models.taskinstance import TaskInstance + with DAG( dag_id=Path(__file__).stem, description=__doc__.partition(".")[0], diff --git a/airflow/example_dags/example_params_ui_tutorial.py b/airflow/example_dags/example_params_ui_tutorial.py index 9af4e0ba4af92..a9da876bf2b7c 100644 --- a/airflow/example_dags/example_params_ui_tutorial.py +++ b/airflow/example_dags/example_params_ui_tutorial.py @@ -25,13 +25,16 @@ import datetime import json from pathlib import Path +from typing import TYPE_CHECKING from airflow import DAG from airflow.decorators import task from airflow.exceptions import AirflowSkipException -from airflow.models.dagrun import DagRun from airflow.models.param import Param -from airflow.models.taskinstance import TaskInstance + +if TYPE_CHECKING: + from airflow.models.dagrun import DagRun + from airflow.models.taskinstance import TaskInstance with DAG( dag_id=Path(__file__).stem, diff --git a/airflow/example_dags/example_skip_dag.py b/airflow/example_dags/example_skip_dag.py index 7723d9f9c724c..ced2f6ec134ef 100644 --- a/airflow/example_dags/example_skip_dag.py +++ b/airflow/example_dags/example_skip_dag.py @@ -18,15 +18,19 @@ """Example DAG demonstrating the EmptyOperator and a custom EmptySkipOperator which skips by default.""" from __future__ import annotations +from typing import TYPE_CHECKING + import pendulum from airflow import DAG from airflow.exceptions import AirflowSkipException from airflow.models.baseoperator import BaseOperator from airflow.operators.empty import EmptyOperator -from airflow.utils.context import Context from airflow.utils.trigger_rule import TriggerRule +if TYPE_CHECKING: + from airflow.utils.context import Context + # Create some placeholder operators class EmptySkipOperator(BaseOperator): diff --git a/airflow/example_dags/plugins/workday.py b/airflow/example_dags/plugins/workday.py index 20363a69e7a4b..79473e06ddc6f 100644 --- a/airflow/example_dags/plugins/workday.py +++ b/airflow/example_dags/plugins/workday.py @@ -20,12 +20,16 @@ import logging from datetime import timedelta +from typing import TYPE_CHECKING # [START howto_timetable] from pendulum import UTC, Date, DateTime, Time from airflow.plugins_manager import AirflowPlugin -from airflow.timetables.base import DagRunInfo, DataInterval, TimeRestriction, Timetable +from airflow.timetables.base import DagRunInfo, DataInterval, Timetable + +if TYPE_CHECKING: + from airflow.timetables.base import TimeRestriction log = logging.getLogger(__name__) try: From 17c47077e231e91bb6fcd19f64ad0a353c0b853d Mon Sep 17 00:00:00 2001 From: Tzu-ping Chung Date: Mon, 28 Aug 2023 15:02:56 +0800 Subject: [PATCH 2/3] Code format --- airflow/dag_processing/processor.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/airflow/dag_processing/processor.py b/airflow/dag_processing/processor.py index fec29347534ef..1cb9c74a27431 100644 --- a/airflow/dag_processing/processor.py +++ b/airflow/dag_processing/processor.py @@ -61,9 +61,7 @@ from sqlalchemy.orm.session import Session - from airflow.callbacks.callback_requests import ( - CallbackRequest, - ) + from airflow.callbacks.callback_requests import CallbackRequest from airflow.models.dag import DAG from airflow.models.operator import Operator From 821b41f9ec50d8cff3d1f6d5ee20b4734e4cc8cc Mon Sep 17 00:00:00 2001 From: Hussein Awala Date: Mon, 28 Aug 2023 10:17:19 +0200 Subject: [PATCH 3/3] revert some unrelated changes --- airflow/dag_processing/manager.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/airflow/dag_processing/manager.py b/airflow/dag_processing/manager.py index 14f613d8ac75c..3b26da2c6822e 100644 --- a/airflow/dag_processing/manager.py +++ b/airflow/dag_processing/manager.py @@ -868,7 +868,7 @@ def _log_file_processing_stats(self, known_file_paths): rows.append((file_path, processor_pid, runtime, num_dags, num_errors, last_runtime, last_run)) # Sort by longest last runtime. (Can't sort None values in python3) - rows = sorted(rows, key=lambda x: x[3] or 0.0) + rows.sort(key=lambda x: x[3] or 0.0) formatted_rows = [] for file_path, pid, runtime, num_dags, num_errors, last_runtime, last_run in rows: @@ -1170,7 +1170,7 @@ def prepare_file_path_queue(self): if is_mtime_mode: file_paths = sorted(files_with_mtime, key=files_with_mtime.get, reverse=True) elif list_mode == "alphabetical": - file_paths = sorted(file_paths) + file_paths.sort() elif list_mode == "random_seeded_by_host": # Shuffle the list seeded by hostname so multiple schedulers can work on different # set of files. Since we set the seed, the sort order will remain same per host