Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
64 changes: 60 additions & 4 deletions airflow/models/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
# under the License.
from __future__ import annotations

from urllib.parse import urlparse
from urllib.parse import parse_qs, urlencode, urlparse, urlunparse

import sqlalchemy_jsonfield
from sqlalchemy import (
Expand All @@ -35,6 +35,7 @@

from airflow.datasets import Dataset
from airflow.models.base import ID_LEN, Base, StringID
from airflow.models.connection import Connection
from airflow.settings import json
from airflow.utils import timezone
from airflow.utils.sqlalchemy import UtcDateTime
Expand Down Expand Up @@ -83,9 +84,6 @@ def __init__(self, uri: str, **kwargs):
uri.encode('ascii')
except UnicodeEncodeError:
raise ValueError('URI must be ascii')
parsed = urlparse(uri)
if parsed.scheme and parsed.scheme.lower() == 'airflow':
raise ValueError("Scheme `airflow` is reserved.")
super().__init__(uri=uri, **kwargs)

def __eq__(self, other):
Expand All @@ -100,6 +98,64 @@ def __hash__(self):
def __repr__(self):
return f"{self.__class__.__name__}(uri={self.uri!r}, extra={self.extra!r})"

@property
def canonical_uri(self):
"""
Resolve the canonical uri for a dataset.

If the uri doesn't have an `airflow` scheme, return it as-is.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just want to point out, this will leave a bit of a gap concerning canonicalization... given that for some services there is more than one way to represent a given resource. not to mention the ambiguity in the spec itself.

what to do about it? well, we could lean on the "experimental" aspect of this and change it later if we decide to implement fuller canonicalization. alternatively, we could be a little more restrictive, and instead of adding this property (which advertises canonicalization) we could add a method like "render_airflow_uri" or something...


If it does have an `airflow` scheme, it takes the connection id from
the username in userinfo. It then will combine the connection uri and
dataset uri to form the canonical uri. It does this by:

* Using the scheme from the connection, unless an override is provided
in the dataset scheme (e.g. airflow+override://)
* Determine the hostname and port, where the dataset values take precedence
* Combine the path, connection first followed by the dataset path
* Merge the query args

# airflow://conn_id/...
# airflow+override://conn_id/...
# airflow://conn_id/some_extra_path?query
"""
parsed = urlparse(self.uri)

if not parsed.scheme.startswith("airflow"):
return self.uri

conn_id = parsed.username
conn = urlparse(Connection.get_connection_from_secrets(conn_id).get_uri())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i'm not sure this is a good idea. there could be sensitive information in here. a lot of it tends to deal with auth and config more so than the mere location of the data thing. e.g. keyfile_dict? it could be very long. also you have problems with "canonicalizing" i.e. for a given airflow connection and resource there could be multiple ways to represent it e.g. password in user info block vs in extra and this kind of thing. also, extra is nested json it will render everything under query param __extra__ or something like that.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you come up with an example connection that leaks anything @dstandish? Form a quick glance at the code I don't think it's possible.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.


# Take the scheme from the connection, unless it is overridden in the dataset
scheme = conn.scheme
split_scheme = parsed.scheme.split("+")
if len(split_scheme) == 2:
scheme = split_scheme[1]

# Strip userinfo from the uri
# Allow hostname/port override
hostname = parsed.hostname or conn.hostname
port = parsed.port or conn.port
netloc = hostname
if port:
netloc = f"{hostname}:{port}"
Comment on lines +140 to +142
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
netloc = hostname
if port:
netloc = f"{hostname}:{port}"
netloc = f"{hostname}:{port}" if port else hostname


# Combine the paths (connection followed by dataset)
path = conn.path
if parsed.path:
path = f"{path}{parsed.path}"
if path == "//":
path = "/"

# Merge the query args
query = parse_qs(conn.query)
if parsed.query:
query.update(parse_qs(parsed.query))

merged_conn = (scheme, netloc, path, "", urlencode(query, doseq=True), conn.fragment)
return urlunparse(merged_conn)


class DatasetDagRef(Base):
"""References from a DAG to an upstream dataset."""
Expand Down
42 changes: 36 additions & 6 deletions tests/models/test_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,12 @@
# specific language governing permissions and limitations
# under the License.

from unittest import mock

import pytest

from airflow.datasets import Dataset
from airflow.models.dataset import DatasetModel
from airflow.operators.empty import EmptyOperator


Expand All @@ -32,14 +35,41 @@ def test_uri_with_scheme(self, dag_maker, session):
with dag_maker(dag_id="example_dataset"):
EmptyOperator(task_id="task1", outlets=[dataset])

def test_uri_with_airflow_scheme_restricted(self, dag_maker, session):
dataset = Dataset(uri="airflow://example_dataset")
with pytest.raises(ValueError, match='Scheme `airflow` is reserved'):
with dag_maker(dag_id="example_dataset"):
EmptyOperator(task_id="task1", outlets=[dataset])

def test_uri_with_invalid_characters(self, dag_maker, session):
dataset = Dataset(uri="èxample_datašet")
with pytest.raises(ValueError, match='URI must be ascii'):
with dag_maker(dag_id="example_dataset"):
EmptyOperator(task_id="task1", outlets=[dataset])


class TestDatasetModel:
@pytest.mark.parametrize(
"conn_uri, dataset_uri, expected_canonical_uri",
[
("postgres://somehost/", "airflow://testconn@/", "postgres://somehost/"),
("postgres://somehost:111/base", "airflow://testconn@", "postgres://somehost:111/base"),
("postgres://somehost:111/base", "airflow+foo://testconn@", "foo://somehost:111/base"),
("postgres://somehost:111", "airflow://testconn@foo:222", "postgres://foo:222"),
(
"postgres://somehost:111/base",
"airflow://testconn@/extra",
"postgres://somehost:111/base/extra",
),
("postgres://somehost:111", "airflow://testconn@/?foo=bar", "postgres://somehost:111/?foo=bar"),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, this might be a risk. I don't think we should include query params from the connection -- there's much to big a risk of including the (old style/deprecated) of aws_secret_access_key https://airflow.apache.org/docs/apache-airflow-providers-amazon/stable/connections/aws.html#configuring-the-connection

(
"postgres://somehost?biz=baz",
"airflow://testconn@/?foo=bar",
"postgres://somehost/?biz=baz&foo=bar",
),
(
"postgres://somehost?foo=baz",
"airflow://testconn@/?foo=bar",
"postgres://somehost/?foo=bar",
),
("postgres://user:pass@somehost", "airflow://testconn@", "postgres://somehost"),
],
)
def test_canonical_uri(self, conn_uri, dataset_uri, expected_canonical_uri):
with mock.patch.dict('os.environ', AIRFLOW_CONN_TESTCONN=conn_uri):
dataset = DatasetModel(uri=dataset_uri)
assert dataset.canonical_uri == expected_canonical_uri