Skip to content

Commit

Permalink
Change dataset URI validation to raise warning instead of error in Ai…
Browse files Browse the repository at this point in the history
…rflow 2.9 (#39670)

Closes: #39486

Valid DAGs that worked in Airflow 2.8.x  and had tasks with outlets with specific URIs, such as `Dataset("postgres://postgres:5432/postgres.dbt.stg_customers")`, stopped working in Airflow 2.9.0 & Airflow 2.9.1, after #37005 was merged.

This was a breaking change in an Airflow minor version. We should avoid this.

Airflow < 3.0 should raise a warning, and from Airflow 3.0, we can make errors by default. We can have a feature flag to allow users who want to see this in advance to enable errors in Airflow 2. x, but this should not be the default behaviour.

The DAGs should continue working on Airflow 2.x minor/micro releases without errors (unless the user opts in via configuration).

By running the following DAG with `apache-airflow==2.9.1` and `apache-airflow-providers-postgres==5.11.0`, as an example:
```
from datetime import datetime

from airflow import DAG
from airflow.datasets import Dataset
from airflow.operators.empty import EmptyOperator

with DAG(dag_id='empty_operator_example', start_date=datetime(2022, 1, 1), schedule_interval=None) as dag:

    task1 = EmptyOperator(
        task_id='empty_task1',
        dag=dag,
        outlets=[Dataset("postgres://postgres:5432/postgres.dbt.stg_customers")]
    )

    task2 = EmptyOperator(
        task_id='empty_task2',
        dag=dag
    )

    task1 >> task2
```

Causes to the exception:
```
Broken DAG: [/usr/local/airflow/dags/example_issue.py]
Traceback (most recent call last):
  File "/usr/local/lib/python3.11/site-packages/airflow/datasets/__init__.py", line 81, in _sanitize_uri
    parsed = normalizer(parsed)
             ^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/airflow/providers/postgres/datasets/postgres.py", line 34, in sanitize_uri
    raise ValueError("URI format postgres:// must contain database, schema, and table names")
ValueError: URI format postgres:// must contain database, schema, and table names
```

This PR introduces the following:

1. A boolean configuration within `[core],` named `strict_dataset_uri_validation,` which should be `False` by default.

2. When this configuration is `False,` Airflow should raise a warning saying:
```
From Airflow 3, Airflow will be more strict with Dataset URIs, and the URI xx will no longer be valid. Please, follow the expected standard as documented in XX.
```

3. If this configuration is `True,` Airflow should raise the exception, as it does now in Airflow 2.9.0 and 2.9.1

4. From Airflow 3.0, we change this configuration to be `True` by default.

(cherry picked from commit a07d799)
  • Loading branch information
tatiana authored and ephraimbuddy committed Jun 5, 2024
1 parent 83e8e74 commit cb63c3e
Show file tree
Hide file tree
Showing 3 changed files with 55 additions and 2 deletions.
9 changes: 9 additions & 0 deletions airflow/config_templates/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -464,6 +464,15 @@ core:
sensitive: true
default: ~
example: '{"some_param": "some_value"}'
strict_dataset_uri_validation:
description: |
Dataset URI validation should raise an exception if it is not compliant with AIP-60.
By default this configuration is false, meaning that Airflow 2.x only warns the user.
In Airflow 3, this configuration will be enabled by default.
default: "False"
example: ~
version_added: 2.9.2
type: boolean
database_access_isolation:
description: (experimental) Whether components should use Airflow Internal API for DB connectivity.
version_added: 2.6.0
Expand Down
16 changes: 15 additions & 1 deletion airflow/datasets/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,9 @@
if TYPE_CHECKING:
from urllib.parse import SplitResult


from airflow.configuration import conf

__all__ = ["Dataset", "DatasetAll", "DatasetAny"]


Expand Down Expand Up @@ -78,7 +81,18 @@ def _sanitize_uri(uri: str) -> str:
fragment="", # Ignore any fragments.
)
if (normalizer := _get_uri_normalizer(normalized_scheme)) is not None:
parsed = normalizer(parsed)
try:
parsed = normalizer(parsed)
except ValueError as exception:
if conf.getboolean("core", "strict_dataset_uri_validation", fallback=False):
raise exception
else:
warnings.warn(
f"The dataset URI {uri} is not AIP-60 compliant. "
f"In Airflow 3, this will raise an exception. More information: {repr(exception)}",
UserWarning,
stacklevel=3,
)
return urllib.parse.urlunsplit(parsed)


Expand Down
32 changes: 31 additions & 1 deletion tests/datasets/test_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,17 @@
import os
from collections import defaultdict
from typing import Callable
from unittest.mock import patch

import pytest
from sqlalchemy.sql import select

from airflow.datasets import BaseDatasetEventInput, Dataset, DatasetAll, DatasetAny
from airflow.datasets import BaseDatasetEventInput, Dataset, DatasetAll, DatasetAny, _sanitize_uri
from airflow.models.dataset import DatasetDagRunQueue, DatasetModel
from airflow.models.serialized_dag import SerializedDagModel
from airflow.operators.empty import EmptyOperator
from airflow.serialization.serialized_objects import BaseSerialization, SerializedDAG
from tests.test_utils.config import conf_vars


@pytest.fixture
Expand Down Expand Up @@ -441,3 +443,31 @@ def test_datasets_expression_error(expression: Callable[[], None], error: str) -
with pytest.raises(TypeError) as info:
expression()
assert str(info.value) == error


def mock_get_uri_normalizer(normalized_scheme):
def normalizer(uri):
raise ValueError("Incorrect URI format")

return normalizer


@patch("airflow.datasets._get_uri_normalizer", mock_get_uri_normalizer)
@patch("airflow.datasets.warnings.warn")
def test__sanitize_uri_raises_warning(mock_warn):
_sanitize_uri("postgres://localhost:5432/database.schema.table")
msg = mock_warn.call_args.args[0]
assert "The dataset URI postgres://localhost:5432/database.schema.table is not AIP-60 compliant." in msg
assert (
"In Airflow 3, this will raise an exception. More information: ValueError('Incorrect URI format')"
in msg
)


@patch("airflow.datasets._get_uri_normalizer", mock_get_uri_normalizer)
@conf_vars({("core", "strict_dataset_uri_validation"): "True"})
def test__sanitize_uri_raises_exception():
with pytest.raises(ValueError) as e_info:
_sanitize_uri("postgres://localhost:5432/database.schema.table")
assert isinstance(e_info.value, ValueError)
assert str(e_info.value) == "Incorrect URI format"

0 comments on commit cb63c3e

Please sign in to comment.