Skip to content

Commit

Permalink
feat: Add OpenLineage support for File and User Airflow's lineage ent…
Browse files Browse the repository at this point in the history
…ities (apache#37744)
  • Loading branch information
kacpermuda authored and utkarsharma2 committed Apr 22, 2024
1 parent 8c79561 commit 29ca912
Show file tree
Hide file tree
Showing 4 changed files with 265 additions and 33 deletions.
76 changes: 69 additions & 7 deletions airflow/providers/openlineage/extractors/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,9 @@
from airflow.utils.module_loading import import_string

if TYPE_CHECKING:
from openlineage.client.run import Dataset

from airflow.lineage.entities import Table
from airflow.models import Operator


Expand Down Expand Up @@ -178,19 +181,78 @@ def extract_inlets_and_outlets(
task_metadata.outputs.append(d)

@staticmethod
def convert_to_ol_dataset(obj):
def convert_to_ol_dataset_from_object_storage_uri(uri: str) -> Dataset | None:
from urllib.parse import urlparse

from openlineage.client.run import Dataset

try:
scheme, netloc, path, params, _, _ = urlparse(uri)
except Exception:
return None
if scheme.startswith("s3"):
return Dataset(namespace=f"s3://{netloc}", name=path.lstrip("/"))
elif scheme.startswith(("gcs", "gs")):
return Dataset(namespace=f"gs://{netloc}", name=path.lstrip("/"))
elif "/" not in uri:
return None
return Dataset(namespace=scheme, name=f"{netloc}{path}")

@staticmethod
def convert_to_ol_dataset_from_table(table: Table) -> Dataset:
from openlineage.client.facet import (
BaseFacet,
OwnershipDatasetFacet,
OwnershipDatasetFacetOwners,
SchemaDatasetFacet,
SchemaField,
)
from openlineage.client.run import Dataset

facets: dict[str, BaseFacet] = {}
if table.columns:
facets["schema"] = SchemaDatasetFacet(
fields=[
SchemaField(
name=column.name,
type=column.data_type,
description=column.description,
)
for column in table.columns
]
)
if table.owners:
facets["ownership"] = OwnershipDatasetFacet(
owners=[
OwnershipDatasetFacetOwners(
# f.e. "user:John Doe <jdoe@company.com>" or just "user:<jdoe@company.com>"
name=f"user:"
f"{user.first_name + ' ' if user.first_name else ''}"
f"{user.last_name + ' ' if user.last_name else ''}"
f"<{user.email}>",
type="",
)
for user in table.owners
]
)
return Dataset(
namespace=f"{table.cluster}",
name=f"{table.database}.{table.name}",
facets=facets,
)

@staticmethod
def convert_to_ol_dataset(obj) -> Dataset | None:
from openlineage.client.run import Dataset

from airflow.lineage.entities import Table
from airflow.lineage.entities import File, Table

if isinstance(obj, Dataset):
return obj
elif isinstance(obj, Table):
return Dataset(
namespace=f"{obj.cluster}",
name=f"{obj.database}.{obj.name}",
facets={},
)
return ExtractorManager.convert_to_ol_dataset_from_table(obj)
elif isinstance(obj, File):
return ExtractorManager.convert_to_ol_dataset_from_object_storage_uri(obj.url)
else:
return None

Expand Down
62 changes: 37 additions & 25 deletions docs/apache-airflow-providers-openlineage/guides/developer.rst
Original file line number Diff line number Diff line change
Expand Up @@ -364,10 +364,7 @@ Airflow allows Operators to track lineage by specifying the input and outputs of
`inlets and outlets <https://airflow.apache.org/docs/apache-airflow/stable/administration-and-deployment/lineage.html#lineage>`_.
OpenLineage will, by default, use inlets and outlets as input/output datasets if it cannot find any successful extraction from the OpenLineage methods or the Extractors.

.. important::

Airflow supports inlets and outlets to be either a Table, Column, File or User entity. However, currently OpenLineage only extracts lineage via Table entity

Airflow supports inlets and outlets to be either a Table, Column, File or User entity and so does OpenLineage.

Example
^^^^^^^
Expand All @@ -379,33 +376,50 @@ An Operator inside the Airflow DAG can be annotated with inlets and outlets like
"""Example DAG demonstrating the usage of the extraction via Inlets and Outlets."""
import pendulum
import datetime
from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.lineage.entities import Table, File
from airflow.lineage.entities import Table, File, Column, User
t1 = Table(
cluster="c1",
database="d1",
name="t1",
owners=[User(email="jdoe@ok.com", first_name="Joe", last_name="Doe")],
)
t2 = Table(
cluster="c1",
database="d1",
name="t2",
columns=[
Column(name="col1", description="desc1", data_type="type1"),
Column(name="col2", description="desc2", data_type="type2"),
],
owners=[
User(email="mike@company.com", first_name="Mike", last_name="Smith"),
User(email="theo@company.com", first_name="Theo"),
User(email="smith@company.com", last_name="Smith"),
User(email="jane@company.com"),
],
)
t3 = Table(
cluster="c1",
database="d1",
name="t3",
columns=[
Column(name="col3", description="desc3", data_type="type3"),
Column(name="col4", description="desc4", data_type="type4"),
],
)
t4 = Table(cluster="c1", database="d1", name="t4")
f1 = File(url="s3://bucket/dir/file1")
def create_table(cluster, database, name):
return Table(
database=database,
cluster=cluster,
name=name,
)
t1 = create_table("c1", "d1", "t1")
t2 = create_table("c1", "d1", "t2")
t3 = create_table("c1", "d1", "t3")
t4 = create_table("c1", "d1", "t4")
f1 = File(url="http://randomfile")
with DAG(
dag_id="example_operator",
schedule_interval="0 0 * * *",
schedule_interval="@once",
start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
dagrun_timeout=datetime.timedelta(minutes=60),
params={"example_key": "example_value"},
) as dag:
task1 = BashOperator(
task_id="task_1_with_inlet_outlet",
Expand All @@ -426,8 +440,6 @@ An Operator inside the Airflow DAG can be annotated with inlets and outlets like
if __name__ == "__main__":
dag.cli()
Note that the ``File`` entity, defined in the example code, is not captured by the lineage event currently as described in the ``important`` box above.

Conversion from Airflow Table entity to OpenLineage Dataset is made in the following way:
- ``CLUSTER`` of the table entity becomes the namespace of OpenLineage's Dataset
- The name of the dataset is formed by ``{{DATABASE}}.{{NAME}}`` where ``DATABASE`` and ``NAME`` are attributes specified by Airflow's Table entity.
Expand Down
1 change: 0 additions & 1 deletion tests/always/test_project_structure.py
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,6 @@ def test_providers_modules_should_have_tests(self):
"tests/providers/microsoft/azure/operators/test_adls.py",
"tests/providers/microsoft/azure/transfers/test_azure_blob_to_gcs.py",
"tests/providers/mongo/sensors/test_mongo.py",
"tests/providers/openlineage/extractors/test_manager.py",
"tests/providers/openlineage/plugins/test_adapter.py",
"tests/providers/openlineage/plugins/test_facets.py",
"tests/providers/openlineage/test_sqlparser.py",
Expand Down
159 changes: 159 additions & 0 deletions tests/providers/openlineage/extractors/test_manager.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,159 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from __future__ import annotations

import pytest
from openlineage.client.facet import (
OwnershipDatasetFacet,
OwnershipDatasetFacetOwners,
SchemaDatasetFacet,
SchemaField,
)
from openlineage.client.run import Dataset

from airflow.lineage.entities import Column, File, Table, User
from airflow.providers.openlineage.extractors.manager import ExtractorManager


@pytest.mark.parametrize(
("uri", "dataset"),
(
("s3://bucket1/dir1/file1", Dataset(namespace="s3://bucket1", name="dir1/file1")),
("gs://bucket2/dir2/file2", Dataset(namespace="gs://bucket2", name="dir2/file2")),
("gcs://bucket3/dir3/file3", Dataset(namespace="gs://bucket3", name="dir3/file3")),
("https://test.com", Dataset(namespace="https", name="test.com")),
("https://test.com?param1=test1&param2=test2", Dataset(namespace="https", name="test.com")),
("not_an_url", None),
),
)
def test_convert_to_ol_dataset_from_object_storage_uri(uri, dataset):
result = ExtractorManager.convert_to_ol_dataset_from_object_storage_uri(uri)
assert result == dataset


@pytest.mark.parametrize(
("obj", "dataset"),
(
(
Dataset(namespace="n1", name="f1"),
Dataset(namespace="n1", name="f1"),
),
(File(url="s3://bucket1/dir1/file1"), Dataset(namespace="s3://bucket1", name="dir1/file1")),
(File(url="gs://bucket2/dir2/file2"), Dataset(namespace="gs://bucket2", name="dir2/file2")),
(File(url="https://test.com"), Dataset(namespace="https", name="test.com")),
(Table(cluster="c1", database="d1", name="t1"), Dataset(namespace="c1", name="d1.t1")),
("gs://bucket2/dir2/file2", None),
("not_an_url", None),
),
)
def test_convert_to_ol_dataset(obj, dataset):
result = ExtractorManager.convert_to_ol_dataset(obj)
assert result == dataset


def test_convert_to_ol_dataset_from_table_with_columns_and_owners():
table = Table(
cluster="c1",
database="d1",
name="t1",
columns=[
Column(name="col1", description="desc1", data_type="type1"),
Column(name="col2", description="desc2", data_type="type2"),
],
owners=[
User(email="mike@company.com", first_name="Mike", last_name="Smith"),
User(email="theo@company.com", first_name="Theo"),
User(email="smith@company.com", last_name="Smith"),
User(email="jane@company.com"),
],
)
expected_facets = {
"schema": SchemaDatasetFacet(
fields=[
SchemaField(
name="col1",
type="type1",
description="desc1",
),
SchemaField(
name="col2",
type="type2",
description="desc2",
),
]
),
"ownership": OwnershipDatasetFacet(
owners=[
OwnershipDatasetFacetOwners(name="user:Mike Smith <mike@company.com>", type=""),
OwnershipDatasetFacetOwners(name="user:Theo <theo@company.com>", type=""),
OwnershipDatasetFacetOwners(name="user:Smith <smith@company.com>", type=""),
OwnershipDatasetFacetOwners(name="user:<jane@company.com>", type=""),
]
),
}
result = ExtractorManager.convert_to_ol_dataset_from_table(table)
assert result.namespace == "c1"
assert result.name == "d1.t1"
assert result.facets == expected_facets


def test_convert_to_ol_dataset_table():
table = Table(
cluster="c1",
database="d1",
name="t1",
columns=[
Column(name="col1", description="desc1", data_type="type1"),
Column(name="col2", description="desc2", data_type="type2"),
],
owners=[
User(email="mike@company.com", first_name="Mike", last_name="Smith"),
User(email="theo@company.com", first_name="Theo"),
User(email="smith@company.com", last_name="Smith"),
User(email="jane@company.com"),
],
)
expected_facets = {
"schema": SchemaDatasetFacet(
fields=[
SchemaField(
name="col1",
type="type1",
description="desc1",
),
SchemaField(
name="col2",
type="type2",
description="desc2",
),
]
),
"ownership": OwnershipDatasetFacet(
owners=[
OwnershipDatasetFacetOwners(name="user:Mike Smith <mike@company.com>", type=""),
OwnershipDatasetFacetOwners(name="user:Theo <theo@company.com>", type=""),
OwnershipDatasetFacetOwners(name="user:Smith <smith@company.com>", type=""),
OwnershipDatasetFacetOwners(name="user:<jane@company.com>", type=""),
]
),
}

result = ExtractorManager.convert_to_ol_dataset(table)
assert result.namespace == "c1"
assert result.name == "d1.t1"
assert result.facets == expected_facets

0 comments on commit 29ca912

Please sign in to comment.