Skip to content

Commit

Permalink
Merge branch 'main' into openai_hooks
Browse files Browse the repository at this point in the history
  • Loading branch information
nathadfield committed Apr 26, 2024
2 parents 4c073f0 + 0c96b06 commit d9e38ac
Show file tree
Hide file tree
Showing 230 changed files with 1,067 additions and 578 deletions.
7 changes: 7 additions & 0 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1001,6 +1001,13 @@ repos:
pass_filenames: true
files: ^airflow/.*\.py$
exclude: ^.*/.*_vendor/
- id: check-code-deprecations
name: Check deprecations categories in decorators
entry: ./scripts/ci/pre_commit/check_deprecations.py
language: python
pass_filenames: true
files: ^airflow/.*\.py$
exclude: ^.*/.*_vendor/
- id: lint-chart-schema
name: Lint chart/values.schema.json file
entry: ./scripts/ci/pre_commit/chart_schema.py
Expand Down
4 changes: 2 additions & 2 deletions PROVIDERS.rst
Original file line number Diff line number Diff line change
Expand Up @@ -144,8 +144,8 @@ Airflow version to the next MINOR release, when 12 months passed since the first
MINOR version of Airflow.

For example this means that by default we upgrade the minimum version of Airflow supported by providers
to 2.7.0 in the first Provider's release after 27th of April 2024. 27th of April 2023 is the date when the
first ``PATCHLEVEL`` of 2.6 (2.6.0) has been released.
to 2.8.0 in the first Provider's release after 18th of August 2024. 18th of August 2023 is the date when the
first ``PATCHLEVEL`` of 2.7 (2.7.0) has been released.

When we increase the minimum Airflow version, this is not a reason to bump ``MAJOR`` version of the providers
(unless there are other breaking changes in the provider). The reason for that is that people who use
Expand Down
6 changes: 4 additions & 2 deletions airflow/api/common/experimental/get_code.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,13 @@
from deprecated import deprecated

from airflow.api.common.experimental import check_and_get_dag
from airflow.exceptions import AirflowException, DagCodeNotFound
from airflow.exceptions import AirflowException, DagCodeNotFound, RemovedInAirflow3Warning
from airflow.models.dagcode import DagCode


@deprecated(reason="Use DagCode().get_code_by_fileloc() instead", version="2.2.4")
@deprecated(
reason="Use DagCode().get_code_by_fileloc() instead", version="2.2.4", category=RemovedInAirflow3Warning
)
def get_code(dag_id: str) -> str:
"""Return python code of a given dag_id.
Expand Down
3 changes: 2 additions & 1 deletion airflow/api/common/experimental/get_dag_run_state.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,13 @@
from deprecated import deprecated

from airflow.api.common.experimental import check_and_get_dag, check_and_get_dagrun
from airflow.exceptions import RemovedInAirflow3Warning

if TYPE_CHECKING:
from datetime import datetime


@deprecated(reason="Use DagRun().get_state() instead", version="2.2.4")
@deprecated(reason="Use DagRun().get_state() instead", version="2.2.4", category=RemovedInAirflow3Warning)
def get_dag_run_state(dag_id: str, execution_date: datetime) -> dict[str, str]:
"""Return the Dag Run state identified by the given dag_id and execution_date.
Expand Down
3 changes: 2 additions & 1 deletion airflow/api/common/experimental/get_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,13 @@
from deprecated import deprecated

from airflow.api.common.experimental import check_and_get_dag
from airflow.exceptions import RemovedInAirflow3Warning

if TYPE_CHECKING:
from airflow.models import TaskInstance


@deprecated(reason="Use DAG().get_task", version="2.2.4")
@deprecated(reason="Use DAG().get_task", version="2.2.4", category=RemovedInAirflow3Warning)
def get_task(dag_id: str, task_id: str) -> TaskInstance:
"""Return the task object identified by the given dag_id and task_id."""
dag = check_and_get_dag(dag_id, task_id)
Expand Down
4 changes: 2 additions & 2 deletions airflow/api/common/experimental/get_task_instance.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,14 @@
from deprecated import deprecated

from airflow.api.common.experimental import check_and_get_dag, check_and_get_dagrun
from airflow.exceptions import TaskInstanceNotFound
from airflow.exceptions import RemovedInAirflow3Warning, TaskInstanceNotFound
from airflow.models import TaskInstance

if TYPE_CHECKING:
from datetime import datetime


