diff --git a/airflow/providers/openlineage/extractors/manager.py b/airflow/providers/openlineage/extractors/manager.py index cb77c796ba168..405db3d8e5a4c 100644 --- a/airflow/providers/openlineage/extractors/manager.py +++ b/airflow/providers/openlineage/extractors/manager.py @@ -186,16 +186,24 @@ def convert_to_ol_dataset_from_object_storage_uri(uri: str) -> Dataset | None: from openlineage.client.run import Dataset + if "/" not in uri: + return None + try: scheme, netloc, path, params, _, _ = urlparse(uri) except Exception: return None - if scheme.startswith("s3"): - return Dataset(namespace=f"s3://{netloc}", name=path.lstrip("/")) - elif scheme.startswith(("gcs", "gs")): - return Dataset(namespace=f"gs://{netloc}", name=path.lstrip("/")) - elif "/" not in uri: - return None + + common_schemas = { + "s3": "s3", + "gs": "gs", + "gcs": "gs", + "hdfs": "hdfs", + "file": "file", + } + for found, final in common_schemas.items(): + if scheme.startswith(found): + return Dataset(namespace=f"{final}://{netloc}", name=path.lstrip("/")) return Dataset(namespace=scheme, name=f"{netloc}{path}") @staticmethod diff --git a/tests/providers/openlineage/extractors/test_manager.py b/tests/providers/openlineage/extractors/test_manager.py index d1f794b49d68b..7a790e8393927 100644 --- a/tests/providers/openlineage/extractors/test_manager.py +++ b/tests/providers/openlineage/extractors/test_manager.py @@ -36,8 +36,13 @@ ("s3://bucket1/dir1/file1", Dataset(namespace="s3://bucket1", name="dir1/file1")), ("gs://bucket2/dir2/file2", Dataset(namespace="gs://bucket2", name="dir2/file2")), ("gcs://bucket3/dir3/file3", Dataset(namespace="gs://bucket3", name="dir3/file3")), + ("hdfs://namenodehost:8020/file1", Dataset(namespace="hdfs://namenodehost:8020", name="file1")), + ("hdfs://namenodehost/file2", Dataset(namespace="hdfs://namenodehost", name="file2")), + ("file://localhost/etc/fstab", Dataset(namespace="file://localhost", name="etc/fstab")), + ("file:///etc/fstab", Dataset(namespace="file://", name="etc/fstab")), ("https://test.com", Dataset(namespace="https", name="test.com")), ("https://test.com?param1=test1¶m2=test2", Dataset(namespace="https", name="test.com")), + ("file:test.csv", None), ("not_an_url", None), ), ) @@ -55,6 +60,14 @@ def test_convert_to_ol_dataset_from_object_storage_uri(uri, dataset): ), (File(url="s3://bucket1/dir1/file1"), Dataset(namespace="s3://bucket1", name="dir1/file1")), (File(url="gs://bucket2/dir2/file2"), Dataset(namespace="gs://bucket2", name="dir2/file2")), + (File(url="gcs://bucket3/dir3/file3"), Dataset(namespace="gs://bucket3", name="dir3/file3")), + ( + File(url="hdfs://namenodehost:8020/file1"), + Dataset(namespace="hdfs://namenodehost:8020", name="file1"), + ), + (File(url="hdfs://namenodehost/file2"), Dataset(namespace="hdfs://namenodehost", name="file2")), + (File(url="file://localhost/etc/fstab"), Dataset(namespace="file://localhost", name="etc/fstab")), + (File(url="file:///etc/fstab"), Dataset(namespace="file://", name="etc/fstab")), (File(url="https://test.com"), Dataset(namespace="https", name="test.com")), (Table(cluster="c1", database="d1", name="t1"), Dataset(namespace="c1", name="d1.t1")), ("gs://bucket2/dir2/file2", None),