diff --git a/airflow/datasets/__init__.py b/airflow/datasets/__init__.py index 953333c6596b7..4f8d587727f14 100644 --- a/airflow/datasets/__init__.py +++ b/airflow/datasets/__init__.py @@ -18,14 +18,70 @@ from __future__ import annotations import os -from typing import Any, Callable, ClassVar, Iterable, Iterator, Protocol, runtime_checkable -from urllib.parse import urlsplit +import urllib.parse +import warnings +from typing import TYPE_CHECKING, Any, Callable, ClassVar, Iterable, Iterator, Protocol, runtime_checkable import attr +if TYPE_CHECKING: + from urllib.parse import SplitResult + __all__ = ["Dataset", "DatasetAll", "DatasetAny"] +def normalize_noop(parts: SplitResult) -> SplitResult: + return parts + + +def _get_uri_normalizer(scheme: str) -> Callable[[SplitResult], SplitResult] | None: + if scheme == "file": + return normalize_noop + from airflow.providers_manager import ProvidersManager + + return ProvidersManager().dataset_uri_handlers.get(scheme) + + +def _sanitize_uri(uri: str) -> str: + if not uri: + raise ValueError("Dataset URI cannot be empty") + if uri.isspace(): + raise ValueError("Dataset URI cannot be just whitespace") + if not uri.isascii(): + raise ValueError("Dataset URI must only consist of ASCII characters") + parsed = urllib.parse.urlsplit(uri) + if not parsed.scheme and not parsed.netloc: # Does not look like a URI. + return uri + normalized_scheme = parsed.scheme.lower() + if normalized_scheme.startswith("x-"): + return uri + if normalized_scheme == "airflow": + raise ValueError("Dataset scheme 'airflow' is reserved") + _, auth_exists, normalized_netloc = parsed.netloc.rpartition("@") + if auth_exists: + # TODO: Collect this into a DagWarning. + warnings.warn( + "A dataset URI should not contain auth info (e.g. username or " + "password). It has been automatically dropped.", + UserWarning, + stacklevel=3, + ) + if parsed.query: + normalized_query = urllib.parse.urlencode(sorted(urllib.parse.parse_qsl(parsed.query))) + else: + normalized_query = "" + parsed = parsed._replace( + scheme=normalized_scheme, + netloc=normalized_netloc, + path=parsed.path.rstrip("/") or "/", # Remove all trailing slashes. + query=normalized_query, + fragment="", # Ignore any fragments. + ) + if (normalizer := _get_uri_normalizer(normalized_scheme)) is not None: + parsed = normalizer(parsed) + return urllib.parse.urlunsplit(parsed) + + @runtime_checkable class BaseDatasetEventInput(Protocol): """Protocol for all dataset triggers to use in ``DAG(schedule=...)``. @@ -50,23 +106,14 @@ def iter_datasets(self) -> Iterator[tuple[str, Dataset]]: class Dataset(os.PathLike, BaseDatasetEventInput): """A representation of data dependencies between workflows.""" - uri: str = attr.field(validator=[attr.validators.min_len(1), attr.validators.max_len(3000)]) + uri: str = attr.field( + converter=_sanitize_uri, + validator=[attr.validators.min_len(1), attr.validators.max_len(3000)], + ) extra: dict[str, Any] | None = None __version__: ClassVar[int] = 1 - @uri.validator - def _check_uri(self, attr, uri: str) -> None: - if uri.isspace(): - raise ValueError(f"{attr.name} cannot be just whitespace") - try: - uri.encode("ascii") - except UnicodeEncodeError: - raise ValueError(f"{attr.name!r} must be ascii") - parsed = urlsplit(uri) - if parsed.scheme and parsed.scheme.lower() == "airflow": - raise ValueError(f"{attr.name!r} scheme `airflow` is reserved") - def __fspath__(self) -> str: return self.uri @@ -76,7 +123,7 @@ def __eq__(self, other: Any) -> bool: else: return NotImplemented - def __hash__(self): + def __hash__(self) -> int: return hash(self.uri) def iter_datasets(self) -> Iterator[tuple[str, Dataset]]: diff --git a/airflow/example_dags/example_datasets.py b/airflow/example_dags/example_datasets.py index c2a869ba4aa52..ac7cc2b3c1702 100644 --- a/airflow/example_dags/example_datasets.py +++ b/airflow/example_dags/example_datasets.py @@ -119,7 +119,7 @@ start_date=pendulum.datetime(2021, 1, 1, tz="UTC"), schedule=[ dag1_dataset, - Dataset("s3://this-dataset-doesnt-get-triggered"), + Dataset("s3://unrelated/this-dataset-doesnt-get-triggered"), ], tags=["consumes", "dataset-scheduled"], ) as dag5: diff --git a/airflow/provider.yaml.schema.json b/airflow/provider.yaml.schema.json index ff14ea59a7d59..3e5e71759e200 100644 --- a/airflow/provider.yaml.schema.json +++ b/airflow/provider.yaml.schema.json @@ -196,6 +196,26 @@ "type": "string" } }, + "dataset-uris": { + "type": "array", + "description": "Dataset URI formats", + "items": { + "type": "object", + "properties": { + "schemes": { + "type": "array", + "description": "List of supported URI schemes", + "items": { + "type": "string" + } + }, + "handler": { + "type": ["string", "null"], + "description": "Normalization function for specified URI schemes. Import path to a callable taking and returning a SplitResult. 'null' specifies a no-op." + } + } + } + }, "transfers": { "type": "array", "items": { diff --git a/airflow/providers/amazon/provider.yaml b/airflow/providers/amazon/provider.yaml index bcde2ad278696..ca7417f609c3b 100644 --- a/airflow/providers/amazon/provider.yaml +++ b/airflow/providers/amazon/provider.yaml @@ -497,6 +497,10 @@ sensors: python-modules: - airflow.providers.amazon.aws.sensors.quicksight +dataset-uris: + - schemes: [s3] + handler: null + filesystems: - airflow.providers.amazon.aws.fs.s3 diff --git a/airflow/providers/google/datasets/__init__.py b/airflow/providers/google/datasets/__init__.py new file mode 100644 index 0000000000000..13a83393a9124 --- /dev/null +++ b/airflow/providers/google/datasets/__init__.py @@ -0,0 +1,16 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. diff --git a/airflow/providers/google/datasets/bigquery.py b/airflow/providers/google/datasets/bigquery.py new file mode 100644 index 0000000000000..538f13f7cef28 --- /dev/null +++ b/airflow/providers/google/datasets/bigquery.py @@ -0,0 +1,31 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from __future__ import annotations + +from typing import TYPE_CHECKING + +if TYPE_CHECKING: + from urllib.parse import SplitResult + + +def sanitize_uri(uri: SplitResult) -> SplitResult: + if not uri.netloc: + raise ValueError("URI format bigquery:// must contain a project ID") + if len(uri.path.split("/")) != 3: # Leading slash, database name, and table name. + raise ValueError("URI format bigquery:// must contain dataset and table names") + return uri diff --git a/airflow/providers/google/provider.yaml b/airflow/providers/google/provider.yaml index 7edc6e137c7c7..21c6ce08c5356 100644 --- a/airflow/providers/google/provider.yaml +++ b/airflow/providers/google/provider.yaml @@ -751,6 +751,12 @@ sensors: filesystems: - airflow.providers.google.cloud.fs.gcs +dataset-uris: + - schemes: [gcp] + handler: null + - schemes: [bigquery] + handler: airflow.providers.google.datasets.bigquery.sanitize_uri + hooks: - integration-name: Google Ads python-modules: diff --git a/airflow/providers/mysql/datasets/__init__.py b/airflow/providers/mysql/datasets/__init__.py new file mode 100644 index 0000000000000..13a83393a9124 --- /dev/null +++ b/airflow/providers/mysql/datasets/__init__.py @@ -0,0 +1,16 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. diff --git a/airflow/providers/mysql/datasets/mysql.py b/airflow/providers/mysql/datasets/mysql.py new file mode 100644 index 0000000000000..0ead1aff1173c --- /dev/null +++ b/airflow/providers/mysql/datasets/mysql.py @@ -0,0 +1,34 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from __future__ import annotations + +from typing import TYPE_CHECKING + +if TYPE_CHECKING: + from urllib.parse import SplitResult + + +def sanitize_uri(uri: SplitResult) -> SplitResult: + if not uri.netloc: + raise ValueError("URI format mysql:// must contain a host") + if uri.port is None: + host = uri.netloc.rstrip(":") + uri = uri._replace(netloc=f"{host}:3306") + if len(uri.path.split("/")) != 3: # Leading slash, database name, and table name. + raise ValueError("URI format mysql:// must contain database and table names") + return uri._replace(scheme="mysql") diff --git a/airflow/providers/mysql/provider.yaml b/airflow/providers/mysql/provider.yaml index d825b4c02fc9a..0cb5c88889334 100644 --- a/airflow/providers/mysql/provider.yaml +++ b/airflow/providers/mysql/provider.yaml @@ -102,7 +102,10 @@ transfers: target-integration-name: MySQL python-module: airflow.providers.mysql.transfers.trino_to_mysql - connection-types: - hook-class-name: airflow.providers.mysql.hooks.mysql.MySqlHook connection-type: mysql + +dataset-uris: + - schemes: [mysql, mariadb] + handler: airflow.providers.mysql.datasets.mysql.sanitize_uri diff --git a/airflow/providers/postgres/datasets/__init__.py b/airflow/providers/postgres/datasets/__init__.py new file mode 100644 index 0000000000000..13a83393a9124 --- /dev/null +++ b/airflow/providers/postgres/datasets/__init__.py @@ -0,0 +1,16 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. diff --git a/airflow/providers/postgres/datasets/postgres.py b/airflow/providers/postgres/datasets/postgres.py new file mode 100644 index 0000000000000..b3cee7234cd4a --- /dev/null +++ b/airflow/providers/postgres/datasets/postgres.py @@ -0,0 +1,37 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from __future__ import annotations + +from typing import TYPE_CHECKING + +if TYPE_CHECKING: + from urllib.parse import SplitResult + + +def sanitize_uri(uri: SplitResult) -> SplitResult: + if not uri.netloc: + raise ValueError("URI format postgres:// must contain a host") + if uri.port is None: + host = uri.netloc.rstrip(":") + uri = uri._replace(netloc=f"{host}:5432") + path_parts = uri.path.split("/") + if len(path_parts) != 4: # Leading slash, database, schema, and table names. + raise ValueError("URI format postgres:// must contain database, schema, and table names") + if not path_parts[2]: + path_parts[2] = "default" + return uri._replace(scheme="postgres", path="/".join(path_parts)) diff --git a/airflow/providers/postgres/provider.yaml b/airflow/providers/postgres/provider.yaml index 325548050145f..d460de4c32490 100644 --- a/airflow/providers/postgres/provider.yaml +++ b/airflow/providers/postgres/provider.yaml @@ -88,3 +88,7 @@ hooks: connection-types: - hook-class-name: airflow.providers.postgres.hooks.postgres.PostgresHook connection-type: postgres + +dataset-uris: + - schemes: [postgres, postgresql] + handler: airflow.providers.postgres.datasets.postgres.sanitize_uri diff --git a/airflow/providers/trino/datasets/__init__.py b/airflow/providers/trino/datasets/__init__.py new file mode 100644 index 0000000000000..13a83393a9124 --- /dev/null +++ b/airflow/providers/trino/datasets/__init__.py @@ -0,0 +1,16 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. diff --git a/airflow/providers/trino/datasets/trino.py b/airflow/providers/trino/datasets/trino.py new file mode 100644 index 0000000000000..d5f3f669fe396 --- /dev/null +++ b/airflow/providers/trino/datasets/trino.py @@ -0,0 +1,34 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from __future__ import annotations + +from typing import TYPE_CHECKING + +if TYPE_CHECKING: + from urllib.parse import SplitResult + + +def sanitize_uri(uri: SplitResult) -> SplitResult: + if not uri.netloc: + raise ValueError("URI format trino:// must contain a host") + if uri.port is None: + host = uri.netloc.rstrip(":") + uri = uri._replace(netloc=f"{host}:8080") + if len(uri.path.split("/")) != 4: # Leading slash, catalog, schema, and table names. + raise ValueError("URI format trino:// must contain catalog, schema, and table names") + return uri diff --git a/airflow/providers/trino/provider.yaml b/airflow/providers/trino/provider.yaml index 34fbb9803a263..26c2663728d33 100644 --- a/airflow/providers/trino/provider.yaml +++ b/airflow/providers/trino/provider.yaml @@ -76,6 +76,10 @@ operators: python-modules: - airflow.providers.trino.operators.trino +dataset-uris: + - schemes: [trino] + handler: airflow.providers.trino.datasets.trino.sanitize_uri + hooks: - integration-name: Trino python-modules: diff --git a/airflow/providers_manager.py b/airflow/providers_manager.py index 6ecca347fa690..cce54d81d108e 100644 --- a/airflow/providers_manager.py +++ b/airflow/providers_manager.py @@ -84,6 +84,8 @@ def ensure_prefix(field): if TYPE_CHECKING: + from urllib.parse import SplitResult + from airflow.decorators.base import TaskDecorator from airflow.hooks.base import BaseHook from airflow.typing_compat import Literal @@ -301,9 +303,7 @@ def log_import_warning(class_name, e, provider_package): KNOWN_UNHANDLED_OPTIONAL_FEATURE_ERRORS = [("apache-airflow-providers-google", "No module named 'paramiko'")] -def _correctness_check( - provider_package: str, class_name: str, provider_info: ProviderInfo -) -> type[BaseHook] | None: +def _correctness_check(provider_package: str, class_name: str, provider_info: ProviderInfo) -> Any: """ Perform coherence check on provider classes. @@ -418,6 +418,7 @@ def __init__(self): # Keeps dict of hooks keyed by connection type self._hooks_dict: dict[str, HookInfo] = {} self._fs_set: set[str] = set() + self._dataset_uri_handlers: dict[str, Callable[[SplitResult], SplitResult]] = {} self._taskflow_decorators: dict[str, Callable] = LazyDictWithCache() # type: ignore[assignment] # keeps mapping between connection_types and hook class, package they come from self._hook_provider_dict: dict[str, HookClassProvider] = {} @@ -514,6 +515,12 @@ def initialize_providers_filesystems(self): self.initialize_providers_list() self._discover_filesystems() + @provider_info_cache("dataset_uris") + def initializa_providers_dataset_uri_handlers(self): + """Lazy initialization of provider dataset URI handlers.""" + self.initialize_providers_list() + self._discover_dataset_uri_handlers() + @provider_info_cache("taskflow_decorators") def initialize_providers_taskflow_decorator(self): """Lazy initialization of providers hooks.""" @@ -859,10 +866,26 @@ def _discover_filesystems(self) -> None: """Retrieve all filesystems defined in the providers.""" for provider_package, provider in self._provider_dict.items(): for fs_module_name in provider.data.get("filesystems", []): - if _correctness_check(provider_package, fs_module_name + ".get_fs", provider): + if _correctness_check(provider_package, f"{fs_module_name}.get_fs", provider): self._fs_set.add(fs_module_name) self._fs_set = set(sorted(self._fs_set)) + def _discover_dataset_uri_handlers(self) -> None: + from airflow.datasets import normalize_noop + + for provider_package, provider in self._provider_dict.items(): + for handler_info in provider.data.get("dataset-uris", []): + try: + schemes = handler_info["schemes"] + handler_path = handler_info["handler"] + except KeyError: + continue + if handler_path is None: + handler = normalize_noop + elif not (handler := _correctness_check(provider_package, handler_path, provider)): + continue + self._dataset_uri_handlers.update((scheme, handler) for scheme in schemes) + def _discover_taskflow_decorators(self) -> None: for name, info in self._provider_dict.items(): for taskflow_decorator in info.data.get("task-decorators", []): @@ -939,7 +962,7 @@ def _import_hook( f"Provider package name is not set when hook_class_name ({hook_class_name}) is used" ) allowed_field_classes = [IntegerField, PasswordField, StringField, BooleanField] - hook_class = _correctness_check(package_name, hook_class_name, provider_info) + hook_class: type[BaseHook] | None = _correctness_check(package_name, hook_class_name, provider_info) if hook_class is None: return None try: @@ -1260,6 +1283,11 @@ def filesystem_module_names(self) -> list[str]: self.initialize_providers_filesystems() return sorted(self._fs_set) + @property + def dataset_uri_handlers(self) -> dict[str, Callable[[SplitResult], SplitResult]]: + self.initializa_providers_dataset_uri_handlers() + return self._dataset_uri_handlers + @property def provider_configs(self) -> list[tuple[str, dict[str, Any]]]: self.initialize_providers_configuration() diff --git a/docs/apache-airflow/authoring-and-scheduling/datasets.rst b/docs/apache-airflow/authoring-and-scheduling/datasets.rst index 191c08e1e880f..d91120f6acf76 100644 --- a/docs/apache-airflow/authoring-and-scheduling/datasets.rst +++ b/docs/apache-airflow/authoring-and-scheduling/datasets.rst @@ -63,12 +63,18 @@ A dataset is defined by a Uniform Resource Identifier (URI): Airflow makes no assumptions about the content or location of the data represented by the URI. It is treated as a string, so any use of regular expressions (eg ``input_\d+.csv``) or file glob patterns (eg ``input_2022*.csv``) as an attempt to create multiple datasets from one declaration will not work. -There are two restrictions on the dataset URI: +A dataset should be created with a valid URI. Airflow core and providers define various URI schemes that you can use, such as ``file`` (core), ``postgres`` (by the Postgres provider), and ``s3`` (by the Amazon provider). Third-party providers and plugins may also provide their own schemes. These pre-defined schemes have individual semantics that are expected to be followed. -1. It must be a valid URI, which means it must be composed of only ASCII characters. -2. The URI scheme cannot be ``airflow`` (this is reserved for future use). +What is valid URI? +------------------ + +Technically, the URI must conform to the valid character set in RFC 3986. If you don't know what this means, that's basically ASCII alphanumeric characters, plus ``%``, ``-``, ``_``, ``.``, and ``~``. To identify a resource that cannot be represented by URI-safe characters, encode the resource name with `percent-encoding `_. + +The URI is also case sensitive, so ``s3://example/dataset`` and ``s3://Example/Dataset`` are considered different. Note that the *host* part of the URI is also case sensitive, which differs from RFC 3986. -If you try to use either of the examples below, your code will cause a ValueError to be raised, and Airflow will not import it. +Do not use the ``airflow`` scheme, which is is reserved for Airflow's internals. + +Airflow always prefers using lower cases in schemes, and case sensitivity is needed in the host part to correctly distinguish between resources. .. code-block:: python @@ -76,7 +82,14 @@ If you try to use either of the examples below, your code will cause a ValueErro reserved = Dataset("airflow://example_dataset") not_ascii = Dataset("èxample_datašet") -The identifier does not have to be an absolute URI, it can be a scheme-less, relative URI, or even just a simple path or string: +If you wish to define datasets with a scheme without additional semantic constraints, use a scheme with the prefix ``x-``. Airflow will skip any semantic validation on URIs with such schemes. + +.. code-block:: python + + # valid dataset, treated as a plain string + my_ds = Dataset("x-my-thing://foobarbaz") + +The identifier does not have to be absolute; it can be a scheme-less, relative URI, or even just a simple path or string: .. code-block:: python @@ -84,7 +97,12 @@ The identifier does not have to be an absolute URI, it can be a scheme-less, rel schemeless = Dataset("//example/dataset") csv_file = Dataset("example_dataset") -If required, an extra dictionary can be included in a Dataset: +Non-absolute identifiers are considered plain strings that do not carry any semantic meanings to Airflow. + +Extra information +----------------- + +If needed, an extra dictionary can be included in a Dataset: .. code-block:: python @@ -93,9 +111,24 @@ If required, an extra dictionary can be included in a Dataset: extra={"team": "trainees"}, ) -.. note:: **Security Note:** Dataset URI and extra fields are not encrypted, they are stored in cleartext, in Airflow's metadata database. Do NOT store any sensitive values, especially credentials, in dataset URIs or extra key values! +This extra information does not affect a dataset's identity. This means a DAG will be triggered by a dataset with an identical URI, even if the extra dict is different: -The URI is also case sensitive throughout, so ``s3://example_dataset`` and ``s3://Example_Dataset`` are considered different, as is ``s3://example_dataset`` and ``S3://example_dataset``. +.. code-block:: python + + with DAG( + dag_id="consumer", + schedule=[Dataset("s3://dataset/example.csv", extra={"different": "extras"})], + ): + ... + + with DAG(dag_id="producer", ...): + MyOperator( + # triggers "consumer" with the given extra! + outlets=[Dataset("s3://dataset/example.csv", extra={"team": "trainees"})], + ..., + ) + +.. note:: **Security Note:** Dataset URI and extra fields are not encrypted, they are stored in cleartext, in Airflow's metadata database. Do NOT store any sensitive values, especially credentials, in dataset URIs or extra key values! How to use datasets in your DAGs -------------------------------- diff --git a/newsfragments/37005.significant.rst b/newsfragments/37005.significant.rst new file mode 100644 index 0000000000000..927c8ef848b06 --- /dev/null +++ b/newsfragments/37005.significant.rst @@ -0,0 +1,10 @@ +Dataset URIs are now validated on input + +Datasets must use a URI that conform to rules laid down in AIP-60, and the value +will be automatically normalised when the DAG file is parsed. See +:doc:`documentation on Datasets ` for +a more detailed description on the rules. + +You may need to change your Dataset identifiers if they look like a URI, but are +used in a less mainstream way, such as relying on the URI's auth section, or +have a case-sensitive protocol name. diff --git a/tests/datasets/test_dataset.py b/tests/datasets/test_dataset.py index ba29e41361b86..d403402e95583 100644 --- a/tests/datasets/test_dataset.py +++ b/tests/datasets/test_dataset.py @@ -45,8 +45,8 @@ def clear_datasets(): pytest.param("", id="empty"), pytest.param("\n\t", id="whitespace"), pytest.param("a" * 3001, id="too_long"), - pytest.param("airflow:" * 3001, id="reserved_scheme"), - pytest.param("😊" * 3001, id="non-ascii"), + pytest.param("airflow://xcom/dag/task", id="reserved_scheme"), + pytest.param("😊", id="non-ascii"), ], ) def test_invalid_uris(uri): @@ -54,9 +54,33 @@ def test_invalid_uris(uri): Dataset(uri=uri) -def test_uri_with_scheme(): - dataset = Dataset(uri="s3://example_dataset") +@pytest.mark.parametrize( + "uri, normalized", + [ + pytest.param("foobar", "foobar", id="scheme-less"), + pytest.param("foo:bar", "foo:bar", id="scheme-less-colon"), + pytest.param("foo/bar", "foo/bar", id="scheme-less-slash"), + pytest.param("s3://bucket/key/path", "s3://bucket/key/path", id="normal"), + pytest.param("file:///123/456/", "file:///123/456", id="trailing-slash"), + ], +) +def test_uri_with_scheme(uri: str, normalized: str) -> None: + dataset = Dataset(uri) + EmptyOperator(task_id="task1", outlets=[dataset]) + assert dataset.uri == normalized + assert os.fspath(dataset) == normalized + + +def test_uri_with_auth() -> None: + with pytest.warns(UserWarning) as record: + dataset = Dataset("ftp://user@localhost/foo.txt") + assert len(record) == 1 and str(record[0].message) == ( + "A dataset URI should not contain auth info (e.g. username or " + "password). It has been automatically dropped." + ) EmptyOperator(task_id="task1", outlets=[dataset]) + assert dataset.uri == "ftp://localhost/foo.txt" + assert os.fspath(dataset) == "ftp://localhost/foo.txt" def test_uri_without_scheme(): @@ -65,26 +89,26 @@ def test_uri_without_scheme(): def test_fspath(): - uri = "s3://example_dataset" + uri = "s3://example/dataset" dataset = Dataset(uri=uri) assert os.fspath(dataset) == uri def test_equal_when_same_uri(): - uri = "s3://example_dataset" + uri = "s3://example/dataset" dataset1 = Dataset(uri=uri) dataset2 = Dataset(uri=uri) assert dataset1 == dataset2 def test_not_equal_when_different_uri(): - dataset1 = Dataset(uri="s3://example_dataset") - dataset2 = Dataset(uri="s3://other_dataset") + dataset1 = Dataset(uri="s3://example/dataset") + dataset2 = Dataset(uri="s3://other/dataset") assert dataset1 != dataset2 def test_hash(): - uri = "s3://example_dataset" + uri = "s3://example/dataset" dataset = Dataset(uri=uri) hash(dataset) diff --git a/tests/decorators/test_python.py b/tests/decorators/test_python.py index f78d71e05fcf5..758aec63178f1 100644 --- a/tests/decorators/test_python.py +++ b/tests/decorators/test_python.py @@ -933,7 +933,7 @@ def test_task_decorator_dataset(dag_maker, session): from airflow.datasets import Dataset result = None - uri = "s3://test" + uri = "s3://bucket/name" with dag_maker(session=session) as dag: diff --git a/tests/models/test_dag.py b/tests/models/test_dag.py index b46f2b28706e7..20d3714edc2cd 100644 --- a/tests/models/test_dag.py +++ b/tests/models/test_dag.py @@ -1096,10 +1096,10 @@ def test_bulk_write_to_db_datasets(self): dag_id1 = "test_dataset_dag1" dag_id2 = "test_dataset_dag2" task_id = "test_dataset_task" - uri1 = "s3://dataset1" + uri1 = "s3://dataset/1" d1 = Dataset(uri1, extra={"not": "used"}) - d2 = Dataset("s3://dataset2") - d3 = Dataset("s3://dataset3") + d2 = Dataset("s3://dataset/2") + d3 = Dataset("s3://dataset/3") dag1 = DAG(dag_id=dag_id1, start_date=DEFAULT_DATE, schedule=[d1]) EmptyOperator(task_id=task_id, dag=dag1, outlets=[d2, d3]) dag2 = DAG(dag_id=dag_id2, start_date=DEFAULT_DATE) diff --git a/tests/providers/google/datasets/__init__.py b/tests/providers/google/datasets/__init__.py new file mode 100644 index 0000000000000..13a83393a9124 --- /dev/null +++ b/tests/providers/google/datasets/__init__.py @@ -0,0 +1,16 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. diff --git a/tests/providers/google/datasets/test_bigquery.py b/tests/providers/google/datasets/test_bigquery.py new file mode 100644 index 0000000000000..45da4ffb1eb71 --- /dev/null +++ b/tests/providers/google/datasets/test_bigquery.py @@ -0,0 +1,45 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from __future__ import annotations + +import urllib.parse + +import pytest + +from airflow.providers.google.datasets.bigquery import sanitize_uri + + +def test_sanitize_uri_pass() -> None: + uri_i = urllib.parse.urlsplit("bigquery://project/dataset/table") + uri_o = sanitize_uri(uri_i) + assert urllib.parse.urlunsplit(uri_o) == "bigquery://project/dataset/table" + + +@pytest.mark.parametrize( + "value", + [ + pytest.param("bigquery://", id="blank"), + pytest.param("bigquery:///dataset/table", id="no-project"), + pytest.param("bigquery://project/dataset", id="missing-component"), + pytest.param("bigquery://project/dataset/table/column", id="extra-component"), + ], +) +def test_sanitize_uri_fail(value: str) -> None: + uri_i = urllib.parse.urlsplit(value) + with pytest.raises(ValueError): + sanitize_uri(uri_i) diff --git a/tests/providers/mysql/datasets/__init__.py b/tests/providers/mysql/datasets/__init__.py new file mode 100644 index 0000000000000..13a83393a9124 --- /dev/null +++ b/tests/providers/mysql/datasets/__init__.py @@ -0,0 +1,16 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. diff --git a/tests/providers/mysql/datasets/test_mysql.py b/tests/providers/mysql/datasets/test_mysql.py new file mode 100644 index 0000000000000..5f31d72991f27 --- /dev/null +++ b/tests/providers/mysql/datasets/test_mysql.py @@ -0,0 +1,66 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from __future__ import annotations + +import urllib.parse + +import pytest + +from airflow.providers.mysql.datasets.mysql import sanitize_uri + + +@pytest.mark.parametrize( + "original, normalized", + [ + pytest.param( + "mysql://example.com:1234/database/table", + "mysql://example.com:1234/database/table", + id="normalized", + ), + pytest.param( + "mysql://example.com/database/table", + "mysql://example.com:3306/database/table", + id="default-port", + ), + pytest.param( + "mariadb://example.com/database/table", + "mysql://example.com:3306/database/table", + id="mariadb", + ), + ], +) +def test_sanitize_uri_pass(original: str, normalized: str) -> None: + uri_i = urllib.parse.urlsplit(original) + uri_o = sanitize_uri(uri_i) + assert urllib.parse.urlunsplit(uri_o) == normalized + + +@pytest.mark.parametrize( + "value", + [ + pytest.param("mysql://", id="blank"), + pytest.param("mysql:///database/table", id="no-host"), + pytest.param("mysql://example.com/database", id="missing-component"), + pytest.param("mysql://example.com:abcd/database/table", id="non-port"), + pytest.param("mysql://example.com/database/table/column", id="extra-component"), + ], +) +def test_sanitize_uri_fail(value: str) -> None: + uri_i = urllib.parse.urlsplit(value) + with pytest.raises(ValueError): + sanitize_uri(uri_i) diff --git a/tests/providers/postgres/datasets/__init__.py b/tests/providers/postgres/datasets/__init__.py new file mode 100644 index 0000000000000..13a83393a9124 --- /dev/null +++ b/tests/providers/postgres/datasets/__init__.py @@ -0,0 +1,16 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. diff --git a/tests/providers/postgres/datasets/test_postgres.py b/tests/providers/postgres/datasets/test_postgres.py new file mode 100644 index 0000000000000..40d6bf11d235d --- /dev/null +++ b/tests/providers/postgres/datasets/test_postgres.py @@ -0,0 +1,66 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from __future__ import annotations + +import urllib.parse + +import pytest + +from airflow.providers.postgres.datasets.postgres import sanitize_uri + + +@pytest.mark.parametrize( + "original, normalized", + [ + pytest.param( + "postgres://example.com:1234/database/schema/table", + "postgres://example.com:1234/database/schema/table", + id="normalized", + ), + pytest.param( + "postgres://example.com/database/schema/table", + "postgres://example.com:5432/database/schema/table", + id="default-port", + ), + pytest.param( + "postgres://example.com/database//table", + "postgres://example.com:5432/database/default/table", + id="default-schema", + ), + ], +) +def test_sanitize_uri_pass(original: str, normalized: str) -> None: + uri_i = urllib.parse.urlsplit(original) + uri_o = sanitize_uri(uri_i) + assert urllib.parse.urlunsplit(uri_o) == normalized + + +@pytest.mark.parametrize( + "value", + [ + pytest.param("postgres://", id="blank"), + pytest.param("postgres:///database/schema/table", id="no-host"), + pytest.param("postgres://example.com/database/table", id="missing-component"), + pytest.param("postgres://example.com:abcd/database/schema/table", id="non-port"), + pytest.param("postgres://example.com/database/schema/table/column", id="extra-component"), + ], +) +def test_sanitize_uri_fail(value: str) -> None: + uri_i = urllib.parse.urlsplit(value) + with pytest.raises(ValueError): + sanitize_uri(uri_i) diff --git a/tests/providers/trino/datasets/__init__.py b/tests/providers/trino/datasets/__init__.py new file mode 100644 index 0000000000000..13a83393a9124 --- /dev/null +++ b/tests/providers/trino/datasets/__init__.py @@ -0,0 +1,16 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. diff --git a/tests/providers/trino/datasets/test_trino.py b/tests/providers/trino/datasets/test_trino.py new file mode 100644 index 0000000000000..12cacd4eb0cf2 --- /dev/null +++ b/tests/providers/trino/datasets/test_trino.py @@ -0,0 +1,61 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from __future__ import annotations + +import urllib.parse + +import pytest + +from airflow.providers.trino.datasets.trino import sanitize_uri + + +@pytest.mark.parametrize( + "original, normalized", + [ + pytest.param( + "trino://example.com:1234/catalog/schema/table", + "trino://example.com:1234/catalog/schema/table", + id="normalized", + ), + pytest.param( + "trino://example.com/catalog/schema/table", + "trino://example.com:8080/catalog/schema/table", + id="default-port", + ), + ], +) +def test_sanitize_uri_pass(original: str, normalized: str) -> None: + uri_i = urllib.parse.urlsplit(original) + uri_o = sanitize_uri(uri_i) + assert urllib.parse.urlunsplit(uri_o) == normalized + + +@pytest.mark.parametrize( + "value", + [ + pytest.param("trino://", id="blank"), + pytest.param("trino:///catalog/schema/table", id="no-host"), + pytest.param("trino://example.com/catalog/table", id="missing-component"), + pytest.param("trino://example.com:abcd/catalog/schema/table", id="non-port"), + pytest.param("trino://example.com/catalog/schema/table/column", id="extra-component"), + ], +) +def test_sanitize_uri_fail(value: str) -> None: + uri_i = urllib.parse.urlsplit(value) + with pytest.raises(ValueError): + sanitize_uri(uri_i) diff --git a/tests/serialization/test_serde.py b/tests/serialization/test_serde.py index 6298de53e6398..b066f4d1b1c6d 100644 --- a/tests/serialization/test_serde.py +++ b/tests/serialization/test_serde.py @@ -323,7 +323,7 @@ def test_backwards_compat(self): """ Verify deserialization of old-style encoded Xcom values including nested ones """ - uri = "s3://does_not_exist" + uri = "s3://does/not/exist" data = { "__type": "airflow.datasets.Dataset", "__source": None,