@deprecated(version="2.2.4", reason="Use DagRun.get_task_instance instead")
@deprecated(version="2.2.4", reason="Use DagRun.get_task_instance instead", category=RemovedInAirflow3Warning)
def get_task_instance(dag_id: str, task_id: str, execution_date: datetime) -> TaskInstance:
"""Return the task instance identified by the given dag_id, task_id and execution_date."""
dag = check_and_get_dag(dag_id, task_id)
Expand Down
10 changes: 5 additions & 5 deletions airflow/api/common/experimental/pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,15 +24,15 @@
from deprecated import deprecated
from sqlalchemy import select

from airflow.exceptions import AirflowBadRequest, PoolNotFound
from airflow.exceptions import AirflowBadRequest, PoolNotFound, RemovedInAirflow3Warning
from airflow.models import Pool
from airflow.utils.session import NEW_SESSION, provide_session

if TYPE_CHECKING:
from sqlalchemy.orm import Session


@deprecated(reason="Use Pool.get_pool() instead", version="2.2.4")
@deprecated(reason="Use Pool.get_pool() instead", version="2.2.4", category=RemovedInAirflow3Warning)
@provide_session
def get_pool(name, session: Session = NEW_SESSION):
"""Get pool by a given name."""
Expand All @@ -46,14 +46,14 @@ def get_pool(name, session: Session = NEW_SESSION):
return pool


@deprecated(reason="Use Pool.get_pools() instead", version="2.2.4")
@deprecated(reason="Use Pool.get_pools() instead", version="2.2.4", category=RemovedInAirflow3Warning)
@provide_session
def get_pools(session: Session = NEW_SESSION):
"""Get all pools."""
return session.scalars(select(Pool)).all()


@deprecated(reason="Use Pool.create_pool() instead", version="2.2.4")
@deprecated(reason="Use Pool.create_pool() instead", version="2.2.4", category=RemovedInAirflow3Warning)
@provide_session
def create_pool(name, slots, description, session: Session = NEW_SESSION):
"""Create a pool with given parameters."""
Expand Down Expand Up @@ -84,7 +84,7 @@ def create_pool(name, slots, description, session: Session = NEW_SESSION):
return pool


@deprecated(reason="Use Pool.delete_pool() instead", version="2.2.4")
@deprecated(reason="Use Pool.delete_pool() instead", version="2.2.4", category=RemovedInAirflow3Warning)
@provide_session
def delete_pool(name, session: Session = NEW_SESSION):
"""Delete pool by a given name."""
Expand Down
5 changes: 4 additions & 1 deletion airflow/auth/managers/fab/fab_auth_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,14 @@

from deprecated import deprecated

from airflow.exceptions import RemovedInAirflow3Warning
from airflow.providers.fab.auth_manager.fab_auth_manager import FabAuthManager as FabAuthManagerProvider


@deprecated(
reason="Use airflow.providers.fab.auth_manager.fab_auth_manager.FabAuthManager instead", version="2.9.0"
reason="Use airflow.providers.fab.auth_manager.fab_auth_manager.FabAuthManager instead",
version="2.9.0",
category=RemovedInAirflow3Warning,
)
class FabAuthManager(FabAuthManagerProvider):
"""
Expand Down
4 changes: 3 additions & 1 deletion airflow/auth/managers/fab/security_manager/override.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

from deprecated import deprecated

from airflow.exceptions import RemovedInAirflow3Warning
from airflow.providers.fab.auth_manager.security_manager.override import (
FabAirflowSecurityManagerOverride as FabProviderAirflowSecurityManagerOverride,
)
Expand All @@ -27,7 +28,8 @@
@deprecated(
reason="If you want to override the security manager, you should inherit from "
"`airflow.providers.fab.auth_manager.security_manager.override.FabAirflowSecurityManagerOverride` "
"instead"
"instead",
category=RemovedInAirflow3Warning,
)
class FabAirflowSecurityManagerOverride(FabProviderAirflowSecurityManagerOverride):
"""
Expand Down
14 changes: 11 additions & 3 deletions airflow/cli/commands/db_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
from tempfile import NamedTemporaryFile
from typing import TYPE_CHECKING

from packaging.version import parse as parse_version
from packaging.version import InvalidVersion, parse as parse_version
from tenacity import Retrying, stop_after_attempt, wait_fixed

