In [1]:
from pyspark.sql import SparkSession

# Set the absolute paths to the Iceberg tables and JAR files
iceberg_tables_path = "/Users/france.cama/code/iceberg-practice/iceberg_tables"
iceberg_jars_path = "/Users/france.cama/code/iceberg-practice/jars/iceberg-spark-runtime-3.5_2.12-1.5.1.jar"

# Create a Spark session
spark = SparkSession.builder \
    .appName("Iceberg schema evolution feature") \
    .config("spark.driver.extraJavaOptions", "-Dderby.system.home=" + iceberg_tables_path) \
    .config("spark.jars", iceberg_jars_path) \
    .config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.iceberg.spark.SparkSessionCatalog") \
    .config("spark.sql.catalog.spark_catalog.type", "hadoop") \
    .config("spark.sql.catalog.spark_catalog.warehouse", iceberg_tables_path) \
    .getOrCreate()

spark.sql("ALTER TABLE titanic DROP COLUMN new_col;")

# Add a new column to the schema
spark.sql("ALTER TABLE titanic ADD COLUMN new_col string")
spark.sql("INSERT INTO default.titanic VALUES (77, 'a', 'b', 77, 'ccc', 1, 2, 3, 'M', 4, 77, 'a', 'b', 4, 'lorem', 'lorem');")

spark.sql("SELECT * FROM default.titanic.history ORDER BY made_current_at DESC;").show()

current_snapshot_id = spark.sql("SELECT snapshot_id FROM default.titanic.history ORDER BY made_current_at DESC;").first()[0]
second_snapshot_id = spark.sql("SELECT snapshot_id FROM default.titanic.history ORDER BY made_current_at DESC LIMIT 1 OFFSET 3;").first()[0]

print(current_snapshot_id, second_snapshot_id )

# Read the table with the new schema
df = spark.sql(f"SELECT * FROM titanic VERSION AS OF {current_snapshot_id};")
print("Data with new schema:")
df.show(5)

# Read the table with the old schema
df_old = spark.sql(f"SELECT * FROM titanic VERSION AS OF 1184812391625634863;")
print("Data with old schema:")
df_old.show(5)

#spark.sql("ALTER TABLE titanic DROP COLUMN choose_a_column;")

24/07/22 12:22:51 WARN Utils: Your hostname, MBA-Francesco.local resolves to a loopback address: 127.0.0.1; using 192.168.1.62 instead (on interface en0)
24/07/22 12:22:51 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
24/07/22 12:22:52 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/07/22 12:22:52 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
                                                                                

+--------------------+-------------------+-------------------+-------------------+
|     made_current_at|        snapshot_id|          parent_id|is_current_ancestor|
+--------------------+-------------------+-------------------+-------------------+
|2024-07-22 12:22:...|2309543119036033141|6609739537041086161|               true|
|2024-07-22 11:47:...|6609739537041086161|1184812391625634863|               true|
|2024-07-22 11:11:...|1759033321787871743| 813399347516847428|              false|
|2024-07-22 11:11:...| 813399347516847428|6609739537041086161|              false|
|2024-07-22 11:11:...|6609739537041086161|1184812391625634863|               true|
|2024-07-22 11:10:...|1184812391625634863|4619981039604578990|               true|
|2024-07-22 11:09:...|4619981039604578990|               NULL|               true|
+--------------------+-------------------+-------------------+-------------------+

2309543119036033141 813399347516847428
Data with new schema:
+----+-----+--------+----

It's possible to alter the schema of the table and depending on the snapshot you want to access the schema is determined, allowing you to time travel back in time and look at data using the old table schema and data. 
In the previous cell I only showed the add column case but iceberg supports the following schema changes: 
- <strong>Add</strong> -- add a new column to the table or to a nested struct
- <strong>Drop</strong> -- remove an existing column from the table or a nested struct
- <strong>Rename</strong> -- rename an existing column or field in a nested struct
- <strong>Update</strong> -- widen the type of a column, struct field, map key, map value, or list element
- <strong>Reorder</strong> -- change the order of columns or fields in a nested struct

Iceberg guarantees that schema evolution changes are independent and free of side-effects, without rewriting files. Iceberg uses unique IDs to track each column in a table. When you add a column, it is assigned a new ID so existing data is never used by mistake