Skip to content

Commit

Permalink
POC: How we can deprecate things in Airflow
Browse files Browse the repository at this point in the history
  • Loading branch information
kacpermuda committed Jan 18, 2024
1 parent 9563dc5 commit 65f3f55
Show file tree
Hide file tree
Showing 62 changed files with 930 additions and 918 deletions.
63 changes: 63 additions & 0 deletions airflow/deprecation.py
@@ -0,0 +1,63 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
# Note: Any AirflowException raised is expected to cause the TaskInstance
# to be marked in an ERROR state
"""Airflow deprecation utilities."""
from __future__ import annotations

from deprecated import deprecated as _deprecated


class RemovedInAirflow3Warning(DeprecationWarning):
"""Issued for usage of deprecated features that will be removed in Airflow3."""

deprecated_since: str | None = None
"Indicates the airflow version that started raising this deprecation warning"


class AirflowProviderDeprecationWarning(DeprecationWarning):
"""Issued for usage of deprecated features of Airflow provider."""

deprecated_provider_since: str | None = None
"Indicates the provider version that started raising this deprecation warning"


# TODO This can be rewritten to include some custom arguments and / or custom message formatting.
deprecated = _deprecated


# Inspired by
# https://github.com/tensorflow/tensorflow/blob/dec8e0b11f4f87693b67e125e67dfbc68d26c205/tensorflow/python/util/deprecation.py#L336
def deprecated_args(*args, **kwargs):
"""Decorator for marking specific function arguments as deprecated.
This decorator logs a deprecation warning whenever the decorated function is
called with the deprecated argument.
"""
pass


# Inspired by
# https://github.com/tensorflow/tensorflow/blob/dec8e0b11f4f87693b67e125e67dfbc68d26c205/tensorflow/python/util/deprecation.py#L516
def deprecated_arg_values(func):
"""Decorator for marking specific function argument values as deprecated.
This decorator logs a deprecation warning whenever the decorated function is
called with the deprecated argument values.
"""
pass
28 changes: 15 additions & 13 deletions airflow/providers/amazon/aws/hooks/base_aws.py
Expand Up @@ -29,7 +29,6 @@
import json
import logging
import os
import warnings
from copy import deepcopy
from functools import cached_property, wraps
from pathlib import Path
Expand All @@ -44,6 +43,7 @@
from botocore.config import Config
from botocore.waiter import Waiter, WaiterModel
from dateutil.tz import tzlocal
from deprecated import deprecated
from slugify import slugify

