Skip to content

Commit

Permalink
Add support for openlineage to AFS and common.io (#36410)
Browse files Browse the repository at this point in the history
This adds low level support for open lineage to ObjectStorage
and integrates it into common.io.
  • Loading branch information
bolkedebruin committed Jan 4, 2024
1 parent 6e93549 commit 33996a4
Show file tree
Hide file tree
Showing 7 changed files with 78 additions and 21 deletions.
11 changes: 10 additions & 1 deletion airflow/io/path.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,9 @@ def __init__(
conn_id: str | None = None,
**kwargs: typing.Any,
) -> None:
# warning: we are not calling super().__init__ here
# as it will try to create a new fs from a different
# set if registered filesystems
if parsed_url and parsed_url.scheme:
self._store = attach(parsed_url.scheme, conn_id)
else:
Expand Down Expand Up @@ -173,10 +176,16 @@ def bucket(self) -> str:
@property
def key(self) -> str:
if self._url:
return self._url.path
# per convention, we strip the leading slashes to ensure a relative key is returned
# we keep the trailing slash to allow for directory-like semantics
return self._url.path.lstrip(self.sep)
else:
return ""

@property
def namespace(self) -> str:
return f"{self.protocol}://{self.bucket}" if self.bucket else self.protocol

def stat(self) -> stat_result: # type: ignore[override]
"""Call ``stat`` and return the result."""
return stat_result(
Expand Down
37 changes: 25 additions & 12 deletions airflow/providers/common/io/operators/file_transfer.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
from airflow.models import BaseOperator

if TYPE_CHECKING:
from airflow.providers.openlineage.extractors import OperatorLineage
from airflow.utils.context import Context


Expand Down Expand Up @@ -64,21 +65,33 @@ def __init__(
self.overwrite = overwrite

def execute(self, context: Context) -> None:
src: ObjectStoragePath
dst: ObjectStoragePath

if isinstance(self.src, str):
src = ObjectStoragePath(self.src, conn_id=self.source_conn_id)
else:
src = self.src

if isinstance(self.dst, str):
dst = ObjectStoragePath(self.dst, conn_id=self.dst_conn_id)
else:
dst = self.dst
src: ObjectStoragePath = self._get_path(self.src, self.source_conn_id)
dst: ObjectStoragePath = self._get_path(self.dst, self.dst_conn_id)

if not self.overwrite:
if dst.exists() and dst.is_file():
raise ValueError(f"Destination {dst} already exists")

src.copy(dst)

def get_openlineage_facets_on_start(self) -> OperatorLineage:
from openlineage.client.run import Dataset

from airflow.providers.openlineage.extractors import OperatorLineage

src: ObjectStoragePath = self._get_path(self.src, self.source_conn_id)
dst: ObjectStoragePath = self._get_path(self.dst, self.dst_conn_id)

input_dataset = Dataset(namespace=src.namespace, name=src.key)
output_dataset = Dataset(namespace=dst.namespace, name=dst.key)

return OperatorLineage(
inputs=[input_dataset],
outputs=[output_dataset],
)

@staticmethod
def _get_path(path: str | ObjectStoragePath, conn_id: str | None) -> ObjectStoragePath:
if isinstance(path, str):
return ObjectStoragePath(path, conn_id=conn_id)
return path
4 changes: 2 additions & 2 deletions dev/breeze/tests/test_selective_checks.py
Original file line number Diff line number Diff line change
Expand Up @@ -525,7 +525,7 @@ def assert_outputs_are_printed(expected_outputs: dict[str, str], stderr: str):
"tests/providers/common/io/operators/test_file_transfer.py",
),
{
"affected-providers-list-as-string": "common.io",
"affected-providers-list-as-string": "common.io openlineage",
"all-python-versions": "['3.8']",
"all-python-versions-list-as-string": "3.8",
"python-versions": "['3.8']",
Expand All @@ -538,7 +538,7 @@ def assert_outputs_are_printed(expected_outputs: dict[str, str], stderr: str):
"docs-build": "false",
"run-kubernetes-tests": "false",
"upgrade-to-newer-dependencies": "false",
"parallel-test-types-list-as-string": "Always Providers[common.io]",
"parallel-test-types-list-as-string": "Always Providers[common.io,openlineage]",
},
id="Only Always and Common.IO tests should run when only common.io and tests/always changed",
),
Expand Down
8 changes: 7 additions & 1 deletion docs/apache-airflow/core-concepts/objectstorage.rst
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,8 @@ object you want to interact with. For example, to point to a bucket in s3, you w
base = ObjectStoragePath("s3://aws_default@my-bucket/")
The username part of the URI is optional. It can alternatively be passed in as a separate keyword argument:
The username part of the URI represents the Airflow connection id and is optional. It can alternatively be passed
in as a separate keyword argument:

.. code-block:: python
Expand Down Expand Up @@ -242,6 +243,11 @@ key

Returns the object key.

namespace
^^^^^^^^^

Returns the namespace of the object. Typically this is the protocol, like ``s3://`` with the
bucket name.

path
^^^^
Expand Down
4 changes: 3 additions & 1 deletion generated/provider_dependencies.json
Original file line number Diff line number Diff line change
Expand Up @@ -300,7 +300,9 @@
"deps": [
"apache-airflow>=2.8.0"
],
"cross-providers-deps": [],
"cross-providers-deps": [
"openlineage"
],
"excluded-python-versions": [],
"state": "ready"
},
Expand Down
11 changes: 7 additions & 4 deletions tests/io/test_path.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,18 +72,21 @@ def test_alias(self):
def test_init_objectstoragepath(self):
path = ObjectStoragePath("file://bucket/key/part1/part2")
assert path.bucket == "bucket"
assert path.key == "/key/part1/part2"
assert path.key == "key/part1/part2"
assert path.protocol == "file"
assert path.path == "bucket/key/part1/part2"

