Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions airflow/api_connexion/endpoints/dag_endpoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
from flask import current_app, g, request
from marshmallow import ValidationError

from airflow import DAG
from airflow.api_connexion import security
from airflow.api_connexion.exceptions import BadRequest, NotFound
from airflow.api_connexion.parameters import check_limit, format_parameters
Expand All @@ -28,7 +27,7 @@
dags_collection_schema,
)
from airflow.exceptions import SerializedDagNotFound
from airflow.models.dag import DagModel
from airflow.models.dag import DAG, DagModel
from airflow.security import permissions
from airflow.utils.session import provide_session

Expand Down
2 changes: 1 addition & 1 deletion airflow/api_connexion/endpoints/extra_link_endpoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,10 @@

from flask import current_app

from airflow import DAG
from airflow.api_connexion import security
from airflow.api_connexion.exceptions import NotFound
from airflow.exceptions import TaskNotFound
from airflow.models.dag import DAG
from airflow.models.dagbag import DagBag
from airflow.models.dagrun import DagRun as DR
from airflow.security import permissions
Expand Down
2 changes: 1 addition & 1 deletion airflow/api_connexion/endpoints/task_endpoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,11 @@

from flask import current_app

from airflow import DAG
from airflow.api_connexion import security
from airflow.api_connexion.exceptions import BadRequest, NotFound
from airflow.api_connexion.schemas.task_schema import TaskCollection, task_collection_schema, task_schema
from airflow.exceptions import TaskNotFound
from airflow.models.dag import DAG
from airflow.security import permissions


Expand Down
3 changes: 1 addition & 2 deletions airflow/api_connexion/schemas/dag_schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,9 @@
from marshmallow import Schema, fields
from marshmallow_sqlalchemy import SQLAlchemySchema, auto_field

from airflow import DAG
from airflow.api_connexion.schemas.common_schema import ScheduleIntervalSchema, TimeDeltaSchema, TimezoneField
from airflow.configuration import conf
from airflow.models.dag import DagModel, DagTag
from airflow.models.dag import DAG, DagModel, DagTag


class DagTagSchema(SQLAlchemySchema):
Expand Down
2 changes: 1 addition & 1 deletion airflow/cli/commands/info_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,11 @@

import requests
import tenacity
from typing_extensions import Protocol

from airflow import configuration
from airflow.cli.simple_table import AirflowConsole
from airflow.providers_manager import ProvidersManager
from airflow.typing_compat import Protocol
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We have to import Prootcol directly for sphinx-autoapi to understand this correctly.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is done this way for compatibility across multiple python versions.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I install typing_extensions on every pythton version, because sphinx-autoapi has problems with it.

WARNING: Cannot resolve import of airflow.typing_compat.Protocol in airflow.stats

Hopefully we can fix this differently, but for now, I wanted to take a step forward to see if we had other critical issues. Now the documentation for the apache-airflow package builds up fine, so I think it's a pretty cool start.

from airflow.utils.cli import suppress_logs_and_warning
from airflow.version import version as airflow_version

Expand Down
3 changes: 2 additions & 1 deletion airflow/hooks/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,9 @@
import warnings
from typing import Any, Dict, List

from typing_extensions import Protocol

from airflow.models.connection import Connection
from airflow.typing_compat import Protocol
from airflow.utils.log.logging_mixin import LoggingMixin

log = logging.getLogger(__name__)
Expand Down
2 changes: 1 addition & 1 deletion airflow/hooks/dbapi.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,10 @@
from urllib.parse import quote_plus

from sqlalchemy import create_engine
from typing_extensions import Protocol

from airflow.exceptions import AirflowException
from airflow.hooks.base import BaseHook
from airflow.typing_compat import Protocol


class ConnectorProtocol(Protocol):
Expand Down
2 changes: 1 addition & 1 deletion airflow/models/crypto.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,10 @@
from typing import Optional

from cryptography.fernet import Fernet, MultiFernet
from typing_extensions import Protocol

from airflow.configuration import conf
from airflow.exceptions import AirflowException
from airflow.typing_compat import Protocol

log = logging.getLogger(__name__)

Expand Down
2 changes: 1 addition & 1 deletion airflow/models/pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,11 @@

from sqlalchemy import Column, Integer, String, Text, func
from sqlalchemy.orm.session import Session
from typing_extensions import TypedDict

from airflow.exceptions import AirflowException
from airflow.models.base import Base
from airflow.ti_deps.dependencies_states import EXECUTION_STATES
from airflow.typing_compat import TypedDict
from airflow.utils.session import provide_session
from airflow.utils.sqlalchemy import nowait, with_row_locks
from airflow.utils.state import State
Expand Down
4 changes: 2 additions & 2 deletions airflow/providers/google/cloud/operators/speech_to_text.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,12 @@
from typing import Optional, Sequence, Union

