diff --git a/.github/workflows/build-images.yml b/.github/workflows/build-images.yml index 96b4108ab7703..0de707561c774 100644 --- a/.github/workflows/build-images.yml +++ b/.github/workflows/build-images.yml @@ -99,7 +99,7 @@ jobs: }' --jq '.data.node.labels.nodes[]' | jq --slurp -c '[.[].name]' >> ${GITHUB_OUTPUT} if: github.event_name == 'pull_request_target' # Retrieve it to be able to determine which files has changed in the incoming commit of the PR - # we checkout the target commit and it's parent to be able to compare them + # we checkout the target commit and its parent to be able to compare them - name: Cleanup repo run: docker run -v "${GITHUB_WORKSPACE}:/workspace" -u 0:0 bash -c "rm -rf /workspace/*" - uses: actions/checkout@v3 diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index b2c022c308cfe..a716ca2f78d33 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -886,7 +886,7 @@ jobs: DB_RESET: "false" PYTHON_MAJOR_MINOR_VERSION: "${{needs.build-info.outputs.default-python-version}}" JOB_ID: "helm-tests" - COVERAGE: "${{needs.build-info.outputs.run-coverage}}" + COVERAGE: "false" # We do not need to run coverage on Helm tests if: > needs.build-info.outputs.needs-helm-tests == 'true' && (github.repository == 'apache/airflow' || github.event_name != 'schedule') && @@ -1089,6 +1089,7 @@ jobs: MYSQL_VERSION: "${{matrix.mysql-version}}" BACKEND_VERSION: "${{matrix.mysql-version}}" JOB_ID: "mysql-${{matrix.mysql-version}}-${{matrix.python-version}}" + COVERAGE: "${{needs.build-info.outputs.run-coverage}}" if: needs.build-info.outputs.run-tests == 'true' steps: - name: Cleanup repo diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index cb0545eddffa6..778b4d6db0026 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -412,7 +412,7 @@ repos: language: python entry: ./scripts/ci/pre_commit/pre_commit_version_heads_map.py pass_filenames: false - additional_dependencies: ['packaging'] + additional_dependencies: ['packaging','google-re2'] - id: update-version name: Update version to the latest version in the documentation entry: ./scripts/ci/pre_commit/pre_commit_update_versions.py diff --git a/CONTRIBUTING.rst b/CONTRIBUTING.rst index ef358cbdc107c..5379f31160be9 100644 --- a/CONTRIBUTING.rst +++ b/CONTRIBUTING.rst @@ -679,7 +679,7 @@ ldap, leveldb, microsoft.azure, microsoft.mssql, microsoft.psrp, microsoft.winrm mysql, neo4j, odbc, openfaas, openlineage, opsgenie, oracle, otel, pagerduty, pandas, papermill, password, pinot, plexus, postgres, presto, qds, qubole, rabbitmq, redis, s3, salesforce, samba, segment, sendgrid, sentry, sftp, singularity, slack, smtp, snowflake, spark, sqlite, ssh, statsd, -tableau, tabular, telegram, trino, vertica, virtualenv, webhdfs, winrm, zendesk +tableau, tabular, telegram, trino, vertica, virtualenv, webhdfs, winrm, yandex, zendesk .. END EXTRAS HERE Provider packages diff --git a/CONTRIBUTORS_QUICK_START.rst b/CONTRIBUTORS_QUICK_START.rst index 75168b4efccf8..7ccfac30d09fb 100644 --- a/CONTRIBUTORS_QUICK_START.rst +++ b/CONTRIBUTORS_QUICK_START.rst @@ -104,7 +104,7 @@ Colima ------ If you use Colima as your container runtimes engine, please follow the next steps: -1. `Install buildx manually `_ and follow it's instructions +1. `Install buildx manually `_ and follow its instructions 2. Link the Colima socket to the default socket path. Note that this may break other Docker servers. @@ -252,7 +252,7 @@ Typical development tasks ######################### For many of the development tasks you will need ``Breeze`` to be configured. ``Breeze`` is a development -environment which uses docker and docker-compose and it's main purpose is to provide a consistent +environment which uses docker and docker-compose and its main purpose is to provide a consistent and repeatable environment for all the contributors and CI. When using ``Breeze`` you avoid the "works for me" syndrome - because not only others can reproduce easily what you do, but also the CI of Airflow uses the same environment to run all tests - so you should be able to easily reproduce the same failures you diff --git a/Dockerfile b/Dockerfile index 5023e3ef9a3c0..43f085ce6afc2 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1336,7 +1336,7 @@ RUN if [[ -f /docker-context-files/requirements.txt ]]; then \ ############################################################################################## # This is the actual Airflow image - much smaller than the build one. We copy -# installed Airflow and all it's dependencies from the build image to make it smaller. +# installed Airflow and all its dependencies from the build image to make it smaller. ############################################################################################## FROM ${PYTHON_BASE_IMAGE} as main diff --git a/Dockerfile.ci b/Dockerfile.ci index a4503f2d7cd59..0761021304197 100644 --- a/Dockerfile.ci +++ b/Dockerfile.ci @@ -1020,10 +1020,11 @@ else fi if [[ ${ENABLE_TEST_COVERAGE:="false"} == "true" ]]; then + _suffix="$(head /dev/urandom | tr -dc A-Za-z0-9 | head -c 8)" EXTRA_PYTEST_ARGS+=( "--cov=airflow" "--cov-config=pyproject.toml" - "--cov-report=xml:/files/coverage-${TEST_TYPE/\[*\]/}-${BACKEND}.xml" + "--cov-report=xml:/files/coverage-${TEST_TYPE/\[*\]/}-${BACKEND}-${_suffix}.xml" ) fi @@ -1395,7 +1396,7 @@ RUN echo "Airflow version: ${AIRFLOW_VERSION}" # force them on the main Airflow package. Currently we need no extra limits as PIP 23.1+ has much better # dependency resolution and we do not need to limit the versions of the dependencies # aiobotocore is limited temporarily until it stops backtracking pip -ARG EAGER_UPGRADE_ADDITIONAL_REQUIREMENTS="aiobotocore<2.6.0" +ARG EAGER_UPGRADE_ADDITIONAL_REQUIREMENTS="" ARG UPGRADE_TO_NEWER_DEPENDENCIES="false" ARG VERSION_SUFFIX_FOR_PYPI="" diff --git a/INSTALL b/INSTALL index b30f0dbd5f10f..4e017cc9a8065 100644 --- a/INSTALL +++ b/INSTALL @@ -106,7 +106,7 @@ ldap, leveldb, microsoft.azure, microsoft.mssql, microsoft.psrp, microsoft.winrm mysql, neo4j, odbc, openfaas, openlineage, opsgenie, oracle, otel, pagerduty, pandas, papermill, password, pinot, plexus, postgres, presto, qds, qubole, rabbitmq, redis, s3, salesforce, samba, segment, sendgrid, sentry, sftp, singularity, slack, smtp, snowflake, spark, sqlite, ssh, statsd, -tableau, tabular, telegram, trino, vertica, virtualenv, webhdfs, winrm, zendesk +tableau, tabular, telegram, trino, vertica, virtualenv, webhdfs, winrm, yandex, zendesk # END EXTRAS HERE # For installing Airflow in development environments - see CONTRIBUTING.rst diff --git a/RELEASE_NOTES.rst b/RELEASE_NOTES.rst index fbd2ba786ef24..66794268e2ac9 100644 --- a/RELEASE_NOTES.rst +++ b/RELEASE_NOTES.rst @@ -5439,7 +5439,7 @@ It has been removed. ``airflow.settings.CONTEXT_MANAGER_DAG`` ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ -CONTEXT_MANAGER_DAG was removed from settings. It's role has been taken by ``DagContext`` in +CONTEXT_MANAGER_DAG was removed from settings. Its role has been taken by ``DagContext`` in 'airflow.models.dag'. One of the reasons was that settings should be rather static than store dynamic context from the DAG, but the main one is that moving the context out of settings allowed to untangle cyclic imports between DAG, BaseOperator, SerializedDAG, SerializedBaseOperator which was diff --git a/TESTING.rst b/TESTING.rst index c9e1525c2f98c..d5657124684e7 100644 --- a/TESTING.rst +++ b/TESTING.rst @@ -451,9 +451,9 @@ Running Tests with provider packages Airflow 2.0 introduced the concept of splitting the monolithic Airflow package into separate providers packages. The main "apache-airflow" package contains the bare Airflow implementation, -and additionally we have 70+ providers that we can install additionally to get integrations with -external services. Those providers live in the same monorepo as Airflow, but we build separate -packages for them and the main "apache-airflow" package does not contain the providers. +and there are more than 70 additional providers that can be installed to integrate with +external services. Such providers live in the same monorepo as Airflow but are built as +separate packages, distinct from the main "apache-airflow" package. Most of the development in Breeze happens by iterating on sources and when you run your tests during development, you usually do not want to build packages and install them separately. diff --git a/airflow/auth/managers/base_auth_manager.py b/airflow/auth/managers/base_auth_manager.py index bcc5e13892e2b..a512804b4ca94 100644 --- a/airflow/auth/managers/base_auth_manager.py +++ b/airflow/auth/managers/base_auth_manager.py @@ -20,11 +20,12 @@ from abc import abstractmethod from typing import TYPE_CHECKING -from airflow.auth.managers.models.base_user import BaseUser from airflow.exceptions import AirflowException from airflow.utils.log.logging_mixin import LoggingMixin if TYPE_CHECKING: + from airflow.auth.managers.models.base_user import BaseUser + from airflow.cli.cli_config import CLICommand from airflow.www.security import AirflowSecurityManager @@ -38,6 +39,14 @@ class BaseAuthManager(LoggingMixin): def __init__(self): self._security_manager: AirflowSecurityManager | None = None + @staticmethod + def get_cli_commands() -> list[CLICommand]: + """Vends CLI commands to be included in Airflow CLI. + + Override this method to expose commands via Airflow CLI to manage this auth manager. + """ + return [] + @abstractmethod def get_user_name(self) -> str: """Return the username associated to the user in session.""" diff --git a/airflow/auth/managers/fab/cli_commands/__init__.py b/airflow/auth/managers/fab/cli_commands/__init__.py new file mode 100644 index 0000000000000..217e5db960782 --- /dev/null +++ b/airflow/auth/managers/fab/cli_commands/__init__.py @@ -0,0 +1,17 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. diff --git a/airflow/auth/managers/fab/cli_commands/definition.py b/airflow/auth/managers/fab/cli_commands/definition.py new file mode 100644 index 0000000000000..478f6d8d309e0 --- /dev/null +++ b/airflow/auth/managers/fab/cli_commands/definition.py @@ -0,0 +1,220 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from __future__ import annotations + +import textwrap + +from airflow.cli.cli_config import ( + ARG_OUTPUT, + ARG_VERBOSE, + ActionCommand, + Arg, + lazy_load_command, +) + +############ +# # ARGS # # +############ + +# users +ARG_USERNAME = Arg(("-u", "--username"), help="Username of the user", required=True, type=str) +ARG_USERNAME_OPTIONAL = Arg(("-u", "--username"), help="Username of the user", type=str) +ARG_FIRSTNAME = Arg(("-f", "--firstname"), help="First name of the user", required=True, type=str) +ARG_LASTNAME = Arg(("-l", "--lastname"), help="Last name of the user", required=True, type=str) +ARG_ROLE = Arg( + ("-r", "--role"), + help="Role of the user. Existing roles include Admin, User, Op, Viewer, and Public", + required=True, + type=str, +) +ARG_EMAIL = Arg(("-e", "--email"), help="Email of the user", required=True, type=str) +ARG_EMAIL_OPTIONAL = Arg(("-e", "--email"), help="Email of the user", type=str) +ARG_PASSWORD = Arg( + ("-p", "--password"), + help="Password of the user, required to create a user without --use-random-password", + type=str, +) +ARG_USE_RANDOM_PASSWORD = Arg( + ("--use-random-password",), + help="Do not prompt for password. Use random string instead." + " Required to create a user without --password ", + default=False, + action="store_true", +) +ARG_USER_IMPORT = Arg( + ("import",), + metavar="FILEPATH", + help="Import users from JSON file. Example format::\n" + + textwrap.indent( + textwrap.dedent( + """ + [ + { + "email": "foo@bar.org", + "firstname": "Jon", + "lastname": "Doe", + "roles": ["Public"], + "username": "jondoe" + } + ]""" + ), + " " * 4, + ), +) +ARG_USER_EXPORT = Arg(("export",), metavar="FILEPATH", help="Export all users to JSON file") + +# roles +ARG_CREATE_ROLE = Arg(("-c", "--create"), help="Create a new role", action="store_true") +ARG_LIST_ROLES = Arg(("-l", "--list"), help="List roles", action="store_true") +ARG_ROLES = Arg(("role",), help="The name of a role", nargs="*") +ARG_PERMISSIONS = Arg(("-p", "--permission"), help="Show role permissions", action="store_true") +ARG_ROLE_RESOURCE = Arg(("-r", "--resource"), help="The name of permissions", nargs="*", required=True) +ARG_ROLE_ACTION = Arg(("-a", "--action"), help="The action of permissions", nargs="*") +ARG_ROLE_ACTION_REQUIRED = Arg(("-a", "--action"), help="The action of permissions", nargs="*", required=True) + +ARG_ROLE_IMPORT = Arg(("file",), help="Import roles from JSON file", nargs=None) +ARG_ROLE_EXPORT = Arg(("file",), help="Export all roles to JSON file", nargs=None) +ARG_ROLE_EXPORT_FMT = Arg( + ("-p", "--pretty"), + help="Format output JSON file by sorting role names and indenting by 4 spaces", + action="store_true", +) + +# sync-perm +ARG_INCLUDE_DAGS = Arg( + ("--include-dags",), help="If passed, DAG specific permissions will also be synced.", action="store_true" +) + +################ +# # COMMANDS # # +################ + +USERS_COMMANDS = ( + ActionCommand( + name="list", + help="List users", + func=lazy_load_command("airflow.auth.managers.fab.cli_commands.user_command.users_list"), + args=(ARG_OUTPUT, ARG_VERBOSE), + ), + ActionCommand( + name="create", + help="Create a user", + func=lazy_load_command("airflow.auth.managers.fab.cli_commands.user_command.users_create"), + args=( + ARG_ROLE, + ARG_USERNAME, + ARG_EMAIL, + ARG_FIRSTNAME, + ARG_LASTNAME, + ARG_PASSWORD, + ARG_USE_RANDOM_PASSWORD, + ARG_VERBOSE, + ), + epilog=( + "examples:\n" + 'To create an user with "Admin" role and username equals to "admin", run:\n' + "\n" + " $ airflow users create \\\n" + " --username admin \\\n" + " --firstname FIRST_NAME \\\n" + " --lastname LAST_NAME \\\n" + " --role Admin \\\n" + " --email admin@example.org" + ), + ), + ActionCommand( + name="delete", + help="Delete a user", + func=lazy_load_command("airflow.auth.managers.fab.cli_commands.user_command.users_delete"), + args=(ARG_USERNAME_OPTIONAL, ARG_EMAIL_OPTIONAL, ARG_VERBOSE), + ), + ActionCommand( + name="add-role", + help="Add role to a user", + func=lazy_load_command("airflow.auth.managers.fab.cli_commands.user_command.add_role"), + args=(ARG_USERNAME_OPTIONAL, ARG_EMAIL_OPTIONAL, ARG_ROLE, ARG_VERBOSE), + ), + ActionCommand( + name="remove-role", + help="Remove role from a user", + func=lazy_load_command("airflow.auth.managers.fab.cli_commands.user_command.remove_role"), + args=(ARG_USERNAME_OPTIONAL, ARG_EMAIL_OPTIONAL, ARG_ROLE, ARG_VERBOSE), + ), + ActionCommand( + name="import", + help="Import users", + func=lazy_load_command("airflow.auth.managers.fab.cli_commands.user_command.users_import"), + args=(ARG_USER_IMPORT, ARG_VERBOSE), + ), + ActionCommand( + name="export", + help="Export all users", + func=lazy_load_command("airflow.auth.managers.fab.cli_commands.user_command.users_export"), + args=(ARG_USER_EXPORT, ARG_VERBOSE), + ), +) +ROLES_COMMANDS = ( + ActionCommand( + name="list", + help="List roles", + func=lazy_load_command("airflow.auth.managers.fab.cli_commands.role_command.roles_list"), + args=(ARG_PERMISSIONS, ARG_OUTPUT, ARG_VERBOSE), + ), + ActionCommand( + name="create", + help="Create role", + func=lazy_load_command("airflow.auth.managers.fab.cli_commands.role_command.roles_create"), + args=(ARG_ROLES, ARG_VERBOSE), + ), + ActionCommand( + name="delete", + help="Delete role", + func=lazy_load_command("airflow.auth.managers.fab.cli_commands.role_command.roles_delete"), + args=(ARG_ROLES, ARG_VERBOSE), + ), + ActionCommand( + name="add-perms", + help="Add roles permissions", + func=lazy_load_command("airflow.auth.managers.fab.cli_commands.role_command.roles_add_perms"), + args=(ARG_ROLES, ARG_ROLE_RESOURCE, ARG_ROLE_ACTION_REQUIRED, ARG_VERBOSE), + ), + ActionCommand( + name="del-perms", + help="Delete roles permissions", + func=lazy_load_command("airflow.auth.managers.fab.cli_commands.role_command.roles_del_perms"), + args=(ARG_ROLES, ARG_ROLE_RESOURCE, ARG_ROLE_ACTION, ARG_VERBOSE), + ), + ActionCommand( + name="export", + help="Export roles (without permissions) from db to JSON file", + func=lazy_load_command("airflow.auth.managers.fab.cli_commands.role_command.roles_export"), + args=(ARG_ROLE_EXPORT, ARG_ROLE_EXPORT_FMT, ARG_VERBOSE), + ), + ActionCommand( + name="import", + help="Import roles (without permissions) from JSON file to db", + func=lazy_load_command("airflow.auth.managers.fab.cli_commands.role_command.roles_import"), + args=(ARG_ROLE_IMPORT, ARG_VERBOSE), + ), +) + +SYNC_PERM_COMMAND = ActionCommand( + name="sync-perm", + help="Update permissions for existing roles and optionally DAGs", + func=lazy_load_command("airflow.auth.managers.fab.cli_commands.sync_perm_command.sync_perm"), + args=(ARG_INCLUDE_DAGS, ARG_VERBOSE), +) diff --git a/airflow/cli/commands/role_command.py b/airflow/auth/managers/fab/cli_commands/role_command.py similarity index 94% rename from airflow/cli/commands/role_command.py rename to airflow/auth/managers/fab/cli_commands/role_command.py index a582b33195320..34ea8fb9d30e0 100644 --- a/airflow/cli/commands/role_command.py +++ b/airflow/auth/managers/fab/cli_commands/role_command.py @@ -23,6 +23,7 @@ import json import os +from airflow.auth.managers.fab.cli_commands.utils import get_application_builder from airflow.auth.managers.fab.models import Action, Permission, Resource, Role from airflow.cli.simple_table import AirflowConsole from airflow.utils import cli as cli_utils @@ -35,8 +36,6 @@ @providers_configuration_loaded def roles_list(args): """List all existing roles.""" - from airflow.utils.cli_app_builder import get_application_builder - with get_application_builder() as appbuilder: roles = appbuilder.sm.get_all_roles() @@ -63,8 +62,6 @@ def roles_list(args): @providers_configuration_loaded def roles_create(args): """Create new empty role in DB.""" - from airflow.utils.cli_app_builder import get_application_builder - with get_application_builder() as appbuilder: for role_name in args.role: appbuilder.sm.add_role(role_name) @@ -76,8 +73,6 @@ def roles_create(args): @providers_configuration_loaded def roles_delete(args): """Delete role in DB.""" - from airflow.utils.cli_app_builder import get_application_builder - with get_application_builder() as appbuilder: for role_name in args.role: role = appbuilder.sm.find_role(role_name) @@ -90,8 +85,6 @@ def roles_delete(args): def __roles_add_or_remove_permissions(args): - from airflow.utils.cli_app_builder import get_application_builder - with get_application_builder() as appbuilder: is_add: bool = args.subcommand.startswith("add") @@ -165,8 +158,6 @@ def roles_export(args): Note, this function does not export the permissions associated for each role. Strictly, it exports the role names into the passed role json file. """ - from airflow.utils.cli_app_builder import get_application_builder - with get_application_builder() as appbuilder: roles = appbuilder.sm.get_all_roles() exporting_roles = [role.name for role in roles if role.name not in EXISTING_ROLES] @@ -196,7 +187,6 @@ def roles_import(args): except ValueError as e: print(f"File '{json_file}' is not a valid JSON file. Error: {e}") exit(1) - from airflow.utils.cli_app_builder import get_application_builder with get_application_builder() as appbuilder: existing_roles = [role.name for role in appbuilder.sm.get_all_roles()] diff --git a/airflow/cli/commands/sync_perm_command.py b/airflow/auth/managers/fab/cli_commands/sync_perm_command.py similarity index 94% rename from airflow/cli/commands/sync_perm_command.py rename to airflow/auth/managers/fab/cli_commands/sync_perm_command.py index 4d4e280637f9c..14b6e58bbb08a 100644 --- a/airflow/cli/commands/sync_perm_command.py +++ b/airflow/auth/managers/fab/cli_commands/sync_perm_command.py @@ -26,7 +26,7 @@ @providers_configuration_loaded def sync_perm(args): """Update permissions for existing roles and DAGs.""" - from airflow.utils.cli_app_builder import get_application_builder + from airflow.auth.managers.fab.cli_commands.utils import get_application_builder with get_application_builder() as appbuilder: print("Updating actions and resources for all existing roles") diff --git a/airflow/cli/commands/user_command.py b/airflow/auth/managers/fab/cli_commands/user_command.py similarity index 95% rename from airflow/cli/commands/user_command.py rename to airflow/auth/managers/fab/cli_commands/user_command.py index bc982719c94df..84e6318e40537 100644 --- a/airflow/cli/commands/user_command.py +++ b/airflow/auth/managers/fab/cli_commands/user_command.py @@ -29,6 +29,7 @@ from marshmallow import Schema, fields, validate from marshmallow.exceptions import ValidationError +from airflow.auth.managers.fab.cli_commands.utils import get_application_builder from airflow.cli.simple_table import AirflowConsole from airflow.utils import cli as cli_utils from airflow.utils.cli import suppress_logs_and_warning @@ -50,8 +51,6 @@ class UserSchema(Schema): @providers_configuration_loaded def users_list(args): """List users at the command line.""" - from airflow.utils.cli_app_builder import get_application_builder - with get_application_builder() as appbuilder: users = appbuilder.sm.get_all_users() fields = ["id", "username", "email", "first_name", "last_name", "roles"] @@ -65,8 +64,6 @@ def users_list(args): @providers_configuration_loaded def users_create(args): """Create new user in the DB.""" - from airflow.utils.cli_app_builder import get_application_builder - with get_application_builder() as appbuilder: role = appbuilder.sm.find_role(args.role) if not role: @@ -101,8 +98,6 @@ def _find_user(args): if args.username and args.email: raise SystemExit("Conflicting args: must supply either --username or --email, but not both") - from airflow.utils.cli_app_builder import get_application_builder - with get_application_builder() as appbuilder: user = appbuilder.sm.find_user(username=args.username, email=args.email) if not user: @@ -119,8 +114,6 @@ def users_delete(args): # Clear the associated user roles first. user.roles.clear() - from airflow.utils.cli_app_builder import get_application_builder - with get_application_builder() as appbuilder: if appbuilder.sm.del_register_user(user): print(f'User "{user.username}" deleted') @@ -134,8 +127,6 @@ def users_manage_role(args, remove=False): """Delete or appends user roles.""" user = _find_user(args) - from airflow.utils.cli_app_builder import get_application_builder - with get_application_builder() as appbuilder: role = appbuilder.sm.find_role(args.role) if not role: @@ -161,8 +152,6 @@ def users_manage_role(args, remove=False): @providers_configuration_loaded def users_export(args): """Export all users to the json file.""" - from airflow.utils.cli_app_builder import get_application_builder - with get_application_builder() as appbuilder: users = appbuilder.sm.get_all_users() fields = ["id", "username", "email", "first_name", "last_name", "roles"] @@ -211,8 +200,6 @@ def users_import(args): def _import_users(users_list: list[dict[str, Any]]): - from airflow.utils.cli_app_builder import get_application_builder - with get_application_builder() as appbuilder: users_created = [] users_updated = [] diff --git a/airflow/utils/cli_app_builder.py b/airflow/auth/managers/fab/cli_commands/utils.py similarity index 100% rename from airflow/utils/cli_app_builder.py rename to airflow/auth/managers/fab/cli_commands/utils.py diff --git a/airflow/auth/managers/fab/fab_auth_manager.py b/airflow/auth/managers/fab/fab_auth_manager.py index 848f5ff188d6c..def0590b1f5c9 100644 --- a/airflow/auth/managers/fab/fab_auth_manager.py +++ b/airflow/auth/managers/fab/fab_auth_manager.py @@ -17,13 +17,22 @@ # under the License. from __future__ import annotations -from flask import url_for -from flask_login import current_user +from typing import TYPE_CHECKING from airflow import AirflowException from airflow.auth.managers.base_auth_manager import BaseAuthManager -from airflow.auth.managers.fab.models import User -from airflow.auth.managers.fab.security_manager.override import FabAirflowSecurityManagerOverride +from airflow.auth.managers.fab.cli_commands.definition import ( + ROLES_COMMANDS, + SYNC_PERM_COMMAND, + USERS_COMMANDS, +) +from airflow.cli.cli_config import ( + CLICommand, + GroupCommand, +) + +if TYPE_CHECKING: + from airflow.auth.managers.fab.models import User class FabAuthManager(BaseAuthManager): @@ -33,6 +42,23 @@ class FabAuthManager(BaseAuthManager): This auth manager is responsible for providing a backward compatible user management experience to users. """ + @staticmethod + def get_cli_commands() -> list[CLICommand]: + """Vends CLI commands to be included in Airflow CLI.""" + return [ + GroupCommand( + name="users", + help="Manage users", + subcommands=USERS_COMMANDS, + ), + GroupCommand( + name="roles", + help="Manage roles", + subcommands=ROLES_COMMANDS, + ), + SYNC_PERM_COMMAND, # not in a command group + ] + def get_user_name(self) -> str: """ Return the username associated to the user in session. @@ -47,6 +73,8 @@ def get_user_name(self) -> str: def get_user(self) -> User: """Return the user associated to the user in session.""" + from flask_login import current_user + return current_user def get_user_id(self) -> str: @@ -59,25 +87,33 @@ def is_logged_in(self) -> bool: def get_security_manager_override_class(self) -> type: """Return the security manager override.""" + from airflow.auth.managers.fab.security_manager.override import FabAirflowSecurityManagerOverride + return FabAirflowSecurityManagerOverride + def url_for(self, *args, **kwargs): + """Wrapper to allow mocking without having to import at the top of the file.""" + from flask import url_for + + return url_for(*args, **kwargs) + def get_url_login(self, **kwargs) -> str: """Return the login page url.""" if not self.security_manager.auth_view: raise AirflowException("`auth_view` not defined in the security manager.") if "next_url" in kwargs and kwargs["next_url"]: - return url_for(f"{self.security_manager.auth_view.endpoint}.login", next=kwargs["next_url"]) + return self.url_for(f"{self.security_manager.auth_view.endpoint}.login", next=kwargs["next_url"]) else: - return url_for(f"{self.security_manager.auth_view.endpoint}.login") + return self.url_for(f"{self.security_manager.auth_view.endpoint}.login") def get_url_logout(self): """Return the logout page url.""" if not self.security_manager.auth_view: raise AirflowException("`auth_view` not defined in the security manager.") - return url_for(f"{self.security_manager.auth_view.endpoint}.logout") + return self.url_for(f"{self.security_manager.auth_view.endpoint}.logout") def get_url_user_profile(self) -> str | None: """Return the url to a page displaying info about the current user.""" if not self.security_manager.user_view: return None - return url_for(f"{self.security_manager.user_view.endpoint}.userinfo") + return self.url_for(f"{self.security_manager.user_view.endpoint}.userinfo") diff --git a/airflow/cli/cli_config.py b/airflow/cli/cli_config.py index 2ec55ff79498f..c06bd32d96cbb 100644 --- a/airflow/cli/cli_config.py +++ b/airflow/cli/cli_config.py @@ -229,6 +229,12 @@ def string_lower_type(val): ), default=None, ) +ARG_SKIP_SERVE_LOGS = Arg( + ("-s", "--skip-serve-logs"), + default=False, + help="Don't start the serve logs process along with the workers", + action="store_true", +) # list_dag_runs ARG_DAG_ID_REQ_FLAG = Arg( @@ -593,7 +599,7 @@ def string_lower_type(val): ARG_DEPENDS_ON_PAST = Arg( ("-d", "--depends-on-past"), help="Determine how Airflow should deal with past dependencies. The default action is `check`, Airflow " - "will check if the the past dependencies are met for the tasks having `depends_on_past=True` before run " + "will check if the past dependencies are met for the tasks having `depends_on_past=True` before run " "them, if `ignore` is provided, the past dependencies will be ignored, if `wait` is provided and " "`depends_on_past=True`, Airflow will wait the past dependencies until they are met before running or " "skipping the task", @@ -878,76 +884,6 @@ def string_lower_type(val): action="store_true", ) -# users -ARG_USERNAME = Arg(("-u", "--username"), help="Username of the user", required=True, type=str) -ARG_USERNAME_OPTIONAL = Arg(("-u", "--username"), help="Username of the user", type=str) -ARG_FIRSTNAME = Arg(("-f", "--firstname"), help="First name of the user", required=True, type=str) -ARG_LASTNAME = Arg(("-l", "--lastname"), help="Last name of the user", required=True, type=str) -ARG_ROLE = Arg( - ("-r", "--role"), - help="Role of the user. Existing roles include Admin, User, Op, Viewer, and Public", - required=True, - type=str, -) -ARG_EMAIL = Arg(("-e", "--email"), help="Email of the user", required=True, type=str) -ARG_EMAIL_OPTIONAL = Arg(("-e", "--email"), help="Email of the user", type=str) -ARG_PASSWORD = Arg( - ("-p", "--password"), - help="Password of the user, required to create a user without --use-random-password", - type=str, -) -ARG_USE_RANDOM_PASSWORD = Arg( - ("--use-random-password",), - help="Do not prompt for password. Use random string instead." - " Required to create a user without --password ", - default=False, - action="store_true", -) -ARG_USER_IMPORT = Arg( - ("import",), - metavar="FILEPATH", - help="Import users from JSON file. Example format::\n" - + textwrap.indent( - textwrap.dedent( - """ - [ - { - "email": "foo@bar.org", - "firstname": "Jon", - "lastname": "Doe", - "roles": ["Public"], - "username": "jondoe" - } - ]""" - ), - " " * 4, - ), -) -ARG_USER_EXPORT = Arg(("export",), metavar="FILEPATH", help="Export all users to JSON file") - -# roles -ARG_CREATE_ROLE = Arg(("-c", "--create"), help="Create a new role", action="store_true") -ARG_LIST_ROLES = Arg(("-l", "--list"), help="List roles", action="store_true") -ARG_ROLES = Arg(("role",), help="The name of a role", nargs="*") -ARG_PERMISSIONS = Arg(("-p", "--permission"), help="Show role permissions", action="store_true") -ARG_ROLE_RESOURCE = Arg(("-r", "--resource"), help="The name of permissions", nargs="*", required=True) -ARG_ROLE_ACTION = Arg(("-a", "--action"), help="The action of permissions", nargs="*") -ARG_ROLE_ACTION_REQUIRED = Arg(("-a", "--action"), help="The action of permissions", nargs="*", required=True) -ARG_AUTOSCALE = Arg(("-a", "--autoscale"), help="Minimum and Maximum number of worker to autoscale") -ARG_SKIP_SERVE_LOGS = Arg( - ("-s", "--skip-serve-logs"), - default=False, - help="Don't start the serve logs process along with the workers", - action="store_true", -) -ARG_ROLE_IMPORT = Arg(("file",), help="Import roles from JSON file", nargs=None) -ARG_ROLE_EXPORT = Arg(("file",), help="Export all roles to JSON file", nargs=None) -ARG_ROLE_EXPORT_FMT = Arg( - ("-p", "--pretty"), - help="Format output JSON file by sorting role names and indenting by 4 spaces", - action="store_true", -) - # info ARG_ANONYMIZE = Arg( ("--anonymize",), @@ -1024,11 +960,6 @@ def string_lower_type(val): help="If passed, this command will be successful even if multiple matching alive jobs are found.", ) -# sync-perm -ARG_INCLUDE_DAGS = Arg( - ("--include-dags",), help="If passed, DAG specific permissions will also be synced.", action="store_true" -) - # triggerer ARG_CAPACITY = Arg( ("--capacity",), @@ -1839,115 +1770,6 @@ class GroupCommand(NamedTuple): ) -USERS_COMMANDS = ( - ActionCommand( - name="list", - help="List users", - func=lazy_load_command("airflow.cli.commands.user_command.users_list"), - args=(ARG_OUTPUT, ARG_VERBOSE), - ), - ActionCommand( - name="create", - help="Create a user", - func=lazy_load_command("airflow.cli.commands.user_command.users_create"), - args=( - ARG_ROLE, - ARG_USERNAME, - ARG_EMAIL, - ARG_FIRSTNAME, - ARG_LASTNAME, - ARG_PASSWORD, - ARG_USE_RANDOM_PASSWORD, - ARG_VERBOSE, - ), - epilog=( - "examples:\n" - 'To create an user with "Admin" role and username equals to "admin", run:\n' - "\n" - " $ airflow users create \\\n" - " --username admin \\\n" - " --firstname FIRST_NAME \\\n" - " --lastname LAST_NAME \\\n" - " --role Admin \\\n" - " --email admin@example.org" - ), - ), - ActionCommand( - name="delete", - help="Delete a user", - func=lazy_load_command("airflow.cli.commands.user_command.users_delete"), - args=(ARG_USERNAME_OPTIONAL, ARG_EMAIL_OPTIONAL, ARG_VERBOSE), - ), - ActionCommand( - name="add-role", - help="Add role to a user", - func=lazy_load_command("airflow.cli.commands.user_command.add_role"), - args=(ARG_USERNAME_OPTIONAL, ARG_EMAIL_OPTIONAL, ARG_ROLE, ARG_VERBOSE), - ), - ActionCommand( - name="remove-role", - help="Remove role from a user", - func=lazy_load_command("airflow.cli.commands.user_command.remove_role"), - args=(ARG_USERNAME_OPTIONAL, ARG_EMAIL_OPTIONAL, ARG_ROLE, ARG_VERBOSE), - ), - ActionCommand( - name="import", - help="Import users", - func=lazy_load_command("airflow.cli.commands.user_command.users_import"), - args=(ARG_USER_IMPORT, ARG_VERBOSE), - ), - ActionCommand( - name="export", - help="Export all users", - func=lazy_load_command("airflow.cli.commands.user_command.users_export"), - args=(ARG_USER_EXPORT, ARG_VERBOSE), - ), -) -ROLES_COMMANDS = ( - ActionCommand( - name="list", - help="List roles", - func=lazy_load_command("airflow.cli.commands.role_command.roles_list"), - args=(ARG_PERMISSIONS, ARG_OUTPUT, ARG_VERBOSE), - ), - ActionCommand( - name="create", - help="Create role", - func=lazy_load_command("airflow.cli.commands.role_command.roles_create"), - args=(ARG_ROLES, ARG_VERBOSE), - ), - ActionCommand( - name="delete", - help="Delete role", - func=lazy_load_command("airflow.cli.commands.role_command.roles_delete"), - args=(ARG_ROLES, ARG_VERBOSE), - ), - ActionCommand( - name="add-perms", - help="Add roles permissions", - func=lazy_load_command("airflow.cli.commands.role_command.roles_add_perms"), - args=(ARG_ROLES, ARG_ROLE_RESOURCE, ARG_ROLE_ACTION_REQUIRED, ARG_VERBOSE), - ), - ActionCommand( - name="del-perms", - help="Delete roles permissions", - func=lazy_load_command("airflow.cli.commands.role_command.roles_del_perms"), - args=(ARG_ROLES, ARG_ROLE_RESOURCE, ARG_ROLE_ACTION, ARG_VERBOSE), - ), - ActionCommand( - name="export", - help="Export roles (without permissions) from db to JSON file", - func=lazy_load_command("airflow.cli.commands.role_command.roles_export"), - args=(ARG_ROLE_EXPORT, ARG_ROLE_EXPORT_FMT, ARG_VERBOSE), - ), - ActionCommand( - name="import", - help="Import roles (without permissions) from JSON file to db", - func=lazy_load_command("airflow.cli.commands.role_command.roles_import"), - args=(ARG_ROLE_IMPORT, ARG_VERBOSE), - ), -) - CONFIG_COMMANDS = ( ActionCommand( name="get-value", @@ -2171,22 +1993,6 @@ class GroupCommand(NamedTuple): help="Display providers", subcommands=PROVIDERS_COMMANDS, ), - GroupCommand( - name="users", - help="Manage users", - subcommands=USERS_COMMANDS, - ), - GroupCommand( - name="roles", - help="Manage roles", - subcommands=ROLES_COMMANDS, - ), - ActionCommand( - name="sync-perm", - help="Update permissions for existing roles and optionally DAGs", - func=lazy_load_command("airflow.cli.commands.sync_perm_command.sync_perm"), - args=(ARG_INCLUDE_DAGS, ARG_VERBOSE), - ), ActionCommand( name="rotate-fernet-key", func=lazy_load_command("airflow.cli.commands.rotate_fernet_key_command.rotate_fernet_key"), diff --git a/airflow/cli/cli_parser.py b/airflow/cli/cli_parser.py index ee1e60f1b210a..07c7695f2dddb 100644 --- a/airflow/cli/cli_parser.py +++ b/airflow/cli/cli_parser.py @@ -46,6 +46,7 @@ from airflow.exceptions import AirflowException from airflow.executors.executor_loader import ExecutorLoader from airflow.utils.helpers import partition +from airflow.www.extensions.init_auth_manager import get_auth_manager_cls airflow_commands = core_commands.copy() # make a copy to prevent bad interactions in tests @@ -64,6 +65,13 @@ # Do not re-raise the exception since we want the CLI to still function for # other commands. +try: + auth_mgr = get_auth_manager_cls() + airflow_commands.extend(auth_mgr.get_cli_commands()) +except Exception: + log.exception("cannot load CLI commands from auth manager") + # do not re-raise for the same reason as above + ALL_COMMANDS_DICT: dict[str, CLICommand] = {sp.name: sp for sp in airflow_commands} diff --git a/airflow/cli/commands/dag_command.py b/airflow/cli/commands/dag_command.py index ea06619917f8f..4b54ac2f9f675 100644 --- a/airflow/cli/commands/dag_command.py +++ b/airflow/cli/commands/dag_command.py @@ -240,7 +240,7 @@ def dag_dependencies_show(args) -> None: @providers_configuration_loaded def dag_show(args) -> None: - """Display DAG or saves it's graphic representation to the file.""" + """Display DAG or saves its graphic representation to the file.""" dag = get_dag(args.subdir, args.dag_id) dot = render_dag(dag) filename = args.save diff --git a/airflow/cli/commands/db_command.py b/airflow/cli/commands/db_command.py index be7dff62d74a3..449ba9c26c224 100644 --- a/airflow/cli/commands/db_command.py +++ b/airflow/cli/commands/db_command.py @@ -29,7 +29,7 @@ from airflow import settings from airflow.exceptions import AirflowException from airflow.utils import cli as cli_utils, db -from airflow.utils.db import REVISION_HEADS_MAP +from airflow.utils.db import _REVISION_HEADS_MAP from airflow.utils.db_cleanup import config_dict, drop_archived_tables, export_archived_records, run_cleanup from airflow.utils.process_utils import execute_interactive from airflow.utils.providers_configuration_loader import providers_configuration_loaded @@ -65,6 +65,27 @@ def upgradedb(args): migratedb(args) +def get_version_revision(version: str, recursion_limit=10) -> str | None: + """ + Recursively search for the revision of the given version. + + This searches REVISION_HEADS_MAP for the revision of the given version, recursively + searching for the previous version if the given version is not found. + """ + if version in _REVISION_HEADS_MAP: + return _REVISION_HEADS_MAP[version] + try: + major, minor, patch = map(int, version.split(".")) + except ValueError: + return None + new_version = f"{major}.{minor}.{patch - 1}" + recursion_limit -= 1 + if recursion_limit <= 0: + # Prevent infinite recursion as I can't imagine 10 successive versions without migration + return None + return get_version_revision(new_version, recursion_limit) + + @cli_utils.action_cli(check_db=False) @providers_configuration_loaded def migratedb(args): @@ -85,12 +106,12 @@ def migratedb(args): elif args.from_version: if parse_version(args.from_version) < parse_version("2.0.0"): raise SystemExit("--from-version must be greater or equal to than 2.0.0") - from_revision = REVISION_HEADS_MAP.get(args.from_version) + from_revision = get_version_revision(args.from_version) if not from_revision: raise SystemExit(f"Unknown version {args.from_version!r} supplied as `--from-version`.") if args.to_version: - to_revision = REVISION_HEADS_MAP.get(args.to_version) + to_revision = get_version_revision(args.to_version) if not to_revision: raise SystemExit(f"Upgrading to version {args.to_version} is not supported.") elif args.to_revision: @@ -129,11 +150,11 @@ def downgrade(args): if args.from_revision: from_revision = args.from_revision elif args.from_version: - from_revision = REVISION_HEADS_MAP.get(args.from_version) + from_revision = get_version_revision(args.from_version) if not from_revision: raise SystemExit(f"Unknown version {args.from_version!r} supplied as `--from-version`.") if args.to_version: - to_revision = REVISION_HEADS_MAP.get(args.to_version) + to_revision = get_version_revision(args.to_version) if not to_revision: raise SystemExit(f"Downgrading to version {args.to_version} is not supported.") elif args.to_revision: diff --git a/airflow/cli/commands/pool_command.py b/airflow/cli/commands/pool_command.py index b26f2032a85bc..13386d6fbe8b6 100644 --- a/airflow/cli/commands/pool_command.py +++ b/airflow/cli/commands/pool_command.py @@ -97,7 +97,7 @@ def pool_import(args): if not os.path.exists(args.file): raise SystemExit(f"Missing pools file {args.file}") pools, failed = pool_import_helper(args.file) - if len(failed) > 0: + if failed: raise SystemExit(f"Failed to update pool(s): {', '.join(failed)}") print(f"Uploaded {len(pools)} pool(s)") diff --git a/airflow/cli/commands/standalone_command.py b/airflow/cli/commands/standalone_command.py index ceae1c6dcaec6..0beacb71d159d 100644 --- a/airflow/cli/commands/standalone_command.py +++ b/airflow/cli/commands/standalone_command.py @@ -182,7 +182,7 @@ def initialize_database(self): # server. Thus, we make a random password and store it in AIRFLOW_HOME, # with the reasoning that if you can read that directory, you can see # the database credentials anyway. - from airflow.utils.cli_app_builder import get_application_builder + from airflow.auth.managers.fab.cli_commands.utils import get_application_builder with get_application_builder() as appbuilder: user_exists = appbuilder.sm.find_user("admin") diff --git a/airflow/cli/commands/webserver_command.py b/airflow/cli/commands/webserver_command.py index 00ac66372465c..1714e2bd55031 100644 --- a/airflow/cli/commands/webserver_command.py +++ b/airflow/cli/commands/webserver_command.py @@ -138,7 +138,7 @@ def _get_num_ready_workers_running(self) -> int: def ready_prefix_on_cmdline(proc): try: cmdline = proc.cmdline() - if len(cmdline) > 0: + if cmdline: return settings.GUNICORN_WORKER_READY_PREFIX in cmdline[0] except psutil.NoSuchProcess: pass diff --git a/airflow/dag_processing/processor.py b/airflow/dag_processing/processor.py index 64858be94ed9e..e38bae653c540 100644 --- a/airflow/dag_processing/processor.py +++ b/airflow/dag_processing/processor.py @@ -826,7 +826,7 @@ def process_file( Stats.incr("dag_file_refresh_error", 1, 1, tags={"file_path": file_path}) return 0, 0 - if len(dagbag.dags) > 0: + if dagbag.dags: self.log.info("DAG(s) %s retrieved from %s", dagbag.dags.keys(), file_path) else: self.log.warning("No viable dags retrieved from %s", file_path) diff --git a/airflow/example_dags/example_setup_teardown_taskflow.py b/airflow/example_dags/example_setup_teardown_taskflow.py index 4dcdbf253f6ca..b1bdbccf7fadc 100644 --- a/airflow/example_dags/example_setup_teardown_taskflow.py +++ b/airflow/example_dags/example_setup_teardown_taskflow.py @@ -50,7 +50,7 @@ def my_third_task(): # The method `as_teardown` will mark task_3 as teardown, task_1 as setup, and # arrow task_1 >> task_3. - # Now if you clear task_2, then it's setup task, task_1, will be cleared in + # Now if you clear task_2, then its setup task, task_1, will be cleared in # addition to its teardown task, task_3 # it's also possible to use a decorator to mark a task as setup or diff --git a/airflow/jobs/local_task_job_runner.py b/airflow/jobs/local_task_job_runner.py index c142504639328..d04af55dcedc3 100644 --- a/airflow/jobs/local_task_job_runner.py +++ b/airflow/jobs/local_task_job_runner.py @@ -43,7 +43,7 @@ an attempt by a program/library to write or read outside its allocated memory. In Python environment usually this signal refers to libraries which use low level C API. -Make sure that you use use right libraries/Docker Images +Make sure that you use right libraries/Docker Images for your architecture (Intel/ARM) and/or Operational System (Linux/macOS). Suggested way to debug diff --git a/airflow/jobs/scheduler_job_runner.py b/airflow/jobs/scheduler_job_runner.py index f128c7857351e..50f78a48a9680 100644 --- a/airflow/jobs/scheduler_job_runner.py +++ b/airflow/jobs/scheduler_job_runner.py @@ -1752,7 +1752,7 @@ def _cleanup_stale_dags(self, session: Session = NEW_SESSION) -> None: Find all dags that were not updated by Dag Processor recently and mark them as inactive. In case one of DagProcessors is stopped (in case there are multiple of them - for different dag folders), it's dags are never marked as inactive. + for different dag folders), its dags are never marked as inactive. Also remove dags from SerializedDag table. Executed on schedule only if [scheduler]standalone_dag_processor is True. """ diff --git a/airflow/jobs/triggerer_job_runner.py b/airflow/jobs/triggerer_job_runner.py index 81ac8c178f776..298b6cf38843d 100644 --- a/airflow/jobs/triggerer_job_runner.py +++ b/airflow/jobs/triggerer_job_runner.py @@ -508,7 +508,7 @@ async def cancel_triggers(self): """ Drain the to_cancel queue and ensure all triggers that are not in the DB are cancelled. - This allows the the cleanup job to delete them. + This allows the cleanup job to delete them. """ while self.to_cancel: trigger_id = self.to_cancel.popleft() diff --git a/airflow/listeners/listener.py b/airflow/listeners/listener.py index dd5481cba1b34..eb738c3e918bf 100644 --- a/airflow/listeners/listener.py +++ b/airflow/listeners/listener.py @@ -46,7 +46,7 @@ def __init__(self): @property def has_listeners(self) -> bool: - return len(self.pm.get_plugins()) > 0 + return bool(self.pm.get_plugins()) @property def hook(self) -> _HookRelay: diff --git a/airflow/models/baseoperator.py b/airflow/models/baseoperator.py index b3167fda06706..0c8c3d3149672 100644 --- a/airflow/models/baseoperator.py +++ b/airflow/models/baseoperator.py @@ -391,7 +391,7 @@ def apply_defaults(self: BaseOperator, *args: Any, **kwargs: Any) -> Any: from airflow.models.dag import DagContext from airflow.utils.task_group import TaskGroupContext - if len(args) > 0: + if args: raise AirflowException("Use keyword arguments when initializing operators") instantiated_from_mapped = kwargs.pop( diff --git a/airflow/models/dag.py b/airflow/models/dag.py index 80b29f8e4ce55..cc2626278f2ee 100644 --- a/airflow/models/dag.py +++ b/airflow/models/dag.py @@ -1583,7 +1583,7 @@ def get_task_instances_before( .limit(num) ).all() - if len(execution_dates) == 0: + if not execution_dates: return self.get_task_instances(start_date=base_date, end_date=base_date, session=session) min_date: datetime | None = execution_dates[-1]._mapping.get( @@ -3146,7 +3146,7 @@ def deactivate_unknown_dags(active_dag_ids, session=NEW_SESSION): :param active_dag_ids: list of DAG IDs that are active :return: None """ - if len(active_dag_ids) == 0: + if not active_dag_ids: return for dag in session.scalars(select(DagModel).where(~DagModel.dag_id.in_(active_dag_ids))).all(): dag.is_active = False diff --git a/airflow/models/dagrun.py b/airflow/models/dagrun.py index 99e517656a129..6b3f561ffc8d3 100644 --- a/airflow/models/dagrun.py +++ b/airflow/models/dagrun.py @@ -1320,7 +1320,7 @@ def schedule_tis( """ Set the given task instances in to the scheduled state. - Each element of ``schedulable_tis`` should have it's ``task`` attribute already set. + Each element of ``schedulable_tis`` should have its ``task`` attribute already set. Any EmptyOperator without callbacks or outlets is instead set straight to the success state. diff --git a/airflow/models/taskinstance.py b/airflow/models/taskinstance.py index 975e615d95414..0be2f005bf5fd 100644 --- a/airflow/models/taskinstance.py +++ b/airflow/models/taskinstance.py @@ -2139,7 +2139,7 @@ def get_prev_ds() -> str | None: execution_date = get_prev_execution_date() if execution_date is None: return None - return execution_date.strftime(r"%Y-%m-%d") + return execution_date.strftime("%Y-%m-%d") def get_prev_ds_nodash() -> str | None: prev_ds = get_prev_ds() diff --git a/airflow/provider_info.schema.json b/airflow/provider_info.schema.json index 3a37810f7a04b..b97bd44fe4daf 100644 --- a/airflow/provider_info.schema.json +++ b/airflow/provider_info.schema.json @@ -25,6 +25,60 @@ "deprecatedVersion": "2.2.0" } }, + "transfers": { + "type": "array", + "items": { + "type": "object", + "properties": { + "how-to-guide": { + "description": "Path to how-to-guide for the transfer. The path must start with '/docs/'", + "type": "string" + }, + "source-integration-name": { + "type": "string", + "description": "Integration name. It must have a matching item in the 'integration' section of any provider." + }, + "target-integration-name": { + "type": "string", + "description": "Target integration name. It must have a matching item in the 'integration' section of any provider." + }, + "python-module": { + "type": "string", + "description": "List of python modules containing the transfers." + } + }, + "additionalProperties": false, + "required": [ + "source-integration-name", + "target-integration-name", + "python-module" + ] + } + }, + "triggers": { + "type": "array", + "items": { + "type": "object", + "properties": { + "integration-name": { + "type": "string", + "description": "Integration name. It must have a matching item in the 'integration' section of any provider." + }, + "python-modules": { + "description": "List of Python modules containing the triggers.", + "type": "array", + "items": { + "type": "string" + } + } + }, + "additionalProperties": false, + "required": [ + "integration-name", + "python-modules" + ] + } + }, "connection-types": { "type": "array", "description": "Map of connection types mapped to hook class names.", diff --git a/airflow/providers/alibaba/cloud/hooks/analyticdb_spark.py b/airflow/providers/alibaba/cloud/hooks/analyticdb_spark.py index 9881ca38ae4cd..e06ee912281bb 100644 --- a/airflow/providers/alibaba/cloud/hooks/analyticdb_spark.py +++ b/airflow/providers/alibaba/cloud/hooks/analyticdb_spark.py @@ -321,7 +321,7 @@ def _validate_extra_conf(conf: dict[Any, Any]) -> bool: if conf: if not isinstance(conf, dict): raise ValueError("'conf' argument must be a dict") - if not all((v and isinstance(v, str)) or isinstance(v, int) for v in conf.values()): + if not all(isinstance(v, (str, int)) and v != "" for v in conf.values()): raise ValueError("'conf' values must be either strings or ints") return True diff --git a/airflow/providers/amazon/aws/hooks/appflow.py b/airflow/providers/amazon/aws/hooks/appflow.py index 71e7ddd1c7f46..c9741278c00a7 100644 --- a/airflow/providers/amazon/aws/hooks/appflow.py +++ b/airflow/providers/amazon/aws/hooks/appflow.py @@ -20,6 +20,7 @@ from typing import TYPE_CHECKING from airflow.providers.amazon.aws.hooks.base_aws import AwsBaseHook +from airflow.providers.amazon.aws.utils.waiter_with_logging import wait if TYPE_CHECKING: from mypy_boto3_appflow.client import AppflowClient @@ -49,13 +50,20 @@ def conn(self) -> AppflowClient: """Get the underlying boto3 Appflow client (cached).""" return super().conn - def run_flow(self, flow_name: str, poll_interval: int = 20, wait_for_completion: bool = True) -> str: + def run_flow( + self, + flow_name: str, + poll_interval: int = 20, + wait_for_completion: bool = True, + max_attempts: int = 60, + ) -> str: """ Execute an AppFlow run. :param flow_name: The flow name :param poll_interval: Time (seconds) to wait between two consecutive calls to check the run status :param wait_for_completion: whether to wait for the run to end to return + :param max_attempts: the number of polls to do before timing out/returning a failure. :return: The run execution ID """ response_start = self.conn.start_flow(flowName=flow_name) @@ -63,9 +71,17 @@ def run_flow(self, flow_name: str, poll_interval: int = 20, wait_for_completion: self.log.info("executionId: %s", execution_id) if wait_for_completion: - self.get_waiter("run_complete", {"EXECUTION_ID": execution_id}).wait( - flowName=flow_name, - WaiterConfig={"Delay": poll_interval}, + wait( + waiter=self.get_waiter("run_complete", {"EXECUTION_ID": execution_id}), + waiter_delay=poll_interval, + waiter_max_attempts=max_attempts, + args={"flowName": flow_name}, + failure_message="error while waiting for flow to complete", + status_message="waiting for flow completion, status", + status_args=[ + f"flowExecutions[?executionId=='{execution_id}'].executionStatus", + f"flowExecutions[?executionId=='{execution_id}'].executionResult.errorInfo", + ], ) self._log_execution_description(flow_name, execution_id) diff --git a/airflow/providers/amazon/aws/hooks/quicksight.py b/airflow/providers/amazon/aws/hooks/quicksight.py index 02ea582631a69..586a73b97b750 100644 --- a/airflow/providers/amazon/aws/hooks/quicksight.py +++ b/airflow/providers/amazon/aws/hooks/quicksight.py @@ -152,7 +152,7 @@ def wait_for_state( :param target_state: Describes the QuickSight Job's Target State :param check_interval: the time interval in seconds which the operator will check the status of QuickSight Ingestion - :return: response of describe_ingestion call after Ingestion is is done + :return: response of describe_ingestion call after Ingestion is done """ while True: status = self.get_status(aws_account_id, data_set_id, ingestion_id) diff --git a/airflow/providers/amazon/aws/hooks/sagemaker.py b/airflow/providers/amazon/aws/hooks/sagemaker.py index b8247d9de97bb..364e073b1a8d3 100644 --- a/airflow/providers/amazon/aws/hooks/sagemaker.py +++ b/airflow/providers/amazon/aws/hooks/sagemaker.py @@ -128,10 +128,8 @@ def secondary_training_status_message( status_strs = [] for transition in transitions_to_print: message = transition["StatusMessage"] - time_str = timezone.convert_to_utc(cast(datetime, job_description["LastModifiedTime"])).strftime( - "%Y-%m-%d %H:%M:%S" - ) - status_strs.append(f"{time_str} {transition['Status']} - {message}") + time_utc = timezone.convert_to_utc(cast(datetime, job_description["LastModifiedTime"])) + status_strs.append(f"{time_utc:%Y-%m-%d %H:%M:%S} {transition['Status']} - {message}") return "\n".join(status_strs) diff --git a/airflow/providers/amazon/aws/operators/appflow.py b/airflow/providers/amazon/aws/operators/appflow.py index f2fe75f39501e..7b7402acde887 100644 --- a/airflow/providers/amazon/aws/operators/appflow.py +++ b/airflow/providers/amazon/aws/operators/appflow.py @@ -16,12 +16,13 @@ # under the License. from __future__ import annotations +import warnings from datetime import datetime, timedelta from functools import cached_property from time import sleep from typing import TYPE_CHECKING, cast -from airflow.exceptions import AirflowException +from airflow.exceptions import AirflowException, AirflowProviderDeprecationWarning from airflow.models import BaseOperator from airflow.operators.python import ShortCircuitOperator from airflow.providers.amazon.aws.hooks.appflow import AppflowHook @@ -51,6 +52,7 @@ class AppflowBaseOperator(BaseOperator): :param source_field: The field name to apply filters :param filter_date: The date value (or template) to be used in filters. :param poll_interval: how often in seconds to check the query status + :param max_attempts: how many times to check for status before timing out :param aws_conn_id: aws connection to use :param region: aws region to use :param wait_for_completion: whether to wait for the run to end to return @@ -58,29 +60,33 @@ class AppflowBaseOperator(BaseOperator): ui_color = "#2bccbd" + template_fields = ("flow_name", "source", "source_field", "filter_date") + UPDATE_PROPAGATION_TIME: int = 15 def __init__( self, - source: str, flow_name: str, flow_update: bool, + source: str | None = None, source_field: str | None = None, filter_date: str | None = None, poll_interval: int = 20, + max_attempts: int = 60, aws_conn_id: str = "aws_default", region: str | None = None, wait_for_completion: bool = True, **kwargs, ) -> None: super().__init__(**kwargs) - if source not in SUPPORTED_SOURCES: + if source is not None and source not in SUPPORTED_SOURCES: raise ValueError(f"{source} is not a supported source (options: {SUPPORTED_SOURCES})!") self.filter_date = filter_date self.flow_name = flow_name self.source = source self.source_field = source_field self.poll_interval = poll_interval + self.max_attempts = max_attempts self.aws_conn_id = aws_conn_id self.region = region self.flow_update = flow_update @@ -95,7 +101,8 @@ def execute(self, context: Context) -> None: self.filter_date_parsed: datetime | None = ( datetime.fromisoformat(self.filter_date) if self.filter_date else None ) - self.connector_type = self._get_connector_type() + if self.source is not None: + self.connector_type = self._get_connector_type() if self.flow_update: self._update_flow() # while schedule flows will pick up the update right away, on-demand flows might use out of date @@ -118,6 +125,7 @@ def _run_flow(self, context) -> str: execution_id = self.hook.run_flow( flow_name=self.flow_name, poll_interval=self.poll_interval, + max_attempts=self.max_attempts, wait_for_completion=self.wait_for_completion, ) task_instance = context["task_instance"] @@ -127,13 +135,13 @@ def _run_flow(self, context) -> str: class AppflowRunOperator(AppflowBaseOperator): """ - Execute a Appflow run with filters as is. + Execute a Appflow run as is. .. seealso:: For more information on how to use this operator, take a look at the guide: :ref:`howto/operator:AppflowRunOperator` - :param source: The source name (Supported: salesforce, zendesk) + :param source: Obsolete, unnecessary for this operator :param flow_name: The flow name :param poll_interval: how often in seconds to check the query status :param aws_conn_id: aws connection to use @@ -143,18 +151,21 @@ class AppflowRunOperator(AppflowBaseOperator): def __init__( self, - source: str, flow_name: str, + source: str | None = None, poll_interval: int = 20, aws_conn_id: str = "aws_default", region: str | None = None, wait_for_completion: bool = True, **kwargs, ) -> None: - if source not in {"salesforce", "zendesk"}: - raise ValueError(NOT_SUPPORTED_SOURCE_MSG.format(source=source, entity="AppflowRunOperator")) + if source is not None: + warnings.warn( + "The `source` parameter is unused when simply running a flow, please remove it.", + AirflowProviderDeprecationWarning, + stacklevel=2, + ) super().__init__( - source=source, flow_name=flow_name, flow_update=False, source_field=None, @@ -227,8 +238,6 @@ class AppflowRunBeforeOperator(AppflowBaseOperator): :param wait_for_completion: whether to wait for the run to end to return """ - template_fields = ("filter_date",) - def __init__( self, source: str, @@ -297,8 +306,6 @@ class AppflowRunAfterOperator(AppflowBaseOperator): :param wait_for_completion: whether to wait for the run to end to return """ - template_fields = ("filter_date",) - def __init__( self, source: str, @@ -365,8 +372,6 @@ class AppflowRunDailyOperator(AppflowBaseOperator): :param wait_for_completion: whether to wait for the run to end to return """ - template_fields = ("filter_date",) - def __init__( self, source: str, diff --git a/airflow/providers/amazon/aws/operators/eks.py b/airflow/providers/amazon/aws/operators/eks.py index 557bf29f5feed..98fbc66da43b4 100644 --- a/airflow/providers/amazon/aws/operators/eks.py +++ b/airflow/providers/amazon/aws/operators/eks.py @@ -987,7 +987,7 @@ class EksPodOperator(KubernetesPodOperator): empty, then the default boto3 configuration would be used (and must be maintained on each worker node). :param on_finish_action: What to do when the pod reaches its final state, or the execution is interrupted. - If "delete_pod", the pod will be deleted regardless it's state; if "delete_succeeded_pod", + If "delete_pod", the pod will be deleted regardless its state; if "delete_succeeded_pod", only succeeded pod will be deleted. You can set to "keep_pod" to keep the pod. Current default is `keep_pod`, but this will be changed in the next major release of this provider. :param is_delete_operator_pod: What to do when the pod reaches its final diff --git a/airflow/providers/amazon/aws/triggers/README.md b/airflow/providers/amazon/aws/triggers/README.md index cd0c0baae5d53..bd998a5a294ff 100644 --- a/airflow/providers/amazon/aws/triggers/README.md +++ b/airflow/providers/amazon/aws/triggers/README.md @@ -38,7 +38,7 @@ The first step to making an existing operator deferrable is to add `deferrable` The next step is to determine where the operator should be deferred. This will be dependent on what the operator does, and how it is written. Although every operator is different, there are a few guidelines to determine the best place to defer an operator. 1. If the operator has a `wait_for_completion` parameter, the `self.defer` method should be called right before the check for wait_for_completion . -2. If there is no `wait_for_completion` , look for the "main" task that the operator does. Often, operators will make various describe calls to to the boto3 API to verify certain conditions, or look up some information before performing its "main" task. Often, right after the "main" call to the boto3 API is made is a good place to call `self.defer`. +2. If there is no `wait_for_completion` , look for the "main" task that the operator does. Often, operators will make various describe calls to the boto3 API to verify certain conditions, or look up some information before performing its "main" task. Often, right after the "main" call to the boto3 API is made is a good place to call `self.defer`. Once the location to defer is decided in the operator, call the `self.defer` method if the `deferrable` flag is `True`. The `self.defer` method takes in several parameters, listed below: diff --git a/airflow/providers/amazon/aws/waiters/appflow.json b/airflow/providers/amazon/aws/waiters/appflow.json index f45c427467901..ed239a4048320 100644 --- a/airflow/providers/amazon/aws/waiters/appflow.json +++ b/airflow/providers/amazon/aws/waiters/appflow.json @@ -10,13 +10,13 @@ "expected": "Successful", "matcher": "path", "state": "success", - "argument": "flowExecutions[?executionId=='{{EXECUTION_ID}}'].executionStatus" + "argument": "flowExecutions[?executionId=='{{EXECUTION_ID}}'].executionStatus | [0]" }, { "expected": "Error", "matcher": "path", "state": "failure", - "argument": "flowExecutions[?executionId=='{{EXECUTION_ID}}'].executionStatus" + "argument": "flowExecutions[?executionId=='{{EXECUTION_ID}}'].executionStatus | [0]" }, { "expected": true, diff --git a/airflow/providers/amazon/aws/waiters/batch.json b/airflow/providers/amazon/aws/waiters/batch.json index 3fbdd433771c8..a8cd489ea3809 100644 --- a/airflow/providers/amazon/aws/waiters/batch.json +++ b/airflow/providers/amazon/aws/waiters/batch.json @@ -17,7 +17,7 @@ "argument": "jobs[].status", "expected": "FAILED", "matcher": "pathAll", - "state": "failed" + "state": "failure" } ] }, @@ -37,13 +37,13 @@ "argument": "computeEnvironments[].status", "expected": "INVALID", "matcher": "pathAny", - "state": "failed" + "state": "failure" }, { "argument": "computeEnvironments[].status", "expected": "DELETED", "matcher": "pathAny", - "state": "failed" + "state": "failure" } ] } diff --git a/airflow/providers/amazon/provider.yaml b/airflow/providers/amazon/provider.yaml index 6aa32825105bf..2959764e00237 100644 --- a/airflow/providers/amazon/provider.yaml +++ b/airflow/providers/amazon/provider.yaml @@ -67,7 +67,19 @@ dependencies: - apache-airflow>=2.4.0 - apache-airflow-providers-common-sql>=1.3.1 - apache-airflow-providers-http - - boto3>=1.24.0 + # We should update minimum version of boto3 and here regularly to avoid `pip` backtracking with the number + # of candidates to consider. We should also make sure that all the below related packages have also the + # same minimum version specified. Boto3 1.28.0 has been released on July 6 2023. We should also make sure we + # set it to the version that `aiobotocore` supports (see `aiobotocore` optional dependency at the end + # of this file). Currently we set aiobotocore as minimum 2.5.3 - as this is was the first version + # that supported boto3 1.28. NOTE!!! BOTOCORE VERSIONS ARE SHIFTED BY 3 MINOR VERSIONS + - boto3>=1.28.0 + - mypy-boto3-rds>=1.28.0 + - mypy-boto3-redshift-data>=1.28.0 + - mypy-boto3-s3>=1.28.0 + - mypy-boto3-appflow>=1.28.0 + # NOTE!!! BOTOCORE VERSIONS ARE SHIFTED BY 3 MINOR VERSIONS + - botocore>=1.31.0 - asgiref # watchtower 3 has been released end Jan and introduced breaking change across the board that might # change logging behaviour: @@ -77,11 +89,7 @@ dependencies: - jsonpath_ng>=1.5.3 - redshift_connector>=2.0.888 - sqlalchemy_redshift>=0.8.6 - - mypy-boto3-rds>=1.24.0 - - mypy-boto3-redshift-data>=1.24.0 - - mypy-boto3-appflow>=1.28.16 - asgiref - - mypy-boto3-s3>=1.24.0 integrations: - integration-name: Amazon Athena @@ -699,7 +707,7 @@ additional-extras: # boto3 have native async support and we move away from aio aiobotocore - name: aiobotocore dependencies: - - aiobotocore[boto3]>=2.2.0 + - aiobotocore[boto3]>=2.5.3 - name: cncf.kubernetes dependencies: - apache-airflow-providers-cncf-kubernetes>=7.2.0 diff --git a/airflow/providers/apache/beam/hooks/beam.py b/airflow/providers/apache/beam/hooks/beam.py index 762dd2f07b676..72dc224626bba 100644 --- a/airflow/providers/apache/beam/hooks/beam.py +++ b/airflow/providers/apache/beam/hooks/beam.py @@ -104,10 +104,8 @@ def process_fd( fd_to_log = {proc.stderr: log.warning, proc.stdout: log.info} func_log = fd_to_log[fd] - while True: - line = fd.readline().decode() - if not line: - return + for line in iter(fd.readline, b""): + line = line.decode() if process_line_callback: process_line_callback(line) func_log(line.rstrip("\n")) diff --git a/airflow/providers/apache/beam/triggers/beam.py b/airflow/providers/apache/beam/triggers/beam.py index 9c29a9fbe2728..0d201cd8c9a4d 100644 --- a/airflow/providers/apache/beam/triggers/beam.py +++ b/airflow/providers/apache/beam/triggers/beam.py @@ -85,32 +85,29 @@ def serialize(self) -> tuple[str, dict[str, Any]]: async def run(self) -> AsyncIterator[TriggerEvent]: # type: ignore[override] """Get current pipeline status and yields a TriggerEvent.""" hook = self._get_async_hook() - while True: - try: - return_code = await hook.start_python_pipeline_async( - variables=self.variables, - py_file=self.py_file, - py_options=self.py_options, - py_interpreter=self.py_interpreter, - py_requirements=self.py_requirements, - py_system_site_packages=self.py_system_site_packages, + try: + return_code = await hook.start_python_pipeline_async( + variables=self.variables, + py_file=self.py_file, + py_options=self.py_options, + py_interpreter=self.py_interpreter, + py_requirements=self.py_requirements, + py_system_site_packages=self.py_system_site_packages, + ) + except Exception as e: + self.log.exception("Exception occurred while checking for pipeline state") + yield TriggerEvent({"status": "error", "message": str(e)}) + else: + if return_code == 0: + yield TriggerEvent( + { + "status": "success", + "message": "Pipeline has finished SUCCESSFULLY", + } ) - if return_code == 0: - yield TriggerEvent( - { - "status": "success", - "message": "Pipeline has finished SUCCESSFULLY", - } - ) - return - else: - yield TriggerEvent({"status": "error", "message": "Operation failed"}) - return - - except Exception as e: - self.log.exception("Exception occurred while checking for pipeline state") - yield TriggerEvent({"status": "error", "message": str(e)}) - return + else: + yield TriggerEvent({"status": "error", "message": "Operation failed"}) + return def _get_async_hook(self) -> BeamAsyncHook: return BeamAsyncHook(runner=self.runner) diff --git a/airflow/providers/apache/hdfs/CHANGELOG.rst b/airflow/providers/apache/hdfs/CHANGELOG.rst index 37de3a05cc49f..8a17a7f4a6924 100644 --- a/airflow/providers/apache/hdfs/CHANGELOG.rst +++ b/airflow/providers/apache/hdfs/CHANGELOG.rst @@ -75,7 +75,7 @@ you can use 3.* version of the provider, but the recommendation is to switch to Protobuf 3 required by the snakebite-py3 library has ended its life in June 2023 and Airflow and it's providers stopped supporting it. If you would like to continue using HDFS hooks and sensors based on snakebite-py3 library when you have protobuf library 4.+ you can install the 3.* version - of the provider but due to Protobuf incompatibility, you need to do one of the the two things: + of the provider but due to Protobuf incompatibility, you need to do one of the two things: * set ``PROTOCOL_BUFFERS_PYTHON_IMPLEMENTATION=python`` variable in your environment. * downgrade protobuf to latest 3.* version (3.20.3 at this time) diff --git a/airflow/providers/apache/hive/hooks/hive.py b/airflow/providers/apache/hive/hooks/hive.py index ea004860b4944..7f02619024fdd 100644 --- a/airflow/providers/apache/hive/hooks/hive.py +++ b/airflow/providers/apache/hive/hooks/hive.py @@ -277,13 +277,11 @@ def run_cli( ) self.sub_process = sub_process stdout = "" - while True: - line = sub_process.stdout.readline() - if not line: - break - stdout += line.decode("UTF-8") + for line in iter(sub_process.stdout.readline, b""): + line = line.decode() + stdout += line if verbose: - self.log.info(line.decode("UTF-8").strip()) + self.log.info(line.strip()) sub_process.wait() if sub_process.returncode: @@ -704,25 +702,20 @@ def _get_max_partition_from_part_specs( # Assuming all specs have the same keys. if partition_key not in part_specs[0].keys(): raise AirflowException(f"Provided partition_key {partition_key} is not in part_specs.") - is_subset = None - if filter_map: - is_subset = set(filter_map.keys()).issubset(set(part_specs[0].keys())) - if filter_map and not is_subset: + if filter_map and not set(filter_map).issubset(part_specs[0]): raise AirflowException( f"Keys in provided filter_map {', '.join(filter_map.keys())} " f"are not subset of part_spec keys: {', '.join(part_specs[0].keys())}" ) - candidates = [ - p_dict[partition_key] - for p_dict in part_specs - if filter_map is None or all(item in p_dict.items() for item in filter_map.items()) - ] - - if not candidates: - return None - else: - return max(candidates) + return max( + ( + p_dict[partition_key] + for p_dict in part_specs + if filter_map is None or all(item in p_dict.items() for item in filter_map.items()) + ), + default=None, + ) def max_partition( self, diff --git a/airflow/providers/apache/kafka/triggers/await_message.py b/airflow/providers/apache/kafka/triggers/await_message.py index f2c1bb81ca6b6..8f89d2ba1f180 100644 --- a/airflow/providers/apache/kafka/triggers/await_message.py +++ b/airflow/providers/apache/kafka/triggers/await_message.py @@ -49,7 +49,7 @@ class AwaitMessageTrigger(BaseTrigger): defaults to None :param poll_timeout: How long the Kafka client should wait before returning from a poll request to Kafka (seconds), defaults to 1 - :param poll_interval: How long the the trigger should sleep after reaching the end of the Kafka log + :param poll_interval: How long the trigger should sleep after reaching the end of the Kafka log (seconds), defaults to 5 """ diff --git a/airflow/providers/apache/livy/hooks/livy.py b/airflow/providers/apache/livy/hooks/livy.py index ede3d2eb985e5..ba2ff1bb1318f 100644 --- a/airflow/providers/apache/livy/hooks/livy.py +++ b/airflow/providers/apache/livy/hooks/livy.py @@ -432,7 +432,7 @@ def _validate_list_of_stringables(vals: Sequence[str | int | float]) -> bool: if ( vals is None or not isinstance(vals, (tuple, list)) - or any(1 for val in vals if not isinstance(val, (str, int, float))) + or not all(isinstance(val, (str, int, float)) for val in vals) ): raise ValueError("List of strings expected") return True @@ -448,7 +448,7 @@ def _validate_extra_conf(conf: dict[Any, Any]) -> bool: if conf: if not isinstance(conf, dict): raise ValueError("'conf' argument must be a dict") - if not all((v and isinstance(v, str)) or isinstance(v, int) for v in conf.values()): + if not all(isinstance(v, (str, int)) and v != "" for v in conf.values()): raise ValueError("'conf' values must be either strings or ints") return True @@ -542,8 +542,7 @@ async def _do_api_call_async( else: return {"Response": f"Unexpected HTTP Method: {self.method}", "status": "error"} - attempt_num = 1 - while True: + for attempt_num in range(1, 1 + self.retry_limit): response = await request_func( url, json=data if self.method in ("POST", "PATCH") else None, @@ -568,7 +567,6 @@ async def _do_api_call_async( # Don't retry. return {"Response": {e.message}, "Status Code": {e.status}, "status": "error"} - attempt_num += 1 await asyncio.sleep(self.retry_delay) def _generate_base_url(self, conn: Connection) -> str: @@ -815,7 +813,7 @@ def _validate_list_of_stringables(vals: Sequence[str | int | float]) -> bool: if ( vals is None or not isinstance(vals, (tuple, list)) - or any(1 for val in vals if not isinstance(val, (str, int, float))) + or not all(isinstance(val, (str, int, float)) for val in vals) ): raise ValueError("List of strings expected") return True @@ -831,6 +829,6 @@ def _validate_extra_conf(conf: dict[Any, Any]) -> bool: if conf: if not isinstance(conf, dict): raise ValueError("'conf' argument must be a dict") - if not all((v and isinstance(v, str)) or isinstance(v, int) for v in conf.values()): + if not all(isinstance(v, (str, int)) and v != "" for v in conf.values()): raise ValueError("'conf' values must be either strings or ints") return True diff --git a/airflow/providers/apache/pinot/hooks/pinot.py b/airflow/providers/apache/pinot/hooks/pinot.py index 0246a5d3a0fee..0fe11a04e4b82 100644 --- a/airflow/providers/apache/pinot/hooks/pinot.py +++ b/airflow/providers/apache/pinot/hooks/pinot.py @@ -56,6 +56,11 @@ class PinotAdminHook(BaseHook): "Exception" is in the output message. """ + conn_name_attr = "conn_id" + default_conn_name = "pinot_admin_default" + conn_type = "pinot_admin" + hook_name = "Pinot Admin" + def __init__( self, conn_id: str = "pinot_admin_default", @@ -258,6 +263,8 @@ class PinotDbApiHook(DbApiHook): conn_name_attr = "pinot_broker_conn_id" default_conn_name = "pinot_broker_default" + conn_type = "pinot" + hook_name = "Pinot Broker" supports_autocommit = False def get_conn(self) -> Any: diff --git a/airflow/providers/apache/pinot/provider.yaml b/airflow/providers/apache/pinot/provider.yaml index 51146a1e6aa69..3c10b9c05f818 100644 --- a/airflow/providers/apache/pinot/provider.yaml +++ b/airflow/providers/apache/pinot/provider.yaml @@ -59,5 +59,7 @@ hooks: - airflow.providers.apache.pinot.hooks.pinot connection-types: - - hook-class-name: airflow.providers.apache.pinot.hooks.pinot.PinotHook + - hook-class-name: airflow.providers.apache.pinot.hooks.pinot.PinotDbApiHook connection-type: pinot + - hook-class-name: airflow.providers.apache.pinot.hooks.pinot.PinotAdminHook + connection-type: pinot_admin diff --git a/airflow/providers/apache/spark/hooks/spark_sql.py b/airflow/providers/apache/spark/hooks/spark_sql.py index 6864aa52fe0c8..41dc741ccdd39 100644 --- a/airflow/providers/apache/spark/hooks/spark_sql.py +++ b/airflow/providers/apache/spark/hooks/spark_sql.py @@ -134,7 +134,7 @@ def _prepare_command(self, cmd: str | list[str]) -> list[str]: connection_cmd += ["--num-executors", str(self._num_executors)] if self._sql: sql = self._sql.strip() - if sql.endswith(".sql") or sql.endswith(".hql"): + if sql.endswith((".sql", ".hql")): connection_cmd += ["-f", sql] else: connection_cmd += ["-e", sql] diff --git a/airflow/providers/celery/executors/celery_executor.py b/airflow/providers/celery/executors/celery_executor.py index 8bdff2a25e727..cc1b6e8122744 100644 --- a/airflow/providers/celery/executors/celery_executor.py +++ b/airflow/providers/celery/executors/celery_executor.py @@ -37,7 +37,6 @@ try: from airflow.cli.cli_config import ( - ARG_AUTOSCALE, ARG_DAEMON, ARG_LOG_FILE, ARG_PID, @@ -143,6 +142,7 @@ def __getattr__(name): ) # worker cli args +ARG_AUTOSCALE = Arg(("-a", "--autoscale"), help="Minimum and Maximum number of worker to autoscale") ARG_QUEUES = Arg( ("-q", "--queues"), help="Comma delimited list of queues to serve", diff --git a/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py b/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py index 8e95f7a6c9387..5bc39625da97c 100644 --- a/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py +++ b/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py @@ -37,7 +37,7 @@ from sqlalchemy import select, update from sqlalchemy.orm import Session -from airflow import AirflowException +from airflow.exceptions import AirflowException try: from airflow.cli.cli_config import ( diff --git a/airflow/providers/cncf/kubernetes/operators/pod.py b/airflow/providers/cncf/kubernetes/operators/pod.py index 7a1cd975e6931..e32e8b4c1f2f7 100644 --- a/airflow/providers/cncf/kubernetes/operators/pod.py +++ b/airflow/providers/cncf/kubernetes/operators/pod.py @@ -22,6 +22,7 @@ import logging import re import secrets +import shlex import string import warnings from collections.abc import Container @@ -29,7 +30,8 @@ from functools import cached_property from typing import TYPE_CHECKING, Any, Iterable, Sequence -from kubernetes.client import CoreV1Api, models as k8s +from kubernetes.client import CoreV1Api, V1Pod, models as k8s +from kubernetes.stream import stream from slugify import slugify from urllib3.exceptions import HTTPError @@ -59,6 +61,7 @@ PodManager, PodOperatorHookProtocol, PodPhase, + container_is_succeeded, get_container_termination_message, ) from airflow.settings import pod_mutation_hook @@ -173,7 +176,7 @@ class KubernetesPodOperator(BaseOperator): They can be exposed as environment vars or files in a volume. :param in_cluster: run kubernetes client with in_cluster configuration. :param cluster_context: context that points to kubernetes cluster. - Ignored when in_cluster is True. If None, current-context is used. + Ignored when in_cluster is True. If None, current-context is used. (templated) :param reattach_on_restart: if the worker dies while the pod is running, reattach and monitor during the next try. If False, always create a new pod for each try. :param labels: labels to apply to the Pod. (templated) @@ -232,7 +235,7 @@ class KubernetesPodOperator(BaseOperator): :param poll_interval: Polling period in seconds to check for the status. Used only in deferrable mode. :param log_pod_spec_on_failure: Log the pod's specification if a failure occurs :param on_finish_action: What to do when the pod reaches its final state, or the execution is interrupted. - If "delete_pod", the pod will be deleted regardless it's state; if "delete_succeeded_pod", + If "delete_pod", the pod will be deleted regardless its state; if "delete_succeeded_pod", only succeeded pod will be deleted. You can set to "keep_pod" to keep the pod. :param is_delete_operator_pod: What to do when the pod reaches its final state, or the execution is interrupted. If True (default), delete the @@ -246,7 +249,7 @@ class KubernetesPodOperator(BaseOperator): # This field can be overloaded at the instance level via base_container_name BASE_CONTAINER_NAME = "base" - + ISTIO_CONTAINER_NAME = "istio-proxy" POD_CHECKED_KEY = "already_checked" POST_TERMINATION_TIMEOUT = 120 @@ -262,6 +265,7 @@ class KubernetesPodOperator(BaseOperator): "container_resources", "volumes", "volume_mounts", + "cluster_context", ) template_fields_renderers = {"env_vars": "py"} @@ -604,7 +608,10 @@ def execute_sync(self, context: Context): if self.do_xcom_push: self.pod_manager.await_xcom_sidecar_container_start(pod=self.pod) result = self.extract_xcom(pod=self.pod) - self.remote_pod = self.pod_manager.await_pod_completion(self.pod) + istio_enabled = self.is_istio_enabled(self.pod) + self.remote_pod = self.pod_manager.await_pod_completion( + self.pod, istio_enabled, self.base_container_name + ) finally: self.cleanup( pod=self.pod or self.pod_request_obj, @@ -667,7 +674,8 @@ def execute_complete(self, context: Context, event: dict, **kwargs): xcom_sidecar_output = self.extract_xcom(pod=pod) return xcom_sidecar_output finally: - pod = self.pod_manager.await_pod_completion(pod) + istio_enabled = self.is_istio_enabled(pod) + pod = self.pod_manager.await_pod_completion(pod, istio_enabled, self.base_container_name) if pod is not None: self.post_complete_action( pod=pod, @@ -699,13 +707,16 @@ def post_complete_action(self, *, pod, remote_pod, **kwargs): ) def cleanup(self, pod: k8s.V1Pod, remote_pod: k8s.V1Pod): + istio_enabled = self.is_istio_enabled(remote_pod) pod_phase = remote_pod.status.phase if hasattr(remote_pod, "status") else None # if the pod fails or success, but we don't want to delete it if pod_phase != PodPhase.SUCCEEDED or self.on_finish_action == OnFinishAction.KEEP_POD: self.patch_already_checked(remote_pod, reraise=False) - if pod_phase != PodPhase.SUCCEEDED: + if (pod_phase != PodPhase.SUCCEEDED and not istio_enabled) or ( + istio_enabled and not container_is_succeeded(remote_pod, self.base_container_name) + ): if self.log_events_on_failure: self._read_pod_events(pod, reraise=False) @@ -752,16 +763,61 @@ def _read_pod_events(self, pod, *, reraise=True): for event in self.pod_manager.read_pod_events(pod).items: self.log.error("Pod Event: %s - %s", event.reason, event.message) + def is_istio_enabled(self, pod: V1Pod) -> bool: + """Checks if istio is enabled for the namespace of the pod by inspecting the namespace labels.""" + if not pod: + return False + + remote_pod = self.pod_manager.read_pod(pod) + + for container in remote_pod.spec.containers: + if container.name == self.ISTIO_CONTAINER_NAME: + return True + + return False + + def kill_istio_sidecar(self, pod: V1Pod) -> None: + command = "/bin/sh -c curl -fsI -X POST http://localhost:15020/quitquitquit && exit 0" + command_to_container = shlex.split(command) + try: + resp = stream( + self.client.connect_get_namespaced_pod_exec, + name=pod.metadata.name, + namespace=pod.metadata.namespace, + container=self.ISTIO_CONTAINER_NAME, + command=command_to_container, + stderr=True, + stdin=True, + stdout=True, + tty=False, + _preload_content=False, + ) + resp.close() + except Exception as e: + self.log.error("Error while deleting istio-proxy sidecar: %s", e) + raise e + def process_pod_deletion(self, pod: k8s.V1Pod, *, reraise=True): + istio_enabled = self.is_istio_enabled(pod) with _optionally_suppress(reraise=reraise): if pod is not None: - should_delete_pod = (self.on_finish_action == OnFinishAction.DELETE_POD) or ( - self.on_finish_action == OnFinishAction.DELETE_SUCCEEDED_POD - and pod.status.phase == PodPhase.SUCCEEDED + should_delete_pod = ( + (self.on_finish_action == OnFinishAction.DELETE_POD) + or ( + self.on_finish_action == OnFinishAction.DELETE_SUCCEEDED_POD + and pod.status.phase == PodPhase.SUCCEEDED + ) + or ( + self.on_finish_action == OnFinishAction.DELETE_SUCCEEDED_POD + and container_is_succeeded(pod, self.base_container_name) + ) ) - if should_delete_pod: + if should_delete_pod and not istio_enabled: self.log.info("Deleting pod: %s", pod.metadata.name) self.pod_manager.delete_pod(pod) + elif should_delete_pod and istio_enabled: + self.log.info("Deleting istio-proxy sidecar inside %s: ", pod.metadata.name) + self.kill_istio_sidecar(pod) else: self.log.info("Skipping deleting pod: %s", pod.metadata.name) diff --git a/airflow/providers/cncf/kubernetes/provider.yaml b/airflow/providers/cncf/kubernetes/provider.yaml index 11a661e63ecae..491deb6e99dd8 100644 --- a/airflow/providers/cncf/kubernetes/provider.yaml +++ b/airflow/providers/cncf/kubernetes/provider.yaml @@ -100,7 +100,6 @@ integrations: operators: - integration-name: Kubernetes python-modules: - - airflow.providers.cncf.kubernetes.operators.kubernetes_pod - airflow.providers.cncf.kubernetes.operators.pod - airflow.providers.cncf.kubernetes.operators.spark_kubernetes - airflow.providers.cncf.kubernetes.operators.resource diff --git a/airflow/providers/cncf/kubernetes/triggers/pod.py b/airflow/providers/cncf/kubernetes/triggers/pod.py index 7b7a9afe59455..f3f18a660ab97 100644 --- a/airflow/providers/cncf/kubernetes/triggers/pod.py +++ b/airflow/providers/cncf/kubernetes/triggers/pod.py @@ -61,7 +61,7 @@ class KubernetesPodTrigger(BaseTrigger): :param get_logs: get the stdout of the container as logs of the tasks. :param startup_timeout: timeout in seconds to start up the pod. :param on_finish_action: What to do when the pod reaches its final state, or the execution is interrupted. - If "delete_pod", the pod will be deleted regardless it's state; if "delete_succeeded_pod", + If "delete_pod", the pod will be deleted regardless its state; if "delete_succeeded_pod", only succeeded pod will be deleted. You can set to "keep_pod" to keep the pod. :param should_delete_pod: What to do when the pod reaches its final state, or the execution is interrupted. If True (default), delete the diff --git a/airflow/providers/cncf/kubernetes/utils/pod_manager.py b/airflow/providers/cncf/kubernetes/utils/pod_manager.py index 1c2e0ab59719d..77600b4fcf195 100644 --- a/airflow/providers/cncf/kubernetes/utils/pod_manager.py +++ b/airflow/providers/cncf/kubernetes/utils/pod_manager.py @@ -145,6 +145,21 @@ def container_is_completed(pod: V1Pod, container_name: str) -> bool: return container_status.state.terminated is not None +def container_is_succeeded(pod: V1Pod, container_name: str) -> bool: + """ + Examines V1Pod ``pod`` to determine whether ``container_name`` is completed and succeeded. + + If that container is present and completed and succeeded, returns True. Returns False otherwise. + """ + if not container_is_completed(pod, container_name): + return False + + container_status = get_container_status(pod, container_name) + if not container_status: + return False + return container_status.state.terminated.exit_code == 0 + + def container_is_terminated(pod: V1Pod, container_name: str) -> bool: """ Examines V1Pod ``pod`` to determine whether ``container_name`` is terminated. @@ -508,17 +523,23 @@ def await_container_completion(self, pod: V1Pod, container_name: str) -> None: self.log.info("Waiting for container '%s' state to be completed", container_name) time.sleep(1) - def await_pod_completion(self, pod: V1Pod) -> V1Pod: + def await_pod_completion( + self, pod: V1Pod, istio_enabled: bool = False, container_name: str = "base" + ) -> V1Pod: """ Monitors a pod and returns the final state. + :param istio_enabled: whether istio is enabled in the namespace :param pod: pod spec that will be monitored + :param container_name: name of the container within the pod :return: tuple[State, str | None] """ while True: remote_pod = self.read_pod(pod) if remote_pod.status.phase in PodPhase.terminal_states: break + if istio_enabled and container_is_completed(remote_pod, container_name): + break self.log.info("Pod %s has phase %s", pod.metadata.name, remote_pod.status.phase) time.sleep(2) return remote_pod diff --git a/airflow/providers/common/sql/hooks/sql.pyi b/airflow/providers/common/sql/hooks/sql.pyi index 30d8eef488d9b..dedac037dfcb2 100644 --- a/airflow/providers/common/sql/hooks/sql.pyi +++ b/airflow/providers/common/sql/hooks/sql.pyi @@ -18,7 +18,7 @@ # This is automatically generated stub for the `common.sql` provider # # This file is generated automatically by the `update-common-sql-api stubs` pre-commit -# and the .pyi file represents part of the the "public" API that the +# and the .pyi file represents part of the "public" API that the # `common.sql` provider exposes to other providers. # # Any, potentially breaking change in the stubs will require deliberate manual action from the contributor diff --git a/airflow/providers/common/sql/operators/sql.py b/airflow/providers/common/sql/operators/sql.py index cf42fa4f57363..979f815c5bbc0 100644 --- a/airflow/providers/common/sql/operators/sql.py +++ b/airflow/providers/common/sql/operators/sql.py @@ -837,14 +837,13 @@ def execute(self, context: Context): pass_value_conv = _convert_to_float_if_possible(self.pass_value) is_numeric_value_check = isinstance(pass_value_conv, float) - tolerance_pct_str = str(self.tol * 100) + "%" if self.tol is not None else None error_msg = ( "Test failed.\nPass value:{pass_value_conv}\n" "Tolerance:{tolerance_pct_str}\n" "Query:\n{sql}\nResults:\n{records!s}" ).format( pass_value_conv=pass_value_conv, - tolerance_pct_str=tolerance_pct_str, + tolerance_pct_str=f"{self.tol:.1%}" if self.tol is not None else None, sql=self.sql, records=records, ) diff --git a/airflow/providers/common/sql/operators/sql.pyi b/airflow/providers/common/sql/operators/sql.pyi index f2735b4a252ab..d9d099f948941 100644 --- a/airflow/providers/common/sql/operators/sql.pyi +++ b/airflow/providers/common/sql/operators/sql.pyi @@ -18,7 +18,7 @@ # This is automatically generated stub for the `common.sql` provider # # This file is generated automatically by the `update-common-sql-api stubs` pre-commit -# and the .pyi file represents part of the the "public" API that the +# and the .pyi file represents part of the "public" API that the # `common.sql` provider exposes to other providers. # # Any, potentially breaking change in the stubs will require deliberate manual action from the contributor diff --git a/airflow/providers/common/sql/sensors/sql.pyi b/airflow/providers/common/sql/sensors/sql.pyi index ed88c4b8109fe..0343d04871e50 100644 --- a/airflow/providers/common/sql/sensors/sql.pyi +++ b/airflow/providers/common/sql/sensors/sql.pyi @@ -18,7 +18,7 @@ # This is automatically generated stub for the `common.sql` provider # # This file is generated automatically by the `update-common-sql-api stubs` pre-commit -# and the .pyi file represents part of the the "public" API that the +# and the .pyi file represents part of the "public" API that the # `common.sql` provider exposes to other providers. # # Any, potentially breaking change in the stubs will require deliberate manual action from the contributor diff --git a/airflow/providers/elasticsearch/log/es_json_formatter.py b/airflow/providers/elasticsearch/log/es_json_formatter.py index 7ac543c0a3a0d..cf77896a9218e 100644 --- a/airflow/providers/elasticsearch/log/es_json_formatter.py +++ b/airflow/providers/elasticsearch/log/es_json_formatter.py @@ -31,11 +31,7 @@ class ElasticsearchJSONFormatter(JSONFormatter): def formatTime(self, record, datefmt=None): """Return the creation time of the LogRecord in ISO 8601 date/time format in the local time zone.""" dt = pendulum.from_timestamp(record.created, tz=pendulum.local_timezone()) - if datefmt: - s = dt.strftime(datefmt) - else: - s = dt.strftime(self.default_time_format) - + s = dt.strftime(datefmt or self.default_time_format) if self.default_msec_format: s = self.default_msec_format % (s, record.msecs) if self.default_tz_format: diff --git a/airflow/providers/google/cloud/hooks/bigquery.py b/airflow/providers/google/cloud/hooks/bigquery.py index 96340c0d50002..9f4be0d4f8f87 100644 --- a/airflow/providers/google/cloud/hooks/bigquery.py +++ b/airflow/providers/google/cloud/hooks/bigquery.py @@ -3182,7 +3182,6 @@ def value_check( raise AirflowException("The query returned None") pass_value_conv = self._convert_to_float_if_possible(pass_value) is_numeric_value_check = isinstance(pass_value_conv, float) - tolerance_pct_str = str(tolerance * 100) + "%" if tolerance else None error_msg = ( "Test failed.\nPass value:{pass_value_conv}\n" @@ -3190,7 +3189,7 @@ def value_check( "Query:\n{sql}\nResults:\n{records!s}" ).format( pass_value_conv=pass_value_conv, - tolerance_pct_str=tolerance_pct_str, + tolerance_pct_str=f"{tolerance:.1%}" if tolerance else None, sql=sql, records=records, ) diff --git a/airflow/providers/google/cloud/operators/dataproc.py b/airflow/providers/google/cloud/operators/dataproc.py index 09e76ae2069fe..eaa65ba7c15db 100644 --- a/airflow/providers/google/cloud/operators/dataproc.py +++ b/airflow/providers/google/cloud/operators/dataproc.py @@ -1578,8 +1578,7 @@ class DataprocSubmitPySparkJobOperator(DataprocJobBaseOperator): @staticmethod def _generate_temp_filename(filename): - date = time.strftime("%Y%m%d%H%M%S") - return f"{date}_{str(uuid.uuid4())[:8]}_{ntpath.basename(filename)}" + return f"{time:%Y%m%d%H%M%S}_{str(uuid.uuid4())[:8]}_{ntpath.basename(filename)}" def _upload_file_temp(self, bucket, local_file): """Upload a local file to a Google Cloud Storage bucket.""" diff --git a/airflow/providers/google/cloud/operators/kubernetes_engine.py b/airflow/providers/google/cloud/operators/kubernetes_engine.py index 3f730d5bda343..642c4bf279555 100644 --- a/airflow/providers/google/cloud/operators/kubernetes_engine.py +++ b/airflow/providers/google/cloud/operators/kubernetes_engine.py @@ -429,7 +429,7 @@ class GKEStartPodOperator(KubernetesPodOperator): :param regional: The location param is region name. :param deferrable: Run operator in the deferrable mode. :param on_finish_action: What to do when the pod reaches its final state, or the execution is interrupted. - If "delete_pod", the pod will be deleted regardless it's state; if "delete_succeeded_pod", + If "delete_pod", the pod will be deleted regardless its state; if "delete_succeeded_pod", only succeeded pod will be deleted. You can set to "keep_pod" to keep the pod. Current default is `keep_pod`, but this will be changed in the next major release of this provider. :param is_delete_operator_pod: What to do when the pod reaches its final diff --git a/airflow/providers/google/cloud/operators/pubsub.py b/airflow/providers/google/cloud/operators/pubsub.py index 8e0f7a12d99a3..f91e9ea8c18dd 100644 --- a/airflow/providers/google/cloud/operators/pubsub.py +++ b/airflow/providers/google/cloud/operators/pubsub.py @@ -709,7 +709,7 @@ class PubSubPullOperator(GoogleCloudBaseOperator): :param gcp_conn_id: The connection ID to use connecting to Google Cloud. :param messages_callback: (Optional) Callback to process received messages. - It's return value will be saved to XCom. + Its return value will be saved to XCom. If you are pulling large messages, you probably want to provide a custom callback. If not provided, the default implementation will convert `ReceivedMessage` objects into JSON-serializable dicts using `google.protobuf.json_format.MessageToDict` function. diff --git a/airflow/providers/google/cloud/sensors/pubsub.py b/airflow/providers/google/cloud/sensors/pubsub.py index c818f7168e761..7bd07a08e5059 100644 --- a/airflow/providers/google/cloud/sensors/pubsub.py +++ b/airflow/providers/google/cloud/sensors/pubsub.py @@ -73,7 +73,7 @@ class PubSubPullSensor(BaseSensorOperator): :param gcp_conn_id: The connection ID to use connecting to Google Cloud. :param messages_callback: (Optional) Callback to process received messages. - It's return value will be saved to XCom. + Its return value will be saved to XCom. If you are pulling large messages, you probably want to provide a custom callback. If not provided, the default implementation will convert `ReceivedMessage` objects into JSON-serializable dicts using `google.protobuf.json_format.MessageToDict` function. diff --git a/airflow/providers/google/cloud/triggers/bigquery_dts.py b/airflow/providers/google/cloud/triggers/bigquery_dts.py index def8b90b66a30..3a5ab2267f97a 100644 --- a/airflow/providers/google/cloud/triggers/bigquery_dts.py +++ b/airflow/providers/google/cloud/triggers/bigquery_dts.py @@ -95,7 +95,7 @@ async def run(self) -> AsyncIterator[TriggerEvent]: self.log.info("Current state is %s", state) if state == TransferState.SUCCEEDED: - self.log.info("Job has completed it's work.") + self.log.info("Job has completed its work.") yield TriggerEvent( { "status": "success", diff --git a/airflow/providers/google/cloud/triggers/kubernetes_engine.py b/airflow/providers/google/cloud/triggers/kubernetes_engine.py index ba0df0fc153c2..1e0780fbc66c9 100644 --- a/airflow/providers/google/cloud/triggers/kubernetes_engine.py +++ b/airflow/providers/google/cloud/triggers/kubernetes_engine.py @@ -55,7 +55,7 @@ class GKEStartPodTrigger(KubernetesPodTrigger): will consult the class variable BASE_CONTAINER_NAME (which defaults to "base") for the base container name to use. :param on_finish_action: What to do when the pod reaches its final state, or the execution is interrupted. - If "delete_pod", the pod will be deleted regardless it's state; if "delete_succeeded_pod", + If "delete_pod", the pod will be deleted regardless its state; if "delete_succeeded_pod", only succeeded pod will be deleted. You can set to "keep_pod" to keep the pod. :param should_delete_pod: What to do when the pod reaches its final state, or the execution is interrupted. If True (default), delete the diff --git a/airflow/providers/google/cloud/triggers/pubsub.py b/airflow/providers/google/cloud/triggers/pubsub.py index 40c43f7cb79dd..27dbd8285cac6 100644 --- a/airflow/providers/google/cloud/triggers/pubsub.py +++ b/airflow/providers/google/cloud/triggers/pubsub.py @@ -41,7 +41,7 @@ class PubsubPullTrigger(BaseTrigger): immediately rather than by any downstream tasks :param gcp_conn_id: Reference to google cloud connection id :param messages_callback: (Optional) Callback to process received messages. - It's return value will be saved to XCom. + Its return value will be saved to XCom. If you are pulling large messages, you probably want to provide a custom callback. If not provided, the default implementation will convert `ReceivedMessage` objects into JSON-serializable dicts using `google.protobuf.json_format.MessageToDict` function. diff --git a/airflow/providers/hashicorp/hooks/vault.py b/airflow/providers/hashicorp/hooks/vault.py index 31867c938397f..0fe8e21031129 100644 --- a/airflow/providers/hashicorp/hooks/vault.py +++ b/airflow/providers/hashicorp/hooks/vault.py @@ -49,7 +49,7 @@ class VaultHook(BaseHook): The mount point should be placed as a path in the URL - similarly to Vault's URL schema: This indicates the "path" the secret engine is mounted on. Default id not specified is "secret". Note that this ``mount_point`` is not used for authentication if authentication is done via a - different engines. Each engine uses it's own engine-specific authentication mount_point. + different engines. Each engine uses its own engine-specific authentication mount_point. The extras in the connection are named the same as the parameters ('kv_engine_version', 'auth_type', ...). diff --git a/airflow/providers/microsoft/azure/CHANGELOG.rst b/airflow/providers/microsoft/azure/CHANGELOG.rst index c5dac31b9d86f..deec92ca60bc8 100644 --- a/airflow/providers/microsoft/azure/CHANGELOG.rst +++ b/airflow/providers/microsoft/azure/CHANGELOG.rst @@ -718,11 +718,11 @@ Breaking changes This change removes ``azure_container_instance_default`` connection type and replaces it with the ``azure_default``. The problem was that AzureContainerInstance was not needed as it was exactly the -same as the plain "azure" connection, however it's presence caused duplication in the field names +same as the plain "azure" connection, however its presence caused duplication in the field names used in the UI editor for connections and unnecessary warnings generated. This version uses plain Azure Hook and connection also for Azure Container Instance. If you already have ``azure_container_instance_default`` connection created in your DB, it will continue to work, but -the first time you edit it with the UI you will have to change it's type to ``azure_default``. +the first time you edit it with the UI you will have to change its type to ``azure_default``. Features ~~~~~~~~ diff --git a/airflow/providers/microsoft/azure/hooks/adx.py b/airflow/providers/microsoft/azure/hooks/adx.py index 705db4b5b39ae..83f955753dbf8 100644 --- a/airflow/providers/microsoft/azure/hooks/adx.py +++ b/airflow/providers/microsoft/azure/hooks/adx.py @@ -28,8 +28,8 @@ import warnings from typing import Any +from azure.kusto.data import ClientRequestProperties, KustoClient, KustoConnectionStringBuilder from azure.kusto.data.exceptions import KustoServiceError -from azure.kusto.data.request import ClientRequestProperties, KustoClient, KustoConnectionStringBuilder from azure.kusto.data.response import KustoResponseDataSetV2 from airflow.exceptions import AirflowException, AirflowProviderDeprecationWarning diff --git a/airflow/providers/microsoft/azure/hooks/data_lake.py b/airflow/providers/microsoft/azure/hooks/data_lake.py index ef84ec51c2681..95ef4c6cc2789 100644 --- a/airflow/providers/microsoft/azure/hooks/data_lake.py +++ b/airflow/providers/microsoft/azure/hooks/data_lake.py @@ -241,7 +241,7 @@ class AzureDataLakeStorageV2Hook(BaseHook): accounts that have a hierarchical namespace. Using Adls_v2 connection details create DataLakeServiceClient object. - Due to Wasb is marked as legacy and and retirement of the (ADLS1), it would + Due to Wasb is marked as legacy and retirement of the (ADLS1), it would be nice to implement ADLS gen2 hook for interacting with the storage account. .. seealso:: diff --git a/airflow/providers/microsoft/azure/operators/data_factory.py b/airflow/providers/microsoft/azure/operators/data_factory.py index 1823212473f6b..d6b4592e35b6e 100644 --- a/airflow/providers/microsoft/azure/operators/data_factory.py +++ b/airflow/providers/microsoft/azure/operators/data_factory.py @@ -92,7 +92,7 @@ class AzureDataFactoryRunPipelineOperator(BaseOperator): ``AzureDataFactoryHook`` will attempt to use the resource group name provided in the corresponding connection. :param factory_name: The data factory name. If a value is not passed in to the operator, the - ``AzureDataFactoryHook`` will attempt to use the factory name name provided in the corresponding + ``AzureDataFactoryHook`` will attempt to use the factory name provided in the corresponding connection. :param reference_pipeline_run_id: The pipeline run identifier. If this run ID is specified the parameters of the specified run will be used to create a new run. diff --git a/airflow/providers/microsoft/azure/provider.yaml b/airflow/providers/microsoft/azure/provider.yaml index 24045abcc955c..9719b03d8900f 100644 --- a/airflow/providers/microsoft/azure/provider.yaml +++ b/airflow/providers/microsoft/azure/provider.yaml @@ -78,11 +78,11 @@ dependencies: - azure-synapse-spark - adal>=1.2.7 - azure-storage-file-datalake>=12.9.1 + - azure-kusto-data>=4.1.0 # TODO: upgrade to newer versions of all the below libraries. # See issue https://github.com/apache/airflow/issues/30199 - azure-mgmt-containerinstance>=1.5.0,<2.0 - azure-mgmt-datafactory>=1.0.0,<2.0 - - azure-kusto-data>=0.0.43,<0.1 integrations: - integration-name: Microsoft Azure Batch diff --git a/airflow/providers/oracle/hooks/oracle.py b/airflow/providers/oracle/hooks/oracle.py index 225a8ca6ccd60..f207df72d9af5 100644 --- a/airflow/providers/oracle/hooks/oracle.py +++ b/airflow/providers/oracle/hooks/oracle.py @@ -302,9 +302,7 @@ def insert_rows( elif numpy and isinstance(cell, numpy.datetime64): lst.append("'" + str(cell) + "'") elif isinstance(cell, datetime): - lst.append( - "to_date('" + cell.strftime("%Y-%m-%d %H:%M:%S") + "','YYYY-MM-DD HH24:MI:SS')" - ) + lst.append(f"to_date('{cell:%Y-%m-%d %H:%M:%S}','YYYY-MM-DD HH24:MI:SS')") else: lst.append(str(cell)) values = tuple(lst) diff --git a/airflow/providers/slack/provider.yaml b/airflow/providers/slack/provider.yaml index b6195dc04a8d4..9f46d284d40cb 100644 --- a/airflow/providers/slack/provider.yaml +++ b/airflow/providers/slack/provider.yaml @@ -82,4 +82,4 @@ connection-types: connection-type: slackwebhook notifications: - - airflow.providers.slack.notifications.slack_notifier.SlackNotifier + - airflow.providers.slack.notifications.slack.SlackNotifier diff --git a/airflow/providers/snowflake/provider.yaml b/airflow/providers/snowflake/provider.yaml index 824551981c0a4..f688ca0e894c5 100644 --- a/airflow/providers/snowflake/provider.yaml +++ b/airflow/providers/snowflake/provider.yaml @@ -23,6 +23,7 @@ description: | suspended: false versions: + - 5.0.0 - 4.4.2 - 4.4.1 - 4.4.0 diff --git a/airflow/providers/yandex/operators/yandexcloud_dataproc.py b/airflow/providers/yandex/operators/yandexcloud_dataproc.py index 625827d109963..dfd3a07fe4f94 100644 --- a/airflow/providers/yandex/operators/yandexcloud_dataproc.py +++ b/airflow/providers/yandex/operators/yandexcloud_dataproc.py @@ -95,6 +95,8 @@ class DataprocCreateClusterOperator(BaseOperator): Docs: https://cloud.yandex.com/docs/data-proc/concepts/logs :param initialization_actions: Set of init-actions to run when cluster starts. Docs: https://cloud.yandex.com/docs/data-proc/concepts/init-action + :param labels: Cluster labels as key:value pairs. No more than 64 per resource. + Docs: https://cloud.yandex.ru/docs/resource-manager/concepts/labels """ def __init__( @@ -135,6 +137,7 @@ def __init__( security_group_ids: Iterable[str] | None = None, log_group_id: str | None = None, initialization_actions: Iterable[InitializationAction] | None = None, + labels: dict[str, str] | None = None, **kwargs, ) -> None: super().__init__(**kwargs) @@ -173,6 +176,7 @@ def __init__( self.security_group_ids = security_group_ids self.log_group_id = log_group_id self.initialization_actions = initialization_actions + self.labels = labels self.hook: DataprocHook | None = None @@ -214,6 +218,7 @@ def execute(self, context: Context) -> dict: host_group_ids=self.host_group_ids, security_group_ids=self.security_group_ids, log_group_id=self.log_group_id, + labels=self.labels, initialization_actions=self.initialization_actions and [ self.hook.sdk.wrappers.InitializationAction( diff --git a/airflow/providers/yandex/provider.yaml b/airflow/providers/yandex/provider.yaml index ea7a4fb4a3cba..76a3459e6435e 100644 --- a/airflow/providers/yandex/provider.yaml +++ b/airflow/providers/yandex/provider.yaml @@ -20,7 +20,7 @@ package-name: apache-airflow-providers-yandex name: Yandex description: | Yandex including `Yandex.Cloud `__ -suspended: true # see https://github.com/apache/airflow/pull/30667/ +suspended: false versions: - 3.3.0 - 3.2.0 @@ -37,7 +37,7 @@ versions: dependencies: - apache-airflow>=2.4.0 - - yandexcloud>=0.173.0 + - yandexcloud>=0.228.0 integrations: - integration-name: Yandex.Cloud diff --git a/airflow/sensors/base.py b/airflow/sensors/base.py index 3f8b6bf2e6bce..77094269a1951 100644 --- a/airflow/sensors/base.py +++ b/airflow/sensors/base.py @@ -339,12 +339,6 @@ def reschedule(self): def get_serialized_fields(cls): return super().get_serialized_fields() | {"reschedule"} - def raise_failed_or_skiping_exception(self, *, failed_message: str, skipping_message: str = "") -> None: - """Raise AirflowSkipException if self.soft_fail is set to True. Otherwise raise AirflowException.""" - if self.soft_fail: - raise AirflowSkipException(skipping_message or failed_message) - raise AirflowException(failed_message) - def poke_mode_only(cls): """ diff --git a/airflow/sensors/external_task.py b/airflow/sensors/external_task.py index ffc2cc1313f0a..92562589b212b 100644 --- a/airflow/sensors/external_task.py +++ b/airflow/sensors/external_task.py @@ -222,8 +222,6 @@ def __init__( self.deferrable = deferrable self.poll_interval = poll_interval - self._skipping_message_postfix = " Skipping due to soft_fail." - def _get_dttm_filter(self, context): if self.execution_delta: dttm = context["logical_date"] - self.execution_delta @@ -276,28 +274,32 @@ def poke(self, context: Context, session: Session = NEW_SESSION) -> bool: # Fail if anything in the list has failed. if count_failed > 0: if self.external_task_ids: - failed_message = ( + if self.soft_fail: + raise AirflowSkipException( + f"Some of the external tasks {self.external_task_ids} " + f"in DAG {self.external_dag_id} failed. Skipping due to soft_fail." + ) + raise AirflowException( f"Some of the external tasks {self.external_task_ids} " f"in DAG {self.external_dag_id} failed." ) - - self.raise_failed_or_skiping_exception( - failed_message=failed_message, - skipping_message=f"{failed_message}{self._skipping_message_postfix}", - ) elif self.external_task_group_id: - self.raise_failed_or_skiping_exception( - failed_message=( + if self.soft_fail: + raise AirflowSkipException( f"The external task_group '{self.external_task_group_id}' " - f"in DAG '{self.external_dag_id}' failed." + f"in DAG '{self.external_dag_id}' failed. Skipping due to soft_fail." ) + raise AirflowException( + f"The external task_group '{self.external_task_group_id}' " + f"in DAG '{self.external_dag_id}' failed." ) + else: - failed_message = f"The external DAG {self.external_dag_id} failed." - self.raise_failed_or_skiping_exception( - failed_message=failed_message, - skipping_message=f"{failed_message}{self._skipping_message_postfix}", - ) + if self.soft_fail: + raise AirflowSkipException( + f"The external DAG {self.external_dag_id} failed. Skipping due to soft_fail." + ) + raise AirflowException(f"The external DAG {self.external_dag_id} failed.") count_skipped = -1 if self.skipped_states: @@ -349,20 +351,12 @@ def execute_complete(self, context, event=None): self.log.info("External task %s has executed successfully.", self.external_task_id) return None elif event["status"] == "timeout": - failed_message = "Dag was not started within 1 minute, assuming fail." - self.raise_failed_or_skiping_exception( - failed_message=failed_message, - skipping_message=f"{failed_message}{self._skipping_message_postfix}", - ) + raise AirflowException("Dag was not started within 1 minute, assuming fail.") else: - failed_message = ( + raise AirflowException( "Error occurred while trying to retrieve task status. Please, check the " "name of executed task and Dag." ) - self.raise_failed_or_skiping_exception( - failed_message=failed_message, - skipping_message=f"{failed_message}{self._skipping_message_postfix}", - ) def _check_for_existence(self, session) -> None: dag_to_wait = DagModel.get_current(self.external_dag_id, session) diff --git a/airflow/sensors/filesystem.py b/airflow/sensors/filesystem.py index dea1643d9ae21..3eba10380f740 100644 --- a/airflow/sensors/filesystem.py +++ b/airflow/sensors/filesystem.py @@ -70,6 +70,6 @@ def poke(self, context: Context): return True for _, _, files in os.walk(path): - if len(files) > 0: + if files: return True return False diff --git a/airflow/sentry.py b/airflow/sentry.py index 34133228ca13f..da46f924786df 100644 --- a/airflow/sentry.py +++ b/airflow/sentry.py @@ -87,7 +87,7 @@ def __init__(self): # LoggingIntegration is set by default. integrations = [sentry_flask] - executor_class, _ = ExecutorLoader.import_default_executor_cls() + executor_class, _ = ExecutorLoader.import_default_executor_cls(validate=False) if executor_class.supports_sentry: from sentry_sdk.integrations.celery import CeleryIntegration diff --git a/airflow/task/task_runner/cgroup_task_runner.py b/airflow/task/task_runner/cgroup_task_runner.py index 2ab011471377a..14354453bc62a 100644 --- a/airflow/task/task_runner/cgroup_task_runner.py +++ b/airflow/task/task_runner/cgroup_task_runner.py @@ -134,7 +134,7 @@ def start(self): return # Create a unique cgroup name - cgroup_name = f"airflow/{datetime.datetime.utcnow().strftime('%Y-%m-%d')}/{str(uuid.uuid4())}" + cgroup_name = f"airflow/{datetime.datetime.utcnow():%Y-%m-%d}/{uuid.uuid4()}" self.mem_cgroup_name = f"memory/{cgroup_name}" self.cpu_cgroup_name = f"cpu/{cgroup_name}" diff --git a/airflow/ti_deps/deps/mapped_task_expanded.py b/airflow/ti_deps/deps/mapped_task_expanded.py index 87a804006be45..8138de9f9e476 100644 --- a/airflow/ti_deps/deps/mapped_task_expanded.py +++ b/airflow/ti_deps/deps/mapped_task_expanded.py @@ -21,7 +21,7 @@ class MappedTaskIsExpanded(BaseTIDep): - """Checks that a mapped task has been expanded before it's TaskInstance can run.""" + """Checks that a mapped task has been expanded before its TaskInstance can run.""" NAME = "Task has been mapped" IGNORABLE = False diff --git a/airflow/ti_deps/deps/not_previously_skipped_dep.py b/airflow/ti_deps/deps/not_previously_skipped_dep.py index 855f04af533d1..92dd2b373acdb 100644 --- a/airflow/ti_deps/deps/not_previously_skipped_dep.py +++ b/airflow/ti_deps/deps/not_previously_skipped_dep.py @@ -84,7 +84,7 @@ def _get_dep_statuses(self, ti, session, dep_context): ) if not past_depends_met: yield self._failing_status( - reason=("Task should be skipped but the the past depends are not met") + reason=("Task should be skipped but the past depends are not met") ) return ti.set_state(TaskInstanceState.SKIPPED, session) diff --git a/airflow/ti_deps/deps/trigger_rule_dep.py b/airflow/ti_deps/deps/trigger_rule_dep.py index c7e2982fffdd7..dbdf692e769ec 100644 --- a/airflow/ti_deps/deps/trigger_rule_dep.py +++ b/airflow/ti_deps/deps/trigger_rule_dep.py @@ -294,7 +294,7 @@ def _iter_upstream_conditions() -> Iterator[ColumnOperators]: ) if not past_depends_met: yield self._failing_status( - reason=("Task should be skipped but the the past depends are not met") + reason=("Task should be skipped but the past depends are not met") ) return changed = ti.set_state(new_state, session) diff --git a/airflow/triggers/file.py b/airflow/triggers/file.py index 12b2d0e8272e5..85a5a373baecf 100644 --- a/airflow/triggers/file.py +++ b/airflow/triggers/file.py @@ -68,6 +68,6 @@ async def run(self) -> typing.AsyncIterator[TriggerEvent]: self.log.info("Found File %s last modified: %s", str(path), str(mod_time)) yield TriggerEvent(True) for _, _, files in os.walk(self.filepath): - if len(files) > 0: + if files: yield TriggerEvent(True) await asyncio.sleep(self.poll_interval) diff --git a/airflow/utils/db.py b/airflow/utils/db.py index 9d82114c562b3..4429cb8dfe893 100644 --- a/airflow/utils/db.py +++ b/airflow/utils/db.py @@ -67,39 +67,27 @@ log = logging.getLogger(__name__) -REVISION_HEADS_MAP = { +_REVISION_HEADS_MAP = { "2.0.0": "e959f08ac86c", "2.0.1": "82b7c48c147f", "2.0.2": "2e42bb497a22", "2.1.0": "a13f7613ad25", - "2.1.1": "a13f7613ad25", - "2.1.2": "a13f7613ad25", "2.1.3": "97cdd93827b8", "2.1.4": "ccde3e26fe78", "2.2.0": "7b2661a43ba3", - "2.2.1": "7b2661a43ba3", - "2.2.2": "7b2661a43ba3", "2.2.3": "be2bfac3da23", "2.2.4": "587bdf053233", - "2.2.5": "587bdf053233", "2.3.0": "b1b348e02d07", "2.3.1": "1de7bc13c950", "2.3.2": "3c94c427fdf6", "2.3.3": "f5fcbda3e651", - "2.3.4": "f5fcbda3e651", "2.4.0": "ecb43d2a1842", - "2.4.1": "ecb43d2a1842", "2.4.2": "b0d31815b5a6", "2.4.3": "e07f49787c9d", "2.5.0": "290244fb8b83", - "2.5.1": "290244fb8b83", - "2.5.2": "290244fb8b83", - "2.5.3": "290244fb8b83", "2.6.0": "98ae134e6fff", - "2.6.1": "98ae134e6fff", "2.6.2": "c804e5c76e3e", - "2.6.3": "c804e5c76e3e", - "2.7.0": "788397e78828", + "2.7.0": "405de8318b3a", } diff --git a/airflow/utils/db_cleanup.py b/airflow/utils/db_cleanup.py index 79a804a9e5011..4c82f90ab3814 100644 --- a/airflow/utils/db_cleanup.py +++ b/airflow/utils/db_cleanup.py @@ -194,6 +194,7 @@ def _do_delete(*, query, orm_model, skip_archive, session): session.execute(delete) session.commit() if skip_archive: + metadata.bind = session.get_bind() target_table.drop() session.commit() print("Finished Performing Delete") diff --git a/airflow/utils/log/logging_mixin.py b/airflow/utils/log/logging_mixin.py index 59c1d7980c1e3..a3d3b4b00c71d 100644 --- a/airflow/utils/log/logging_mixin.py +++ b/airflow/utils/log/logging_mixin.py @@ -38,7 +38,7 @@ class SetContextPropagate(enum.Enum): :meta private: """ - # If a `set_context` function wants to _keep_ propagation set on it's logger it needs to return this + # If a `set_context` function wants to _keep_ propagation set on its logger it needs to return this # special value. MAINTAIN_PROPAGATE = object() # Don't use this one anymore! diff --git a/airflow/utils/log/timezone_aware.py b/airflow/utils/log/timezone_aware.py index 999ccda5a722a..ae96a11116a1f 100644 --- a/airflow/utils/log/timezone_aware.py +++ b/airflow/utils/log/timezone_aware.py @@ -40,11 +40,7 @@ def formatTime(self, record, datefmt=None): date and time format in the local time zone. """ dt = pendulum.from_timestamp(record.created, tz=pendulum.local_timezone()) - if datefmt: - s = dt.strftime(datefmt) - else: - s = dt.strftime(self.default_time_format) - + s = dt.strftime(datefmt or self.default_time_format) if self.default_msec_format: s = self.default_msec_format % (s, record.msecs) if self.default_tz_format: diff --git a/airflow/utils/process_utils.py b/airflow/utils/process_utils.py index f3104df918cdf..1f7c4771e8c02 100644 --- a/airflow/utils/process_utils.py +++ b/airflow/utils/process_utils.py @@ -94,7 +94,7 @@ def signal_procs(sig): + [str(p.pid) for p in all_processes_in_the_group] ) elif err_killpg.errno == errno.ESRCH: - # There is a rare condition that the process has not managed yet to change it's process + # There is a rare condition that the process has not managed yet to change its process # group. In this case os.killpg fails with ESRCH error # So we additionally send a kill signal to the process itself. logger.info( @@ -119,7 +119,7 @@ def signal_procs(sig): all_processes_in_the_group = parent.children(recursive=True) all_processes_in_the_group.append(parent) except psutil.NoSuchProcess: - # The process already exited, but maybe it's children haven't. + # The process already exited, but maybe its children haven't. all_processes_in_the_group = [] for proc in psutil.process_iter(): try: diff --git a/airflow/utils/task_group.py b/airflow/utils/task_group.py index 167eb53b71eaa..841abc710678a 100644 --- a/airflow/utils/task_group.py +++ b/airflow/utils/task_group.py @@ -482,7 +482,7 @@ def topological_sort(self, _include_subdag_tasks: bool = False): while graph_unsorted: # Go through each of the node/edges pairs in the unsorted graph. If a set of edges doesn't contain # any nodes that haven't been resolved, that is, that are still in the unsorted graph, remove the - # pair from the unsorted graph, and append it to the sorted graph. Note here that by using using + # pair from the unsorted graph, and append it to the sorted graph. Note here that by using # the values() method for iterating, a copy of the unsorted graph is used, allowing us to modify # the unsorted graph as we move through it. # diff --git a/airflow/www/extensions/init_appbuilder.py b/airflow/www/extensions/init_appbuilder.py index 9c2948e32490f..bbc6343fcf05a 100644 --- a/airflow/www/extensions/init_appbuilder.py +++ b/airflow/www/extensions/init_appbuilder.py @@ -586,6 +586,10 @@ def security_converge(self, dry=False) -> dict: def get_url_for_login_with(self, next_url: str | None = None) -> str: return get_auth_manager().get_url_login(next_url=next_url) + @property + def get_url_for_login(self): + return get_auth_manager().get_url_login() + @property def get_url_for_index(self): return url_for(f"{self.indexview.endpoint}.{self.indexview.default_view}") diff --git a/airflow/www/extensions/init_auth_manager.py b/airflow/www/extensions/init_auth_manager.py index a53fdf304befc..24ae020862dc9 100644 --- a/airflow/www/extensions/init_auth_manager.py +++ b/airflow/www/extensions/init_auth_manager.py @@ -26,12 +26,10 @@ from airflow.auth.managers.base_auth_manager import BaseAuthManager -@cache -def get_auth_manager() -> BaseAuthManager: - """ - Initialize auth manager. +def get_auth_manager_cls() -> type[BaseAuthManager]: + """Returns just the auth manager class without initializing it. - Import the user manager class, instantiate it and return it. + Useful to save execution time if only static methods need to be called. """ auth_manager_cls = conf.getimport(section="core", key="auth_manager") @@ -41,4 +39,16 @@ def get_auth_manager() -> BaseAuthManager: "Please specify one using section/key [core/auth_manager]." ) + return auth_manager_cls + + +@cache +def get_auth_manager() -> BaseAuthManager: + """ + Initialize auth manager. + + Import the user manager class, instantiate it and return it. + """ + auth_manager_cls = get_auth_manager_cls() + return auth_manager_cls() diff --git a/airflow/www/extensions/init_jinja_globals.py b/airflow/www/extensions/init_jinja_globals.py index 13baeea7bc67e..ff5481dd468f6 100644 --- a/airflow/www/extensions/init_jinja_globals.py +++ b/airflow/www/extensions/init_jinja_globals.py @@ -74,7 +74,7 @@ def prepare_jinja_globals(): } backends = conf.get("api", "auth_backends") - if len(backends) > 0 and backends[0] != "airflow.api.auth.backend.deny_all": + if backends and backends[0] != "airflow.api.auth.backend.deny_all": extra_globals["rest_api_enabled"] = True if "analytics_tool" in conf.getsection("webserver"): diff --git a/airflow/www/fab_security/manager.py b/airflow/www/fab_security/manager.py index 0e08c93d913a6..145e099d081b9 100644 --- a/airflow/www/fab_security/manager.py +++ b/airflow/www/fab_security/manager.py @@ -374,7 +374,7 @@ def oauth_user_info_getter(self, f): Decorator function to be the OAuth user info getter for all the providers. Receives provider and response return a dict with the information returned from the provider. - The returned user info dict should have it's keys with the same name as the User Model. + The returned user info dict should have its keys with the same name as the User Model. Use it like this an example for GitHub :: @@ -521,7 +521,7 @@ def _search_ldap(self, ldap, con, username): self.auth_ldap_lastname_field, self.auth_ldap_email_field, ] - if len(self.auth_roles_mapping) > 0: + if self.auth_roles_mapping: request_fields.append(self.auth_ldap_group_field) # perform the LDAP search @@ -561,7 +561,7 @@ def _ldap_calculate_user_roles(self, user_attributes: dict[str, list[bytes]]) -> user_role_objects = set() # apply AUTH_ROLES_MAPPING - if len(self.auth_roles_mapping) > 0: + if self.auth_roles_mapping: user_role_keys = self.ldap_extract_list(user_attributes, self.auth_ldap_group_field) user_role_objects.update(self.get_roles_from_keys(user_role_keys)) @@ -852,7 +852,7 @@ def _oauth_calculate_user_roles(self, userinfo) -> list[str]: user_role_objects = set() # apply AUTH_ROLES_MAPPING - if len(self.auth_roles_mapping) > 0: + if self.auth_roles_mapping: user_role_keys = userinfo.get("role_keys", []) user_role_objects.update(self.get_roles_from_keys(user_role_keys)) @@ -1096,7 +1096,7 @@ def security_cleanup(self, baseviews, menus): self.delete_resource(resource.name) def find_user(self, username=None, email=None): - """Generic function find a user by it's username or email.""" + """Generic function find a user by its username or email.""" raise NotImplementedError def get_role_permissions_from_db(self, role_id: int) -> list[Permission]: diff --git a/airflow/www/templates/appbuilder/navbar_right.html b/airflow/www/templates/appbuilder/navbar_right.html index 8eec9f9fcf1e3..1ccf28db418f1 100644 --- a/airflow/www/templates/appbuilder/navbar_right.html +++ b/airflow/www/templates/appbuilder/navbar_right.html @@ -67,8 +67,13 @@