from airflow.configuration import conf
Expand Down Expand Up @@ -1020,6 +1020,13 @@ def _parse_s3_config(config_file_name: str, config_format: str | None = "boto",
pass


@deprecated(
reason=(
"airflow.providers.amazon.aws.hook.base_aws.BaseAsyncSessionFactory "
"has been deprecated and will be removed in future"
),
category=AirflowProviderDeprecationWarning,
)
class BaseAsyncSessionFactory(BaseSessionFactory):
"""
Base AWS Session Factory class to handle aiobotocore session creation.
Expand All @@ -1029,12 +1036,6 @@ class BaseAsyncSessionFactory(BaseSessionFactory):
"""

def __init__(self, *args, **kwargs):
warnings.warn(
"airflow.providers.amazon.aws.hook.base_aws.BaseAsyncSessionFactory has been deprecated and "
"will be removed in future",
AirflowProviderDeprecationWarning,
stacklevel=2,
)
super().__init__(*args, **kwargs)

async def get_role_credentials(self) -> dict:
Expand Down Expand Up @@ -1113,6 +1114,13 @@ def create_session(self, deferrable: bool = False) -> AioSession:
return self._get_session_with_assume_role()


@deprecated(
reason=(
"airflow.providers.amazon.aws.hook.base_aws.AwsBaseAsyncHook "
"has been deprecated and will be removed in future"
),
category=AirflowProviderDeprecationWarning,
)
class AwsBaseAsyncHook(AwsBaseHook):
"""Interacts with AWS using aiobotocore asynchronously.
Expand All @@ -1129,12 +1137,6 @@ class AwsBaseAsyncHook(AwsBaseHook):
"""

def __init__(self, *args, **kwargs):
warnings.warn(
"airflow.providers.amazon.aws.hook.base_aws.AwsBaseAsyncHook has been deprecated and "
"will be removed in future",
AirflowProviderDeprecationWarning,
stacklevel=2,
)
super().__init__(*args, **kwargs)

def get_async_session(self) -> AioSession:
Expand Down
17 changes: 9 additions & 8 deletions airflow/providers/amazon/aws/hooks/quicksight.py
Expand Up @@ -18,10 +18,10 @@
from __future__ import annotations

import time
import warnings
from functools import cached_property

from botocore.exceptions import ClientError
from deprecated import deprecated

from airflow.exceptions import AirflowException, AirflowProviderDeprecationWarning
from airflow.providers.amazon.aws.hooks.base_aws import AwsBaseHook
Expand Down Expand Up @@ -172,14 +172,15 @@ def wait_for_state(
return status

@cached_property
def sts_hook(self):
warnings.warn(
f"`{type(self).__name__}.sts_hook` property is deprecated and will be removed in the future. "
@deprecated(
reason=(
"`QuickSightHook.sts_hook` property is deprecated and will be removed in the future. "
"This property used for obtain AWS Account ID, "
f"please consider to use `{type(self).__name__}.account_id` instead",
AirflowProviderDeprecationWarning,
stacklevel=2,
)
"please consider to use `QuickSightHook.account_id` instead"
),
category=AirflowProviderDeprecationWarning,
)
def sts_hook(self):
from airflow.providers.amazon.aws.hooks.sts import StsHook

return StsHook(aws_conn_id=self.aws_conn_id)
15 changes: 8 additions & 7 deletions airflow/providers/amazon/aws/hooks/redshift_cluster.py
Expand Up @@ -17,10 +17,10 @@
from __future__ import annotations

import asyncio
import warnings
from typing import Any, Sequence

import botocore.exceptions
from deprecated import deprecated

from airflow.exceptions import AirflowProviderDeprecationWarning
from airflow.providers.amazon.aws.hooks.base_aws import AwsBaseAsyncHook, AwsBaseHook
Expand Down Expand Up @@ -195,16 +195,17 @@ def get_cluster_snapshot_status(self, snapshot_identifier: str):
return None


@deprecated(
reason=(
"airflow.providers.amazon.aws.hook.base_aws.RedshiftAsyncHook "
"has been deprecated and will be removed in future"
),
category=AirflowProviderDeprecationWarning,
)
class RedshiftAsyncHook(AwsBaseAsyncHook):
"""Interact with AWS Redshift using aiobotocore library."""

def __init__(self, *args, **kwargs):
warnings.warn(
"airflow.providers.amazon.aws.hook.base_aws.RedshiftAsyncHook has been deprecated and "
"will be removed in future",
AirflowProviderDeprecationWarning,
stacklevel=2,
)
kwargs["client_type"] = "redshift"
super().__init__(*args, **kwargs)

Expand Down
8 changes: 2 additions & 6 deletions airflow/providers/amazon/aws/operators/eks.py
Expand Up @@ -25,6 +25,7 @@
from typing import TYPE_CHECKING, Any, List, Sequence, cast

from botocore.exceptions import ClientError, WaiterError
from deprecated import deprecated

from airflow.configuration import conf
from airflow.exceptions import AirflowException, AirflowProviderDeprecationWarning
Expand Down Expand Up @@ -263,13 +264,8 @@ def hook(self) -> EksHook:
return EksHook(aws_conn_id=self.aws_conn_id, region_name=self.region)

@property
@deprecated(reason="use `hook` property instead.", category=AirflowProviderDeprecationWarning)
def eks_hook(self):
warnings.warn(
"`eks_hook` property is deprecated and will be removed in the future. "
"Please use `hook` property instead.",
AirflowProviderDeprecationWarning,
stacklevel=2,
)
return self.hook

def execute(self, context: Context):
Expand Down
31 changes: 17 additions & 14 deletions airflow/providers/amazon/aws/sensors/quicksight.py
Expand Up @@ -17,10 +17,11 @@
# under the License.
from __future__ import annotations

import warnings
from functools import cached_property
from typing import TYPE_CHECKING, Sequence

from deprecated import deprecated

from airflow.exceptions import AirflowException, AirflowProviderDeprecationWarning, AirflowSkipException
from airflow.providers.amazon.aws.hooks.quicksight import QuickSightHook
from airflow.providers.amazon.aws.sensors.base_aws import AwsBaseSensor
Expand Down Expand Up @@ -80,24 +81,26 @@ def poke(self, context: Context) -> bool:
return quicksight_ingestion_state == self.success_status

@cached_property
@deprecated(
reason=(
"`QuickSightSensor.quicksight_hook` property is deprecated, "
"please use `QuickSightSensor.hook` property instead."
),
category=AirflowProviderDeprecationWarning,
)
def quicksight_hook(self):
warnings.warn(
f"`{type(self).__name__}.quicksight_hook` property is deprecated, "
f"please use `{type(self).__name__}.hook` property instead.",
AirflowProviderDeprecationWarning,
stacklevel=2,
)
return self.hook

@cached_property
def sts_hook(self):
warnings.warn(
f"`{type(self).__name__}.sts_hook` property is deprecated and will be removed in the future. "
@deprecated(
reason=(
"`QuickSightSensor.sts_hook` property is deprecated and will be removed in the future. "
"This property used for obtain AWS Account ID, "
f"please consider to use `{type(self).__name__}.hook.account_id` instead",
AirflowProviderDeprecationWarning,
stacklevel=2,
)
"please consider to use `QuickSightSensor.hook.account_id` instead"
),
category=AirflowProviderDeprecationWarning,
)
def sts_hook(self):
from airflow.providers.amazon.aws.hooks.sts import StsHook

return StsHook(aws_conn_id=self.aws_conn_id)
16 changes: 9 additions & 7 deletions airflow/providers/amazon/aws/triggers/rds.py
Expand Up @@ -16,10 +16,11 @@
# under the License.
from __future__ import annotations

import warnings
from functools import cached_property
from typing import TYPE_CHECKING, Any

from deprecated import deprecated

from airflow.exceptions import AirflowProviderDeprecationWarning
from airflow.providers.amazon.aws.hooks.rds import RdsHook
from airflow.providers.amazon.aws.triggers.base import AwsBaseWaiterTrigger
Expand All @@ -31,6 +32,13 @@
from airflow.providers.amazon.aws.hooks.base_aws import AwsGenericHook


@deprecated(
reason=(
"This trigger is deprecated, please use the other RDS triggers "
"such as RdsDbDeletedTrigger, RdsDbStoppedTrigger or RdsDbAvailableTrigger"
),
category=AirflowProviderDeprecationWarning,
)
class RdsDbInstanceTrigger(BaseTrigger):
"""
Deprecated Trigger for RDS operations. Do not use.
Expand All @@ -55,12 +63,6 @@ def __init__(
region_name: str | None,
response: dict[str, Any],
):
warnings.warn(
"This trigger is deprecated, please use the other RDS triggers "
"such as RdsDbDeletedTrigger, RdsDbStoppedTrigger or RdsDbAvailableTrigger",
AirflowProviderDeprecationWarning,
stacklevel=2,
)
self.db_instance_identifier = db_instance_identifier
self.waiter_delay = waiter_delay
self.waiter_max_attempts = waiter_max_attempts
Expand Down
15 changes: 8 additions & 7 deletions airflow/providers/amazon/aws/utils/connection_wrapper.py
Expand Up @@ -25,6 +25,7 @@

from botocore import UNSIGNED
from botocore.config import Config
from deprecated import deprecated

from airflow.exceptions import AirflowException, AirflowProviderDeprecationWarning
from airflow.providers.amazon.aws.utils import trim_none_values
Expand Down Expand Up @@ -462,6 +463,13 @@ def _get_assume_role_configs(
return role_arn, assume_role_method, assume_role_kwargs


@deprecated(
reason=(
"Use local credentials file is never documented and well tested. "
"Obtain credentials by this way deprecated and will be removed in a future releases."
),
category=AirflowProviderDeprecationWarning,
)
def _parse_s3_config(
config_file_name: str, config_format: str | None = "boto", profile: str | None = None
) -> tuple[str | None, str | None]:
Expand All @@ -474,13 +482,6 @@ def _parse_s3_config(
Defaults to "boto"
:param profile: profile name in AWS type config file
"""
warnings.warn(
"Use local credentials file is never documented and well tested. "
"Obtain credentials by this way deprecated and will be removed in a future releases.",
AirflowProviderDeprecationWarning,
stacklevel=4,
)

import configparser

config = configparser.ConfigParser()
Expand Down
6 changes: 5 additions & 1 deletion airflow/providers/amazon/aws/utils/mixins.py
Expand Up @@ -31,6 +31,7 @@
from functools import cached_property
from typing import Any, Generic, NamedTuple, TypeVar

from deprecated import deprecated
from typing_extensions import final

from airflow.compat.functools import cache
Expand Down Expand Up @@ -160,9 +161,12 @@ def hook(self) -> AwsHookType:

@property
@final
@deprecated(
reason=REGION_MSG,
category=AirflowProviderDeprecationWarning,
)
def region(self) -> str | None:
"""Alias for ``region_name``, used for compatibility (deprecated)."""
warnings.warn(REGION_MSG, AirflowProviderDeprecationWarning, stacklevel=3)
return self.region_name


Expand Down

0 comments on commit 65f3f55

Please sign in to comment.