from google.api_core.retry import Retry
from google.cloud.speech_v1.types import RecognitionConfig
from google.cloud.speech_v1.types import RecognitionAudio, RecognitionConfig
from google.protobuf.json_format import MessageToDict

from airflow.exceptions import AirflowException
from airflow.models import BaseOperator
from airflow.providers.google.cloud.hooks.speech_to_text import CloudSpeechToTextHook, RecognitionAudio
from airflow.providers.google.cloud.hooks.speech_to_text import CloudSpeechToTextHook
from airflow.utils.decorators import apply_defaults


Expand Down
24 changes: 10 additions & 14 deletions airflow/providers/google/cloud/utils/credentials_provider.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,7 @@
# specific language governing permissions and limitations
# under the License.
"""
This module contains a mechanism for providing temporary
Google Cloud authentication.
This module contains a mechanism for providing temporary Google Cloud authentication.
"""
import json
import logging
Expand Down Expand Up @@ -79,7 +78,7 @@ def build_gcp_conn(
def provide_gcp_credentials(key_file_path: Optional[str] = None, key_file_dict: Optional[Dict] = None):
"""
Context manager that provides a Google Cloud credentials for application supporting `Application
Default Credentials (ADC) strategy <https://cloud.google.com/docs/authentication/production>`__.
Default Credentials (ADC) strategy <https://cloud.google.com/docs/authentication/production>`_.

It can be used to provide credentials for external programs (e.g. gcloud) that expect authorization
file in ``GOOGLE_APPLICATION_CREDENTIALS`` environment variable.
Expand Down Expand Up @@ -144,11 +143,11 @@ def provide_gcp_conn_and_credentials(
"""
Context manager that provides both:

- Google Cloud credentials for application supporting `Application Default Credentials (ADC)
strategy <https://cloud.google.com/docs/authentication/production>`__.
- temporary value of :envvar:`AIRFLOW_CONN_GOOGLE_CLOUD_DEFAULT` connection
- Google Cloud credentials for application supporting `Application Default Credentials (ADC)
strategy <https://cloud.google.com/docs/authentication/production>`__.
- temporary value of :envvar:`AIRFLOW_CONN_GOOGLE_CLOUD_DEFAULT` connection

:param key_file_path: Path to file with Google Cloud Service Account .json file.
:param key_file_path: Path to file with Google Cloud Service Account ``.json`` file.
:type key_file_path: str
:param scopes: OAuth scopes for the connection
:type scopes: Sequence
Expand All @@ -175,7 +174,7 @@ class _CredentialProvider(LoggingMixin):
"""
Prepare the Credentials object for Google API and the associated project_id

Only either `key_path` or `keyfile_dict` should be provided, or an exception will
Only either ``key_path`` or ``keyfile_dict`` should be provided, or an exception will
occur. If neither of them are provided, return default credentials for the current environment

:param key_path: Path to Google Cloud Service Account key file (JSON).
Expand Down Expand Up @@ -311,7 +310,7 @@ def get_credentials_and_project_id(*args, **kwargs) -> Tuple[google.auth.credent

def _get_scopes(scopes: Optional[str] = None) -> Sequence[str]:
"""
Parse a comma-separated string containing OAuth2 scopes if `scopes` is provided.
Parse a comma-separated string containing OAuth2 scopes if ``scopes`` is provided.
Otherwise, default scope will be returned.

:param scopes: A comma-separated string containing OAuth2 scopes
Expand All @@ -330,10 +329,8 @@ def _get_target_principal_and_delegates(
to directly impersonate using short-term credentials, if any) and optional list of delegates
required to get the access_token of target_principal.

:param impersonation_chain: the service account to impersonate or a chained list leading to this
account
:param impersonation_chain: the service account to impersonate or a chained list leading to this account
:type impersonation_chain: Optional[Union[str, Sequence[str]]]

:return: Returns the tuple of target_principal and delegates
:rtype: Tuple[Optional[str], Optional[Sequence[str]]]
"""
Expand All @@ -352,13 +349,12 @@ def _get_project_id_from_service_account_email(service_account_email: str) -> st

:param service_account_email: email of the service account.
:type service_account_email: str

:return: Returns the project_id of the provided service account.
:rtype: str
"""
try:
return service_account_email.split('@')[1].split('.')[0]
except IndexError:
raise AirflowException(
f"Could not extract project_id from service account's email: " f"{service_account_email}."
f"Could not extract project_id from service account's email: {service_account_email}."
)
4 changes: 3 additions & 1 deletion airflow/providers/jira/operators/jira.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,11 @@

from typing import Any, Callable, Dict, Optional

from jira import JIRA
from jira.exceptions import JIRAError

from airflow.exceptions import AirflowException
from airflow.models import BaseOperator
from airflow.providers.jira.hooks.jira import JIRAError, JiraHook
from airflow.utils.decorators import apply_defaults


Expand Down
3 changes: 2 additions & 1 deletion airflow/providers/jira/sensors/jira.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,10 @@
# under the License.
from typing import Any, Callable, Dict, Optional

from jira.exceptions import JIRAError
from jira.resources import Issue, Resource

from airflow.providers.jira.operators.jira import JIRAError, JiraOperator
from airflow.providers.jira.operators.jira import JiraOperator
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What was the sort of error this gave with the import as it was?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

WARNING: Cannot resolve import of airflow.providers.jira.operators.jira.JIRAError in airflow.providers.jira.sensors.jira

from airflow.sensors.base import BaseSensorOperator
from airflow.utils.decorators import apply_defaults

Expand Down
2 changes: 2 additions & 0 deletions airflow/providers/qubole/hooks/qubole.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,8 @@ def build_command_args() -> Tuple[Dict[str, list], list]:
return command_args, list(hyphen_args)


COMMAND_ARGS: List[str]
HYPHEN_ARGS: List[str]
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to explicitly create these variables so that sphinx-autoapi can see and import these variables in other module.

COMMAND_ARGS, HYPHEN_ARGS = build_command_args()


Expand Down
7 changes: 0 additions & 7 deletions airflow/sensors/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@

import datetime
import hashlib
import os
import time
from datetime import timedelta
from typing import Any, Callable, Dict, Iterable
Expand Down Expand Up @@ -322,9 +321,3 @@ def mode_setter(_, value):
return cls_type

return decorate(cls)


if 'BUILDING_AIRFLOW_DOCS' in os.environ:
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's is duplicated code. See:

if 'BUILDING_AIRFLOW_DOCS' in os.environ:

# flake8: noqa: F811
# Monkey patch hook to get good function headers while building docs
apply_defaults = lambda x: x
2 changes: 1 addition & 1 deletion airflow/serialization/json_schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,10 @@
from typing import Iterable

import jsonschema
from typing_extensions import Protocol

from airflow.exceptions import AirflowException
from airflow.settings import json
from airflow.typing_compat import Protocol


class Validator(Protocol):
Expand Down
8 changes: 4 additions & 4 deletions airflow/stats.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,10 @@
from functools import wraps
from typing import TYPE_CHECKING, Callable, Optional, TypeVar, cast

from typing_extensions import Protocol

from airflow.configuration import conf
from airflow.exceptions import AirflowConfigException, InvalidStatsNameException
from airflow.typing_compat import Protocol

log = logging.getLogger(__name__)

Expand Down Expand Up @@ -347,9 +348,9 @@ def timer(self, stat=None, *args, tags=None, **kwargs):
return Timer()


class _Stats(type):
class _Stats:
factory = None
instance: Optional[StatsLogger] = None
instance = None # : Optional[StatsLogger] = None

def __getattr__(cls, name):
if not cls.instance:
Expand All @@ -361,7 +362,6 @@ def __getattr__(cls, name):
return getattr(cls.instance, name)

def __init__(cls, *args, **kwargs):
super().__init__(cls)
if cls.__class__.factory is None:
is_datadog_enabled_defined = conf.has_option('metrics', 'statsd_datadog_enabled')
if is_datadog_enabled_defined and conf.getboolean('metrics', 'statsd_datadog_enabled'):
Expand Down
2 changes: 1 addition & 1 deletion airflow/utils/log/logging_mixin.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ def get_external_log_url(self, task_instance, try_number) -> str:
class StreamLogWriter:
"""Allows to redirect stdout and stderr to logger"""

encoding: None = None
encoding = None

def __init__(self, logger, level):
"""
Expand Down
2 changes: 1 addition & 1 deletion airflow/utils/state.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ class State:
"""

# scheduler
NONE = None # type: None
NONE = None
REMOVED = "removed"
SCHEDULED = "scheduled"

Expand Down
5 changes: 5 additions & 0 deletions docs/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -511,6 +511,11 @@ def _load_config():
# TOC tree entry yourself.
autoapi_add_toctree_entry = False

# Options for display of the generated documentation.
autoapi_options = [
'members',
]

# -- Options for ext.exampleinclude --------------------------------------------
exampleinclude_sourceroot = os.path.abspath('..')

Expand Down
Loading