path2 = ObjectStoragePath(path / "part3")
assert path2.bucket == "bucket"
assert path2.key == "/key/part1/part2/part3"
assert path2.key == "key/part1/part2/part3"
assert path2.protocol == "file"
assert path2.path == "bucket/key/part1/part2/part3"

path3 = ObjectStoragePath(path2 / "2023")
assert path3.bucket == "bucket"
assert path3.key == "/key/part1/part2/part3/2023"
assert path3.key == "key/part1/part2/part3/2023"
assert path3.protocol == "file"
assert path3.path == "bucket/key/part1/part2/part3/2023"

def test_read_write(self):
o = ObjectStoragePath(f"file:///tmp/{str(uuid.uuid4())}")
Expand Down Expand Up @@ -171,7 +174,7 @@ def test_bucket_key_protocol(self):
o = ObjectStoragePath(f"{protocol}://{bucket}/{key}")
assert o.bucket == bucket
assert o.container == bucket
assert o.key == f"/{key}"
assert o.key == f"{key}"
assert o.protocol == protocol

def test_cwd_home(self):
Expand Down
24 changes: 24 additions & 0 deletions tests/providers/common/io/operators/test_file_transfer.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@

from unittest import mock

from openlineage.client.run import Dataset

from airflow.providers.common.io.operators.file_transfer import FileTransferOperator


Expand All @@ -45,3 +47,25 @@ def test_file_transfer_copy():
)
source_path.copy.assert_called_once_with(target_path)
target_path.copy.assert_not_called()


def test_get_openlineage_facets_on_start():
src_bucket = "src-bucket"
src_key = "src-key"
dst_bucket = "dst-bucket"
dst_key = "dst-key"

expected_input = Dataset(namespace=f"s3://{src_bucket}", name=src_key)
expected_output = Dataset(namespace=f"s3://{dst_bucket}", name=dst_key)

op = FileTransferOperator(
task_id="test",
src=f"s3://{src_bucket}/{src_key}",
dst=f"s3://{dst_bucket}/{dst_key}",
)

lineage = op.get_openlineage_facets_on_start()
assert len(lineage.inputs) == 1
assert len(lineage.outputs) == 1
assert lineage.inputs[0] == expected_input
assert lineage.outputs[0] == expected_output

0 comments on commit 33996a4

Please sign in to comment.