# Kedro Delta Lake demo

![Kedro](./static/kedro-horizontal-color-on-light.png)

Example inspired by https://web.archive.org/web/20230202153818/https://www.databricks.com/notebooks/delta-lake-cdf.html (later adapted at https://gist.github.com/astrojuanlu/41add9bb28f11a220496f9ead1943deb)

## Why Delta Lake?

https://delta.io/

- ACID (atomicity, consistency, isolation, durability)
- Time travel (versioning)
- Audit log (change data feed)
- Schema evolution/enforcement
- Merge/UPSERT (update + insert)
- And more!

![Delta Lake structure](./static/delta-lake-structure.png)

## First steps

In [None]:
import polars as pl

In [None]:
eps_march = pl.read_csv("data/eps_bronze_03-march.csv")
eps_march.head()

You can write in Delta format locally:

In [None]:
eps_march.write_delta("data/_delta_test/eps_march")

In [None]:
!tree data/_delta_test/eps_march/

But going forward let's use an object storage system instead:

In [None]:
minio_credentials = {
    "AWS_ENDPOINT_URL": "http://127.0.0.1:9010",
    "AWS_ACCESS_KEY_ID": "minioadmin",
    "AWS_SECRET_ACCESS_KEY": "minioadmin",
}
minio_storage_options = (
    {
        "AWS_ALLOW_HTTP": "true",
        "AWS_S3_ALLOW_UNSAFE_RENAME": "true",  # Required for Delta writing
        "AWS_REGION": "us-east-1",  # Boilerplate, see https://github.com/delta-io/delta-rs/issues/2377
    }
    | minio_credentials
)

In [None]:
eps_march.write_delta(
    "s3://data/bronze_eps",
    storage_options=minio_storage_options,
    delta_write_options={
        "configuration": {
            # We will use this later
            "delta.enableChangeDataFeed": "true"
        }
    },
)

You can see the result in http://localhost:9011/browser/data/bronze_eps%2F.

## Time travel (versioning)

Now let's try to overwrite the data with a new version:

In [None]:
eps_april = pl.read_csv("data/eps_bronze_04-april.csv")
eps_april

If you try to write it to the same table, you will get an error:

In [None]:
eps_april.write_delta(
    "s3://data/bronze_eps",
    storage_options=minio_storage_options,
)

You need to allow overwriting the data with `mode="overwrite"`:

In [None]:
eps_april.write_delta(
    "s3://data/bronze_eps",
    mode="overwrite",  # <---- This!
    storage_options=minio_storage_options,
)

Here is the interesting thing: you can now see the history of the table!

In [None]:
from deltalake import DeltaTable

dt = DeltaTable("s3://data/bronze_eps", storage_options=minio_storage_options)
dt

In [None]:
dt.history()

## Extra: Audit logs (change data feed)

In [None]:
df_changes = pl.from_arrow(dt.load_cdf().read_all())
df_changes