Skip to content

Commit

Permalink
Turn Pydantic into an optional dependency (#37320)
Browse files Browse the repository at this point in the history
We've been internally using pydantic for internal API and it caused
some compatibility issues, because Pydantic is so popular and currently
still users of Pydantic are somewhat split between Pydantic 1 and
Pydantic 2.  The popularity of Pydantic works against us, and since we
are not yet using it in "production" (and in the future we will only
actually use it for Internal API), it seems that turning Pydantic into
an optional dependency is the best way we can proceed.

It's as simple as converting all the direct imports into a common util
imports that have a fallback mechanism when import is not found.

This should enable less conflicts when installing 3rd-party libraries
with Airflow.

Added test where pydantic is removed. Also made sure that the special
cases we have tests for run full suite of tests - non-db and db.
  • Loading branch information
potiuk committed Feb 12, 2024
1 parent 5fe27c8 commit c3f48ee
Show file tree
Hide file tree
Showing 51 changed files with 704 additions and 340 deletions.
97 changes: 73 additions & 24 deletions .github/workflows/ci.yml
Expand Up @@ -1178,14 +1178,63 @@ jobs:
uses: ./.github/actions/post_tests_failure
if: failure()
tests-postgres-boto:
tests-min-sqlalchemy:
timeout-minutes: 130
name: >
DB:LatestBoto${{needs.build-info.outputs.default-postgres-version}},
DB:MinSQLAlchemy${{needs.build-info.outputs.default-postgres-version}},
Py${{needs.build-info.outputs.default-python-version}}:
${{needs.build-info.outputs.parallel-test-types-list-as-string}}
runs-on: ${{fromJSON(needs.build-info.outputs.runs-on)}}
needs: [build-info, wait-for-ci-images]
env:
RUNS_ON: "${{needs.build-info.outputs.runs-on}}"
PARALLEL_TEST_TYPES: "${{needs.build-info.outputs.parallel-test-types-list-as-string}}"
PR_LABELS: "${{needs.build-info.outputs.pull-request-labels}}"
FULL_TESTS_NEEDED: "${{needs.build-info.outputs.full-tests-needed}}"
DEBUG_RESOURCES: "${{needs.build-info.outputs.debug-resources}}"
BACKEND: "postgres"
ENABLE_COVERAGE: "${{needs.build-info.outputs.run-coverage}}"
PYTHON_MAJOR_MINOR_VERSION: "${{needs.build-info.outputs.default-python-version}}"
PYTHON_VERSION: "${needs.build-info.outputs.default-python-version}}"
POSTGRES_VERSION: "${{needs.build-info.outputs.default-postgres-version}}"
BACKEND_VERSION: "${{needs.build-info.outputs.default-postgres-version}}"
DOWNGRADE_SQLALCHEMY: "true"
JOB_ID: >
min-sqlalchemy-${{needs.build-info.outputs.default-python-version}}-
${{needs.build-info.outputs.default-postgres-version}}
if: needs.build-info.outputs.run-tests == 'true'
steps:
- name: Cleanup repo
shell: bash
run: docker run -v "${GITHUB_WORKSPACE}:/workspace" -u 0:0 bash -c "rm -rf /workspace/*"
- name: "Checkout ${{ github.ref }} ( ${{ github.sha }} )"
uses: actions/checkout@v4
with:
persist-credentials: false
- name: >
Prepare breeze & CI image: ${{needs.build-info.outputs.default-python-version}}:${{env.IMAGE_TAG}}
uses: ./.github/actions/prepare_breeze_and_image
- name: >
Tests: ${{matrix.python-version}}:${{needs.build-info.outputs.parallel-test-types-list-as-string}}
run: >
breeze testing db-tests
--parallel-test-types "${{needs.build-info.outputs.parallel-test-types-list-as-string}}"
- name: >
Post Tests success: ${{needs.build-info.outputs.default-python-version}}:MinSQLAlchemy"
uses: ./.github/actions/post_tests_success
if: success()
- name: >
Post Tests failure: ${{needs.build-info.outputs.default-python-version}}:MinSQLAlchemy"
uses: ./.github/actions/post_tests_failure
if: failure()
tests-boto:
timeout-minutes: 130
name: >
LatestBoto-Py${{needs.build-info.outputs.default-python-version}}:
${{needs.build-info.outputs.parallel-test-types-list-as-string}}
runs-on: ${{fromJSON(needs.build-info.outputs.runs-on)}}
needs: [build-info, wait-for-ci-images]
env:
RUNS_ON: "${{needs.build-info.outputs.runs-on}}"
PARALLEL_TEST_TYPES: "${{needs.build-info.outputs.parallel-test-types-list-as-string}}"
Expand All @@ -1200,7 +1249,7 @@ jobs:
BACKEND_VERSION: "${{needs.build-info.outputs.default-postgres-version}}"
UPGRADE_BOTO: "true"
JOB_ID: >
postgres-boto-${{needs.build-info.outputs.default-python-version}}-
boto-${{needs.build-info.outputs.default-python-version}}-
${{needs.build-info.outputs.default-postgres-version}}
if: needs.build-info.outputs.run-tests == 'true' && needs.build-info.outputs.run-amazon-tests == 'true'
steps:
Expand All @@ -1217,24 +1266,26 @@ jobs:
- name: >
Tests: ${{matrix.python-version}}:${{needs.build-info.outputs.parallel-test-types-list-as-string}}
run: >
breeze testing db-tests
breeze testing tests --run-in-parallel
--parallel-test-types "${{needs.build-info.outputs.parallel-test-types-list-as-string}}"
- name: >
Post Tests success: ${{needs.build-info.outputs.default-python-version}}:Boto"
Post Tests success: ${{needs.build-info.outputs.default-python-version}}:LatestBoto"
uses: ./.github/actions/post_tests_success
if: success()
- name: >
Post Tests failure: ${{needs.build-info.outputs.default-python-version}}:Boto"
Post Tests failure: ${{needs.build-info.outputs.default-python-version}}:LatestBoto"
uses: ./.github/actions/post_tests_failure
if: failure()
tests-postgres-min-sqlalchemy:
tests-pydantic:
timeout-minutes: 130
name: >
DB:MinSQLAlchemy${{needs.build-info.outputs.default-postgres-version}},
Py${{needs.build-info.outputs.default-python-version}}:
Pydantic-${{ matrix.pydantic }}-Py${{needs.build-info.outputs.default-python-version}}:
${{needs.build-info.outputs.parallel-test-types-list-as-string}}
runs-on: ${{fromJSON(needs.build-info.outputs.runs-on)}}
strategy:
matrix:
pydantic: ["v1", "none"]
needs: [build-info, wait-for-ci-images]
env:
RUNS_ON: "${{needs.build-info.outputs.runs-on}}"
Expand All @@ -1248,9 +1299,9 @@ jobs:
PYTHON_VERSION: "${needs.build-info.outputs.default-python-version}}"
POSTGRES_VERSION: "${{needs.build-info.outputs.default-postgres-version}}"
BACKEND_VERSION: "${{needs.build-info.outputs.default-postgres-version}}"
DOWNGRADE_SQLALCHEMY: "true"
PYDANTIC: ${{ matrix.pydantic }}
JOB_ID: >
postgres-min-sqlalchemy-${{needs.build-info.outputs.default-python-version}}-
pydantic-${{ matrix.pydantic }}-${{needs.build-info.outputs.default-python-version}}-
${{needs.build-info.outputs.default-postgres-version}}
if: needs.build-info.outputs.run-tests == 'true'
steps:
Expand All @@ -1267,22 +1318,21 @@ jobs:
- name: >
Tests: ${{matrix.python-version}}:${{needs.build-info.outputs.parallel-test-types-list-as-string}}
run: >
breeze testing db-tests
breeze testing tests --run-in-parallel
--parallel-test-types "${{needs.build-info.outputs.parallel-test-types-list-as-string}}"
- name: >
Post Tests success: ${{needs.build-info.outputs.default-python-version}}:MinSQLAlchemy"
Post Tests success: ${{needs.build-info.outputs.default-python-version}}:NoPydantic"
uses: ./.github/actions/post_tests_success
if: success()
- name: >
Post Tests failure: ${{needs.build-info.outputs.default-python-version}}:MinSQLAlchemy"
Post Tests failure: ${{needs.build-info.outputs.default-python-version}}:NoPydantic"
uses: ./.github/actions/post_tests_failure
if: failure()
tests-postgres-pendulum-2:
tests-pendulum-2:
timeout-minutes: 130
name: >
DB:Postgres${{needs.build-info.outputs.default-postgres-version}},
Pendulum2,Py${{needs.build-info.outputs.default-python-version}}:
Pendulum2-Py${{needs.build-info.outputs.default-python-version}}:
${{needs.build-info.outputs.parallel-test-types-list-as-string}}
runs-on: ${{fromJSON(needs.build-info.outputs.runs-on)}}
needs: [build-info, wait-for-ci-images]
Expand All @@ -1300,7 +1350,7 @@ jobs:
BACKEND_VERSION: "${{needs.build-info.outputs.default-postgres-version}}"
DOWNGRADE_PENDULUM: "true"
JOB_ID: >
postgres-pendulum-2-${{needs.build-info.outputs.default-python-version}}-
pendulum-2-${{needs.build-info.outputs.default-python-version}}-
${{needs.build-info.outputs.default-postgres-version}}
if: needs.build-info.outputs.run-tests == 'true'
steps:
Expand All @@ -1317,7 +1367,7 @@ jobs:
- name: >
Tests: ${{matrix.python-version}}:${{needs.build-info.outputs.parallel-test-types-list-as-string}}
run: >
breeze testing db-tests
breeze testing tests --run-in-parallel
--parallel-test-types "${{needs.build-info.outputs.parallel-test-types-list-as-string}}"
- name: >
Post Tests success: ${{needs.build-info.outputs.default-python-version}}:Pendulum2"
Expand All @@ -1328,11 +1378,10 @@ jobs:
uses: ./.github/actions/post_tests_failure
if: failure()
tests-postgres-in-progress-features-disabled:
tests-in-progress-features-disabled:
timeout-minutes: 130
name: >
DB:InProgressDisabledPostgres${{needs.build-info.outputs.default-postgres-version}},
Py${{needs.build-info.outputs.default-python-version}}:
InProgressDisabled-Py${{needs.build-info.outputs.default-python-version}}:
${{needs.build-info.outputs.parallel-test-types-list-as-string}}
runs-on: ${{fromJSON(needs.build-info.outputs.runs-on)}}
needs: [build-info, wait-for-ci-images]
Expand All @@ -1350,7 +1399,7 @@ jobs:
BACKEND_VERSION: "${{needs.build-info.outputs.default-postgres-version}}"
AIRFLOW_ENABLE_AIP_44: "false"
JOB_ID: >
postgres-in-progress-disabled-${{needs.build-info.outputs.default-python-version}}-
in-progress-disabled-${{needs.build-info.outputs.default-python-version}}-
${{needs.build-info.outputs.default-postgres-version}}
if: needs.build-info.outputs.run-tests == 'true' && needs.build-info.outputs.run-amazon-tests == 'true'
steps:
Expand All @@ -1367,7 +1416,7 @@ jobs:
- name: >
Tests: ${{matrix.python-version}}:${{needs.build-info.outputs.parallel-test-types-list-as-string}}
run: >
breeze testing db-tests
breeze testing tests --run-in-parallel
--parallel-test-types "${{needs.build-info.outputs.parallel-test-types-list-as-string}}"
- name: >
Post Tests success: ${{needs.build-info.outputs.default-python-version}}:FeaturesDisabled"
Expand Down
34 changes: 34 additions & 0 deletions Dockerfile.ci
Expand Up @@ -900,6 +900,39 @@ function check_boto_upgrade() {
pip check
}

function check_pydantic() {
if [[ ${PYDANTIC=} == "none" ]]; then
echo
echo "${COLOR_YELLOW}Reinstalling airflow from local sources to account for pyproject.toml changes${COLOR_RESET}"
echo
pip install --root-user-action ignore -e .
echo
echo "${COLOR_YELLOW}Remove pydantic and 3rd party libraries that depend on it${COLOR_RESET}"
echo
pip uninstall --root-user-action ignore pydantic aws-sam-translator openai pyiceberg qdrant-client cfn-lint -y
pip check
elif [[ ${PYDANTIC=} == "v1" ]]; then
echo
echo "${COLOR_YELLOW}Reinstalling airflow from local sources to account for pyproject.toml changes${COLOR_RESET}"
echo
pip install --root-user-action ignore -e .
echo
echo "${COLOR_YELLOW}Uninstalling pyicberg which is not compatible with Pydantic 1${COLOR_RESET}"
echo
pip uninstall pyiceberg -y
echo
echo "${COLOR_YELLOW}Downgrading Pydantic to < 2${COLOR_RESET}"
echo
pip install --upgrade "pydantic<2.0.0"
pip check
else
echo
echo "${COLOR_BLUE}Leaving default pydantic v2${COLOR_RESET}"
echo
fi
}


function check_download_sqlalchemy() {
if [[ ${DOWNGRADE_SQLALCHEMY=} != "true" ]]; then
return
Expand Down Expand Up @@ -952,6 +985,7 @@ function check_run_tests() {
determine_airflow_to_use
environment_initialization
check_boto_upgrade
check_pydantic
check_download_sqlalchemy
check_download_pendulum
check_run_tests "${@}"
Expand Down
8 changes: 4 additions & 4 deletions INSTALL
Expand Up @@ -253,10 +253,10 @@ gcp_api, github, github-enterprise, google, google-auth, graphviz, grpc, hashico
http, imap, influxdb, jdbc, jenkins, kerberos, kubernetes, ldap, leveldb, microsoft-azure,
microsoft-mssql, microsoft-psrp, microsoft-winrm, mongo, mssql, mysql, neo4j, odbc, openai,
openfaas, openlineage, opensearch, opsgenie, oracle, otel, pagerduty, pandas, papermill, password,
pgvector, pinecone, pinot, postgres, presto, qdrant, rabbitmq, redis, s3, s3fs, salesforce, samba,
saml, segment, sendgrid, sentry, sftp, singularity, slack, smtp, snowflake, spark, sqlite, ssh,
statsd, tableau, tabular, telegram, trino, vertica, virtualenv, weaviate, webhdfs, winrm, yandex,
zendesk
pgvector, pinecone, pinot, postgres, presto, pydantic, qdrant, rabbitmq, redis, s3, s3fs,
salesforce, samba, saml, segment, sendgrid, sentry, sftp, singularity, slack, smtp, snowflake,
spark, sqlite, ssh, statsd, tableau, tabular, telegram, trino, vertica, virtualenv, weaviate,
webhdfs, winrm, yandex, zendesk

# END REGULAR EXTRAS HERE

Expand Down
2 changes: 2 additions & 0 deletions airflow/providers/apache/hdfs/sensors/hdfs.py
Expand Up @@ -18,6 +18,8 @@

from airflow.sensors.base import BaseSensorOperator

# Ignore missing docstring

_EXCEPTION_MESSAGE = """The old HDFS Sensors have been removed in 4.0.0 version of the apache.hdfs provider.
Please convert your DAGs to use the WebHdfsSensor or downgrade the provider to below 4.*
if you want to continue using it.
Expand Down
5 changes: 1 addition & 4 deletions airflow/providers/papermill/hooks/kernel.py
Expand Up @@ -16,17 +16,14 @@
# under the License.
from __future__ import annotations

from typing import TYPE_CHECKING
import typing

from jupyter_client import AsyncKernelManager
from papermill.clientwrap import PapermillNotebookClient
from papermill.engines import NBClientEngine
from papermill.utils import merge_kwargs, remove_args
from traitlets import Unicode

if TYPE_CHECKING:
from pydantic import typing

from airflow.hooks.base import BaseHook

JUPYTER_KERNEL_SHELL_PORT = 60316
Expand Down
23 changes: 23 additions & 0 deletions airflow/providers_manager.py
Expand Up @@ -1268,3 +1268,26 @@ def provider_configs(self) -> list[tuple[str, dict[str, Any]]]:
@property
def already_initialized_provider_configs(self) -> list[tuple[str, dict[str, Any]]]:
return sorted(self._provider_configs.items(), key=lambda x: x[0])

def _cleanup(self):
self._initialized_cache.clear()
self._provider_dict.clear()
self._hooks_dict.clear()
self._fs_set.clear()
self._taskflow_decorators.clear()
self._hook_provider_dict.clear()
self._hooks_lazy_dict.clear()
self._connection_form_widgets.clear()
self._field_behaviours.clear()
self._extra_link_class_name_set.clear()
self._logging_class_name_set.clear()
self._auth_manager_class_name_set.clear()
self._secrets_backend_class_name_set.clear()
self._executor_class_name_set.clear()
self._provider_configs.clear()
self._api_auth_backend_module_names.clear()
self._trigger_info_set.clear()
self._notification_info_set.clear()
self._plugins_set.clear()
self._initialized = False
self._initialization_stack_trace = None
10 changes: 5 additions & 5 deletions airflow/serialization/pydantic/dag.py
Expand Up @@ -21,17 +21,17 @@
from typing import Any, List, Optional

from dateutil import relativedelta
from pydantic import (
from typing_extensions import Annotated

from airflow import DAG, settings
from airflow.configuration import conf as airflow_conf
from airflow.utils.pydantic import (
BaseModel as BaseModelPydantic,
ConfigDict,
PlainSerializer,
PlainValidator,
ValidationInfo,
)
from typing_extensions import Annotated

from airflow import DAG, settings
from airflow.configuration import conf as airflow_conf
from airflow.utils.sqlalchemy import Interval


Expand Down
6 changes: 3 additions & 3 deletions airflow/serialization/pydantic/dag_run.py
Expand Up @@ -19,10 +19,9 @@
from datetime import datetime
from typing import TYPE_CHECKING, Iterable, List, Optional

from pydantic import BaseModel as BaseModelPydantic, ConfigDict

from airflow.serialization.pydantic.dag import PydanticDag
from airflow.serialization.pydantic.dataset import DatasetEventPydantic
from airflow.utils.pydantic import BaseModel as BaseModelPydantic, ConfigDict, is_pydantic_2_installed
from airflow.utils.session import NEW_SESSION, provide_session

if TYPE_CHECKING:
Expand Down Expand Up @@ -101,4 +100,5 @@ def get_task_instance(
)


DagRunPydantic.model_rebuild()
if is_pydantic_2_installed():
DagRunPydantic.model_rebuild()
2 changes: 1 addition & 1 deletion airflow/serialization/pydantic/dataset.py
Expand Up @@ -17,7 +17,7 @@
from datetime import datetime
from typing import List, Optional

from pydantic import BaseModel as BaseModelPydantic, ConfigDict
from airflow.utils.pydantic import BaseModel as BaseModelPydantic, ConfigDict


class DagScheduleDatasetReferencePydantic(BaseModelPydantic):
Expand Down
3 changes: 1 addition & 2 deletions airflow/serialization/pydantic/job.py
Expand Up @@ -18,10 +18,9 @@
from functools import cached_property
from typing import Optional

from pydantic import BaseModel as BaseModelPydantic, ConfigDict

from airflow.executors.executor_loader import ExecutorLoader
from airflow.jobs.base_job_runner import BaseJobRunner
from airflow.utils.pydantic import BaseModel as BaseModelPydantic, ConfigDict


def check_runner_initialized(job_runner: Optional[BaseJobRunner], job_type: str) -> BaseJobRunner:
Expand Down

0 comments on commit c3f48ee

Please sign in to comment.