# 03 — Time Travel

Every write to an Iceberg table creates an immutable **snapshot**. This means you can:
1. Query data as it existed at any point in time
2. List all snapshots and their metadata
3. Roll back to a previous version

This is a game-changer for debugging, auditing, and recovering from mistakes.

In [None]:
from pyspark.sql import SparkSession

spark = (
    SparkSession.builder
    .appName("IcebergDemo")
    .master("local[*]")
    .config("spark.jars.packages", "org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.7.1")
    .config("spark.sql.catalog.demo", "org.apache.iceberg.spark.SparkCatalog")
    .config("spark.sql.catalog.demo.type", "hadoop")
    .config("spark.sql.catalog.demo.warehouse", "../warehouse")
    .config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions")
    .getOrCreate()
)
print("Spark + Iceberg ready.")

## 1. List All Snapshots

Iceberg exposes metadata tables that you can query with SQL.

In [None]:
snapshots_df = spark.sql("""
    SELECT snapshot_id, committed_at, operation, summary
    FROM demo.ecommerce.orders.snapshots
    ORDER BY committed_at
""")

snapshots_df.show(truncate=False)

In [None]:
# Save the first snapshot ID for time-travel queries below
snapshot_ids = [row.snapshot_id for row in snapshots_df.collect()]
first_snapshot_id = snapshot_ids[0]
print(f"First snapshot ID: {first_snapshot_id}")
print(f"Total snapshots: {len(snapshot_ids)}")

## 2. Query a Previous Snapshot

Use `VERSION AS OF <snapshot_id>` to read data as it was at a specific snapshot.

In [None]:
print(f"Data at the FIRST snapshot (snapshot_id = {first_snapshot_id}):")
print("This was right after the initial INSERT in notebook 01.")
print()

spark.sql(f"""
    SELECT * FROM demo.ecommerce.orders
    VERSION AS OF {first_snapshot_id}
    ORDER BY order_id
""").show()

In [None]:
print("Data at the CURRENT (latest) snapshot:")
print()

spark.sql("SELECT * FROM demo.ecommerce.orders ORDER BY order_id").show()

## 3. View the History Table

The `history` metadata table shows which snapshot was current at each point in time.

In [None]:
spark.sql("""
    SELECT * FROM demo.ecommerce.orders.history
""").show(truncate=False)

## 4. Rollback to a Previous Snapshot

Made a mistake? Roll back the table to any previous snapshot.

This is a metadata-only operation — no data files are rewritten!

In [None]:
print("Before rollback:")
spark.sql("SELECT COUNT(*) AS row_count FROM demo.ecommerce.orders").show()

# Roll back to the first snapshot
spark.sql(f"""
    CALL demo.system.rollback_to_snapshot('ecommerce.orders', {first_snapshot_id})
""")

print(f"After rollback to snapshot {first_snapshot_id}:")
spark.sql("SELECT * FROM demo.ecommerce.orders ORDER BY order_id").show()

## 5. Restore the Latest State

Let's re-insert data so the next notebooks have something to work with.

In [None]:
# Roll forward to the latest snapshot
last_snapshot_id = snapshot_ids[-1]

spark.sql(f"""
    CALL demo.system.rollback_to_snapshot('ecommerce.orders', {last_snapshot_id})
""")

print("Restored to latest snapshot.")
spark.sql("SELECT * FROM demo.ecommerce.orders ORDER BY order_id").show()

## Key Takeaway

| Feature              | How it works                                    |
|----------------------|-------------------------------------------------|
| Snapshot history     | Every write creates an immutable snapshot        |
| Time-travel queries  | `VERSION AS OF <snapshot_id>`                    |
| Metadata tables      | `.snapshots`, `.history`, `.files`, etc.         |
| Rollback             | Metadata-only — instant, no data rewrite         |

**Next up:** Schema evolution in notebook 04!