In [2]:
from pyspark.sql import SparkSession

# # Define Iceberg version for clarity, ensure it's compatible with Spark 3.3
iceberg_version = "1.4.2" # Example: Use a recent, compatible version

spark = SparkSession.builder \
    .appName("Iceberg Local Test") \
    .config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") \
    .config("spark.jars.packages", f"org.apache.iceberg:iceberg-spark-runtime-3.3_2.12:{iceberg_version},org.apache.iceberg:iceberg-aws-bundle:{iceberg_version}") \
    .config("spark.sql.catalog.local", "org.apache.iceberg.spark.SparkCatalog") \
    .config("spark.sql.catalog.local.type", "hadoop") \
    .config("spark.sql.catalog.local.warehouse", "s3a://iceberg-warehouse/") \
    .config("spark.hadoop.fs.s3a.endpoint", "http://minio:9000") \
    .config("spark.hadoop.fs.s3a.access.key", "minioadmin") \
    .config("spark.hadoop.fs.s3a.secret.key", "minioadmin") \
    .config("spark.hadoop.fs.s3a.path.style.access", "true") \
    .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") \
    .getOrCreate()


:: loading settings :: url = jar:file:/opt/spark/jars/ivy-2.5.0.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /root/.ivy2/cache
The jars for the packages stored in: /root/.ivy2/jars
org.apache.iceberg#iceberg-spark-runtime-3.3_2.12 added as a dependency
org.apache.iceberg#iceberg-aws-bundle added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-b5196c1a-4ea9-4be5-a9b0-5b9b53cae37e;1.0
	confs: [default]
	found org.apache.iceberg#iceberg-spark-runtime-3.3_2.12;1.4.2 in central
	found org.apache.iceberg#iceberg-aws-bundle;1.4.2 in central
:: resolution report :: resolve 304ms :: artifacts dl 14ms
	:: modules in use:
	org.apache.iceberg#iceberg-aws-bundle;1.4.2 from central in [default]
	org.apache.iceberg#iceberg-spark-runtime-3.3_2.12;1.4.2 from central in [default]
	---------------------------------------------------------------------
	|                  |            modules            ||   artifacts   |
	|       conf       | number| search|dwnlded|evicted|| number|dwnlded|
	--------------------------------------------------------------

25/06/15 12:39:23 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


In [4]:
spark.sql("""
CREATE TABLE local.default.customers (
    id BIGINT,
    name STRING,
    city STRING
)
USING iceberg
""")


AnalysisException: Table default.customers already exists

In [43]:
spark.sql("""
INSERT INTO local.default.customers VALUES
    (1, 'Ali', 'Sydney'),
    (2, 'Maryam', 'Melbourne')
""")


DataFrame[]

In [44]:
spark.sql("SELECT * FROM local.default.customers").show()


+---+------+---------+
| id|  name|     city|
+---+------+---------+
|  1|   Ali|   Sydney|
|  2|Maryam|Melbourne|
+---+------+---------+



In [45]:
# List snapshots
spark.sql("SELECT * FROM local.default.customers.snapshots").show()



+--------------------+-------------------+---------+---------+--------------------+--------------------+
|        committed_at|        snapshot_id|parent_id|operation|       manifest_list|             summary|
+--------------------+-------------------+---------+---------+--------------------+--------------------+
|2025-06-15 12:00:...|5182714854818747790|     null|   append|s3a://iceberg-war...|{spark.app.id -> ...|
+--------------------+-------------------+---------+---------+--------------------+--------------------+



In [47]:
# Time travel to any snapshot
spark.sql("CALL local.system.rollback_to_snapshot('default.customers', 5182714854818747790)")

DataFrame[previous_snapshot_id: bigint, current_snapshot_id: bigint]

In [48]:
# Make modifications to the table schema
spark.sql("""
ALTER TABLE local.default.customers
ADD COLUMNS (
    email STRING
)
""")


25/06/15 12:00:48 WARN BaseTransaction: Failed to load metadata for a committed snapshot, skipping clean-up


DataFrame[]

In [49]:
# Insert new data without the new column:
spark.sql("""
INSERT INTO local.default.customers VALUES
(3, 'Sana', 'Melbourne', NULL),
(4, 'Ali', 'Sydney', NULL)
""")

DataFrame[]

In [50]:
# Insert new data with the new column:
spark.sql("""
INSERT INTO local.default.customers VALUES
(5, 'Zara', 'Perth', 'zara@example.com')
""")

DataFrame[]

