# Delta Lake, Apache Iceberg & DuckLake

## Format-Überblick (kurz)
**Delta Lake**
- Offenes Format auf Parquet-Basis mit Transaktionslog (`_delta_log`).
- Stärken: Spark-Ökosystem, Time‑Travel, MERGE/UPSERT, breites Tooling.
- Schwächen: Spark-Abhängigkeit für viele Features; Log-Vacuum beachten.

**Apache Iceberg**
- Tabellenformat mit **Snapshots**, versteckter Partitionierung und flexibler Schema‑Evolution.
- Stärken: Engine‑neutral (Spark, Flink, Trino, DuckDB), performantes Metadata‑Layout.
- Schwächen: Katalogverwaltung (REST/Nessie/Glue/etc.) & Setup‑Varianten können komplex sein.

**DuckLake**
- Neues offenes Lakehouse‑Format (Metadaten in relationaler DB, z. B. SQLite; Daten als Parquet).
- Stärken: einfache lokale Kataloge, schnelle Demos, Snapshot‑APIs; Engine‑agnostisch via DuckDB.
- Schwächen: junges Ökosystem; weniger Integrationen als Iceberg/Delta (Stand: 2025).

## Gewählte Datenquelle
- NYC Green Taxi (01/2019, Parquet) → `https://d37ci6vzurychx.cloudfront.net/trip-data/green_tripdata_2019-01.parquet`
- Für die Demos wird eine **Teilmenge** benutzt (Limit ~5k Zeilen), um Rechenzeit in Colab kurz zu halten.

In [1]:
DATA_URL = 'https://d37ci6vzurychx.cloudfront.net/trip-data/green_tripdata_2019-01.parquet'
print('Datenquelle:', DATA_URL)

Datenquelle: https://d37ci6vzurychx.cloudfront.net/trip-data/green_tripdata_2019-01.parquet


## Delta Lake


In [2]:
!apt-get install -qq openjdk-11-jdk-headless > /dev/null
!pip install -q pyspark==3.5.1 delta-spark==3.2.0

In [3]:
from pyspark.sql import SparkSession
from delta import configure_spark_with_delta_pip

builder = (
    SparkSession.builder
    .appName("DeltaSchemaEvolutionDemo")
    .config(
        "spark.sql.extensions",
        "io.delta.sql.DeltaSparkSessionExtension"
    )
    .config(
        "spark.sql.catalog.spark_catalog",
        "org.apache.spark.sql.delta.catalog.DeltaCatalog"
    )
)

spark = configure_spark_with_delta_pip(builder).getOrCreate()

print("Spark:", spark.version)
print("Extensions:", spark.conf.get("spark.sql.extensions"))

Spark: 3.5.1
Extensions: io.delta.sql.DeltaSparkSessionExtension


In [4]:
base_path = "/content/delta/events_demo"

events_v1 = spark.createDataFrame(
    [
        (1, "signup"),
        (2, "purchase"),
        (3, "signup"),
    ],
    ["user_id", "event_type"]
)

(
    events_v1.write
        .format("delta")
        .mode("overwrite")
        .save(base_path)
)

df = spark.read.format("delta").load(base_path)
df.printSchema()
df.show()

root
 |-- user_id: long (nullable = true)
 |-- event_type: string (nullable = true)

+-------+----------+
|user_id|event_type|
+-------+----------+
|      2|  purchase|
|      3|    signup|
|      1|    signup|
+-------+----------+



In [5]:
# Schema ändern
events_v2 = spark.createDataFrame(
    [
        (4, "signup",   "mobile"),
        (5, "purchase", "web"),
    ],
    ["user_id", "event_type", "device_type"]
)

(
    events_v2.write
        .format("delta")
        .mode("append")
        .option("mergeSchema", "true")  # Schema-Evolution
        .save(base_path)
)

df2 = spark.read.format("delta").load(base_path)
df2.printSchema()
df2.orderBy("user_id").show()

root
 |-- user_id: long (nullable = true)
 |-- event_type: string (nullable = true)
 |-- device_type: string (nullable = true)

+-------+----------+-----------+
|user_id|event_type|device_type|
+-------+----------+-----------+
|      1|    signup|       NULL|
|      2|  purchase|       NULL|
|      3|    signup|       NULL|
|      4|    signup|     mobile|
|      5|  purchase|        web|
+-------+----------+-----------+



In [6]:
# Zugriff mittels SQL
base_path = "/content/delta/events_demo"  # wie beim Schreiben

spark.sql(f"""
  CREATE TABLE IF NOT EXISTS events_demo
  USING DELTA
  LOCATION '{base_path}'
""")

spark.sql("SELECT * FROM events_demo ORDER BY user_id").show()

+-------+----------+-----------+
|user_id|event_type|device_type|
+-------+----------+-----------+
|      1|    signup|       NULL|
|      2|  purchase|       NULL|
|      3|    signup|       NULL|
|      4|    signup|     mobile|
|      5|  purchase|        web|
+-------+----------+-----------+



In [7]:
# Alternativ DDL erweitern
spark.sql("""
  ALTER TABLE events_demo
  ADD COLUMNS (event_date DATE)
""")

