# PyIceberg Demo: Clickstream Analytics

This notebook demonstrates Iceberg internals (Metadata, Snapshots, Manifests) using a local filesystem catalog.

In [24]:
import os
import shutil
from datetime import datetime
import pandas as pd
import pyarrow as pa
from pyiceberg.catalog import load_catalog
from pyiceberg.schema import Schema
from pyiceberg.types import TimestampType, LongType, StringType, NestedField
from pyiceberg.partitioning import PartitionSpec, PartitionField
from pyiceberg.transforms import DayTransform

# Clean up previous run
if os.path.exists("tmp/warehouse"):
    shutil.rmtree("tmp/warehouse")
if os.path.exists("tmp/catalog.db"):
    os.remove("tmp/catalog.db")

os.makedirs("tmp/warehouse", exist_ok=True)

# Initialize Catalog
catalog = load_catalog("local", **{
    "type": "sql",
    "uri": "sqlite:///tmp/catalog.db",
    "warehouse": "tmp/warehouse",
})

print("Catalog initialized")

Catalog initialized


## 1. Create Table with Partitioning

We define a schema for clickstream events and partition by **Day** of `event_time`.

In [25]:
schema = Schema(
    NestedField(1, "event_time", TimestampType(), required=False),
    NestedField(2, "user_id", LongType(), required=False),
    NestedField(3, "event_name", StringType(), required=False),
    NestedField(4, "event_properties", StringType(), required=False),
)

partition_spec = PartitionSpec(
    PartitionField(source_id=1, field_id=1000, transform=DayTransform(), name="event_time_day")
)

table_name = "default.events"
try:
    catalog.drop_table(table_name)
except:
    pass

try:
    catalog.drop_namespace("default")
except:
    pass

catalog.create_namespace("default")

table = catalog.create_table(
    table_name,
    schema=schema,
    partition_spec=partition_spec,
)

print(f"Table {table_name} created")

Table default.events created


In [26]:
# Inspect metadata
print("\nTable structure:")
!tree tmp/warehouse/default/events/


