In [None]:
# Libraries
import duckdb
import pyarrow as pa
import pyarrow.compute as pc
import os
import tempfile
from pyiceberg.catalog.sql import SqlCatalog

# Create temporary folders for the warehouse and catalog
warehouse_path = tempfile.mkdtemp(prefix='iceberg_warehouse_')
catalog_path = os.path.join(warehouse_path, 'catalog.db')
print('Temporary warehouse:', warehouse_path)
print('Temporary catalog:', catalog_path)

# Create a temporary SQL catalog using SQLite
catalog = SqlCatalog(
    name='tmp_sql_catalog',
    uri=f'sqlite:///{catalog_path}',
    warehouse=f'file://{warehouse_path}',
    properties={}
)
# Create the default namespace
catalog.create_namespace('default')

## First snapshot
We create the initial dataset and save it to an Iceberg table to create the first snapshot.

In [None]:
# Initial dataset
data1 = {
    'vendor_id':[1,2,1,2,1],
    'trip_distance':[1.5,2.3,0.8,5.2,3.1],
    'fare_amount':[10.0,15.5,6.0,22.0,18.0],
    'tip_amount':[2.0,3.0,1.0,4.5,3.5],
    'passenger_count':[1,2,1,3,2]
}
df1 = pa.table(data1)

# Create the Iceberg table and append initial data (first snapshot)
table = catalog.create_table('default.sample_trips', schema=df1.schema)
table.append(df1)
print('First snapshot rows:', len(table.scan().to_arrow()))

## Second snapshot
We add new data to the same table, creating a second snapshot.

In [None]:
# New dataset for the second snapshot
data2 = {
    'vendor_id':[3,1],
    'trip_distance':[2.0,1.0],
    'fare_amount':[12.0,8.0],
    'tip_amount':[1.5,2.0],
    'passenger_count':[1,1]
}
df2 = pa.table(data2)

# Append new data to the table (second snapshot)
table.append(df2)
print('Second snapshot total rows:', len(table.scan().to_arrow()))

## Compare snapshots using DuckDB
We load both snapshots into DuckDB as temporary tables to find added and removed rows.

In [None]:
# Get snapshot IDs
snapshots = table.snapshots()
first_id = snapshots[0].snapshot_id
second_id = snapshots[-1].snapshot_id
print('Snapshot IDs:', first_id, second_id)

# Load snapshots into PyArrow tables
arrow_first = table.scan(snapshot_id=first_id).to_arrow()
arrow_second = table.scan(snapshot_id=second_id).to_arrow()

# Connect to DuckDB and register tables
con = duckdb.connect()
con.register('first_snap', arrow_first)
con.register('second_snap', arrow_second)

# Find added rows in the second snapshot
added_rows = con.execute('''
SELECT * FROM second_snap
EXCEPT
SELECT * FROM first_snap
''').fetchall()

# Find removed rows compared to the first snapshot
removed_rows = con.execute('''
SELECT * FROM first_snap
EXCEPT
SELECT * FROM second_snap
''').fetchall()

print('=== ADDED ROWS ===')
for r in added_rows:
    print(r)

print('\n=== REMOVED ROWS ===')
for r in removed_rows:
    print(r)

## Filters and aggregations on the second snapshot
We add a computed column and perform filtering and aggregation using DuckDB.

In [None]:
# Add computed column 'tip_per_mile'
arrow_second = arrow_second.append_column('tip_per_mile', pc.divide(arrow_second['tip_amount'], arrow_second['trip_distance']))
con.register('second_snap', arrow_second)

# Filter rows with tip_per_mile > 1.0
filtered_df = con.execute('SELECT * FROM second_snap WHERE tip_per_mile > 1.0').fetchdf()
print('Filtered rows (tip_per_mile > 1.0):')
print(filtered_df)

# Aggregate total fare by vendor
agg_df = con.execute('SELECT vendor_id, SUM(fare_amount) AS total_fare FROM second_snap GROUP BY vendor_id').fetchdf()
print('Total fare per vendor:')
print(agg_df)