Skip to content

Commit

Permalink
tests: Add OpenLineage test cases for File to Dataset conversion (#37791
Browse files Browse the repository at this point in the history
)
  • Loading branch information
kacpermuda committed Feb 29, 2024
1 parent 4938ac0 commit e358bb2
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 6 deletions.
20 changes: 14 additions & 6 deletions airflow/providers/openlineage/extractors/manager.py
Expand Up @@ -186,16 +186,24 @@ def convert_to_ol_dataset_from_object_storage_uri(uri: str) -> Dataset | None:

from openlineage.client.run import Dataset

if "/" not in uri:
return None

try:
scheme, netloc, path, params, _, _ = urlparse(uri)
except Exception:
return None
if scheme.startswith("s3"):
return Dataset(namespace=f"s3://{netloc}", name=path.lstrip("/"))
elif scheme.startswith(("gcs", "gs")):
return Dataset(namespace=f"gs://{netloc}", name=path.lstrip("/"))
elif "/" not in uri:
return None

common_schemas = {
"s3": "s3",
"gs": "gs",
"gcs": "gs",
"hdfs": "hdfs",
"file": "file",
}
for found, final in common_schemas.items():
if scheme.startswith(found):
return Dataset(namespace=f"{final}://{netloc}", name=path.lstrip("/"))
return Dataset(namespace=scheme, name=f"{netloc}{path}")

@staticmethod
Expand Down
13 changes: 13 additions & 0 deletions tests/providers/openlineage/extractors/test_manager.py
Expand Up @@ -36,8 +36,13 @@
("s3://bucket1/dir1/file1", Dataset(namespace="s3://bucket1", name="dir1/file1")),
("gs://bucket2/dir2/file2", Dataset(namespace="gs://bucket2", name="dir2/file2")),
("gcs://bucket3/dir3/file3", Dataset(namespace="gs://bucket3", name="dir3/file3")),
("hdfs://namenodehost:8020/file1", Dataset(namespace="hdfs://namenodehost:8020", name="file1")),
("hdfs://namenodehost/file2", Dataset(namespace="hdfs://namenodehost", name="file2")),
("file://localhost/etc/fstab", Dataset(namespace="file://localhost", name="etc/fstab")),
("file:///etc/fstab", Dataset(namespace="file://", name="etc/fstab")),
("https://test.com", Dataset(namespace="https", name="test.com")),
("https://test.com?param1=test1&param2=test2", Dataset(namespace="https", name="test.com")),
("file:test.csv", None),
("not_an_url", None),
),
)
Expand All @@ -55,6 +60,14 @@ def test_convert_to_ol_dataset_from_object_storage_uri(uri, dataset):
),
(File(url="s3://bucket1/dir1/file1"), Dataset(namespace="s3://bucket1", name="dir1/file1")),
(File(url="gs://bucket2/dir2/file2"), Dataset(namespace="gs://bucket2", name="dir2/file2")),
(File(url="gcs://bucket3/dir3/file3"), Dataset(namespace="gs://bucket3", name="dir3/file3")),
(
File(url="hdfs://namenodehost:8020/file1"),
Dataset(namespace="hdfs://namenodehost:8020", name="file1"),
),
(File(url="hdfs://namenodehost/file2"), Dataset(namespace="hdfs://namenodehost", name="file2")),
(File(url="file://localhost/etc/fstab"), Dataset(namespace="file://localhost", name="etc/fstab")),
(File(url="file:///etc/fstab"), Dataset(namespace="file://", name="etc/fstab")),
(File(url="https://test.com"), Dataset(namespace="https", name="test.com")),
(Table(cluster="c1", database="d1", name="t1"), Dataset(namespace="c1", name="d1.t1")),
("gs://bucket2/dir2/file2", None),
Expand Down

0 comments on commit e358bb2

Please sign in to comment.