Table structure:
[01;34mtmp/warehouse/default/events/[0m
└── [01;34mmetadata[0m
    └── 00000-18a99734-edf1-4161-b86e-4b532224fba0.metadata.json

2 directories, 1 file


## 2. Day 1: First Ingestion

We generate data for `2025-12-01` and append it.

In [27]:
df_day1 = pd.DataFrame({
    'event_time': [datetime(2025, 12, 1, 10, 0, 0), datetime(2025, 12, 1, 11, 30, 0)],
    'user_id': [1, 2],
    'event_name': ['login', 'view_item'],
    'event_properties': ['{"device": "mobile"}', '{"item_id": 123}']
})
df_day1['event_time'] = df_day1['event_time'].astype('datetime64[us]')

table.append(pa.Table.from_pandas(df_day1))
print("Day 1 data appended")

Day 1 data appended


In [28]:
# Inspect data and metadata
print("\nTable structure:")
!tree tmp/warehouse/default/events/


Table structure:
[01;34mtmp/warehouse/default/events/[0m
├── [01;34mdata[0m
│   └── [01;34mevent_time_day=2025-12-01[0m
│       └── 00000-0-cad83280-0b8f-4a8d-90d7-a7c39ca60430.parquet
└── [01;34mmetadata[0m
    ├── 00000-18a99734-edf1-4161-b86e-4b532224fba0.metadata.json
    ├── 00001-bf263d9e-8932-4bbc-98bc-f853e8483c9d.metadata.json
    ├── cad83280-0b8f-4a8d-90d7-a7c39ca60430-m0.avro
    └── snap-7847261690880971748-0-cad83280-0b8f-4a8d-90d7-a7c39ca60430.avro

4 directories, 5 files


## 3. Day 2: New Partition

We generate data for `2025-12-02`. This should create a new partition folder.

In [29]:
df_day2 = pd.DataFrame({
    'event_time': [datetime(2025, 12, 2, 9, 15, 0), datetime(2025, 12, 2, 14, 20, 0)],
    'user_id': [1, 3],
    'event_name': ['login', 'checkout'],
    'event_properties': ['{"device": "web"}', '{"amount": 99.99}']
})
df_day2['event_time'] = df_day2['event_time'].astype('datetime64[us]')

table.append(pa.Table.from_pandas(df_day2))
print("Day 2 data appended")

Day 2 data appended


In [30]:
# Verify new partition
print("\nTable structure:")
!tree tmp/warehouse/default/events/


Table structure:
[01;34mtmp/warehouse/default/events/[0m
├── [01;34mdata[0m
│   ├── [01;34mevent_time_day=2025-12-01[0m
│   │   └── 00000-0-cad83280-0b8f-4a8d-90d7-a7c39ca60430.parquet
│   └── [01;34mevent_time_day=2025-12-02[0m
│       └── 00000-0-2cf274a9-e710-4398-a2b8-882ca00b166b.parquet
└── [01;34mmetadata[0m
    ├── 00000-18a99734-edf1-4161-b86e-4b532224fba0.metadata.json
    ├── 00001-bf263d9e-8932-4bbc-98bc-f853e8483c9d.metadata.json
    ├── 00002-7548f7af-8ee7-48eb-94fc-c30051c7129e.metadata.json
    ├── 2cf274a9-e710-4398-a2b8-882ca00b166b-m0.avro
    ├── cad83280-0b8f-4a8d-90d7-a7c39ca60430-m0.avro
    ├── snap-1977146308163284239-0-2cf274a9-e710-4398-a2b8-882ca00b166b.avro
    └── snap-7847261690880971748-0-cad83280-0b8f-4a8d-90d7-a7c39ca60430.avro

5 directories, 9 files


## 4. Time Travel

We can query the table at different points in time using snapshot IDs.

In [31]:
# Current state
print("Current data:")
print(table.scan().to_pandas())

# History
print("\nSnapshot History:")
history = table.history()
for snapshot in history:
    print(f"ID: {snapshot.snapshot_id}, Timestamp: {datetime.fromtimestamp(snapshot.timestamp_ms/1000)}")

# Time Travel to first snapshot
first_snapshot_id = history[0].snapshot_id
print(f"\nData at first snapshot ({first_snapshot_id}):")
print(table.scan(snapshot_id=first_snapshot_id).to_pandas())

Current data:
           event_time  user_id event_name      event_properties
0 2025-12-02 09:15:00        1      login     {"device": "web"}
1 2025-12-02 14:20:00        3   checkout     {"amount": 99.99}
2 2025-12-01 10:00:00        1      login  {"device": "mobile"}
3 2025-12-01 11:30:00        2  view_item      {"item_id": 123}

Snapshot History:
ID: 7847261690880971748, Timestamp: 2025-12-03 12:34:26.684000
ID: 1977146308163284239, Timestamp: 2025-12-03 12:34:31.150000

Data at first snapshot (7847261690880971748):
           event_time  user_id event_name      event_properties
0 2025-12-01 10:00:00        1      login  {"device": "mobile"}
1 2025-12-01 11:30:00        2  view_item      {"item_id": 123}


## 5. Partition Pruning

Querying with a filter on the partition column allows Iceberg to skip unrelated files.

In [32]:
# Query filtering for Day 2
scan = table.scan(row_filter="event_time >= '2025-12-02T00:00:00'")

print("Files selected by the scan:")
for task in scan.plan_files():
    print(task.file.file_path)

print("\nResult:")
print(scan.to_pandas())

Files selected by the scan:
tmp/warehouse/default/events/data/event_time_day=2025-12-02/00000-0-2cf274a9-e710-4398-a2b8-882ca00b166b.parquet

Result:
           event_time  user_id event_name   event_properties
0 2025-12-02 09:15:00        1      login  {"device": "web"}
1 2025-12-02 14:20:00        3   checkout  {"amount": 99.99}