spark.sql("DESCRIBE TABLE events_demo").show(truncate=False)
spark.sql("SELECT * FROM events_demo ORDER BY user_id").show()

+-----------+---------+-------+
|col_name   |data_type|comment|
+-----------+---------+-------+
|user_id    |bigint   |NULL   |
|event_type |string   |NULL   |
|device_type|string   |NULL   |
|event_date |date     |NULL   |
+-----------+---------+-------+

+-------+----------+-----------+----------+
|user_id|event_type|device_type|event_date|
+-------+----------+-----------+----------+
|      1|    signup|       NULL|      NULL|
|      2|  purchase|       NULL|      NULL|
|      3|    signup|       NULL|      NULL|
|      4|    signup|     mobile|      NULL|
|      5|  purchase|        web|      NULL|
+-------+----------+-----------+----------+



In [None]:
# Delta ohne Spark: mit delta-rs (Write/History) + DuckDB (Read/Analytics)
# -> nutzt dieselbe Datenquelle wie Iceberg/DuckLake
%pip -q install deltalake duckdb==1.4.2 pyarrow pandas

[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m20.5/20.5 MB[0m [31m40.8 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m56.6/56.6 MB[0m [31m8.0 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m2.8/2.8 MB[0m [31m92.8 MB/s[0m eta [36m0:00:00[0m
[?25h

In [None]:
import os, pandas as pd, duckdb
from deltalake import DeltaTable, write_deltalake

DATA_URL = globals().get(
    "DATA_URL",
    "https://d37ci6vzurychx.cloudfront.net/trip-data/green_tripdata_2019-01.parquet"
)

In [None]:
# Quelle laden (pandas/pyarrow), auf ~5k Zeilen begrenzen
pdf = pd.read_parquet(DATA_URL, engine="pyarrow", columns=[
    "lpep_pickup_datetime","lpep_dropoff_datetime",
    "passenger_count","trip_distance","total_amount"
]).head(5000)

delta_path = "/content/delta_trips"  # lokaler Delta-Tabellenpfad
os.makedirs(delta_path, exist_ok=True)

In [None]:
print(pdf)

     lpep_pickup_datetime lpep_dropoff_datetime  passenger_count  \
0     2018-12-21 15:17:29   2018-12-21 15:18:57              5.0   
1     2019-01-01 00:10:16   2019-01-01 00:16:32              2.0   
2     2019-01-01 00:27:11   2019-01-01 00:31:38              2.0   
3     2019-01-01 00:46:20   2019-01-01 01:04:54              2.0   
4     2019-01-01 00:19:06   2019-01-01 00:39:43              1.0   
...                   ...                   ...              ...   
4995  2019-01-01 04:40:03   2019-01-01 04:43:51              2.0   
4996  2019-01-01 04:00:25   2019-01-01 04:12:11              1.0   
4997  2019-01-01 04:26:41   2019-01-01 04:45:38              1.0   
4998  2019-01-01 04:36:17   2019-01-01 04:41:04              1.0   
4999  2019-01-01 04:08:05   2019-01-01 04:23:24              1.0   

      trip_distance  total_amount  
0              0.00          4.30  
1              0.86          7.30  
2              0.66          5.80  
3              2.68         19.71  
4  

In [None]:
# Als Delta schreiben
# Erste Version schreiben (delta-rs)
write_deltalake(delta_path, pdf, mode="overwrite")  # v0
dt = DeltaTable(delta_path)
print("v0 -> rows:", len(dt.to_pandas()), "| version:", dt.version())

v0 -> rows: 5000 | version: 0


In [None]:
# Änderung erzeugen (total_amount bei kurze Fahrten erhöhen) und neue Version schreiben
pdf2 = pdf.copy()
mask = pdf2["trip_distance"] < 0.5

# wählt alle Zeilen, mit mask == True, und nur die Spalte total_amount wird erhöht
pdf2.loc[mask, "total_amount"] = pdf2.loc[mask, "total_amount"] + 0.20

write_deltalake(delta_path, pdf2, mode="overwrite")  # v1
dt = DeltaTable(delta_path)
print("v1 -> rows:", len(dt.to_pandas()), "| version:", dt.version())

v1 -> rows: 5000 | version: 2


In [None]:
# History/Time-Travel (delta-rs)
hist = dt.history()  # Liste mit Commit-Metadaten
print("History entries:", len(hist))
print("Latest op:", hist[0].get("operation"), "| version:", dt.version())

History entries: 3
Latest op: WRITE | version: 2


In [None]:
# Delta mit DuckDB lesen
con = duckdb.connect()
con.execute("INSTALL delta; LOAD delta;")  # autoload funktioniert idR auch ohne diese Zeile
res = con.execute(f"SELECT count(*) AS n FROM delta_scan('file://{delta_path}')").fetchdf()
print("DuckDB delta_scan() count:", int(res['n'][0]))

DuckDB delta_scan() count: 5000


In [None]:
# Optional: ein Vergleich alter/neuer Werte direkt in pandas (delta-rs)
v0 = DeltaTable(delta_path, version=0).to_pandas()
v1 = DeltaTable(delta_path).to_pandas()
before = v0.loc[v0["trip_distance"] < 0.5, ["trip_distance","total_amount"]].head(3)
after  = v1.loc[v1["trip_distance"] < 0.5, ["trip_distance","total_amount"]].head(3)
print("Before (v0):\n", before)
print("After  (v1):\n",  after)

Before (v0):
     trip_distance  total_amount
0            0.00          4.30
12           0.49         16.80
18           0.43          6.36
After  (v1):
     trip_distance  total_amount
0            0.00          4.50
12           0.49         17.00
18           0.43          6.56


In [None]:
# Schema Evolution
import numpy as np
import pandas as pd
import duckdb
from deltalake import DeltaTable, write_deltalake

# delta_path sollte aus der vorherigen Delta+DuckDB-Zelle kommen
delta_path = globals().get("delta_path", "/content/delta_trips")

# Aktuelle Tabelle lesen (pandas)
dt = DeltaTable(delta_path)
pdf = dt.to_pandas()

# Schema via DuckDB inspizieren
con = duckdb.connect()
preview = con.execute(f"SELECT * FROM delta_scan('file://{delta_path}') LIMIT 1").fetchdf()
print("Spalten vor Schema-Evolution:", list(preview.columns))
print(preview)

# Neue Spalte berechnen (Division durch 0 abfangen)
new_col = "amount_per_km"
if new_col not in pdf.columns:
    pdf[new_col] = np.where(
        (pdf["trip_distance"].astype(float) > 0),
        pdf["total_amount"].astype(float) / pdf["trip_distance"].astype(float),
        np.nan
    )

# Schema-Evolution durchführen:
#    WICHTIG: statt overwrite -> append und schema_mode="merge"
write_deltalake(
    delta_path,
    pdf,
    mode="append",          # Daten anhängen
    schema_mode="merge"     # Schema mit neuer Spalte zusammenführen (Delta-RS)
)

# Version & Schema prüfen
dt2 = DeltaTable(delta_path)
print("Neue Version:", dt2.version(), "| Zeilen (aktueller Snapshot):", len(dt2.to_pandas()))

# Schema via DuckDB inspizieren
con = duckdb.connect()
preview = con.execute(f"SELECT * FROM delta_scan('file://{delta_path}') where amount_per_km is not null LIMIT 1").fetchdf()
print("Spalten nach Schema-Evolution:", list(preview.columns))
print(preview)

Spalten vor Schema-Evolution: ['lpep_pickup_datetime', 'lpep_dropoff_datetime', 'passenger_count', 'trip_distance', 'total_amount', 'amount_per_km']
  lpep_pickup_datetime lpep_dropoff_datetime  passenger_count  trip_distance  \
0  2018-12-21 15:17:29   2018-12-21 15:18:57              5.0            0.0   

   total_amount  amount_per_km  
0           4.5            NaN  
Neue Version: 5 | Zeilen (aktueller Snapshot): 40000
Spalten nach Schema-Evolution: ['lpep_pickup_datetime', 'lpep_dropoff_datetime', 'passenger_count', 'trip_distance', 'total_amount', 'amount_per_km']
  lpep_pickup_datetime lpep_dropoff_datetime  passenger_count  trip_distance  \
0  2019-01-01 00:10:16   2019-01-01 00:16:32              2.0           0.86   

   total_amount  amount_per_km  
0           7.3       8.488372  


# Apache Iceberg Copy on write vs Merge on read

In [None]:
#!apt-get install -qq openjdk-11-jdk-headless > /dev/null
#!pip install -q pyspark==3.5.1

# Sitzung neu starten!

In [1]:
DATA_URL = 'https://d37ci6vzurychx.cloudfront.net/trip-data/green_tripdata_2019-01.parquet'
print('Datenquelle:', DATA_URL)

Datenquelle: https://d37ci6vzurychx.cloudfront.net/trip-data/green_tripdata_2019-01.parquet


In [18]:
from pyspark.sql import SparkSession
spark = SparkSession.builder \
  .appName("IcebergLocalDevelopment") \
  .config('spark.jars.packages', 'org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.5.2') \
  .config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") \
  .config("spark.sql.catalog.local", "org.apache.iceberg.spark.SparkCatalog") \
  .config("spark.sql.catalog.local.type", "hadoop") \
  .config("spark.sql.catalog.local.warehouse", "spark-warehouse/iceberg") \
  .getOrCreate()
spark.sql("SHOW DATABASES").show()

+---------+
|namespace|
+---------+
|  default|
+---------+



In [3]:
import pandas as pd

pdf = pd.read_parquet(
    DATA_URL,
    engine="pyarrow",
    columns=[
        "lpep_pickup_datetime",
        "lpep_dropoff_datetime",
        "passenger_count",
        "trip_distance",
        "total_amount",
    ],
).head(5000)

# pandas → Spark-DataFrame
trips_df = spark.createDataFrame(pdf)

In [4]:
spark.sql("CREATE NAMESPACE IF NOT EXISTS local.schema")

DataFrame[]

In [5]:
(
    trips_df.writeTo("local.schema.trips_cow")
          .tableProperty("format-version", "2")
          .tableProperty("write.update.mode", "copy-on-write")
          .createOrReplace()
)

In [6]:
(
    trips_df.writeTo("local.schema.trips_mor")
          .tableProperty("format-version", "2")
          .tableProperty("write.update.mode", "merge-on-read")
          .createOrReplace()
)

In [7]:
spark.sql("select * from local.schema.trips_mor").show()

+--------------------+---------------------+---------------+-------------+------------+
|lpep_pickup_datetime|lpep_dropoff_datetime|passenger_count|trip_distance|total_amount|
+--------------------+---------------------+---------------+-------------+------------+
| 2018-12-21 15:17:29|  2018-12-21 15:18:57|            5.0|          0.0|         4.3|
| 2019-01-01 00:10:16|  2019-01-01 00:16:32|            2.0|         0.86|         7.3|
| 2019-01-01 00:27:11|  2019-01-01 00:31:38|            2.0|         0.66|         5.8|
| 2019-01-01 00:46:20|  2019-01-01 01:04:54|            2.0|         2.68|       19.71|
| 2019-01-01 00:19:06|  2019-01-01 00:39:43|            1.0|         4.53|        19.3|
| 2019-01-01 00:12:35|  2019-01-01 00:19:09|            1.0|         1.05|         7.8|
| 2019-01-01 00:47:55|  2019-01-01 01:00:01|            1.0|         3.77|        14.8|
| 2019-01-01 00:12:47|  2019-01-01 00:30:50|            1.0|          4.1|        17.3|
| 2019-01-01 00:16:23|  2019-01-

Bei COW-Table wird bei einem UPDATE die betroffene Datei komplett neu geschrieben.
Bei MOR-Table wird nichts „überschrieben“, sondern es werden neue Daten + Delete-Datei angelegt und die alten Dateien bleiben erhalten.

In [10]:
#spark.sql("SELECT * FROM local.schema.trips_cow").show()

# Welche Dateien gehören zu welchen Snapshots?
spark.sql("""
  SELECT
    content,         -- 0 = Data, 1 = Position-Delete, 2 = Equality-Delete
    file_format,
    file_path,
    record_count
  FROM local.schema.trips_cow.files
  ORDER BY content, file_path
""").show(truncate=False)


+-------+-----------+----------------------------------------------------------------------------------------------------------+------------+
|content|file_format|file_path                                                                                                 |record_count|
+-------+-----------+----------------------------------------------------------------------------------------------------------+------------+
|0      |PARQUET    |spark-warehouse/iceberg/schema/trips_cow/data/00000-6-331fcf20-6257-4349-aaa2-717ca7bc9ea8-0-00001.parquet|5000        |
+-------+-----------+----------------------------------------------------------------------------------------------------------+------------+



In [11]:
#spark.sql("SELECT * FROM local.schema.trips_mor").show()

spark.sql("""
  SELECT
    content,         -- 0 = Data, 1 = Position-Delete, 2 = Equality-Delete
    file_format,
    file_path,
    record_count
  FROM local.schema.trips_mor.files
  ORDER BY content, file_path
""").show(truncate=False)

+-------+-----------+----------------------------------------------------------------------------------------------------------------+------------+
|content|file_format|file_path                                                                                                       |record_count|
+-------+-----------+----------------------------------------------------------------------------------------------------------------+------------+
|0      |PARQUET    |spark-warehouse/iceberg/schema/trips_mor/data/00000-2-a482cd66-84f1-4518-a1a2-ed513134e92e-0-00001.parquet      |2048        |
|0      |PARQUET    |spark-warehouse/iceberg/schema/trips_mor/data/00000-8-68c66947-6d3f-48ab-8cc4-ef11f1bd4d45-00001.parquet        |5000        |
|0      |PARQUET    |spark-warehouse/iceberg/schema/trips_mor/data/00001-3-a482cd66-84f1-4518-a1a2-ed513134e92e-0-00001.parquet      |2952        |
|1      |PARQUET    |spark-warehouse/iceberg/schema/trips_mor/data/00000-8-68c66947-6d3f-48ab-8cc4-ef11f1bd4d45-

Die beiden Dateien mit
2048 und 2952 Zeilen (zusammen 2048 + 2952 = 5000) sind die ursprünglichen Datenfiles vor dem UPDATE.

Das File mit record_count = 5000 und content = 0 ist die neue Daten-Datei mit den geänderten passenger_count-Werten.

Das deletes.parquet mit content = 1 und record_count = 5000 ist eine Position-Delete-Datei:

sie enthält 5000 Einträge, die auf alle ursprünglichen 5000 Zeilen in den zwei alten Dateien verweisen

damit werden diese alten Zeilen logisch gelöscht.

In [20]:
spark.sql("UPDATE local.schema.trips_mor SET passenger_count = passenger_count + 1 WHERE total_amount > 240")

DataFrame[]

In [21]:
spark.sql("""
  SELECT
    content,         -- 0 = Data, 1 = Position-Delete, 2 = Equality-Delete
    file_format,
    file_path,
    record_count
  FROM local.schema.trips_mor.files
  ORDER BY content, file_path
""").show(truncate=False)

+-------+-----------+----------------------------------------------------------------------------------------------------------------+------------+
|content|file_format|file_path                                                                                                       |record_count|
+-------+-----------+----------------------------------------------------------------------------------------------------------------+------------+
|0      |PARQUET    |spark-warehouse/iceberg/schema/trips_mor/data/00000-2-a482cd66-84f1-4518-a1a2-ed513134e92e-0-00001.parquet      |2048        |
|0      |PARQUET    |spark-warehouse/iceberg/schema/trips_mor/data/00000-4-f16123ec-edf0-4130-acf4-08bea9e2c625-00001.parquet        |1           |
|0      |PARQUET    |spark-warehouse/iceberg/schema/trips_mor/data/00000-8-68c66947-6d3f-48ab-8cc4-ef11f1bd4d45-00001.parquet        |5000        |
|0      |PARQUET    |spark-warehouse/iceberg/schema/trips_mor/data/00001-3-a482cd66-84f1-4518-a1a2-ed513134e92e-

Update 2: nur 1 Zeile

Iceberg packt die neue Version dieser einen Zeile in ein Mini-Datenfile

und schreibt ein Delete-File mit 1 Delete-Eintrag, der die alte Version dieser Zeile löscht


Aufräumen:

* expire_snapshots → alte Snapshots + deren nicht benötigte Dateien
aufräumen
* remove_orphan_files → Dateien löschen, die in keinem Snapshot mehr vorkommen
* rewrite_data_files → viele kleine Datafiles zu größeren zusammenfassen (Compaction)
* (optional) rewrite_position_deletes / rewrite_manifests für Feintuning

## Apache Iceberg (via DuckDB Iceberg‑Extension)


In [None]:
%pip install duckdb==1.4.2 pandas "pyiceberg[pyarrow,sql]" sqlalchemy pyarrow

Collecting duckdb==1.4.2
  Downloading duckdb-1.4.2-cp312-cp312-manylinux_2_26_x86_64.manylinux_2_28_x86_64.whl.metadata (4.3 kB)
Collecting pyiceberg[pyarrow,sql]
  Downloading pyiceberg-0.10.0-cp312-cp312-manylinux2014_x86_64.manylinux_2_17_x86_64.manylinux_2_28_x86_64.whl.metadata (4.9 kB)
Collecting mmh3<6.0.0,>=4.0.0 (from pyiceberg[pyarrow,sql])
  Downloading mmh3-5.2.0-cp312-cp312-manylinux1_x86_64.manylinux_2_28_x86_64.manylinux_2_5_x86_64.whl.metadata (14 kB)
Collecting pyiceberg-core<0.7.0,>=0.5.1 (from pyiceberg[pyarrow,sql])
  Downloading pyiceberg_core-0.6.0-cp39-abi3-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (1.6 kB)
Collecting pyroaring<2.0.0,>=1.0.0 (from pyiceberg[pyarrow,sql])
  Downloading pyroaring-1.0.3-cp312-cp312-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (10 kB)
Collecting strictyaml<2.0.0,>=1.7.0 (from pyiceberg[pyarrow,sql])
  Downloading strictyaml-1.7.3-py3-none-any.whl.metadata (11 kB)
Downloading duckdb-1.4.2-cp312-cp312-manylinu

In [None]:
import duckdb
import pandas as pd

print("DuckDB: ", duckdb.__version__)
print("Pandas: ", pd.__version__)

DuckDB:  1.4.2
Pandas:  2.2.2


In [None]:
con = duckdb.connect()
con.execute("INSTALL httpfs; LOAD httpfs;")
con.execute("INSTALL iceberg; LOAD iceberg;")

<_duckdb.DuckDBPyConnection at 0x780d3e73b6f0>

In [None]:
import pyarrow as pa
from pyiceberg.catalog.sql import SqlCatalog
from pyiceberg.exceptions import NamespaceAlreadyExistsError
from pyiceberg.schema import Schema
from pyiceberg.types import NestedField, TimestampType, IntegerType, DoubleType

# Lokalen Iceberg-Katalog (SQLite) + Warehouse
warehouse = "file:///content/warehouse"                 # Daten + Metadata landen hier
catalog   = SqlCatalog(
    name="local",
    uri="sqlite:////content/iceberg_catalog.db",        # Katalog-DB-Datei
    warehouse=warehouse,
)

# Namespace + Tabellenbezeichner
ns = ("nyc",)
try:
    catalog.create_namespace(ns)
except NamespaceAlreadyExistsError:
    pass

table_name = "green_2019_01"
identifier = (*ns, table_name)    # -> ('nyc','green_2019_01')

# (Optional) vorhandene Tabelle löschen, um Schema-Konflikte zu vermeiden
if catalog.table_exists(identifier):
    catalog.drop_table(identifier)

# Iceberg-Schema (numerische Felder optional, um Nulls zu erlauben)
schema = Schema(
    NestedField(1, "lpep_pickup_datetime",  TimestampType(), required=False),
    NestedField(2, "lpep_dropoff_datetime", TimestampType(), required=False),
    NestedField(3, "passenger_count",       IntegerType(),   required=False),
    NestedField(4, "trip_distance",         DoubleType(),    required=False),
    NestedField(5, "total_amount",          DoubleType(),    required=False),
)

# Tabelle anlegen
tbl = catalog.create_table(identifier=identifier, schema=schema)

# Daten aus HTTP-Parquet lesen und passend casten
arrow_tbl = con.sql("""
    SELECT
      lpep_pickup_datetime::TIMESTAMP  AS lpep_pickup_datetime,
      lpep_dropoff_datetime::TIMESTAMP AS lpep_dropoff_datetime,
      CAST(passenger_count AS INTEGER) AS passenger_count,
      CAST(trip_distance   AS DOUBLE)  AS trip_distance,
      CAST(total_amount    AS DOUBLE)  AS total_amount
    FROM read_parquet('https://d37ci6vzurychx.cloudfront.net/trip-data/green_tripdata_2019-01.parquet')
""").arrow().read_all()

# Schreiben (erste Snapshot-Version)
tbl.overwrite(arrow_tbl)
print("OK – Iceberg-Tabelle angelegt und befüllt:", identifier, "Warehouse:", warehouse)



OK – Iceberg-Tabelle angelegt und befüllt: ('nyc', 'green_2019_01') Warehouse: file:///content/warehouse


In [None]:
# Optional: mit DuckDB prüfen (ohne Katalog, direkt via metadata.json)
# Pfad aus file:// URL extrahieren
from urllib.parse import urlparse, unquote
import os

# Pfade aus der file:// URL
latest_meta_url = tbl.metadata_location                                   # file:///.../nyc/green_2019_01/metadata/00001-....metadata.json
meta_local = unquote(urlparse(latest_meta_url).path)                      # /.../nyc/green_2019_01/metadata/00001-....metadata.json
table_root = os.path.dirname(os.path.dirname(meta_local))                 # /.../nyc/green_2019_01
print(meta_local)
print(table_root)

# Version = Dateiname ohne Endung
version_name = os.path.splitext(os.path.basename(meta_local))[0]          # "00001-....metadata" -> erst Endung ".json" ab
if version_name.endswith(".metadata"):                                     # sicherheitshalber zweite Endung entfernen
    version_name = version_name[:-len(".metadata")]

con.sql(f"""
    SELECT *
    FROM iceberg_snapshots(
        '{meta_local}',
        version = '1'
     )
""").df()


con.sql(f"""
    SELECT COUNT(*) AS n
    FROM iceberg_scan('{meta_local}')
""").df()

# DuckDB-Scan: Root + Version + Namensformat
#    Namensformat zeigt auf "metadata/<version>.metadata.json"
con.sql("SET unsafe_enable_version_guessing = true")
con.sql(f"""
    SELECT COUNT(*) AS n
    FROM iceberg_scan('{table_root}',
                      allow_moved_paths=true)
""").df()

/content/warehouse/nyc/green_2019_01/metadata/00001-f39875e4-95e7-4b1f-bbe4-1455bb006901.metadata.json
/content/warehouse/nyc/green_2019_01


Unnamed: 0,n
0,672105


In [None]:
# Time Travel
import pyarrow as pa
import pandas as pd

# Aktuellen Stand als Pandas holen (alternativ .to_arrow())
pdf = tbl.scan().to_pandas()  # PyIceberg scan -> Pandas

# Änderung: +0.20 auf sehr kurze Fahrten
pdf2 = pdf.copy()
mask = pdf2["trip_distance"].astype(float) < 0.5
pdf2.loc[mask, "total_amount"] = pdf2.loc[mask, "total_amount"].astype(float) + 0.20
pdf2["passenger_count"] = (
    pd.to_numeric(pdf2["passenger_count"], errors="coerce")  # floats -> numeric, ungültige -> NaN
      .round()                                              # falls 1.0, 2.0 etc.
      .astype("Int32")                                      # pandas nullable Int32 (maps to Arrow int32)
)

# (Optional, robust) Timestamps normalisieren
pdf2["lpep_pickup_datetime"]  = pd.to_datetime(pdf2["lpep_pickup_datetime"],  errors="coerce")
pdf2["lpep_dropoff_datetime"] = pd.to_datetime(pdf2["lpep_dropoff_datetime"], errors="coerce")

# Neue Snapshot-Version schreiben (Full-table Overwrite für Demo)
tbl.overwrite(pa.Table.from_pandas(pdf2))

# Snapshot-Infos
cur = tbl.current_snapshot()
snapshots = list(tbl.snapshots())  # ältester -> neuester
print("Rows (current):", len(pdf2),
      "| current snapshot_id:", cur.snapshot_id if cur else None,
      "| #snapshots:", len(snapshots))

# History/Time-Travel
#    v0/v1 analog zu Delta: wir nehmen die ersten beiden Snapshot-IDs
if len(snapshots) >= 2:
    snap_v0 = snapshots[-2].snapshot_id  # vorletzter = "v0" in deiner Demo
    snap_v1 = snapshots[-1].snapshot_id  # letzter   = "v1"
else:
    # Falls nur ein Snapshot existiert (Erstschreibvorgang)
    snap_v0 = snapshots[-1].snapshot_id
    snap_v1 = snapshots[-1].snapshot_id

print("History entries:", len(snapshots),
      "| latest operation:", snapshots[-1].summary.operation if snapshots else None)

# DuckDB: konsistente Zählung über iceberg_scan
con.sql("SET unsafe_enable_version_guessing = true") # Ensure this is set for root scans
res = con.execute(f"""
    SELECT COUNT(*) AS n
    FROM iceberg_scan('{table_root}', allow_moved_paths=true)
""").fetchdf()
print("DuckDB iceberg_scan() count:", int(res["n"][0]))

# Optionaler Vergleich alter/neuer Werte (Time-Travel via snapshot_id)
v0 = tbl.scan(snapshot_id=snap_v0).to_pandas()
v1 = tbl.scan(snapshot_id=snap_v1).to_pandas()
before = v0.loc[v0["trip_distance"].astype(float) < 0.5, ["trip_distance","total_amount"]].head(3)
after  = v1.loc[v1["trip_distance"].astype(float) < 0.5, ["trip_distance","total_amount"]].head(3)
print("Before (v0):\n", before)
print("After  (v1):\n",  after)


Rows (current): 672105 | current snapshot_id: 6662130445376781400 | #snapshots: 3
History entries: 3 | latest operation: Operation.APPEND
DuckDB iceberg_scan() count: 672105
Before (v0):
 Empty DataFrame
Columns: [trip_distance, total_amount]
Index: []
After  (v1):
     trip_distance  total_amount
0            0.00          4.50
12           0.49         17.00
18           0.43          6.56


In [None]:
# Schema Evolution
import numpy as np
import pandas as pd
import pyarrow as pa
from pyiceberg.types import DoubleType

# Aktuelle Daten
pdf = tbl.scan().to_pandas()

# Neue Spalte berechnen
new_col = "amount_per_km"
if new_col not in pdf.columns:
    td = pd.to_numeric(pdf["trip_distance"], errors="coerce")
    ta = pd.to_numeric(pdf["total_amount"],  errors="coerce")
    pdf[new_col] = np.where(td > 0, ta / td, np.nan)

# Schema-Evolution in Iceberg (Spalte hinzufügen)
with tbl.update_schema() as update:
    if new_col not in [f.name for f in tbl.schema().columns]:
        update.add_column(new_col, DoubleType(), doc="total_amount per km")

# Datentyp-Korrekturen für bestehende Spalten (wichtig!)
pdf["passenger_count"] = (
    pd.to_numeric(pdf["passenger_count"], errors="coerce")
      .round()
      .astype("Int32")   # pandas nullable Int32 -> Arrow int32
)
# (optional, robust – Timestamps normalisieren)
pdf["lpep_pickup_datetime"]  = pd.to_datetime(pdf["lpep_pickup_datetime"],  errors="coerce")
pdf["lpep_dropoff_datetime"] = pd.to_datetime(pdf["lpep_dropoff_datetime"], errors="coerce")

# Anhängen
tbl.append(pa.Table.from_pandas(pdf))

cur = tbl.current_snapshot()
print("Neue snapshot_id:", cur.snapshot_id if cur else None, "| Zeilen:", len(pdf))


Neue snapshot_id: 176762555907235440 | Zeilen: 672105


## DuckLake

In [1]:
%pip install jupysql duckdb==1.4.2 duckdb-engine
%load_ext sql
%sql duckdb://



In [3]:
%%sql
INSTALL ducklake;
LOAD ducklake;


Success


In [4]:
%%sql
ATTACH 'ducklake:metadata.ducklake' AS my_ducklake;
USE my_ducklake;

Success


In [5]:
%%sql
FROM ducklake_snapshots('my_ducklake');

snapshot_id,snapshot_time,schema_version,changes,author,commit_message,commit_extra_info
0,2025-11-17 18:11:10.626261+00:00,0,{'schemas_created': ['main']},,,


In [6]:
%%sql
CREATE SCHEMA IF NOT EXISTS sales;
USE sales;

Success


In [7]:
%%sql
CREATE TABLE IF NOT EXISTS customer (
    customer_id INTEGER,  # PRIMARY KEY nicht implementiert
    last_name VARCHAR(100) NOT NULL
);
COMMIT;

Success


In [8]:
%%sql
INSERT INTO customer (customer_id, last_name) VALUES
(1, 'Maier'),
(2, 'Schmitt'),
(3, 'Albrecht');
COMMIT;

Success


In [9]:
%sql SELECT * FROM customer;

customer_id,last_name
1,Maier
2,Schmitt
3,Albrecht


In [10]:
%%sql
FROM ducklake_snapshots('my_ducklake');

snapshot_id,snapshot_time,schema_version,changes,author,commit_message,commit_extra_info
0,2025-11-17 18:11:10.626261+00:00,0,{'schemas_created': ['main']},,,
1,2025-11-17 18:11:14.464743+00:00,1,{'schemas_created': ['sales']},,,
2,2025-11-17 18:11:24.485910+00:00,2,{'tables_created': ['sales.customer']},,,
3,2025-11-17 18:11:31.687585+00:00,2,{'tables_inserted_into': ['2']},,,


In [11]:
%%sql
INSERT INTO customer (customer_id, last_name) VALUES
(4, 'Berger');
COMMIT;

Success


In [12]:
%%sql
DELETE FROM customer WHERE customer_id = 2;
COMMIT;

Success


In [13]:
%%sql
FROM ducklake_snapshots('my_ducklake');

snapshot_id,snapshot_time,schema_version,changes,author,commit_message,commit_extra_info
0,2025-11-17 18:11:10.626261+00:00,0,{'schemas_created': ['main']},,,
1,2025-11-17 18:11:14.464743+00:00,1,{'schemas_created': ['sales']},,,
2,2025-11-17 18:11:24.485910+00:00,2,{'tables_created': ['sales.customer']},,,
3,2025-11-17 18:11:31.687585+00:00,2,{'tables_inserted_into': ['2']},,,
4,2025-11-17 18:11:38.734073+00:00,2,{'tables_inserted_into': ['2']},,,
5,2025-11-17 18:11:54.859229+00:00,2,{'tables_deleted_from': ['2']},,,


In [14]:
import duckdb

delete_file = duckdb.sql(f"""
    SELECT file FROM glob('/content/metadata.ducklake.files/sales/customer/*delete.parquet') LIMIT 1;
    """).fetchone()[0]

# Was enthält delete file?
duckdb.sql(f"""
    SELECT * FROM read_parquet('{delete_file}');
    """)


┌──────────────────────────────────────────────────────────────────────────────────────────────┬───────┐
│                                          file_path                                           │  pos  │
│                                           varchar                                            │ int64 │
├──────────────────────────────────────────────────────────────────────────────────────────────┼───────┤
│ metadata.ducklake.files/sales/customer/ducklake-019a9303-cf02-7c77-9cf3-afa3f1ff8ae9.parquet │     1 │
└──────────────────────────────────────────────────────────────────────────────────────────────┴───────┘

Time Travel

In [15]:
%%sql
SELECT MAX(snapshot_id) FROM ducklake_snapshots('my_ducklake');

max(snapshot_id)
5


In [16]:
%%sql
-- vor dem löschen
SELECT * FROM customer AT (VERSION => 4)

customer_id,last_name
1,Maier
2,Schmitt
3,Albrecht
4,Berger


## Zusammenfassung: Formate im Vergleich
Die folgende Tabelle fasst **Features**, **Vorteile/Nachteile** und **Empfehlungen** zusammen.

| Format | Zentrale Features | Vorteile | Nachteile | Empfehlung |
|---|---|---|---|---|
| Delta Lake | ACID‑Transaktionen, Time‑Travel, MERGE/UPSERT, Schema‑Evolution | Reifes Spark‑Ökosystem, breite Community | Häufig Spark‑zentrisch, Log‑Pflege (VACUUM) nötig | Wenn du ohnehin Spark einsetzt und schnelle Upserts/MERGEs brauchst |
| Apache Iceberg | Snapshot‑basierte Metadaten, verborgene Partitionierung, Schema‑Evolution | Engine‑agnostisch (Spark/Trino/Flink/DuckDB), skalierbare Metadaten | Katalog/Deployment kann komplex sein | Für heterogene Engines & großes Scale-out, wenn Abfrage‑Engine frei wählbar bleiben soll |
| DuckLake | Metadaten in SQL‑DB (z. B. SQLite), Daten als Parquet, Snapshots/Changes | Sehr einfacher lokaler Katalog, schnelle Demos/POCs | Jüngeres Ökosystem, weniger Integrationen | Für einfache, portable Setups, lokale Analytics & Lehr-/Demo‑Szenarien |

In [None]:
# Versionsübersicht

from importlib.metadata import version, PackageNotFoundError

def v(pkg):
    try:
        return version(pkg)
    except PackageNotFoundError:
        return None

rows = []

# DuckDB (Python-Package)
import duckdb
rows.append(("duckdb (py)", duckdb.__version__))

# DuckDB-Extensions (falls abrufbar)
try:
    con_chk = duckdb.connect()
    con_chk.execute("INSTALL pragma;")  # no-op, aber falls nicht vorhanden ignorieren
    exts = con_chk.execute("SELECT name, loaded, installed FROM duckdb_extensions() ORDER BY name").fetchall()
    rows.append(("duckdb extensions", ", ".join([f"{n}{'[*]' if l else ''}" for n,l,_ in exts]) or "none"))
except Exception:
    rows.append(("duckdb extensions", "n/a"))

# Delta-RS (Python-Package: 'deltalake')
dl_rs = v("deltalake")
rows.append(("deltalake (delta-rs)", dl_rs or "not installed"))

# Delta-Spark (Python-Package: 'delta-spark' / Modul: 'delta')
dl_spark = v("delta-spark") or v("delta_core") or v("delta")
rows.append(("delta-spark (Spark)", dl_spark or "not installed"))

# PySpark
ps = v("pyspark")
rows.append(("pyspark", ps or "not installed"))

# PyIceberg
pi = v("pyiceberg")
rows.append(("pyiceberg", pi or "not installed"))

# PyArrow & Pandas
pa = v("pyarrow")
pd = v("pandas")
rows.append(("pyarrow", pa or "not installed"))
rows.append(("pandas", pd or "not installed"))

# Ausgabe hübsch formatiert
w1 = max(len(k) for k,_ in rows) + 2
for k,val in rows:
    print(f"{k:<{w1}} {val}")


duckdb (py)            1.4.2
duckdb extensions      n/a
deltalake (delta-rs)   1.2.1
delta-spark (Spark)    not installed
pyspark                3.5.1
pyiceberg              0.10.0
pyarrow                18.1.0
pandas                 2.2.2
