From 29ca9122d12c477f9edae4598cc9cdbd68ea044e Mon Sep 17 00:00:00 2001 From: Kacper Muda Date: Thu, 29 Feb 2024 11:40:12 +0100 Subject: [PATCH] feat: Add OpenLineage support for File and User Airflow's lineage entities (#37744) --- .../openlineage/extractors/manager.py | 76 ++++++++- .../guides/developer.rst | 62 ++++--- tests/always/test_project_structure.py | 1 - .../openlineage/extractors/test_manager.py | 159 ++++++++++++++++++ 4 files changed, 265 insertions(+), 33 deletions(-) create mode 100644 tests/providers/openlineage/extractors/test_manager.py diff --git a/airflow/providers/openlineage/extractors/manager.py b/airflow/providers/openlineage/extractors/manager.py index a5654d8bbfb654..cb77c796ba168d 100644 --- a/airflow/providers/openlineage/extractors/manager.py +++ b/airflow/providers/openlineage/extractors/manager.py @@ -34,6 +34,9 @@ from airflow.utils.module_loading import import_string if TYPE_CHECKING: + from openlineage.client.run import Dataset + + from airflow.lineage.entities import Table from airflow.models import Operator @@ -178,19 +181,78 @@ def extract_inlets_and_outlets( task_metadata.outputs.append(d) @staticmethod - def convert_to_ol_dataset(obj): + def convert_to_ol_dataset_from_object_storage_uri(uri: str) -> Dataset | None: + from urllib.parse import urlparse + + from openlineage.client.run import Dataset + + try: + scheme, netloc, path, params, _, _ = urlparse(uri) + except Exception: + return None + if scheme.startswith("s3"): + return Dataset(namespace=f"s3://{netloc}", name=path.lstrip("/")) + elif scheme.startswith(("gcs", "gs")): + return Dataset(namespace=f"gs://{netloc}", name=path.lstrip("/")) + elif "/" not in uri: + return None + return Dataset(namespace=scheme, name=f"{netloc}{path}") + + @staticmethod + def convert_to_ol_dataset_from_table(table: Table) -> Dataset: + from openlineage.client.facet import ( + BaseFacet, + OwnershipDatasetFacet, + OwnershipDatasetFacetOwners, + SchemaDatasetFacet, + SchemaField, + ) + from openlineage.client.run import Dataset + + facets: dict[str, BaseFacet] = {} + if table.columns: + facets["schema"] = SchemaDatasetFacet( + fields=[ + SchemaField( + name=column.name, + type=column.data_type, + description=column.description, + ) + for column in table.columns + ] + ) + if table.owners: + facets["ownership"] = OwnershipDatasetFacet( + owners=[ + OwnershipDatasetFacetOwners( + # f.e. "user:John Doe " or just "user:" + name=f"user:" + f"{user.first_name + ' ' if user.first_name else ''}" + f"{user.last_name + ' ' if user.last_name else ''}" + f"<{user.email}>", + type="", + ) + for user in table.owners + ] + ) + return Dataset( + namespace=f"{table.cluster}", + name=f"{table.database}.{table.name}", + facets=facets, + ) + + @staticmethod + def convert_to_ol_dataset(obj) -> Dataset | None: from openlineage.client.run import Dataset - from airflow.lineage.entities import Table + from airflow.lineage.entities import File, Table if isinstance(obj, Dataset): return obj elif isinstance(obj, Table): - return Dataset( - namespace=f"{obj.cluster}", - name=f"{obj.database}.{obj.name}", - facets={}, - ) + return ExtractorManager.convert_to_ol_dataset_from_table(obj) + elif isinstance(obj, File): + return ExtractorManager.convert_to_ol_dataset_from_object_storage_uri(obj.url) else: return None diff --git a/docs/apache-airflow-providers-openlineage/guides/developer.rst b/docs/apache-airflow-providers-openlineage/guides/developer.rst index 035774e69af8b5..3c7518d6cb6c60 100644 --- a/docs/apache-airflow-providers-openlineage/guides/developer.rst +++ b/docs/apache-airflow-providers-openlineage/guides/developer.rst @@ -364,10 +364,7 @@ Airflow allows Operators to track lineage by specifying the input and outputs of `inlets and outlets `_. OpenLineage will, by default, use inlets and outlets as input/output datasets if it cannot find any successful extraction from the OpenLineage methods or the Extractors. -.. important:: - - Airflow supports inlets and outlets to be either a Table, Column, File or User entity. However, currently OpenLineage only extracts lineage via Table entity - +Airflow supports inlets and outlets to be either a Table, Column, File or User entity and so does OpenLineage. Example ^^^^^^^ @@ -379,33 +376,50 @@ An Operator inside the Airflow DAG can be annotated with inlets and outlets like """Example DAG demonstrating the usage of the extraction via Inlets and Outlets.""" import pendulum - import datetime from airflow import DAG from airflow.operators.bash import BashOperator - from airflow.lineage.entities import Table, File + from airflow.lineage.entities import Table, File, Column, User + + + t1 = Table( + cluster="c1", + database="d1", + name="t1", + owners=[User(email="jdoe@ok.com", first_name="Joe", last_name="Doe")], + ) + t2 = Table( + cluster="c1", + database="d1", + name="t2", + columns=[ + Column(name="col1", description="desc1", data_type="type1"), + Column(name="col2", description="desc2", data_type="type2"), + ], + owners=[ + User(email="mike@company.com", first_name="Mike", last_name="Smith"), + User(email="theo@company.com", first_name="Theo"), + User(email="smith@company.com", last_name="Smith"), + User(email="jane@company.com"), + ], + ) + t3 = Table( + cluster="c1", + database="d1", + name="t3", + columns=[ + Column(name="col3", description="desc3", data_type="type3"), + Column(name="col4", description="desc4", data_type="type4"), + ], + ) + t4 = Table(cluster="c1", database="d1", name="t4") + f1 = File(url="s3://bucket/dir/file1") - def create_table(cluster, database, name): - return Table( - database=database, - cluster=cluster, - name=name, - ) - - - t1 = create_table("c1", "d1", "t1") - t2 = create_table("c1", "d1", "t2") - t3 = create_table("c1", "d1", "t3") - t4 = create_table("c1", "d1", "t4") - f1 = File(url="http://randomfile") - with DAG( dag_id="example_operator", - schedule_interval="0 0 * * *", + schedule_interval="@once", start_date=pendulum.datetime(2021, 1, 1, tz="UTC"), - dagrun_timeout=datetime.timedelta(minutes=60), - params={"example_key": "example_value"}, ) as dag: task1 = BashOperator( task_id="task_1_with_inlet_outlet", @@ -426,8 +440,6 @@ An Operator inside the Airflow DAG can be annotated with inlets and outlets like if __name__ == "__main__": dag.cli() -Note that the ``File`` entity, defined in the example code, is not captured by the lineage event currently as described in the ``important`` box above. - Conversion from Airflow Table entity to OpenLineage Dataset is made in the following way: - ``CLUSTER`` of the table entity becomes the namespace of OpenLineage's Dataset - The name of the dataset is formed by ``{{DATABASE}}.{{NAME}}`` where ``DATABASE`` and ``NAME`` are attributes specified by Airflow's Table entity. diff --git a/tests/always/test_project_structure.py b/tests/always/test_project_structure.py index bab56abead2a1e..85c2c306555b78 100644 --- a/tests/always/test_project_structure.py +++ b/tests/always/test_project_structure.py @@ -159,7 +159,6 @@ def test_providers_modules_should_have_tests(self): "tests/providers/microsoft/azure/operators/test_adls.py", "tests/providers/microsoft/azure/transfers/test_azure_blob_to_gcs.py", "tests/providers/mongo/sensors/test_mongo.py", - "tests/providers/openlineage/extractors/test_manager.py", "tests/providers/openlineage/plugins/test_adapter.py", "tests/providers/openlineage/plugins/test_facets.py", "tests/providers/openlineage/test_sqlparser.py", diff --git a/tests/providers/openlineage/extractors/test_manager.py b/tests/providers/openlineage/extractors/test_manager.py new file mode 100644 index 00000000000000..d1f794b49d68bb --- /dev/null +++ b/tests/providers/openlineage/extractors/test_manager.py @@ -0,0 +1,159 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from __future__ import annotations + +import pytest +from openlineage.client.facet import ( + OwnershipDatasetFacet, + OwnershipDatasetFacetOwners, + SchemaDatasetFacet, + SchemaField, +) +from openlineage.client.run import Dataset + +from airflow.lineage.entities import Column, File, Table, User +from airflow.providers.openlineage.extractors.manager import ExtractorManager + + +@pytest.mark.parametrize( + ("uri", "dataset"), + ( + ("s3://bucket1/dir1/file1", Dataset(namespace="s3://bucket1", name="dir1/file1")), + ("gs://bucket2/dir2/file2", Dataset(namespace="gs://bucket2", name="dir2/file2")), + ("gcs://bucket3/dir3/file3", Dataset(namespace="gs://bucket3", name="dir3/file3")), + ("https://test.com", Dataset(namespace="https", name="test.com")), + ("https://test.com?param1=test1¶m2=test2", Dataset(namespace="https", name="test.com")), + ("not_an_url", None), + ), +) +def test_convert_to_ol_dataset_from_object_storage_uri(uri, dataset): + result = ExtractorManager.convert_to_ol_dataset_from_object_storage_uri(uri) + assert result == dataset + + +@pytest.mark.parametrize( + ("obj", "dataset"), + ( + ( + Dataset(namespace="n1", name="f1"), + Dataset(namespace="n1", name="f1"), + ), + (File(url="s3://bucket1/dir1/file1"), Dataset(namespace="s3://bucket1", name="dir1/file1")), + (File(url="gs://bucket2/dir2/file2"), Dataset(namespace="gs://bucket2", name="dir2/file2")), + (File(url="https://test.com"), Dataset(namespace="https", name="test.com")), + (Table(cluster="c1", database="d1", name="t1"), Dataset(namespace="c1", name="d1.t1")), + ("gs://bucket2/dir2/file2", None), + ("not_an_url", None), + ), +) +def test_convert_to_ol_dataset(obj, dataset): + result = ExtractorManager.convert_to_ol_dataset(obj) + assert result == dataset + + +def test_convert_to_ol_dataset_from_table_with_columns_and_owners(): + table = Table( + cluster="c1", + database="d1", + name="t1", + columns=[ + Column(name="col1", description="desc1", data_type="type1"), + Column(name="col2", description="desc2", data_type="type2"), + ], + owners=[ + User(email="mike@company.com", first_name="Mike", last_name="Smith"), + User(email="theo@company.com", first_name="Theo"), + User(email="smith@company.com", last_name="Smith"), + User(email="jane@company.com"), + ], + ) + expected_facets = { + "schema": SchemaDatasetFacet( + fields=[ + SchemaField( + name="col1", + type="type1", + description="desc1", + ), + SchemaField( + name="col2", + type="type2", + description="desc2", + ), + ] + ), + "ownership": OwnershipDatasetFacet( + owners=[ + OwnershipDatasetFacetOwners(name="user:Mike Smith ", type=""), + OwnershipDatasetFacetOwners(name="user:Theo ", type=""), + OwnershipDatasetFacetOwners(name="user:Smith ", type=""), + OwnershipDatasetFacetOwners(name="user:", type=""), + ] + ), + } + result = ExtractorManager.convert_to_ol_dataset_from_table(table) + assert result.namespace == "c1" + assert result.name == "d1.t1" + assert result.facets == expected_facets + + +def test_convert_to_ol_dataset_table(): + table = Table( + cluster="c1", + database="d1", + name="t1", + columns=[ + Column(name="col1", description="desc1", data_type="type1"), + Column(name="col2", description="desc2", data_type="type2"), + ], + owners=[ + User(email="mike@company.com", first_name="Mike", last_name="Smith"), + User(email="theo@company.com", first_name="Theo"), + User(email="smith@company.com", last_name="Smith"), + User(email="jane@company.com"), + ], + ) + expected_facets = { + "schema": SchemaDatasetFacet( + fields=[ + SchemaField( + name="col1", + type="type1", + description="desc1", + ), + SchemaField( + name="col2", + type="type2", + description="desc2", + ), + ] + ), + "ownership": OwnershipDatasetFacet( + owners=[ + OwnershipDatasetFacetOwners(name="user:Mike Smith ", type=""), + OwnershipDatasetFacetOwners(name="user:Theo ", type=""), + OwnershipDatasetFacetOwners(name="user:Smith ", type=""), + OwnershipDatasetFacetOwners(name="user:", type=""), + ] + ), + } + + result = ExtractorManager.convert_to_ol_dataset(table) + assert result.namespace == "c1" + assert result.name == "d1.t1" + assert result.facets == expected_facets