Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve importing the module in Airflow operators package #33800

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
6 changes: 4 additions & 2 deletions airflow/operators/bash.py
Expand Up @@ -21,14 +21,16 @@
import shutil
import warnings
from functools import cached_property
from typing import Container, Sequence
from typing import TYPE_CHECKING, Container, Sequence

from airflow.exceptions import AirflowException, AirflowSkipException
from airflow.hooks.subprocess import SubprocessHook
from airflow.models.baseoperator import BaseOperator
from airflow.utils.context import Context
from airflow.utils.operator_helpers import context_to_airflow_vars

if TYPE_CHECKING:
from airflow.utils.context import Context


class BashOperator(BaseOperator):
r"""
Expand Down
6 changes: 4 additions & 2 deletions airflow/operators/branch.py
Expand Up @@ -18,11 +18,13 @@
"""Branching operators."""
from __future__ import annotations

from typing import Iterable
from typing import TYPE_CHECKING, Iterable

from airflow.models.baseoperator import BaseOperator
from airflow.models.skipmixin import SkipMixin
from airflow.utils.context import Context

if TYPE_CHECKING:
from airflow.utils.context import Context


class BaseBranchOperator(BaseOperator, SkipMixin):
Expand Down
6 changes: 4 additions & 2 deletions airflow/operators/datetime.py
Expand Up @@ -18,12 +18,14 @@

import datetime
import warnings
from typing import Iterable
from typing import TYPE_CHECKING, Iterable

from airflow.exceptions import AirflowException, RemovedInAirflow3Warning
from airflow.operators.branch import BaseBranchOperator
from airflow.utils import timezone
from airflow.utils.context import Context

if TYPE_CHECKING:
from airflow.utils.context import Context


class BranchDateTimeOperator(BaseBranchOperator):
Expand Down
6 changes: 4 additions & 2 deletions airflow/operators/email.py
Expand Up @@ -17,12 +17,14 @@
# under the License.
from __future__ import annotations

from typing import Any, Sequence
from typing import TYPE_CHECKING, Any, Sequence

from airflow.models.baseoperator import BaseOperator
from airflow.utils.context import Context
from airflow.utils.email import send_email

if TYPE_CHECKING:
from airflow.utils.context import Context


class EmailOperator(BaseOperator):
"""
Expand Down
6 changes: 5 additions & 1 deletion airflow/operators/empty.py
Expand Up @@ -16,8 +16,12 @@
# under the License.
from __future__ import annotations

from typing import TYPE_CHECKING

from airflow.models.baseoperator import BaseOperator
from airflow.utils.context import Context

if TYPE_CHECKING:
from airflow.utils.context import Context


class EmptyOperator(BaseOperator):
Expand Down
6 changes: 4 additions & 2 deletions airflow/operators/generic_transfer.py
Expand Up @@ -17,11 +17,13 @@
# under the License.
from __future__ import annotations

from typing import Sequence
from typing import TYPE_CHECKING, Sequence

from airflow.hooks.base import BaseHook
from airflow.models import BaseOperator
from airflow.utils.context import Context

if TYPE_CHECKING:
from airflow.utils.context import Context


class GenericTransfer(BaseOperator):
Expand Down
2 changes: 1 addition & 1 deletion airflow/operators/latest_only.py
Expand Up @@ -23,10 +23,10 @@
import pendulum

from airflow.operators.branch import BaseBranchOperator
from airflow.utils.context import Context

if TYPE_CHECKING:
from airflow.models import DAG, DagRun
from airflow.utils.context import Context


class LatestOnlyOperator(BaseBranchOperator):
Expand Down
4 changes: 3 additions & 1 deletion airflow/operators/python.py
Expand Up @@ -46,14 +46,16 @@
from airflow.models.baseoperator import BaseOperator
from airflow.models.skipmixin import SkipMixin
from airflow.models.taskinstance import _CURRENT_CONTEXT
from airflow.utils.context import Context, context_copy_partial, context_merge
from airflow.utils.context import context_copy_partial, context_merge
from airflow.utils.operator_helpers import KeywordParameters
from airflow.utils.process_utils import execute_in_subprocess
from airflow.utils.python_virtualenv import prepare_virtualenv, write_python_script

if TYPE_CHECKING:
from pendulum.datetime import DateTime

from airflow.utils.context import Context


def is_venv_installed() -> bool:
"""
Expand Down
6 changes: 5 additions & 1 deletion airflow/operators/smooth.py
Expand Up @@ -17,8 +17,12 @@
# under the License.
from __future__ import annotations

from typing import TYPE_CHECKING

from airflow.models.baseoperator import BaseOperator
from airflow.utils.context import Context

if TYPE_CHECKING:
from airflow.utils.context import Context


class SmoothOperator(BaseOperator):
Expand Down
14 changes: 10 additions & 4 deletions airflow/operators/subdag.py
Expand Up @@ -23,24 +23,30 @@
from __future__ import annotations

import warnings
from datetime import datetime
from enum import Enum
from typing import TYPE_CHECKING

from sqlalchemy import select
from sqlalchemy.orm.session import Session

from airflow.api.common.experimental.get_task_instance import get_task_instance
from airflow.exceptions import AirflowException, RemovedInAirflow3Warning, TaskInstanceNotFound
from airflow.models import DagRun
from airflow.models.dag import DAG, DagContext
from airflow.models.dag import DagContext
from airflow.models.pool import Pool
from airflow.models.taskinstance import TaskInstance
from airflow.sensors.base import BaseSensorOperator
from airflow.utils.context import Context
from airflow.utils.session import NEW_SESSION, create_session, provide_session
from airflow.utils.state import DagRunState, TaskInstanceState
from airflow.utils.types import DagRunType

if TYPE_CHECKING:
from datetime import datetime

from sqlalchemy.orm.session import Session

from airflow.models.dag import DAG
from airflow.utils.context import Context


class SkippedStatePropagationOptions(Enum):
"""Available options for skipped state propagation of subdag's tasks to parent dag tasks."""
Expand Down
2 changes: 1 addition & 1 deletion airflow/operators/trigger_dagrun.py
Expand Up @@ -34,7 +34,6 @@
from airflow.models.xcom import XCom
from airflow.triggers.external_task import DagStateTrigger
from airflow.utils import timezone
from airflow.utils.context import Context
from airflow.utils.helpers import build_airflow_url_with_query
from airflow.utils.session import provide_session
from airflow.utils.state import DagRunState
Expand All @@ -48,6 +47,7 @@
from sqlalchemy.orm.session import Session

from airflow.models.taskinstancekey import TaskInstanceKey
from airflow.utils.context import Context


class TriggerDagRunLink(BaseOperatorLink):
Expand Down
6 changes: 4 additions & 2 deletions airflow/operators/weekday.py
Expand Up @@ -18,14 +18,16 @@
from __future__ import annotations

import warnings
from typing import Iterable
from typing import TYPE_CHECKING, Iterable

from airflow.exceptions import RemovedInAirflow3Warning
from airflow.operators.branch import BaseBranchOperator
from airflow.utils import timezone
from airflow.utils.context import Context
from airflow.utils.weekday import WeekDay

if TYPE_CHECKING:
from airflow.utils.context import Context


class BranchDayOfWeekOperator(BaseBranchOperator):
"""Branches into one of two lists of tasks depending on the current day.
Expand Down