From 512155d8a450e7eeb4d033ac2f60223a3cdedfe3 Mon Sep 17 00:00:00 2001 From: Jarek Potiuk Date: Wed, 8 May 2024 21:20:09 +0200 Subject: [PATCH] Run unit tests for Providers with airflow installed as package. This PR adds the option of running unit tests for providers against a specific airflow version (for example released version in PyPI) and enables it for back-compatibility testing for 2.9.1. In the future it could be used to run forward-compatibility testing with Airflow 3 as well. --- .github/workflows/check-providers.yml | 23 +- .github/workflows/ci.yml | 3 + Dockerfile.ci | 2 + airflow/providers/openlineage/conf.py | 1 - .../openlineage/plugins/openlineage.py | 3 + contributing-docs/testing/unit_tests.rst | 607 +++++++++++------- .../src/airflow_breeze/global_constants.py | 8 + .../airflow_breeze/utils/selective_checks.py | 13 +- dev/breeze/tests/test_selective_checks.py | 28 +- pyproject.toml | 3 + scripts/docker/entrypoint_ci.sh | 2 + scripts/in_container/run_ci_tests.sh | 9 + .../endpoints/test_import_error_endpoint.py | 2 +- .../schemas/test_error_schema.py | 2 +- .../common/test_delete_dag.py | 2 +- tests/conftest.py | 20 +- tests/dag_processing/test_job_runner.py | 2 +- tests/dag_processing/test_processor.py | 2 +- tests/listeners/class_listener.py | 92 ++- .../aws/auth_manager/avp/test_facade.py | 23 +- .../executors/batch/test_batch_executor.py | 6 + .../aws/executors/ecs/test_ecs_executor.py | 15 +- .../amazon/aws/hooks/test_dynamodb.py | 2 +- .../amazon/aws/hooks/test_hooks_signature.py | 4 +- .../amazon/aws/links/test_base_aws.py | 5 +- .../aws/operators/test_emr_serverless.py | 5 +- .../amazon/aws/utils/test_eks_get_token.py | 6 +- .../apache/iceberg/hooks/test_iceberg.py | 34 +- .../cncf/kubernetes/operators/test_pod.py | 15 +- .../kubernetes/test_template_rendering.py | 4 +- .../google/cloud/operators/test_bigquery.py | 23 +- .../google/cloud/operators/test_dataproc.py | 75 ++- .../openlineage/plugins/test_listener.py | 48 +- .../openlineage/plugins/test_openlineage.py | 4 + .../providers/smtp/notifications/test_smtp.py | 6 +- tests/test_utils/compat.py | 60 ++ tests/test_utils/db.py | 5 +- 37 files changed, 766 insertions(+), 398 deletions(-) create mode 100644 tests/test_utils/compat.py diff --git a/.github/workflows/check-providers.yml b/.github/workflows/check-providers.yml index d71b8d678dff..8bac79f6fd48 100644 --- a/.github/workflows/check-providers.yml +++ b/.github/workflows/check-providers.yml @@ -43,7 +43,11 @@ on: # yamllint disable-line rule:truthy providers-compatibility-checks: description: > JSON-formatted array of providers compatibility checks in the form of array of dicts - (airflow-version, python-versions, remove-providers) + (airflow-version, python-versions, remove-providers, run-tests) + required: true + type: string + providers-test-types-list-as-string: + description: "List of parallel provider test types as string" required: true type: string skip-provider-tests: @@ -237,6 +241,9 @@ jobs: - name: > Install and verify all provider packages and airflow on Airflow ${{ matrix.airflow-version }}:Python ${{ matrix.python-version }} + # We do not need to run import check if we run tests, the tests should cover all the import checks + # automatically + if: matrix.run-tests != 'true' run: > breeze release-management verify-provider-packages --use-packages-from-dist @@ -245,3 +252,17 @@ jobs: --airflow-constraints-reference constraints-${{matrix.airflow-version}} --providers-skip-constraints --install-airflow-with-constraints + - name: > + Run provider unit tests on + Airflow ${{ matrix.airflow-version }}:Python ${{ matrix.python-version }} + if: matrix.run-tests == 'true' + run: > + breeze testing tests --run-in-parallel + --parallel-test-types "${{ inputs.providers-test-types-list-as-string }}" + --use-packages-from-dist + --package-format wheel + --use-airflow-version "${{ matrix.airflow-version }}" + --airflow-constraints-reference constraints-${{matrix.airflow-version}} + --install-airflow-with-constraints + --providers-skip-constraints + --skip-providers "${{ matrix.remove-providers }}" diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 00155070d185..0502df336838 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -77,6 +77,8 @@ jobs: full-tests-needed: ${{ steps.selective-checks.outputs.full-tests-needed }} parallel-test-types-list-as-string: >- ${{ steps.selective-checks.outputs.parallel-test-types-list-as-string }} + providers-test-types-list-as-string: >- + ${{ steps.selective-checks.outputs.providers-test-types-list-as-string }} include-success-outputs: ${{ steps.selective-checks.outputs.include-success-outputs }} postgres-exclude: ${{ steps.selective-checks.outputs.postgres-exclude }} mysql-exclude: ${{ steps.selective-checks.outputs.mysql-exclude }} @@ -315,6 +317,7 @@ jobs: providers-compatibility-checks: ${{ needs.build-info.outputs.providers-compatibility-checks }} skip-provider-tests: ${{ needs.build-info.outputs.skip-provider-tests }} python-versions: ${{ needs.build-info.outputs.python-versions }} + providers-test-types-list-as-string: ${{ needs.build-info.outputs.providers-test-types-list-as-string }} tests-helm: name: "Helm tests" diff --git a/Dockerfile.ci b/Dockerfile.ci index 7b9392f291df..ff9597037110 100644 --- a/Dockerfile.ci +++ b/Dockerfile.ci @@ -984,6 +984,8 @@ function determine_airflow_to_use() { mkdir -p "${AIRFLOW_SOURCES}"/tmp/ else python "${IN_CONTAINER_DIR}/install_airflow_and_providers.py" + # Some packages might leave legacy typing module which causes test issues + pip uninstall -y typing || true fi if [[ "${USE_AIRFLOW_VERSION}" =~ ^2\.2\..*|^2\.1\..*|^2\.0\..* && "${AIRFLOW__DATABASE__SQL_ALCHEMY_CONN=}" != "" ]]; then diff --git a/airflow/providers/openlineage/conf.py b/airflow/providers/openlineage/conf.py index 23e663f67e9b..f79511a22d05 100644 --- a/airflow/providers/openlineage/conf.py +++ b/airflow/providers/openlineage/conf.py @@ -100,7 +100,6 @@ def is_disabled() -> bool: option = os.getenv("OPENLINEAGE_DISABLED", "") if _is_true(option): return True - # Check if both 'transport' and 'config_path' are not present and also # if legacy 'OPENLINEAGE_URL' environment variables is not set return transport() == {} and config_path(True) == "" and os.getenv("OPENLINEAGE_URL", "") == "" diff --git a/airflow/providers/openlineage/plugins/openlineage.py b/airflow/providers/openlineage/plugins/openlineage.py index 5927929588a5..5b4f2c3dbf46 100644 --- a/airflow/providers/openlineage/plugins/openlineage.py +++ b/airflow/providers/openlineage/plugins/openlineage.py @@ -39,3 +39,6 @@ class OpenLineageProviderPlugin(AirflowPlugin): if not conf.is_disabled(): macros = [lineage_job_namespace, lineage_job_name, lineage_run_id, lineage_parent_id] listeners = [get_openlineage_listener()] + else: + macros = [] + listeners = [] diff --git a/contributing-docs/testing/unit_tests.rst b/contributing-docs/testing/unit_tests.rst index e9a7263a4e34..c7eaed99e23d 100644 --- a/contributing-docs/testing/unit_tests.rst +++ b/contributing-docs/testing/unit_tests.rst @@ -427,75 +427,75 @@ You can see details about the limitation `here /test.py + +5. Iterate with the tests + +The tests are run using: + +* airflow installed from PyPI +* tests coming from the current airflow sources (they are mounted inside the breeze image) +* provider packages built from the current airflow sources and placed in dist + +This means that you can modify and run tests and re-run them, but if you want to modify provider code +you need to exit breeze, rebuild the provider package and restart breeze using the command above. + +Rebuilding single provider package can be done using this command: + +.. code-block::bash + + breeze release-management prepare-provider-packages \ + --version-suffix-for-pypi dev0 --package-format wheel + + +Note that some of the tests if written without taking care about the compatibility, might not work with older +versions of Airflow - this is because of refactorings, renames, and tests relying on internals of Airflow that +are not part of the public API. We deal with it in one of the following ways: + +1) If the whole provider is supposed to only work for later airflow version, we remove the whole provider + by excluding it from compatibility test configuration (see below) + +2) Some compatibility shims are defined in ``tests/test_utils/compat.py`` - and they can be used to make the + tests compatible - for example importing ``ParseImportError`` after the exception has been renamed from + ``ImportError`` and it would fail in Airflow 2.9, but we have a fallback import in ``compat.py`` that + falls back to old import automatically, so all tests testing / expecting ``ParseImportError`` should import + it from the ``tests.tests_utils.compat`` module. There are few other compatibility shims defined there and + you can add more if needed in a similar way. + +3) If only some tests are not compatible and use features that are available only in newer airflow version, + we can mark those tests with appropriate ``AIRFLOW_V_2_X_PLUS`` boolean constant defined in ``compat.py`` + For example: + +.. code-block::python + + from tests.test_utils.compat import AIRFLOW_V_2_7_PLUS + + @pytest.mark.skip(not AIRFLOW_V_2_7_PLUS, reason="The tests should be skipped for Airflow < 2.7") + def some_test_that_only_works_for_airflow_2_7_plus(): + pass + +4) Sometimes, the tests should only be run when airflow is installed from the sources. In this case you can + add conditional ``skipif`` markerfor ``RUNNING_TESTS_AGAINST_AIRFLOW_PACKAGES`` to the test. For example: + +.. code-block::python + + @pytest.mark.skipif(RUNNING_TESTS_AGAINST_AIRFLOW_PACKAGES, + reason="Plugin initialization is done early in case of packages") + def test_plugin(): + pass + + +How providers compatibility tests are run in CI? +------------------------------------------------- + +We run a set of back-compatibility tests based on the configuration specified in the +``BASE_PROVIDERS_COMPATIBILITY_CHECKS`` constant in the ``./dev/breeze/src/airflow_breeze/global_constants.py`` +file - where we specify: +* python version +* airflow version +* which providers should be removed (exclusions) +* whether to run tests + Other Settings -------------- @@ -1169,7 +1292,7 @@ to **ignore**, e.g. set ``PYTHONWARNINGS`` environment variable to ``ignore``. .. code-block:: bash - pytest tests/core/ --disable-capture-warnings + pytest tests/core/ --disable-capture-warnings Code Coverage ------------- @@ -1186,19 +1309,19 @@ a. Initiate a breeze shell. b. Execute one of the commands below based on the desired coverage area: - - **Core:** ``python scripts/cov/core_coverage.py`` - - **REST API:** ``python scripts/cov/restapi_coverage.py`` - - **CLI:** ``python scripts/cov/cli_coverage.py`` - - **Webserver:** ``python scripts/cov/www_coverage.py`` +- **Core:** ``python scripts/cov/core_coverage.py`` +- **REST API:** ``python scripts/cov/restapi_coverage.py`` +- **CLI:** ``python scripts/cov/cli_coverage.py`` +- **Webserver:** ``python scripts/cov/www_coverage.py`` c. After execution, the coverage report will be available at: http://localhost:28000/dev/coverage/index.html. - .. note:: +.. note:: - In order to see the coverage report, you must start webserver first in breeze environment via the - `airflow webserver`. Once you enter `breeze`, you can start `tmux` (terminal multiplexer) and - split the terminal (by pressing `ctrl-B "` for example) to continue testing and run the webserver - in one terminal and run tests in the second one (you can switch between the terminals with `ctrl-B `). + In order to see the coverage report, you must start webserver first in breeze environment via the + ``airflow webserver``. Once you enter ``breeze``, you can start ``tmux`` (terminal multiplexer) and + split the terminal (by pressing ``ctrl-B "`` for example) to continue testing and run the webserver + in one terminal and run tests in the second one (you can switch between the terminals with ``ctrl-B ``). Modules Not Fully Covered: .......................... diff --git a/dev/breeze/src/airflow_breeze/global_constants.py b/dev/breeze/src/airflow_breeze/global_constants.py index 66cde22e709c..84424505658c 100644 --- a/dev/breeze/src/airflow_breeze/global_constants.py +++ b/dev/breeze/src/airflow_breeze/global_constants.py @@ -476,11 +476,19 @@ def _exclusion(providers: Iterable[str]) -> str: "python-version": "3.8", "airflow-version": "2.7.1", "remove-providers": _exclusion(["common.io", "fab"]), + "run-tests": "false", }, { "python-version": "3.8", "airflow-version": "2.8.0", "remove-providers": _exclusion(["fab"]), + "run-tests": "false", + }, + { + "python-version": "3.8", + "airflow-version": "2.9.1", + "remove-providers": _exclusion([]), + "run-tests": "true", }, ] diff --git a/dev/breeze/src/airflow_breeze/utils/selective_checks.py b/dev/breeze/src/airflow_breeze/utils/selective_checks.py index 6b42d9134673..193271f34c2b 100644 --- a/dev/breeze/src/airflow_breeze/utils/selective_checks.py +++ b/dev/breeze/src/airflow_breeze/utils/selective_checks.py @@ -83,6 +83,8 @@ "PythonVenv Serialization WWW" ) +ALL_PROVIDERS_SELECTIVE_TEST_TYPES = "Providers[-amazon,google] Providers[amazon] Providers[google]" + class FileGroupForCi(Enum): ENVIRONMENT_FILES = "environment_files" @@ -816,6 +818,15 @@ def parallel_test_types_list_as_string(self) -> str | None: self._extract_long_provider_tests(current_test_types) return " ".join(sorted(current_test_types)) + @cached_property + def providers_test_types_list_as_string(self) -> str | None: + all_test_types = self.parallel_test_types_list_as_string + if all_test_types is None: + return None + return " ".join( + test_type for test_type in all_test_types.split(" ") if test_type.startswith("Providers") + ) + @cached_property def include_success_outputs( self, @@ -828,7 +839,7 @@ def basic_checks_only(self) -> bool: @staticmethod def _print_diff(old_lines: list[str], new_lines: list[str]): - diff = "\n".join([line for line in difflib.ndiff(old_lines, new_lines) if line and line[0] in "+-?"]) + diff = "\n".join(line for line in difflib.ndiff(old_lines, new_lines) if line and line[0] in "+-?") get_console().print(diff) @cached_property diff --git a/dev/breeze/tests/test_selective_checks.py b/dev/breeze/tests/test_selective_checks.py index 964c8c34874a..81789de714ab 100644 --- a/dev/breeze/tests/test_selective_checks.py +++ b/dev/breeze/tests/test_selective_checks.py @@ -30,7 +30,11 @@ DEFAULT_PYTHON_MAJOR_MINOR_VERSION, GithubEvents, ) -from airflow_breeze.utils.selective_checks import ALL_CI_SELECTIVE_TEST_TYPES, SelectiveChecks +from airflow_breeze.utils.selective_checks import ( + ALL_CI_SELECTIVE_TEST_TYPES, + ALL_PROVIDERS_SELECTIVE_TEST_TYPES, + SelectiveChecks, +) ANSI_COLORS_MATCHER = re.compile(r"(?:\x1B[@-_]|[\x80-\x9F])[0-?]*[ -/]*[@-~]") @@ -114,6 +118,7 @@ def assert_outputs_are_printed(expected_outputs: dict[str, str], stderr: str): "mypy-docs,mypy-providers,ts-compile-format-lint-www", "upgrade-to-newer-dependencies": "false", "parallel-test-types-list-as-string": None, + "providers-test-types-list-as-string": None, "needs-mypy": "false", "mypy-folders": "[]", }, @@ -139,6 +144,7 @@ def assert_outputs_are_printed(expected_outputs: dict[str, str], stderr: str): "mypy-docs,mypy-providers,ts-compile-format-lint-www", "upgrade-to-newer-dependencies": "false", "parallel-test-types-list-as-string": "API Always Providers[fab]", + "providers-test-types-list-as-string": "Providers[fab]", "needs-mypy": "true", "mypy-folders": "['airflow']", }, @@ -164,6 +170,7 @@ def assert_outputs_are_printed(expected_outputs: dict[str, str], stderr: str): "mypy-docs,mypy-providers,ts-compile-format-lint-www", "upgrade-to-newer-dependencies": "false", "parallel-test-types-list-as-string": "Always Operators", + "providers-test-types-list-as-string": "", "needs-mypy": "true", "mypy-folders": "['airflow']", }, @@ -190,6 +197,7 @@ def assert_outputs_are_printed(expected_outputs: dict[str, str], stderr: str): "upgrade-to-newer-dependencies": "false", "parallel-test-types-list-as-string": "Always BranchExternalPython BranchPythonVenv " "ExternalPython Operators PythonVenv", + "providers-test-types-list-as-string": "", "needs-mypy": "true", "mypy-folders": "['airflow']", }, @@ -215,6 +223,7 @@ def assert_outputs_are_printed(expected_outputs: dict[str, str], stderr: str): "mypy-docs,mypy-providers,ts-compile-format-lint-www", "upgrade-to-newer-dependencies": "false", "parallel-test-types-list-as-string": "Always Serialization", + "providers-test-types-list-as-string": "", "needs-mypy": "true", "mypy-folders": "['airflow']", }, @@ -245,6 +254,8 @@ def assert_outputs_are_printed(expected_outputs: dict[str, str], stderr: str): "upgrade-to-newer-dependencies": "false", "parallel-test-types-list-as-string": "API Always Providers[amazon] " "Providers[common.sql,fab,openlineage,pgvector,postgres] Providers[google]", + "providers-test-types-list-as-string": "Providers[amazon] " + "Providers[common.sql,fab,openlineage,pgvector,postgres] Providers[google]", "needs-mypy": "true", "mypy-folders": "['airflow', 'providers']", }, @@ -271,6 +282,7 @@ def assert_outputs_are_printed(expected_outputs: dict[str, str], stderr: str): "run-kubernetes-tests": "false", "upgrade-to-newer-dependencies": "false", "parallel-test-types-list-as-string": "Always Providers[apache.beam] Providers[google]", + "providers-test-types-list-as-string": "Providers[apache.beam] Providers[google]", "needs-mypy": "true", "mypy-folders": "['providers']", }, @@ -297,6 +309,7 @@ def assert_outputs_are_printed(expected_outputs: dict[str, str], stderr: str): "run-kubernetes-tests": "false", "upgrade-to-newer-dependencies": "false", "parallel-test-types-list-as-string": None, + "providers-test-types-list-as-string": None, "needs-mypy": "false", "mypy-folders": "[]", }, @@ -327,6 +340,8 @@ def assert_outputs_are_printed(expected_outputs: dict[str, str], stderr: str): "upgrade-to-newer-dependencies": "false", "parallel-test-types-list-as-string": "Always Providers[amazon] " "Providers[common.sql,openlineage,pgvector,postgres] Providers[google]", + "providers-test-types-list-as-string": "Providers[amazon] " + "Providers[common.sql,openlineage,pgvector,postgres] Providers[google]", "needs-mypy": "true", "mypy-folders": "['providers']", }, @@ -359,6 +374,7 @@ def assert_outputs_are_printed(expected_outputs: dict[str, str], stderr: str): "upgrade-to-newer-dependencies": "false", "parallel-test-types-list-as-string": "Always " "Providers[airbyte,apache.livy,dbt.cloud,dingding,discord,http] Providers[amazon]", + "providers-test-types-list-as-string": "Providers[airbyte,apache.livy,dbt.cloud,dingding,discord,http] Providers[amazon]", "needs-mypy": "true", "mypy-folders": "['providers']", }, @@ -389,6 +405,7 @@ def assert_outputs_are_printed(expected_outputs: dict[str, str], stderr: str): "run-kubernetes-tests": "true", "upgrade-to-newer-dependencies": "false", "parallel-test-types-list-as-string": "Always Providers[airbyte,http]", + "providers-test-types-list-as-string": "Providers[airbyte,http]", "needs-mypy": "true", "mypy-folders": "['providers']", }, @@ -420,6 +437,7 @@ def assert_outputs_are_printed(expected_outputs: dict[str, str], stderr: str): "run-kubernetes-tests": "true", "upgrade-to-newer-dependencies": "false", "parallel-test-types-list-as-string": "Always", + "providers-test-types-list-as-string": "", "needs-mypy": "true", "mypy-folders": "['airflow']", }, @@ -446,6 +464,7 @@ def assert_outputs_are_printed(expected_outputs: dict[str, str], stderr: str): "skip-pre-commits": "identity,mypy-airflow,mypy-dev,mypy-docs,mypy-providers", "upgrade-to-newer-dependencies": "true", "parallel-test-types-list-as-string": ALL_CI_SELECTIVE_TEST_TYPES, + "providers-test-types-list-as-string": ALL_PROVIDERS_SELECTIVE_TEST_TYPES, "needs-mypy": "true", "mypy-folders": "['airflow', 'providers', 'docs', 'dev']", }, @@ -472,6 +491,7 @@ def assert_outputs_are_printed(expected_outputs: dict[str, str], stderr: str): "skip-pre-commits": "identity,mypy-airflow,mypy-dev,mypy-docs,mypy-providers", "upgrade-to-newer-dependencies": "true", "parallel-test-types-list-as-string": ALL_CI_SELECTIVE_TEST_TYPES, + "providers-test-types-list-as-string": ALL_PROVIDERS_SELECTIVE_TEST_TYPES, "needs-mypy": "true", "mypy-folders": "['airflow', 'providers', 'docs', 'dev']", }, @@ -778,6 +798,7 @@ def test_full_test_needed_when_scripts_changes(files: tuple[str, ...], expected_ "skip-pre-commits": "identity,mypy-airflow,mypy-dev,mypy-docs,mypy-providers", "upgrade-to-newer-dependencies": "false", "parallel-test-types-list-as-string": ALL_CI_SELECTIVE_TEST_TYPES, + "providers-test-types-list-as-string": ALL_PROVIDERS_SELECTIVE_TEST_TYPES, "needs-mypy": "true", "mypy-folders": "['airflow', 'providers', 'docs', 'dev']", }, @@ -811,6 +832,7 @@ def test_full_test_needed_when_scripts_changes(files: tuple[str, ...], expected_ "skip-pre-commits": "identity,mypy-airflow,mypy-dev,mypy-docs,mypy-providers", "upgrade-to-newer-dependencies": "false", "parallel-test-types-list-as-string": ALL_CI_SELECTIVE_TEST_TYPES, + "providers-test-types-list-as-string": ALL_PROVIDERS_SELECTIVE_TEST_TYPES, "needs-mypy": "true", "mypy-folders": "['airflow', 'providers', 'docs', 'dev']", }, @@ -844,6 +866,7 @@ def test_full_test_needed_when_scripts_changes(files: tuple[str, ...], expected_ "skip-pre-commits": "identity,mypy-airflow,mypy-dev,mypy-docs,mypy-providers", "upgrade-to-newer-dependencies": "false", "parallel-test-types-list-as-string": ALL_CI_SELECTIVE_TEST_TYPES, + "providers-test-types-list-as-string": ALL_PROVIDERS_SELECTIVE_TEST_TYPES, "needs-mypy": "true", "mypy-folders": "['airflow', 'providers', 'docs', 'dev']", }, @@ -878,6 +901,7 @@ def test_full_test_needed_when_scripts_changes(files: tuple[str, ...], expected_ "skip-pre-commits": "identity,mypy-airflow,mypy-dev,mypy-docs,mypy-providers", "upgrade-to-newer-dependencies": "false", "parallel-test-types-list-as-string": ALL_CI_SELECTIVE_TEST_TYPES, + "providers-test-types-list-as-string": ALL_PROVIDERS_SELECTIVE_TEST_TYPES, "needs-mypy": "true", "mypy-folders": "['airflow', 'providers', 'docs', 'dev']", }, @@ -912,6 +936,7 @@ def test_full_test_needed_when_scripts_changes(files: tuple[str, ...], expected_ "skip-pre-commits": "identity,mypy-airflow,mypy-dev,mypy-docs,mypy-providers", "upgrade-to-newer-dependencies": "false", "parallel-test-types-list-as-string": ALL_CI_SELECTIVE_TEST_TYPES, + "providers-test-types-list-as-string": ALL_PROVIDERS_SELECTIVE_TEST_TYPES, "needs-mypy": "true", "mypy-folders": "['airflow', 'providers', 'docs', 'dev']", }, @@ -943,6 +968,7 @@ def test_full_test_needed_when_scripts_changes(files: tuple[str, ...], expected_ "skip-pre-commits": "identity,mypy-airflow,mypy-dev,mypy-docs,mypy-providers", "upgrade-to-newer-dependencies": "false", "parallel-test-types-list-as-string": ALL_CI_SELECTIVE_TEST_TYPES, + "providers-test-types-list-as-string": ALL_PROVIDERS_SELECTIVE_TEST_TYPES, "needs-mypy": "true", "mypy-folders": "['airflow', 'providers', 'docs', 'dev']", }, diff --git a/pyproject.toml b/pyproject.toml index fadfa13b469e..e1bd7e846d4a 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -410,6 +410,9 @@ combine-as-imports = true # https://github.com/apache/airflow/issues/39252 "airflow/providers/amazon/aws/hooks/eks.py" = ["W605"] +# Test compat imports banned imports to allow testing against older airflow versions +"tests/test_utils/compat.py" = ["TID251", "F401"] + [tool.ruff.lint.flake8-tidy-imports] # Disallow all relative imports. ban-relative-imports = "all" diff --git a/scripts/docker/entrypoint_ci.sh b/scripts/docker/entrypoint_ci.sh index a0738dc1d6b8..3c1699acbcc4 100755 --- a/scripts/docker/entrypoint_ci.sh +++ b/scripts/docker/entrypoint_ci.sh @@ -205,6 +205,8 @@ function determine_airflow_to_use() { mkdir -p "${AIRFLOW_SOURCES}"/tmp/ else python "${IN_CONTAINER_DIR}/install_airflow_and_providers.py" + # Some packages might leave legacy typing module which causes test issues + pip uninstall -y typing || true fi if [[ "${USE_AIRFLOW_VERSION}" =~ ^2\.2\..*|^2\.1\..*|^2\.0\..* && "${AIRFLOW__DATABASE__SQL_ALCHEMY_CONN=}" != "" ]]; then diff --git a/scripts/in_container/run_ci_tests.sh b/scripts/in_container/run_ci_tests.sh index 3d6b5a6a68da..98ff9ddef93d 100755 --- a/scripts/in_container/run_ci_tests.sh +++ b/scripts/in_container/run_ci_tests.sh @@ -60,6 +60,15 @@ if [[ ${TEST_TYPE:=} == "Quarantined" ]]; then fi fi +if [[ ${CI:="false"} == "true" && ${RES} != "0" && ${USE_AIRFLOW_VERSION=} != "" ]]; then + echo + echo "${COLOR_YELLOW}Failing compatibility test of providers for for ${USE_AIRFLOW_VERSION} Airflow and you need to make sure it passes for it as well or deal with compatibility.${COLOR_RESET}" + echo + echo "${COLOR_BLUE}Read more on how to run the test locally and how to deal with Provider's compatibility with older Airflow versions at:${COLOR_RESET}" + echo "https://github.com/apache/airflow/blob/main/contributing-docs/testing/unit_tests.rst#running-provider-compatibility-tests" + echo +fi + if [[ ${CI:="false"} == "true" || ${CI} == "True" ]]; then if [[ ${RES} != "0" ]]; then echo diff --git a/tests/api_connexion/endpoints/test_import_error_endpoint.py b/tests/api_connexion/endpoints/test_import_error_endpoint.py index ce084165d24e..4549b74ae997 100644 --- a/tests/api_connexion/endpoints/test_import_error_endpoint.py +++ b/tests/api_connexion/endpoints/test_import_error_endpoint.py @@ -22,11 +22,11 @@ from airflow.api_connexion.exceptions import EXCEPTIONS_LINK_MAP from airflow.models.dag import DagModel -from airflow.models.errors import ParseImportError from airflow.security import permissions from airflow.utils import timezone from airflow.utils.session import provide_session from tests.test_utils.api_connexion_utils import assert_401, create_user, delete_user +from tests.test_utils.compat import ParseImportError from tests.test_utils.config import conf_vars from tests.test_utils.db import clear_db_dags, clear_db_import_errors diff --git a/tests/api_connexion/schemas/test_error_schema.py b/tests/api_connexion/schemas/test_error_schema.py index 7056417e6609..8604bee516db 100644 --- a/tests/api_connexion/schemas/test_error_schema.py +++ b/tests/api_connexion/schemas/test_error_schema.py @@ -23,9 +23,9 @@ import_error_collection_schema, import_error_schema, ) -from airflow.models.errors import ParseImportError from airflow.utils import timezone from airflow.utils.session import provide_session +from tests.test_utils.compat import ParseImportError from tests.test_utils.db import clear_db_import_errors pytestmark = pytest.mark.db_test diff --git a/tests/api_experimental/common/test_delete_dag.py b/tests/api_experimental/common/test_delete_dag.py index 961d04dd3768..4dc5f9b00f88 100644 --- a/tests/api_experimental/common/test_delete_dag.py +++ b/tests/api_experimental/common/test_delete_dag.py @@ -23,7 +23,6 @@ from airflow.exceptions import AirflowException, DagNotFound from airflow.models.dag import DAG, DagModel from airflow.models.dagrun import DagRun as DR -from airflow.models.errors import ParseImportError as IE from airflow.models.log import Log from airflow.models.taskfail import TaskFail from airflow.models.taskinstance import TaskInstance as TI @@ -33,6 +32,7 @@ from airflow.utils.session import create_session from airflow.utils.state import State from airflow.utils.types import DagRunType +from tests.test_utils.compat import ParseImportError as IE from tests.test_utils.db import clear_db_dags, clear_db_runs pytestmark = pytest.mark.db_test diff --git a/tests/conftest.py b/tests/conftest.py index efff93f58bf1..63e5e40018fc 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -334,8 +334,12 @@ def initial_db_init(): from airflow.utils import db from airflow.www.extensions.init_appbuilder import init_appbuilder from airflow.www.extensions.init_auth_manager import get_auth_manager + from tests.test_utils.compat import AIRFLOW_V_2_10_PLUS - db.resetdb(use_migration_files=True) + if AIRFLOW_V_2_10_PLUS: + db.resetdb(use_migration_files=True) + else: + db.resetdb() db.bootstrap_dagbag() # minimal app to add roles flask_app = Flask(__name__) @@ -1292,6 +1296,20 @@ def _disable_redact(request: pytest.FixtureRequest, mocker): return +@pytest.fixture +def airflow_root_path() -> Path: + import airflow + + return Path(airflow.__path__[0]).parent + + +# This constant is set to True if tests are run with Airflow installed from Packages rather than running +# the tests within Airflow sources. While most tests in CI are run using Airflow sources, there are +# also compatibility tests that only use `tests` package and run against installed packages of Airflow in +# for supported Airflow versions. +RUNNING_TESTS_AGAINST_AIRFLOW_PACKAGES = not (Path(__file__).parents[1] / "airflow" / "__init__.py").exists() + + if TYPE_CHECKING: # Static checkers do not know about pytest fixtures' types and return, # In case if them distributed through third party packages. diff --git a/tests/dag_processing/test_job_runner.py b/tests/dag_processing/test_job_runner.py index 0ebb6466aa89..8e2bfbde623f 100644 --- a/tests/dag_processing/test_job_runner.py +++ b/tests/dag_processing/test_job_runner.py @@ -54,13 +54,13 @@ from airflow.jobs.job import Job from airflow.models import DagBag, DagModel, DbCallbackRequest from airflow.models.dagcode import DagCode -from airflow.models.errors import ParseImportError from airflow.models.serialized_dag import SerializedDagModel from airflow.utils import timezone from airflow.utils.net import get_hostname from airflow.utils.session import create_session from tests.core.test_logging_config import SETTINGS_FILE_VALID, settings_context from tests.models import TEST_DAGS_FOLDER +from tests.test_utils.compat import ParseImportError from tests.test_utils.config import conf_vars from tests.test_utils.db import clear_db_callbacks, clear_db_dags, clear_db_runs, clear_db_serialized_dags diff --git a/tests/dag_processing/test_processor.py b/tests/dag_processing/test_processor.py index 98640aaf3d71..b79095994ab0 100644 --- a/tests/dag_processing/test_processor.py +++ b/tests/dag_processing/test_processor.py @@ -32,7 +32,6 @@ from airflow.dag_processing.manager import DagFileProcessorAgent from airflow.dag_processing.processor import DagFileProcessor, DagFileProcessorProcess from airflow.models import DagBag, DagModel, SlaMiss, TaskInstance -from airflow.models.errors import ParseImportError from airflow.models.serialized_dag import SerializedDagModel from airflow.models.taskinstance import SimpleTaskInstance from airflow.operators.empty import EmptyOperator @@ -40,6 +39,7 @@ from airflow.utils.session import create_session from airflow.utils.state import State from airflow.utils.types import DagRunType +from tests.test_utils.compat import ParseImportError from tests.test_utils.config import conf_vars, env_vars from tests.test_utils.db import ( clear_db_dags, diff --git a/tests/listeners/class_listener.py b/tests/listeners/class_listener.py index a719d372bd7e..ececa853213a 100644 --- a/tests/listeners/class_listener.py +++ b/tests/listeners/class_listener.py @@ -19,38 +19,70 @@ from airflow.listeners import hookimpl from airflow.utils.state import DagRunState, TaskInstanceState +from tests.test_utils.compat import AIRFLOW_V_2_10_PLUS +if AIRFLOW_V_2_10_PLUS: -class ClassBasedListener: - def __init__(self): - self.started_component = None - self.stopped_component = None - self.state = [] - - @hookimpl - def on_starting(self, component): - self.started_component = component - self.state.append(DagRunState.RUNNING) - - @hookimpl - def before_stopping(self, component): - global stopped_component - stopped_component = component - self.state.append(DagRunState.SUCCESS) - - @hookimpl - def on_task_instance_running(self, previous_state, task_instance, session): - self.state.append(TaskInstanceState.RUNNING) - - @hookimpl - def on_task_instance_success(self, previous_state, task_instance, session): - self.state.append(TaskInstanceState.SUCCESS) - - @hookimpl - def on_task_instance_failed( - self, previous_state, task_instance, error: None | str | BaseException, session - ): - self.state.append(TaskInstanceState.FAILED) + class ClassBasedListener: + def __init__(self): + self.started_component = None + self.stopped_component = None + self.state = [] + + @hookimpl + def on_starting(self, component): + self.started_component = component + self.state.append(DagRunState.RUNNING) + + @hookimpl + def before_stopping(self, component): + global stopped_component + stopped_component = component + self.state.append(DagRunState.SUCCESS) + + @hookimpl + def on_task_instance_running(self, previous_state, task_instance, session): + self.state.append(TaskInstanceState.RUNNING) + + @hookimpl + def on_task_instance_success(self, previous_state, task_instance, session): + self.state.append(TaskInstanceState.SUCCESS) + + @hookimpl + def on_task_instance_failed( + self, previous_state, task_instance, error: None | str | BaseException, session + ): + self.state.append(TaskInstanceState.FAILED) +else: + + class ClassBasedListener: # type: ignore[no-redef] + def __init__(self): + self.started_component = None + self.stopped_component = None + self.state = [] + + @hookimpl + def on_starting(self, component): + self.started_component = component + self.state.append(DagRunState.RUNNING) + + @hookimpl + def before_stopping(self, component): + global stopped_component + stopped_component = component + self.state.append(DagRunState.SUCCESS) + + @hookimpl + def on_task_instance_running(self, previous_state, task_instance, session): + self.state.append(TaskInstanceState.RUNNING) + + @hookimpl + def on_task_instance_success(self, previous_state, task_instance, session): + self.state.append(TaskInstanceState.SUCCESS) + + @hookimpl + def on_task_instance_failed(self, previous_state, task_instance, session): + self.state.append(TaskInstanceState.FAILED) def clear(): diff --git a/tests/providers/amazon/aws/auth_manager/avp/test_facade.py b/tests/providers/amazon/aws/auth_manager/avp/test_facade.py index 0daae8811e84..5c632ac1ba8e 100644 --- a/tests/providers/amazon/aws/auth_manager/avp/test_facade.py +++ b/tests/providers/amazon/aws/auth_manager/avp/test_facade.py @@ -17,7 +17,6 @@ from __future__ import annotations import json -from pathlib import Path from typing import TYPE_CHECKING from unittest.mock import Mock @@ -312,13 +311,10 @@ def test_get_batch_is_authorized_single_result_unsuccessful(self, facade): user=test_user, ) - def test_is_policy_store_schema_up_to_date_when_schema_up_to_date(self, facade): - schema_path = ( - Path(__file__) - .parents[6] - .joinpath("airflow", "providers", "amazon", "aws", "auth_manager", "avp", "schema.json") - .resolve() - ) + def test_is_policy_store_schema_up_to_date_when_schema_up_to_date(self, facade, airflow_root_path): + schema_path = airflow_root_path.joinpath( + "airflow", "providers", "amazon", "aws", "auth_manager", "avp", "schema.json" + ).resolve() with open(schema_path) as schema_file: avp_response = {"schema": schema_file.read()} mock_get_schema = Mock(return_value=avp_response) @@ -326,13 +322,10 @@ def test_is_policy_store_schema_up_to_date_when_schema_up_to_date(self, facade): assert facade.is_policy_store_schema_up_to_date() - def test_is_policy_store_schema_up_to_date_when_schema_is_modified(self, facade): - schema_path = ( - Path(__file__) - .parents[6] - .joinpath("airflow", "providers", "amazon", "aws", "auth_manager", "avp", "schema.json") - .resolve() - ) + def test_is_policy_store_schema_up_to_date_when_schema_is_modified(self, facade, airflow_root_path): + schema_path = airflow_root_path.joinpath( + "airflow", "providers", "amazon", "aws", "auth_manager", "avp", "schema.json" + ).resolve() with open(schema_path) as schema_file: schema = json.loads(schema_file.read()) schema["new_field"] = "new_value" diff --git a/tests/providers/amazon/aws/executors/batch/test_batch_executor.py b/tests/providers/amazon/aws/executors/batch/test_batch_executor.py index 8e0d20653b7a..cd6c2c87ac08 100644 --- a/tests/providers/amazon/aws/executors/batch/test_batch_executor.py +++ b/tests/providers/amazon/aws/executors/batch/test_batch_executor.py @@ -42,6 +42,7 @@ ) from airflow.utils.helpers import convert_camel_to_snake from airflow.utils.state import State +from tests.conftest import RUNNING_TESTS_AGAINST_AIRFLOW_PACKAGES from tests.test_utils.config import conf_vars ARN1 = "arn1" @@ -655,6 +656,11 @@ def _unset_conf(): def teardown_method(self) -> None: self._unset_conf() + @pytest.mark.skipif( + RUNNING_TESTS_AGAINST_AIRFLOW_PACKAGES, + reason="Config defaults are validated against provider.yaml so this test " + "should only run when tests are run from sources", + ) def test_validate_config_defaults(self): """Assert that the defaults stated in the config.yml file match those in utils.CONFIG_DEFAULTS.""" curr_dir = os.path.dirname(os.path.abspath(__file__)) diff --git a/tests/providers/amazon/aws/executors/ecs/test_ecs_executor.py b/tests/providers/amazon/aws/executors/ecs/test_ecs_executor.py index 6c6bef5f7c7b..b547f398337f 100644 --- a/tests/providers/amazon/aws/executors/ecs/test_ecs_executor.py +++ b/tests/providers/amazon/aws/executors/ecs/test_ecs_executor.py @@ -54,6 +54,8 @@ from airflow.utils.helpers import convert_camel_to_snake from airflow.utils.state import State, TaskInstanceState from airflow.utils.timezone import utcnow +from tests.conftest import RUNNING_TESTS_AGAINST_AIRFLOW_PACKAGES +from tests.test_utils.compat import AIRFLOW_V_2_10_PLUS from tests.test_utils.config import conf_vars pytestmark = pytest.mark.db_test @@ -367,6 +369,7 @@ def test_stopped_tasks(self): class TestAwsEcsExecutor: """Tests the AWS ECS Executor.""" + @pytest.mark.skipif(not AIRFLOW_V_2_10_PLUS, reason="Test requires Airflow 2.10+") @mock.patch("airflow.providers.amazon.aws.executors.ecs.ecs_executor.AwsEcsExecutor.change_state") def test_execute(self, change_state_mock, mock_airflow_key, mock_executor): """Test execution from end-to-end.""" @@ -1205,8 +1208,18 @@ def test_flatten_dict(self): nested_dict = {"a": "a", "b": "b", "c": {"d": "d"}} assert _recursive_flatten_dict(nested_dict) == {"a": "a", "b": "b", "d": "d"} + @pytest.mark.skipif( + RUNNING_TESTS_AGAINST_AIRFLOW_PACKAGES, + reason="Config defaults are validated against provider.yaml so this test " + "should only run when tests are run from sources", + ) def test_validate_config_defaults(self): - """Assert that the defaults stated in the config.yml file match those in utils.CONFIG_DEFAULTS.""" + """Assert that the defaults stated in the config.yml file match those in utils.CONFIG_DEFAULTS. + + This test should only be run to verify configuration defaults are the same when it is run from + airflow sources, not when airflow is installed from packages, because airflow installed from packages + will not have the provider.yml file. + """ curr_dir = os.path.dirname(os.path.abspath(__file__)) executor_path = "aws/executors/ecs" config_filename = curr_dir.replace("tests", "airflow").replace(executor_path, "provider.yaml") diff --git a/tests/providers/amazon/aws/hooks/test_dynamodb.py b/tests/providers/amazon/aws/hooks/test_dynamodb.py index f3baba8b69d2..4e3e96c0dd47 100644 --- a/tests/providers/amazon/aws/hooks/test_dynamodb.py +++ b/tests/providers/amazon/aws/hooks/test_dynamodb.py @@ -60,4 +60,4 @@ def test_insert_batch_items_dynamodb_table(self): def test_waiter_path_generated_from_resource_type(self, _): hook = DynamoDBHook(aws_conn_id="aws_default") path = hook.waiter_path - assert path.as_uri().endswith("/airflow/airflow/providers/amazon/aws/waiters/dynamodb.json") + assert path.as_uri().endswith("/airflow/providers/amazon/aws/waiters/dynamodb.json") diff --git a/tests/providers/amazon/aws/hooks/test_hooks_signature.py b/tests/providers/amazon/aws/hooks/test_hooks_signature.py index a6530f45a61c..f2537226183b 100644 --- a/tests/providers/amazon/aws/hooks/test_hooks_signature.py +++ b/tests/providers/amazon/aws/hooks/test_hooks_signature.py @@ -65,7 +65,9 @@ def get_aws_hooks_modules(): """Parse Amazon Provider metadata and find all hooks based on `AwsGenericHook` and return it.""" - hooks_dir = Path(__file__).absolute().parents[5] / "airflow" / "providers" / "amazon" / "aws" / "hooks" + import airflow.providers.amazon.aws.hooks as aws_hooks + + hooks_dir = Path(aws_hooks.__path__[0]) if not hooks_dir.exists(): msg = f"Amazon Provider hooks directory not found: {hooks_dir.__fspath__()!r}" raise FileNotFoundError(msg) diff --git a/tests/providers/amazon/aws/links/test_base_aws.py b/tests/providers/amazon/aws/links/test_base_aws.py index 222be314585f..446d584edf35 100644 --- a/tests/providers/amazon/aws/links/test_base_aws.py +++ b/tests/providers/amazon/aws/links/test_base_aws.py @@ -203,9 +203,10 @@ def test_link_serialize(self): """Test: Operator links should exist for serialized DAG.""" self.create_op_and_ti(self.link_class, dag_id="test_link_serialize", task_id=self.task_id) serialized_dag = self.dag_maker.get_serialized_data() - operator_extra_link = serialized_dag["dag"]["tasks"][0]["__var"]["_operator_extra_links"] + deserialized_dag = SerializedDAG.deserialize_dag(serialized_dag["dag"]) + operator_extra_link = deserialized_dag.tasks[0].operator_extra_links[0] error_message = "Operator links should exist for serialized DAG" - assert operator_extra_link == [{self.full_qualname: {}}], error_message + assert operator_extra_link.name == self.link_class.name, error_message def test_empty_xcom(self): """Test: Operator links should return empty string if no XCom value.""" diff --git a/tests/providers/amazon/aws/operators/test_emr_serverless.py b/tests/providers/amazon/aws/operators/test_emr_serverless.py index be42cf63ba49..b1344521b98a 100644 --- a/tests/providers/amazon/aws/operators/test_emr_serverless.py +++ b/tests/providers/amazon/aws/operators/test_emr_serverless.py @@ -41,6 +41,7 @@ BaseSerialization, ) from airflow.utils.types import NOTSET +from tests.test_utils.compat import deserialize_operator if TYPE_CHECKING: from unittest.mock import MagicMock @@ -1119,7 +1120,7 @@ def test_operator_extra_links_mapped_without_applicationui_enabled( ) ser_operator = BaseSerialization.serialize(operator) - deser_operator = BaseSerialization.deserialize(ser_operator) + deser_operator = deserialize_operator(ser_operator) assert deser_operator.operator_extra_links == [ EmrServerlessS3LogsLink(), @@ -1140,7 +1141,7 @@ def test_operator_extra_links_mapped_with_applicationui_enabled_at_partial( ) ser_operator = BaseSerialization.serialize(operator) - deser_operator = BaseSerialization.deserialize(ser_operator) + deser_operator = deserialize_operator(ser_operator) assert deser_operator.operator_extra_links == [ EmrServerlessS3LogsLink(), diff --git a/tests/providers/amazon/aws/utils/test_eks_get_token.py b/tests/providers/amazon/aws/utils/test_eks_get_token.py index 8825c6c218ba..672ccfc9b3fd 100644 --- a/tests/providers/amazon/aws/utils/test_eks_get_token.py +++ b/tests/providers/amazon/aws/utils/test_eks_get_token.py @@ -25,8 +25,6 @@ import pytest import time_machine -from tests.test_utils import AIRFLOW_MAIN_FOLDER - class TestGetEksToken: @mock.patch("airflow.providers.amazon.aws.hooks.eks.EksHook") @@ -65,13 +63,13 @@ class TestGetEksToken: ], ], ) - def test_run(self, mock_eks_hook, args, expected_aws_conn_id, expected_region_name): + def test_run(self, mock_eks_hook, args, expected_aws_conn_id, expected_region_name, airflow_root_path): ( mock_eks_hook.return_value.fetch_access_token_for_cluster.return_value ) = "k8s-aws-v1.aHR0cDovL2V4YW1wbGUuY29t" with mock.patch("sys.argv", args), contextlib.redirect_stdout(StringIO()) as temp_stdout: - os.chdir(AIRFLOW_MAIN_FOLDER) + os.chdir(airflow_root_path) # We are not using run_module because of https://github.com/pytest-dev/pytest/issues/9007 runpy.run_path("airflow/providers/amazon/aws/utils/eks_get_token.py", run_name="__main__") output = temp_stdout.getvalue() diff --git a/tests/providers/apache/iceberg/hooks/test_iceberg.py b/tests/providers/apache/iceberg/hooks/test_iceberg.py index af58bb55b0c3..f11e1d4ea629 100644 --- a/tests/providers/apache/iceberg/hooks/test_iceberg.py +++ b/tests/providers/apache/iceberg/hooks/test_iceberg.py @@ -17,6 +17,8 @@ from __future__ import annotations +from unittest.mock import Mock, patch + import pytest import requests_mock @@ -27,16 +29,22 @@ def test_iceberg_hook(): access_token = "eyJ0eXAiOiJKV1QiLCJhbGciOiJSU" - with requests_mock.Mocker() as m: - m.post( - "https://api.iceberg.io/ws/v1/oauth/tokens", - json={ - "access_token": access_token, - "token_type": "Bearer", - "expires_in": 86400, - "warehouse_id": "fadc4c31-e81f-48cd-9ce8-64cd5ce3fa5d", - "region": "us-west-2", - "catalog_url": "warehouses/fadc4c31-e81f-48cd-9ce8-64cd5ce3fa5d", - }, - ) - assert IcebergHook().get_conn() == access_token + with patch("airflow.models.Connection.get_connection_from_secrets") as mock_get_connection: + mock_conn = Mock() + mock_conn.conn_id = "iceberg_default" + mock_conn.host = "https://api.iceberg.io/ws/v1" + mock_conn.extra_dejson = {} + mock_get_connection.return_value = mock_conn + with requests_mock.Mocker() as m: + m.post( + "https://api.iceberg.io/ws/v1/oauth/tokens", + json={ + "access_token": access_token, + "token_type": "Bearer", + "expires_in": 86400, + "warehouse_id": "fadc4c31-e81f-48cd-9ce8-64cd5ce3fa5d", + "region": "us-west-2", + "catalog_url": "warehouses/fadc4c31-e81f-48cd-9ce8-64cd5ce3fa5d", + }, + ) + assert IcebergHook().get_conn() == access_token diff --git a/tests/providers/cncf/kubernetes/operators/test_pod.py b/tests/providers/cncf/kubernetes/operators/test_pod.py index 224824edbc5e..8b7c238e9c14 100644 --- a/tests/providers/cncf/kubernetes/operators/test_pod.py +++ b/tests/providers/cncf/kubernetes/operators/test_pod.py @@ -358,7 +358,8 @@ def test_labels(self, hook_mock, in_cluster): "dag_id": "dag", "kubernetes_pod_operator": "True", "task_id": "task", - "try_number": "0", + # Try number behaves differently on different versions of Airflow + "try_number": mock.ANY, "airflow_version": mock.ANY, "run_id": "test", "airflow_kpo_in_cluster": str(in_cluster), @@ -374,7 +375,7 @@ def test_labels_mapped(self): "dag_id": "dag", "kubernetes_pod_operator": "True", "task_id": "task", - "try_number": "0", + "try_number": mock.ANY, "airflow_version": mock.ANY, "run_id": "test", "map_index": "10", @@ -884,7 +885,7 @@ def test_full_pod_spec(self, randomize_name, pod_spec): "dag_id": "dag", "kubernetes_pod_operator": "True", "task_id": "task", - "try_number": "0", + "try_number": mock.ANY, "airflow_version": mock.ANY, "airflow_kpo_in_cluster": str(k.hook.is_in_cluster), "run_id": "test", @@ -920,7 +921,7 @@ def test_full_pod_spec_kwargs(self, randomize_name, pod_spec): "dag_id": "dag", "kubernetes_pod_operator": "True", "task_id": "task", - "try_number": "0", + "try_number": mock.ANY, "airflow_version": mock.ANY, "airflow_kpo_in_cluster": str(k.hook.is_in_cluster), "run_id": "test", @@ -991,7 +992,7 @@ def test_pod_template_file(self, randomize_name, pod_template_file): "dag_id": "dag", "kubernetes_pod_operator": "True", "task_id": "task", - "try_number": "0", + "try_number": mock.ANY, "airflow_version": mock.ANY, "airflow_kpo_in_cluster": str(k.hook.is_in_cluster), "run_id": "test", @@ -1061,7 +1062,7 @@ def test_pod_template_file_kwargs_override(self, randomize_name, pod_template_fi "dag_id": "dag", "kubernetes_pod_operator": "True", "task_id": "task", - "try_number": "0", + "try_number": mock.ANY, "airflow_version": mock.ANY, "airflow_kpo_in_cluster": str(k.hook.is_in_cluster), "run_id": "test", @@ -1112,7 +1113,7 @@ def test_pod_template_dict(self, randomize_name): "dag_id": "dag", "kubernetes_pod_operator": "True", "task_id": "task", - "try_number": "0", + "try_number": mock.ANY, "airflow_version": mock.ANY, "airflow_kpo_in_cluster": str(k.hook.is_in_cluster), "run_id": "test", diff --git a/tests/providers/cncf/kubernetes/test_template_rendering.py b/tests/providers/cncf/kubernetes/test_template_rendering.py index f3e61101eab8..98764a2f1faa 100644 --- a/tests/providers/cncf/kubernetes/test_template_rendering.py +++ b/tests/providers/cncf/kubernetes/test_template_rendering.py @@ -48,7 +48,7 @@ def test_render_k8s_pod_yaml(pod_mutation_hook, create_task_instance): "dag_id": "test_render_k8s_pod_yaml", "run_id": "test_run_id", "task_id": "op1", - "try_number": "0", + "try_number": mock.ANY, }, "labels": { "airflow-worker": "0", @@ -57,7 +57,7 @@ def test_render_k8s_pod_yaml(pod_mutation_hook, create_task_instance): "run_id": "test_run_id", "kubernetes_executor": "True", "task_id": "op1", - "try_number": "0", + "try_number": mock.ANY, }, "name": mock.ANY, "namespace": "default", diff --git a/tests/providers/google/cloud/operators/test_bigquery.py b/tests/providers/google/cloud/operators/test_bigquery.py index d84218f8bd5e..4cfcb7fe87c6 100644 --- a/tests/providers/google/cloud/operators/test_bigquery.py +++ b/tests/providers/google/cloud/operators/test_bigquery.py @@ -731,7 +731,8 @@ def test_bigquery_operator_extra_serialized_field_when_single_query( sql="SELECT * FROM test_table", ) serialized_dag = dag_maker.get_serialized_data() - assert "sql" in serialized_dag["dag"]["tasks"][0]["__var"] + deserialized_dag = SerializedDAG.deserialize_dag(serialized_dag["dag"]) + assert hasattr(deserialized_dag.tasks[0], "sql") dag = SerializedDAG.from_dict(serialized_dag) simple_task = dag.task_dict[TASK_ID] @@ -740,11 +741,8 @@ def test_bigquery_operator_extra_serialized_field_when_single_query( ######################################################### # Verify Operator Links work with Serialized Operator ######################################################### - - # Check Serialized version of operator link - assert serialized_dag["dag"]["tasks"][0]["__var"]["_operator_extra_links"] == [ - {"airflow.providers.google.cloud.operators.bigquery.BigQueryConsoleLink": {}} - ] + deserialized_dag = SerializedDAG.deserialize_dag(serialized_dag["dag"]) + assert deserialized_dag.tasks[0].operator_extra_links[0].name == "BigQuery Console" # Check DeSerialized version of operator link assert isinstance(next(iter(simple_task.operator_extra_links)), BigQueryConsoleLink) @@ -768,7 +766,8 @@ def test_bigquery_operator_extra_serialized_field_when_multiple_queries( sql=["SELECT * FROM test_table", "SELECT * FROM test_table2"], ) serialized_dag = dag_maker.get_serialized_data() - assert "sql" in serialized_dag["dag"]["tasks"][0]["__var"] + deserialized_dag = SerializedDAG.deserialize_dag(serialized_dag["dag"]) + assert hasattr(deserialized_dag.tasks[0], "sql") dag = SerializedDAG.from_dict(serialized_dag) simple_task = dag.task_dict[TASK_ID] @@ -777,12 +776,10 @@ def test_bigquery_operator_extra_serialized_field_when_multiple_queries( ######################################################### # Verify Operator Links work with Serialized Operator ######################################################### - - # Check Serialized version of operator link - assert serialized_dag["dag"]["tasks"][0]["__var"]["_operator_extra_links"] == [ - {"airflow.providers.google.cloud.operators.bigquery.BigQueryConsoleIndexableLink": {"index": 0}}, - {"airflow.providers.google.cloud.operators.bigquery.BigQueryConsoleIndexableLink": {"index": 1}}, - ] + deserialized_dag = SerializedDAG.deserialize_dag(serialized_dag["dag"]) + operator_extra_links = deserialized_dag.tasks[0].operator_extra_links + assert operator_extra_links[0].name == "BigQuery Console #1" + assert operator_extra_links[1].name == "BigQuery Console #2" # Check DeSerialized version of operator link assert isinstance(next(iter(simple_task.operator_extra_links)), BigQueryConsoleIndexableLink) diff --git a/tests/providers/google/cloud/operators/test_dataproc.py b/tests/providers/google/cloud/operators/test_dataproc.py index 0d97cfe36c24..c3d945c80821 100644 --- a/tests/providers/google/cloud/operators/test_dataproc.py +++ b/tests/providers/google/cloud/operators/test_dataproc.py @@ -79,12 +79,12 @@ from airflow.providers.google.common.consts import GOOGLE_DEFAULT_DEFERRABLE_METHOD_NAME from airflow.serialization.serialized_objects import SerializedDAG from airflow.utils.timezone import datetime -from airflow.version import version as airflow_version +from tests.test_utils.compat import AIRFLOW_VERSION from tests.test_utils.db import clear_db_runs, clear_db_xcom -cluster_params = inspect.signature(ClusterGenerator.__init__).parameters +AIRFLOW_VERSION_LABEL = "v" + str(AIRFLOW_VERSION).replace(".", "-").replace("+", "-") -AIRFLOW_VERSION = "v" + airflow_version.replace(".", "-").replace("+", "-") +cluster_params = inspect.signature(ClusterGenerator.__init__).parameters DATAPROC_PATH = "airflow.providers.google.cloud.operators.dataproc.{}" DATAPROC_TRIGGERS_PATH = "airflow.providers.google.cloud.triggers.dataproc.{}" @@ -325,9 +325,9 @@ "endpoint_config": {}, } -LABELS = {"labels": "data", "airflow-version": AIRFLOW_VERSION} +LABELS = {"labels": "data", "airflow-version": AIRFLOW_VERSION_LABEL} -LABELS.update({"airflow-version": "v" + airflow_version.replace(".", "-").replace("+", "-")}) +LABELS.update({"airflow-version": AIRFLOW_VERSION_LABEL}) CLUSTER = {"project_id": "project_id", "cluster_name": CLUSTER_NAME, "config": CONFIG, "labels": LABELS} @@ -1064,11 +1064,10 @@ def test_create_cluster_operator_extra_links(dag_maker, create_task_instance_of_ serialized_dag = dag_maker.get_serialized_data() deserialized_dag = SerializedDAG.from_dict(serialized_dag) deserialized_task = deserialized_dag.task_dict[TASK_ID] - # Assert operator links for serialized DAG - assert serialized_dag["dag"]["tasks"][0]["__var"]["_operator_extra_links"] == [ - {"airflow.providers.google.cloud.links.dataproc.DataprocClusterLink": {}} - ] + deserialized_dag = SerializedDAG.deserialize_dag(serialized_dag["dag"]) + operator_extra_link = deserialized_dag.tasks[0].operator_extra_links[0] + assert operator_extra_link.name == "Dataproc Cluster" # Assert operator link types are preserved during deserialization assert isinstance(deserialized_task.operator_extra_links[0], DataprocClusterLink) @@ -1168,9 +1167,9 @@ def test_scale_cluster_operator_extra_links(dag_maker, create_task_instance_of_o deserialized_task = deserialized_dag.task_dict[TASK_ID] # Assert operator links for serialized DAG - assert serialized_dag["dag"]["tasks"][0]["__var"]["_operator_extra_links"] == [ - {"airflow.providers.google.cloud.links.dataproc.DataprocLink": {}} - ] + deserialized_dag = SerializedDAG.deserialize_dag(serialized_dag["dag"]) + operator_extra_link = deserialized_dag.tasks[0].operator_extra_links[0] + assert operator_extra_link.name == "Dataproc resource" # Assert operator link types are preserved during deserialization assert isinstance(deserialized_task.operator_extra_links[0], DataprocLink) @@ -1562,10 +1561,10 @@ def test_submit_job_operator_extra_links(mock_hook, dag_maker, create_task_insta deserialized_dag = SerializedDAG.from_dict(serialized_dag) deserialized_task = deserialized_dag.task_dict[TASK_ID] - # Assert operator links for serialized_dag - assert serialized_dag["dag"]["tasks"][0]["__var"]["_operator_extra_links"] == [ - {"airflow.providers.google.cloud.links.dataproc.DataprocJobLink": {}} - ] + # Assert operator links for serialized DAG + deserialized_dag = SerializedDAG.deserialize_dag(serialized_dag["dag"]) + operator_extra_link = deserialized_dag.tasks[0].operator_extra_links[0] + assert operator_extra_link.name == "Dataproc Job" # Assert operator link types are preserved during deserialization assert isinstance(deserialized_task.operator_extra_links[0], DataprocJobLink) @@ -1767,10 +1766,10 @@ def test_update_cluster_operator_extra_links(dag_maker, create_task_instance_of_ deserialized_dag = SerializedDAG.from_dict(serialized_dag) deserialized_task = deserialized_dag.task_dict[TASK_ID] - # Assert operator links for serialized_dag - assert serialized_dag["dag"]["tasks"][0]["__var"]["_operator_extra_links"] == [ - {"airflow.providers.google.cloud.links.dataproc.DataprocClusterLink": {}} - ] + # Assert operator links for serialized DAG + deserialized_dag = SerializedDAG.deserialize_dag(serialized_dag["dag"]) + operator_extra_link = deserialized_dag.tasks[0].operator_extra_links[0] + assert operator_extra_link.name == "Dataproc Cluster" # Assert operator link types are preserved during deserialization assert isinstance(deserialized_task.operator_extra_links[0], DataprocClusterLink) @@ -1989,10 +1988,10 @@ def test_instantiate_workflow_operator_extra_links(mock_hook, dag_maker, create_ deserialized_dag = SerializedDAG.from_dict(serialized_dag) deserialized_task = deserialized_dag.task_dict[TASK_ID] - # Assert operator links for serialized_dag - assert serialized_dag["dag"]["tasks"][0]["__var"]["_operator_extra_links"] == [ - {"airflow.providers.google.cloud.links.dataproc.DataprocWorkflowLink": {}} - ] + # Assert operator links for serialized DAG + deserialized_dag = SerializedDAG.deserialize_dag(serialized_dag["dag"]) + operator_extra_link = deserialized_dag.tasks[0].operator_extra_links[0] + assert operator_extra_link.name == "Dataproc Workflow" # Assert operator link types are preserved during deserialization assert isinstance(deserialized_task.operator_extra_links[0], DataprocWorkflowLink) @@ -2151,10 +2150,10 @@ def test_instantiate_inline_workflow_operator_extra_links( deserialized_dag = SerializedDAG.from_dict(serialized_dag) deserialized_task = deserialized_dag.task_dict[TASK_ID] - # Assert operator links for serialized_dag - assert serialized_dag["dag"]["tasks"][0]["__var"]["_operator_extra_links"] == [ - {"airflow.providers.google.cloud.links.dataproc.DataprocWorkflowLink": {}} - ] + # Assert operator links for serialized DAG + deserialized_dag = SerializedDAG.deserialize_dag(serialized_dag["dag"]) + operator_extra_link = deserialized_dag.tasks[0].operator_extra_links[0] + assert operator_extra_link.name == "Dataproc Workflow" # Assert operator link types are preserved during deserialization assert isinstance(deserialized_task.operator_extra_links[0], DataprocWorkflowLink) @@ -2182,7 +2181,7 @@ class TestDataProcHiveOperator: job = { "reference": {"project_id": GCP_PROJECT, "job_id": f"{job_name}_{job_id}"}, "placement": {"cluster_name": "cluster-1"}, - "labels": {"airflow-version": AIRFLOW_VERSION}, + "labels": {"airflow-version": AIRFLOW_VERSION_LABEL}, "hive_job": {"query_list": {"queries": [query]}, "script_variables": variables}, } @@ -2244,7 +2243,7 @@ class TestDataProcPigOperator: job = { "reference": {"project_id": GCP_PROJECT, "job_id": f"{job_name}_{job_id}"}, "placement": {"cluster_name": "cluster-1"}, - "labels": {"airflow-version": AIRFLOW_VERSION}, + "labels": {"airflow-version": AIRFLOW_VERSION_LABEL}, "pig_job": {"query_list": {"queries": [query]}, "script_variables": variables}, } @@ -2306,13 +2305,13 @@ class TestDataProcSparkSqlOperator: job = { "reference": {"project_id": GCP_PROJECT, "job_id": f"{job_name}_{job_id}"}, "placement": {"cluster_name": "cluster-1"}, - "labels": {"airflow-version": AIRFLOW_VERSION}, + "labels": {"airflow-version": AIRFLOW_VERSION_LABEL}, "spark_sql_job": {"query_list": {"queries": [query]}, "script_variables": variables}, } other_project_job = { "reference": {"project_id": "other-project", "job_id": f"{job_name}_{job_id}"}, "placement": {"cluster_name": "cluster-1"}, - "labels": {"airflow-version": AIRFLOW_VERSION}, + "labels": {"airflow-version": AIRFLOW_VERSION_LABEL}, "spark_sql_job": {"query_list": {"queries": [query]}, "script_variables": variables}, } @@ -2410,7 +2409,7 @@ def setup_class(cls): "job_id": f"{job_name}_{TEST_JOB_ID}", }, "placement": {"cluster_name": "cluster-1"}, - "labels": {"airflow-version": AIRFLOW_VERSION}, + "labels": {"airflow-version": AIRFLOW_VERSION_LABEL}, "spark_job": {"jar_file_uris": jars, "main_class": main_class}, } @@ -2473,9 +2472,9 @@ def test_submit_spark_job_operator_extra_links(mock_hook, dag_maker, create_task deserialized_task = deserialized_dag.task_dict[TASK_ID] # Assert operator links for serialized DAG - assert serialized_dag["dag"]["tasks"][0]["__var"]["_operator_extra_links"] == [ - {"airflow.providers.google.cloud.links.dataproc.DataprocLink": {}} - ] + deserialized_dag = SerializedDAG.deserialize_dag(serialized_dag["dag"]) + operator_extra_link = deserialized_dag.tasks[0].operator_extra_links[0] + assert operator_extra_link.name == "Dataproc resource" # Assert operator link types are preserved during deserialization assert isinstance(deserialized_task.operator_extra_links[0], DataprocLink) @@ -2504,7 +2503,7 @@ class TestDataProcHadoopOperator: job = { "reference": {"project_id": GCP_PROJECT, "job_id": f"{job_name}_{job_id}"}, "placement": {"cluster_name": "cluster-1"}, - "labels": {"airflow-version": AIRFLOW_VERSION}, + "labels": {"airflow-version": AIRFLOW_VERSION_LABEL}, "hadoop_job": {"main_jar_file_uri": jar, "args": args}, } @@ -2542,7 +2541,7 @@ class TestDataProcPySparkOperator: job = { "reference": {"project_id": GCP_PROJECT, "job_id": f"{job_name}_{job_id}"}, "placement": {"cluster_name": "cluster-1"}, - "labels": {"airflow-version": AIRFLOW_VERSION}, + "labels": {"airflow-version": AIRFLOW_VERSION_LABEL}, "pyspark_job": {"main_python_file_uri": uri}, } diff --git a/tests/providers/openlineage/plugins/test_listener.py b/tests/providers/openlineage/plugins/test_listener.py index d9fbb0dfd360..65ac9657c0ba 100644 --- a/tests/providers/openlineage/plugins/test_listener.py +++ b/tests/providers/openlineage/plugins/test_listener.py @@ -33,10 +33,20 @@ from airflow.providers.openlineage.plugins.listener import OpenLineageListener from airflow.providers.openlineage.utils.selective_enable import disable_lineage, enable_lineage from airflow.utils.state import State +from tests.test_utils.compat import AIRFLOW_V_2_10_PLUS from tests.test_utils.config import conf_vars pytestmark = pytest.mark.db_test +EXPECTED_TRY_NUMBER_1 = 1 if AIRFLOW_V_2_10_PLUS else 0 +EXPECTED_TRY_NUMBER_2 = 2 if AIRFLOW_V_2_10_PLUS else 1 + +TRY_NUMBER_BEFORE_EXECUTION = 0 if AIRFLOW_V_2_10_PLUS else 1 +TRY_NUMBER_RUNNING = 0 if AIRFLOW_V_2_10_PLUS else 1 +TRY_NUMBER_FAILED = 0 if AIRFLOW_V_2_10_PLUS else 1 +TRY_NUMBER_SUCCESS = 0 if AIRFLOW_V_2_10_PLUS else 2 +TRY_NUMBER_AFTER_EXECUTION = 0 if AIRFLOW_V_2_10_PLUS else 2 + class TemplateOperator(BaseOperator): template_fields = ["df"] @@ -304,7 +314,7 @@ def mock_task_id(dag_id, task_id, execution_date, try_number): job_name="job_name", parent_job_name="dag_id", parent_run_id="dag_id.dag_run_run_id", - run_id="dag_id.task_id.execution_date.1", + run_id=f"dag_id.task_id.execution_date.{EXPECTED_TRY_NUMBER_1}", task=listener.extractor_manager.extract_metadata(), ) @@ -319,7 +329,7 @@ def mock_task_id(dag_id, task_id, execution_date, try_number): job_name="job_name", parent_job_name="dag_id", parent_run_id="dag_id.dag_run_run_id", - run_id="dag_id.task_id.execution_date.2", + run_id=f"dag_id.task_id.execution_date.{EXPECTED_TRY_NUMBER_2}", task=listener.extractor_manager.extract_metadata(), ) @@ -334,7 +344,9 @@ def test_run_id_is_constant_across_all_methods(mocked_adapter): """ def mock_task_id(dag_id, task_id, execution_date, try_number): - return f"{dag_id}.{task_id}.{execution_date}.{try_number}" + returned_try_number = try_number if AIRFLOW_V_2_10_PLUS else max(try_number - 1, 1) + + return f"{dag_id}.{task_id}.{execution_date}.{returned_try_number}" listener, task_instance = _create_listener_and_task_instance() mocked_adapter.build_task_instance_run_id.side_effect = mock_task_id @@ -344,7 +356,11 @@ def mock_task_id(dag_id, task_id, execution_date, try_number): assert listener.adapter.start_task.call_args.kwargs["run_id"] == expected_run_id_1 listener.on_task_instance_failed(None, task_instance, None) - assert listener.adapter.fail_task.call_args.kwargs["run_id"] == expected_run_id_1 + assert ( + listener.adapter.fail_task.call_args.kwargs["run_id"] == expected_run_id_1 + if AIRFLOW_V_2_10_PLUS + else expected_run_id_2 + ) # This run_id will not be different as we did NOT simulate increase of the try_number attribute, listener.on_task_instance_success(None, task_instance, None) @@ -354,7 +370,11 @@ def mock_task_id(dag_id, task_id, execution_date, try_number): # This is how airflow works, and that's why we expect the run_id to remain constant across all methods. task_instance.try_number += 1 listener.on_task_instance_success(None, task_instance, None) - assert listener.adapter.complete_task.call_args.kwargs["run_id"] == expected_run_id_2 + assert ( + listener.adapter.complete_task.call_args.kwargs["run_id"] == expected_run_id_2 + if AIRFLOW_V_2_10_PLUS + else expected_run_id_1 + ) def test_running_task_correctly_calls_openlineage_adapter_run_id_method(): @@ -406,7 +426,7 @@ def test_successful_task_correctly_calls_openlineage_adapter_run_id_method(mock_ dag_id="dag_id", task_id="task_id", execution_date="execution_date", - try_number=1, + try_number=EXPECTED_TRY_NUMBER_1, ) @@ -431,16 +451,16 @@ def fail_callable(**kwargs): _, task_instance = _create_test_dag_and_task(fail_callable, "failure") # try_number before execution - assert task_instance.try_number == 0 + assert task_instance.try_number == TRY_NUMBER_BEFORE_EXECUTION with suppress(CustomError): task_instance.run() # try_number at the moment of function being called - assert captured_try_numbers["running"] == 0 - assert captured_try_numbers["failed"] == 0 + assert captured_try_numbers["running"] == TRY_NUMBER_RUNNING + assert captured_try_numbers["failed"] == TRY_NUMBER_FAILED # try_number after task has been executed - assert task_instance.try_number == 0 + assert task_instance.try_number == TRY_NUMBER_AFTER_EXECUTION @mock.patch("airflow.models.taskinstance.get_listener_manager") @@ -460,15 +480,15 @@ def success_callable(**kwargs): _, task_instance = _create_test_dag_and_task(success_callable, "success") # try_number before execution - assert task_instance.try_number == 0 + assert task_instance.try_number == TRY_NUMBER_BEFORE_EXECUTION task_instance.run() # try_number at the moment of function being called - assert captured_try_numbers["running"] == 0 - assert captured_try_numbers["success"] == 0 + assert captured_try_numbers["running"] == TRY_NUMBER_RUNNING + assert captured_try_numbers["success"] == TRY_NUMBER_SUCCESS # try_number after task has been executed - assert task_instance.try_number == 0 + assert task_instance.try_number == TRY_NUMBER_AFTER_EXECUTION @mock.patch("airflow.providers.openlineage.plugins.listener.is_operator_disabled") diff --git a/tests/providers/openlineage/plugins/test_openlineage.py b/tests/providers/openlineage/plugins/test_openlineage.py index 409c8461e8e0..8d9cc9602bde 100644 --- a/tests/providers/openlineage/plugins/test_openlineage.py +++ b/tests/providers/openlineage/plugins/test_openlineage.py @@ -24,9 +24,13 @@ import pytest from airflow.providers.openlineage.conf import config_path, is_disabled, transport +from tests.conftest import RUNNING_TESTS_AGAINST_AIRFLOW_PACKAGES from tests.test_utils.config import conf_vars +@pytest.mark.skipif( + RUNNING_TESTS_AGAINST_AIRFLOW_PACKAGES, reason="Plugin initialization is done early in case of packages" +) class TestOpenLineageProviderPlugin: def setup_method(self): is_disabled.cache_clear() diff --git a/tests/providers/smtp/notifications/test_smtp.py b/tests/providers/smtp/notifications/test_smtp.py index b19cc4baa873..98fd7387e7ef 100644 --- a/tests/providers/smtp/notifications/test_smtp.py +++ b/tests/providers/smtp/notifications/test_smtp.py @@ -31,6 +31,7 @@ send_smtp_notification, ) from airflow.utils import timezone +from tests.test_utils.compat import AIRFLOW_V_2_10_PLUS from tests.test_utils.config import conf_vars pytestmark = pytest.mark.db_test @@ -38,6 +39,9 @@ SMTP_API_DEFAULT_CONN_ID = SmtpHook.default_conn_name +NUM_TRY = 0 if AIRFLOW_V_2_10_PLUS else 1 + + class TestSmtpNotifier: @mock.patch("airflow.providers.smtp.notifications.smtp.SmtpHook") def test_notifier(self, mock_smtphook_hook, dag_maker): @@ -129,7 +133,7 @@ def test_notifier_with_defaults(self, mock_smtphook_hook, create_task_instance): from_email=conf.get("smtp", "smtp_mail_from"), to="test_reciver@test.com", subject="DAG dag - Task op - Run ID test in State None", - html_content="""\n\n \n \n \n \n\n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n
Run ID:test
Try:0 of 1
Task State:None
Host:
Log Link:http://localhost:8080/dags/dag/grid?dag_run_id=test&task_id=op&map_index=-1&tab=logs
Mark Success Link:http://localhost:8080/confirm?task_id=op&dag_id=dag&dag_run_id=test&upstream=false&downstream=false&state=success
\n\n""", + html_content=f"""\n\n \n \n \n \n\n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n
Run ID:test
Try:{NUM_TRY} of 1
Task State:None
Host:
Log Link:http://localhost:8080/dags/dag/grid?dag_run_id=test&task_id=op&map_index=-1&tab=logs
Mark Success Link:http://localhost:8080/confirm?task_id=op&dag_id=dag&dag_run_id=test&upstream=false&downstream=false&state=success
\n\n""", smtp_conn_id="smtp_default", files=None, cc=None, diff --git a/tests/test_utils/compat.py b/tests/test_utils/compat.py new file mode 100644 index 000000000000..5724ec1d4eb3 --- /dev/null +++ b/tests/test_utils/compat.py @@ -0,0 +1,60 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from __future__ import annotations + +from importlib.metadata import version +from typing import TYPE_CHECKING, Any, cast + +from packaging.version import Version + +from airflow.models import Operator + +try: + # ImportError has been renamed to ParseImportError in airflow 2.10.0, and since our provider tests should + # run on all supported versions of Airflow, this compatibility shim falls back to the old ImportError so + # that tests can import it from here and use it in their code and run against older versions of Airflow + # This import can be removed (and all tests switched to import ParseImportError directly) as soon as + # all providers are updated to airflow 2.10+. + from airflow.models.errors import ParseImportError +except ImportError: + from airflow.models.errors import ImportError as ParseImportError # type: ignore[no-redef] + +from airflow import __version__ as airflow_version + +AIRFLOW_VERSION = Version(airflow_version) +AIRFLOW_V_2_7_PLUS = Version(AIRFLOW_VERSION.base_version) >= Version("2.7.0") +AIRFLOW_V_2_8_PLUS = Version(AIRFLOW_VERSION.base_version) >= Version("2.8.0") +AIRFLOW_V_2_9_PLUS = Version(AIRFLOW_VERSION.base_version) >= Version("2.9.0") +AIRFLOW_V_2_10_PLUS = Version(AIRFLOW_VERSION.base_version) >= Version("2.10.0") + + +def deserialize_operator(serialized_operator: dict[str, Any]) -> Operator: + if AIRFLOW_V_2_10_PLUS: + # In airflow 2.10+ we can deserialize operator using regular deserialize method. + # We do not need to use deserialize_operator method explicitly but some tests are deserializing the + # operator and in the future they could use regular ``deserialize`` method. This method is a shim + # to make deserialization of operator works for tests run against older Airflow versions and tests + # should use that method instead of calling ``BaseSerialization.deserialize`` directly. + # We can remove this method and switch to the regular ``deserialize`` method as long as all providers + # are updated to airflow 2.10+. + from airflow.serialization.serialized_objects import BaseSerialization + + return cast(Operator, BaseSerialization.deserialize(serialized_operator)) + else: + from airflow.serialization.serialized_objects import SerializedBaseOperator + + return SerializedBaseOperator.deserialize_operator(serialized_operator) diff --git a/tests/test_utils/db.py b/tests/test_utils/db.py index 783362b8b1be..1c2b871b1963 100644 --- a/tests/test_utils/db.py +++ b/tests/test_utils/db.py @@ -36,7 +36,6 @@ XCom, ) from airflow.models.dag import DagOwnerAttributes -from airflow.models.dagbag import DagPriorityParsingRequest from airflow.models.dagcode import DagCode from airflow.models.dagwarning import DagWarning from airflow.models.dataset import ( @@ -46,12 +45,12 @@ DatasetModel, TaskOutletDatasetReference, ) -from airflow.models.errors import ParseImportError from airflow.models.serialized_dag import SerializedDagModel from airflow.providers.fab.auth_manager.models import Permission, Resource, assoc_permission_role from airflow.security.permissions import RESOURCE_DAG_PREFIX from airflow.utils.db import add_default_pool_if_not_exists, create_default_connections, reflect_tables from airflow.utils.session import create_session +from tests.test_utils.compat import ParseImportError def clear_db_runs(): @@ -172,6 +171,8 @@ def clear_db_task_reschedule(): def clear_db_dag_parsing_requests(): with create_session() as session: + from airflow.models.dagbag import DagPriorityParsingRequest + session.query(DagPriorityParsingRequest).delete()