Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add lightweight serialization for deltalake tables #35462

Merged
merged 4 commits into from Nov 5, 2023

Conversation

bolkedebruin
Copy link
Contributor

@bolkedebruin bolkedebruin commented Nov 5, 2023

This adds support for deltalake table serialization.

It allows you to do the following:

import polars as pl

from airflow.io.store.path import ObjectStoragePath
from deltalake import write_deltalake
from deltalake.table import DeltaTable

path = ObjectStoragePath("s3://delta-lake/test_table", conn_id="minio")


@task
def a_task() -> DeltaTable:
  dt = DeltaTable(str(path), storage_options=delta_storage_options)
  write_deltalake(dt, frame, storage_options=delta_storage_options, mode="overwrite")

  return dt

@task
def b_task(dt: DeltaTable) -> pl.DataFrame:
  df = pl.scan_delta(dt.table_uri, storage_options=dt._storage_options, version=dt.version)
  return df.collect()

cc: @hussein-awala


^ Add meaningful description above
Read the Pull Request Guidelines for more information.
In case of fundamental code changes, an Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in a newsfragment file, named {pr_number}.significant.rst or {issue_number}.significant.rst, in newsfragments.

airflow/io/store/path.py Outdated Show resolved Hide resolved


def serialize(o: object) -> tuple[U, str, int, bool]:
from deltalake.table import DeltaTable
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How do we handle versioning of the underlying third-party libraries like deltalake here and iceberg somewhere else?

We should add or update some docs too on what serializers are available natively

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You mean if the API changes? We mostly assume the API remains relatively stable for this kind of work (as we do with pandas and others) and we do not tie users to a particular version at the moment for Iceberg and DeltaLake as it is not core to Airflow neither in one of the providers. If we do move some serializers to providers , which is probably the right thing to do, then it makes more sense improve this.

Docs yes, but I think having that with the rework into providers is more logical.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we do move some serializers to providers , which is probably the right thing to do, then it makes more sense improve this.

Yeah we will need to handle that sooner. delta for example, is still on 0.x (https://pypi.org/project/deltalake/#history) so in theory can be broken if they follow SemVer

Docs yes, but I think having that with the rework into providers is more logical.
Are you planning to do that soon before we release the next minor Airflow version?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree, on the other hand we rely only very lightly on the API, just enough to be able to re-instantiate it, which is typically getting the right data to pass to __init__

Not sure if I get it in for a next minor release. Provider re-work isn't much fun :-). And this serialization, while it works, is not the final work. I was thinking about doing something like airflow.catalog, but I am not sure about that yet. In addition I am thinking a about Arrow, maybe that should be our default format for this. Again, not sure and I need to play with it a bit more. Hence, for now, the lightweight part.

Copy link
Member

@hussein-awala hussein-awala left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

@bolkedebruin bolkedebruin merged commit 7d0afbd into apache:main Nov 5, 2023
70 checks passed
@bolkedebruin bolkedebruin deleted the delta branch November 5, 2023 22:36
romsharon98 pushed a commit to romsharon98/airflow that referenced this pull request Nov 10, 2023
@ephraimbuddy ephraimbuddy added the type:new-feature Changelog: New Features label Nov 20, 2023
@ephraimbuddy ephraimbuddy added AIP-58 changelog:skip Changes that should be skipped from the changelog (CI, tests, etc..) and removed type:new-feature Changelog: New Features labels Nov 27, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
AIP-58 area:serialization changelog:skip Changes that should be skipped from the changelog (CI, tests, etc..)
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

4 participants