In [51]:
#View table content
spark.sql("SELECT * FROM local.default.customers").show()


+---+------+---------+----------------+
| id|  name|     city|           email|
+---+------+---------+----------------+
|  5|  Zara|    Perth|zara@example.com|
|  1|   Ali|   Sydney|            null|
|  2|Maryam|Melbourne|            null|
|  3|  Sana|Melbourne|            null|
|  4|   Ali|   Sydney|            null|
+---+------+---------+----------------+



In [52]:
# View snapshots

spark.sql("SELECT * FROM local.default.customers.snapshots").show()


+--------------------+-------------------+-------------------+---------+--------------------+--------------------+
|        committed_at|        snapshot_id|          parent_id|operation|       manifest_list|             summary|
+--------------------+-------------------+-------------------+---------+--------------------+--------------------+
|2025-06-15 12:00:...|5182714854818747790|               null|   append|s3a://iceberg-war...|{spark.app.id -> ...|
|2025-06-15 12:00:...|1539835591936628885|5182714854818747790|   append|s3a://iceberg-war...|{spark.app.id -> ...|
|2025-06-15 12:00:...|8876385980521966540|1539835591936628885|   append|s3a://iceberg-war...|{spark.app.id -> ...|
+--------------------+-------------------+-------------------+---------+--------------------+--------------------+



In [54]:
# Rollback to previous snapshot


spark.sql("CALL local.system.rollback_to_snapshot('default.customers', 5182714854818747790)")

25/06/15 12:01:20 WARN BaseTransaction: Failed to load metadata for a committed snapshot, skipping clean-up


DataFrame[previous_snapshot_id: bigint, current_snapshot_id: bigint]

In [55]:
#View table content
spark.sql("SELECT * FROM local.default.customers").show()


+---+------+---------+-----+
| id|  name|     city|email|
+---+------+---------+-----+
|  1|   Ali|   Sydney| null|
|  2|Maryam|Melbourne| null|
+---+------+---------+-----+



In [56]:
# Perform a delete

spark.sql("""
DELETE FROM local.default.customers
WHERE name = 'Ali'
""")


DataFrame[]

In [57]:
#View table content
spark.sql("SELECT * FROM local.default.customers").show()


+---+------+---------+-----+
| id|  name|     city|email|
+---+------+---------+-----+
|  2|Maryam|Melbourne| null|
+---+------+---------+-----+



In [58]:
# Perform an update

spark.sql("""
MERGE INTO local.default.customers AS target
USING (SELECT 5 AS id, 'Zara' AS name, 'Brisbane' AS city, 'zara@newmail.com' AS email) AS source
ON target.id = source.id
WHEN MATCHED THEN
  UPDATE SET *
WHEN NOT MATCHED THEN
  INSERT *
""")


DataFrame[]

In [59]:
#View table content
spark.sql("SELECT * FROM local.default.customers").show()


+---+------+---------+----------------+
| id|  name|     city|           email|
+---+------+---------+----------------+
|  5|  Zara| Brisbane|zara@newmail.com|
|  2|Maryam|Melbourne|            null|
+---+------+---------+----------------+



In [67]:
spark.sql("SELECT * FROM local.default.customers.snapshots").show()


+--------------------+-------------------+-------------------+---------+--------------------+--------------------+
|        committed_at|        snapshot_id|          parent_id|operation|       manifest_list|             summary|
+--------------------+-------------------+-------------------+---------+--------------------+--------------------+
|2025-06-15 12:00:...|5182714854818747790|               null|   append|s3a://iceberg-war...|{spark.app.id -> ...|
|2025-06-15 12:00:...|1539835591936628885|5182714854818747790|   append|s3a://iceberg-war...|{spark.app.id -> ...|
|2025-06-15 12:00:...|8876385980521966540|1539835591936628885|   append|s3a://iceberg-war...|{spark.app.id -> ...|
|2025-06-15 12:01:...|5025805177383032076|5182714854818747790|   delete|s3a://iceberg-war...|{spark.app.id -> ...|
|2025-06-15 12:01:...|2749155953363769792|5025805177383032076|overwrite|s3a://iceberg-war...|{spark.app.id -> ...|
+--------------------+-------------------+-------------------+---------+--------

In [74]:
spark.sql("SHOW TABLE EXTENDED IN default LIKE 'customers'").show(truncate=False)


+---------+---------+-----------+-----------+
|namespace|tableName|isTemporary|information|
+---------+---------+-----------+-----------+
+---------+---------+-----------+-----------+

