# Exploring Apache Iceberg on Cloudflare R2

This notebook demonstrates querying federal regulations data through the **R2 Data Catalog** (Apache Iceberg REST catalog).

**What's different from raw Parquet?**
- Tables are managed — schema evolution, ACID transactions, time travel
- Multi-engine access — DuckDB, PyIceberg, Spark, Snowflake all see the same catalog
- Incremental updates — append/update rows without rewriting entire files

Data source: [regulations.gov](https://www.regulations.gov/) via [Mirrulations](https://github.com/MoravianUniversity/mirrulations)

In [None]:
# Install dependencies (run once)
# !pip install duckdb pandas python-dotenv pyiceberg[pyarrow]

## 1. Connect via DuckDB

DuckDB 1.4+ can attach directly to an Iceberg REST catalog. Once attached, tables are queryable with standard SQL.

In [None]:
import os
import duckdb
from dotenv import load_dotenv

load_dotenv()

# Iceberg catalog config
CATALOG_URI = "https://catalog.cloudflarestorage.com/a18589c7a7a0fc4febecadfc9c71b105/spicy-regs"
WAREHOUSE = "a18589c7a7a0fc4febecadfc9c71b105_spicy-regs"
TOKEN = os.getenv("R2_API_TOKEN")

# Initialize DuckDB with Iceberg support
conn = duckdb.connect()
conn.execute("INSTALL iceberg; LOAD iceberg;")
conn.execute("INSTALL httpfs; LOAD httpfs;")

# Authenticate
conn.execute(f"""
    CREATE SECRET r2_secret (
        TYPE ICEBERG,
        TOKEN '{TOKEN}'
    );
""")

# Attach the catalog
conn.execute(f"""
    ATTACH '{WAREHOUSE}' AS spicy_regs (
        TYPE ICEBERG,
        ENDPOINT '{CATALOG_URI}'
    );
""")

print("✓ Connected to Iceberg catalog")
print(f"DuckDB {duckdb.__version__}")

## 2. Discover the Catalog

Browse schemas (namespaces) and tables — just like a traditional database.

In [None]:
# List all tables in the catalog
conn.execute("SHOW ALL TABLES").fetchdf()

In [None]:
# Set the active schema for shorter queries
conn.execute("USE spicy_regs.regulations")
print("✓ Active schema: spicy_regs.regulations")

In [None]:
# Inspect the schema of each table
for table in ["dockets", "documents", "comments"]:
    print(f"\n── {table} ──")
    display(conn.execute(f"DESCRIBE {table}").fetchdf())

## 3. Dataset Overview

Get row counts and basic stats — same queries as the Parquet notebook, but now reading from managed Iceberg tables.

In [None]:
# Row counts for all tables
for table in ["dockets", "documents", "comments"]:
    count = conn.execute(f"SELECT COUNT(*) FROM {table}").fetchone()[0]
    agencies = conn.execute(f"SELECT COUNT(DISTINCT agency_code) FROM {table}").fetchone()[0]
    print(f"{table}: {count:,} rows, {agencies} agencies")

## 4. Query Examples

### Top Agencies by Docket Count

In [None]:
conn.execute("""
    SELECT agency_code, COUNT(*) as docket_count
    FROM dockets
    GROUP BY agency_code
    ORDER BY docket_count DESC
    LIMIT 15
""").fetchdf()

### Recent EPA Dockets

In [None]:
conn.execute("""
    SELECT docket_id, title, docket_type, modify_date
    FROM dockets
    WHERE agency_code = 'EPA'
    ORDER BY modify_date DESC
    LIMIT 10
""").fetchdf()

### Documents with Open Comment Periods

In [None]:
conn.execute("""
    SELECT document_id, agency_code, title, 
           comment_start_date, comment_end_date
    FROM documents
    WHERE comment_end_date IS NOT NULL
      AND TRY_CAST(comment_end_date AS DATE) > CURRENT_DATE
    ORDER BY comment_end_date ASC
    LIMIT 10
""").fetchdf()

### Most Commented Dockets (Cross-Table Join)

This is where Iceberg shines — joins across managed tables work like a traditional database.

In [None]:
conn.execute("""
    SELECT 
        d.docket_id,
        d.agency_code,
        d.title,
        COUNT(c.comment_id) as comment_count
    FROM dockets d
    LEFT JOIN comments c ON d.docket_id = c.docket_id
    WHERE d.agency_code = 'EPA'
    GROUP BY d.docket_id, d.agency_code, d.title
    ORDER BY comment_count DESC
    LIMIT 10
""").fetchdf()

### Comment Volume by Month

In [None]:
conn.execute("""
    SELECT 
        EXTRACT(YEAR FROM TRY_CAST(posted_date AS DATE))::INT as year,
        EXTRACT(MONTH FROM TRY_CAST(posted_date AS DATE))::INT as month,
        COUNT(*) as comment_count
    FROM comments
    WHERE posted_date IS NOT NULL 
      AND TRY_CAST(posted_date AS DATE) IS NOT NULL
      AND TRY_CAST(posted_date AS DATE) >= DATE '2024-01-01'
    GROUP BY 1, 2
    ORDER BY 1, 2
""").fetchdf()

## 5. Compare: Iceberg vs Raw Parquet

The same queries work against both. The key difference is how you reference the data.

In [None]:
import time

R2_PUBLIC_URL = "https://pub-5fc11ad134984edf8d9af452dd1849d6.r2.dev"

query = "SELECT agency_code, COUNT(*) as cnt FROM {source} GROUP BY 1 ORDER BY 2 DESC LIMIT 5"

# Iceberg table
t0 = time.time()
iceberg_result = conn.execute(query.format(source="dockets")).fetchdf()
iceberg_time = time.time() - t0

# Raw Parquet
t0 = time.time()
parquet_result = conn.execute(
    query.format(source=f"read_parquet('{R2_PUBLIC_URL}/dockets.parquet')")
).fetchdf()
parquet_time = time.time() - t0

print(f"Iceberg: {iceberg_time:.2f}s")
print(f"Parquet:  {parquet_time:.2f}s")
print(f"\nResults match: {iceberg_result.equals(parquet_result)}")
display(iceberg_result)

## 6. PyIceberg Access

PyIceberg provides a Python-native way to interact with the catalog — useful for schema inspection, metadata access, and programmatic table management.

In [None]:
from pyiceberg.catalog.rest import RestCatalog

catalog = RestCatalog(
    name="spicy_regs",
    warehouse=WAREHOUSE,
    uri=CATALOG_URI,
    token=TOKEN,
)

# List namespaces and tables
print("Namespaces:", catalog.list_namespaces())
print("\nTables:")
for table in catalog.list_tables("regulations"):
    print(f"  {'.'.join(table)}")

In [None]:
# Inspect table metadata
table = catalog.load_table(("regulations", "dockets"))

print(f"Table: {table.name()}")
print(f"Location: {table.location()}")
print(f"Snapshots: {len(table.metadata.snapshots)}")
print(f"\nSchema:")
for field in table.schema().fields:
    print(f"  {field.name}: {field.field_type}")

In [None]:
# Read data via PyIceberg → Arrow → Pandas
arrow_table = table.scan(
    selected_fields=("docket_id", "agency_code", "title"),
    limit=5
).to_arrow()

arrow_table.to_pandas()

In [None]:
# Filter with row-level predicates (pushed down to Iceberg)
from pyiceberg.expressions import EqualTo

epa_dockets = catalog.load_table(("regulations", "dockets")).scan(
    row_filter=EqualTo("agency_code", "EPA"),
    selected_fields=("docket_id", "title", "docket_type"),
    limit=10
).to_arrow()

print(f"EPA dockets (filtered at scan level): {len(epa_dockets)} rows")
epa_dockets.to_pandas()

## 7. Snapshot History (Time Travel)

Every write to an Iceberg table creates a snapshot. You can inspect the history and (in the future) query historical versions.

In [None]:
from datetime import datetime

for tbl_name in ["dockets", "documents", "comments"]:
    tbl = catalog.load_table(("regulations", tbl_name))
    print(f"\n── {tbl_name} ──")
    print(f"  Snapshots: {len(tbl.metadata.snapshots)}")
    for snap in tbl.metadata.snapshots:
        ts = datetime.fromtimestamp(snap.timestamp_ms / 1000)
        summary = snap.summary or {}
        rows = summary.get("total-records", "?")
        files = summary.get("total-data-files", "?")
        print(f"  Snapshot {snap.snapshot_id}: {ts} | {rows} records, {files} data files")

## Summary

| Method | Best For |
|--------|----------|
| **DuckDB SQL** | Ad-hoc queries, joins, aggregations — most natural for analytics |
| **PyIceberg** | Schema inspection, metadata access, programmatic table management |
| **Raw Parquet** | Quick reads when you don't need catalog features |

All three access the same underlying data on Cloudflare R2.