Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve importing the modules in Airflow dag_processing, datasets and example_dags packages #33808

Merged
merged 3 commits into from
Aug 28, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 6 additions & 3 deletions airflow/dag_processing/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,12 +33,10 @@
from collections import defaultdict
from datetime import datetime, timedelta
from importlib import import_module
from multiprocessing.connection import Connection as MultiprocessingConnection
from pathlib import Path
from typing import Any, Callable, Iterator, NamedTuple, cast
from typing import TYPE_CHECKING, Any, Callable, Iterator, NamedTuple, cast

from setproctitle import setproctitle
from sqlalchemy.orm import Session
from tabulate import tabulate

import airflow.models
Expand Down Expand Up @@ -67,6 +65,11 @@
from airflow.utils.session import NEW_SESSION, provide_session
from airflow.utils.sqlalchemy import prohibit_commit, skip_locked, with_row_locks

if TYPE_CHECKING:
from multiprocessing.connection import Connection as MultiprocessingConnection

from sqlalchemy.orm import Session


class DagParsingStat(NamedTuple):
"""Information on processing progress."""
Expand Down
16 changes: 10 additions & 6 deletions airflow/dag_processing/processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,33 +18,29 @@

import importlib
import logging
import multiprocessing
import os
import signal
import threading
import time
import zipfile
from contextlib import redirect_stderr, redirect_stdout, suppress
from datetime import datetime, timedelta
from multiprocessing.connection import Connection as MultiprocessingConnection
from datetime import timedelta
from typing import TYPE_CHECKING, Iterable, Iterator

from setproctitle import setproctitle
from sqlalchemy import delete, exc, func, or_, select
from sqlalchemy.orm.session import Session

from airflow import settings
from airflow.api_internal.internal_api_call import internal_api_call
from airflow.callbacks.callback_requests import (
CallbackRequest,
DagCallbackRequest,
SlaCallbackRequest,
TaskCallbackRequest,
)
from airflow.configuration import conf
from airflow.exceptions import AirflowException, TaskNotFound
from airflow.models import SlaMiss, errors
from airflow.models.dag import DAG, DagModel
from airflow.models.dag import DagModel
from airflow.models.dagbag import DagBag
from airflow.models.dagrun import DagRun as DR
from airflow.models.dagwarning import DagWarning, DagWarningType
Expand All @@ -59,6 +55,14 @@
from airflow.utils.state import TaskInstanceState

if TYPE_CHECKING:
import multiprocessing
from datetime import datetime
from multiprocessing.connection import Connection as MultiprocessingConnection

from sqlalchemy.orm.session import Session

from airflow.callbacks.callback_requests import CallbackRequest
from airflow.models.dag import DAG
from airflow.models.operator import Operator


Expand Down
5 changes: 3 additions & 2 deletions airflow/datasets/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,16 @@
from typing import TYPE_CHECKING

from sqlalchemy import exc, select
from sqlalchemy.orm.session import Session

from airflow.configuration import conf
from airflow.datasets import Dataset
from airflow.models.dataset import DatasetDagRunQueue, DatasetEvent, DatasetModel
from airflow.stats import Stats
from airflow.utils.log.logging_mixin import LoggingMixin

if TYPE_CHECKING:
from sqlalchemy.orm.session import Session

from airflow.datasets import Dataset
from airflow.models.taskinstance import TaskInstance


Expand Down
6 changes: 4 additions & 2 deletions airflow/example_dags/example_dag_decorator.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,17 @@
# under the License.
from __future__ import annotations

from typing import Any
from typing import TYPE_CHECKING, Any

import httpx
import pendulum

from airflow.decorators import dag, task
from airflow.models.baseoperator import BaseOperator
from airflow.operators.email import EmailOperator
from airflow.utils.context import Context

if TYPE_CHECKING:
from airflow.utils.context import Context


class GetRequestOperator(BaseOperator):
Expand Down
7 changes: 5 additions & 2 deletions airflow/example_dags/example_params_trigger_ui.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,17 @@

import datetime
from pathlib import Path
from typing import TYPE_CHECKING

from airflow import DAG
from airflow.decorators import task
from airflow.models.dagrun import DagRun
from airflow.models.param import Param
from airflow.models.taskinstance import TaskInstance
from airflow.utils.trigger_rule import TriggerRule

if TYPE_CHECKING:
from airflow.models.dagrun import DagRun
from airflow.models.taskinstance import TaskInstance

with DAG(
dag_id=Path(__file__).stem,
description=__doc__.partition(".")[0],
Expand Down
7 changes: 5 additions & 2 deletions airflow/example_dags/example_params_ui_tutorial.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,16 @@
import datetime
import json
from pathlib import Path
from typing import TYPE_CHECKING

from airflow import DAG
from airflow.decorators import task
from airflow.exceptions import AirflowSkipException
from airflow.models.dagrun import DagRun
from airflow.models.param import Param
from airflow.models.taskinstance import TaskInstance

if TYPE_CHECKING:
from airflow.models.dagrun import DagRun
from airflow.models.taskinstance import TaskInstance

with DAG(
dag_id=Path(__file__).stem,
Expand Down
6 changes: 5 additions & 1 deletion airflow/example_dags/example_skip_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,19 @@
"""Example DAG demonstrating the EmptyOperator and a custom EmptySkipOperator which skips by default."""
from __future__ import annotations

from typing import TYPE_CHECKING

import pendulum

from airflow import DAG
from airflow.exceptions import AirflowSkipException
from airflow.models.baseoperator import BaseOperator
from airflow.operators.empty import EmptyOperator
from airflow.utils.context import Context
from airflow.utils.trigger_rule import TriggerRule

if TYPE_CHECKING:
from airflow.utils.context import Context


# Create some placeholder operators
class EmptySkipOperator(BaseOperator):
Expand Down
6 changes: 5 additions & 1 deletion airflow/example_dags/plugins/workday.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,16 @@

import logging
from datetime import timedelta
from typing import TYPE_CHECKING

# [START howto_timetable]
from pendulum import UTC, Date, DateTime, Time

from airflow.plugins_manager import AirflowPlugin
from airflow.timetables.base import DagRunInfo, DataInterval, TimeRestriction, Timetable
from airflow.timetables.base import DagRunInfo, DataInterval, Timetable

if TYPE_CHECKING:
from airflow.timetables.base import TimeRestriction

log = logging.getLogger(__name__)
try:
Expand Down