from airflow import settings
Expand Down Expand Up @@ -111,16 +111,24 @@ def migratedb(args):
if args.from_revision:
from_revision = args.from_revision
elif args.from_version:
if parse_version(args.from_version) < parse_version("2.0.0"):
try:
parsed_version = parse_version(args.from_version)
except InvalidVersion:
raise SystemExit(f"Invalid version {args.from_version!r} supplied as `--from-version`.")
if parsed_version < parse_version("2.0.0"):
raise SystemExit("--from-version must be greater or equal to than 2.0.0")
from_revision = get_version_revision(args.from_version)
if not from_revision:
raise SystemExit(f"Unknown version {args.from_version!r} supplied as `--from-version`.")

if args.to_version:
try:
parse_version(args.to_version)
except InvalidVersion:
raise SystemExit(f"Invalid version {args.to_version!r} supplied as `--to-version`.")
to_revision = get_version_revision(args.to_version)
if not to_revision:
raise SystemExit(f"Upgrading to version {args.to_version} is not supported.")
raise SystemExit(f"Unknown version {args.to_version!r} supplied as `--to-version`.")
elif args.to_revision:
to_revision = args.to_revision

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,18 +16,22 @@
# specific language governing permissions and limitations
# under the License.

"""update trigger kwargs type
"""update trigger kwargs type and encrypt
Revision ID: 1949afb29106
Revises: ee1467d4aa35
Create Date: 2024-03-17 22:09:09.406395
"""
import json
from textwrap import dedent

from alembic import context, op
import sqlalchemy as sa
from sqlalchemy.orm import lazyload

from airflow.serialization.serialized_objects import BaseSerialization
from airflow.models.trigger import Trigger
from alembic import op

from airflow.utils.sqlalchemy import ExtendedJSON

# revision identifiers, used by Alembic.
Expand All @@ -38,13 +42,43 @@
airflow_version = "2.9.0"


def get_session() -> sa.orm.Session:
conn = op.get_bind()
sessionmaker = sa.orm.sessionmaker()
return sessionmaker(bind=conn)

def upgrade():
"""Update trigger kwargs type to string"""
"""Update trigger kwargs type to string and encrypt"""
with op.batch_alter_table("trigger") as batch_op:
batch_op.alter_column("kwargs", type_=sa.Text(), )

if not context.is_offline_mode():
session = get_session()
try:
for trigger in session.query(Trigger).options(lazyload(Trigger.task_instance)):
trigger.kwargs = trigger.kwargs
session.commit()
finally:
session.close()


def downgrade():
"""Unapply update trigger kwargs type to string"""
"""Unapply update trigger kwargs type to string and encrypt"""
if context.is_offline_mode():
print(dedent("""
------------
-- WARNING: Unable to decrypt trigger kwargs automatically in offline mode!
-- If any trigger rows exist when you do an offline downgrade, the migration will fail.
------------
"""))
else:
session = get_session()
try:
for trigger in session.query(Trigger).options(lazyload(Trigger.task_instance)):
trigger.encrypted_kwargs = json.dumps(BaseSerialization.serialize(trigger.kwargs))
session.commit()
finally:
session.close()

with op.batch_alter_table("trigger") as batch_op:
batch_op.alter_column("kwargs", type_=ExtendedJSON(), postgresql_using="kwargs::json")
batch_op.alter_column("kwargs", type_=ExtendedJSON(), postgresql_using='kwargs::json')
10 changes: 9 additions & 1 deletion airflow/models/trigger.py
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,15 @@ def _decrypt_kwargs(encrypted_kwargs: str) -> dict[str, Any]:
from airflow.models.crypto import get_fernet
from airflow.serialization.serialized_objects import BaseSerialization

decrypted_kwargs = json.loads(get_fernet().decrypt(encrypted_kwargs.encode("utf-8")).decode("utf-8"))
# We weren't able to encrypt the kwargs in all migration paths,
# so we need to handle the case where they are not encrypted.
# Triggers aren't long lasting, so we can skip encrypting them now.
if encrypted_kwargs.startswith("{"):
decrypted_kwargs = json.loads(encrypted_kwargs)
else:
decrypted_kwargs = json.loads(
get_fernet().decrypt(encrypted_kwargs.encode("utf-8")).decode("utf-8")
)

return BaseSerialization.deserialize(decrypted_kwargs)

Expand Down
4 changes: 2 additions & 2 deletions airflow/providers/airbyte/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,8 @@
from airflow.version import version as airflow_version

if packaging.version.parse(packaging.version.parse(airflow_version).base_version) < packaging.version.parse(
"2.6.0"
"2.7.0"
):
raise RuntimeError(
f"The package `apache-airflow-providers-airbyte:{__version__}` needs Apache Airflow 2.6.0+"
f"The package `apache-airflow-providers-airbyte:{__version__}` needs Apache Airflow 2.7.0+"
)
2 changes: 1 addition & 1 deletion airflow/providers/airbyte/provider.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ versions:
- 1.0.0

dependencies:
- apache-airflow>=2.6.0
- apache-airflow>=2.7.0
- apache-airflow-providers-http

integrations:
Expand Down
4 changes: 2 additions & 2 deletions airflow/providers/alibaba/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,8 @@
from airflow.version import version as airflow_version

if packaging.version.parse(packaging.version.parse(airflow_version).base_version) < packaging.version.parse(
"2.6.0"
"2.7.0"
):
raise RuntimeError(
f"The package `apache-airflow-providers-alibaba:{__version__}` needs Apache Airflow 2.6.0+"
f"The package `apache-airflow-providers-alibaba:{__version__}` needs Apache Airflow 2.7.0+"
)
2 changes: 1 addition & 1 deletion airflow/providers/alibaba/provider.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ versions:
- 1.0.0

dependencies:
- apache-airflow>=2.6.0
- apache-airflow>=2.7.0
- oss2>=2.14.0
- alibabacloud_adb20211201>=1.0.0
- alibabacloud_tea_openapi>=0.3.7
Expand Down
4 changes: 2 additions & 2 deletions airflow/providers/amazon/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,8 @@
from airflow.version import version as airflow_version

if packaging.version.parse(packaging.version.parse(airflow_version).base_version) < packaging.version.parse(
"2.6.0"
"2.7.0"
):
raise RuntimeError(
f"The package `apache-airflow-providers-amazon:{__version__}` needs Apache Airflow 2.6.0+"
f"The package `apache-airflow-providers-amazon:{__version__}` needs Apache Airflow 2.7.0+"
)
2 changes: 1 addition & 1 deletion airflow/providers/amazon/provider.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ versions:
- 1.0.0

dependencies:
- apache-airflow>=2.6.0
- apache-airflow>=2.7.0
- apache-airflow-providers-common-sql>=1.3.1
- apache-airflow-providers-http
# We should update minimum version of boto3 and here regularly to avoid `pip` backtracking with the number
Expand Down
4 changes: 2 additions & 2 deletions airflow/providers/apache/beam/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,8 @@
from airflow.version import version as airflow_version

if packaging.version.parse(packaging.version.parse(airflow_version).base_version) < packaging.version.parse(
"2.6.0"
"2.7.0"
):
raise RuntimeError(
f"The package `apache-airflow-providers-apache-beam:{__version__}` needs Apache Airflow 2.6.0+"
f"The package `apache-airflow-providers-apache-beam:{__version__}` needs Apache Airflow 2.7.0+"
)
2 changes: 1 addition & 1 deletion airflow/providers/apache/beam/provider.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ versions:
- 1.0.0

dependencies:
- apache-airflow>=2.6.0
- apache-airflow>=2.7.0
# Apache Beam > 2.53.0 and pyarrow > 14.0.1 fix https://nvd.nist.gov/vuln/detail/CVE-2023-47248.
- apache-beam>=2.53.0
- pyarrow>=14.0.1
Expand Down
4 changes: 2 additions & 2 deletions airflow/providers/apache/cassandra/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,8 @@
from airflow.version import version as airflow_version

if packaging.version.parse(packaging.version.parse(airflow_version).base_version) < packaging.version.parse(
"2.6.0"
"2.7.0"
):
raise RuntimeError(
f"The package `apache-airflow-providers-apache-cassandra:{__version__}` needs Apache Airflow 2.6.0+"
f"The package `apache-airflow-providers-apache-cassandra:{__version__}` needs Apache Airflow 2.7.0+"
)
2 changes: 1 addition & 1 deletion airflow/providers/apache/cassandra/provider.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ versions:
- 1.0.0

dependencies:
- apache-airflow>=2.6.0
- apache-airflow>=2.7.0
- cassandra-driver>=3.29.1

integrations:
Expand Down
Loading

0 comments on commit d9e38ac

Please sign in to comment.