In [1]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F

In [2]:
# %%


# spark-sql --packages org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.7.1\
#     --conf spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions \
#     --conf spark.sql.catalog.spark_catalog=org.apache.iceberg.spark.SparkSessionCatalog \
#     --conf spark.sql.catalog.spark_catalog.type=hive \
#     --conf spark.sql.catalog.local=org.apache.iceberg.spark.SparkCatalog \
#     --conf spark.sql.catalog.local.type=hadoop \
#     --conf spark.sql.catalog.local.warehouse=$PWD/warehouse

# spark-shell --packages org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.7.1


spark_configs = {
    "spark.jars.packages": "org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.7.1",
    "spark.sql.extensions": "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions",
    "spark.sql.catalog.spark_catalog": "org.apache.iceberg.spark.SparkSessionCatalog",
    "spark.sql.catalog.spark_catalog.type": "hive",
    "spark.sql.catalog.local":"org.apache.iceberg.spark.SparkCatalog",
    "spark.sql.catalog.local.warehouse":"./warehouse",
    "spark.sql.catalog.local.type":"hadoop",
}

spark = (
    SparkSession
    .builder
    .appName("Python Spark SQL basic example")
    .config(map=spark_configs)
    .getOrCreate()
)

25/01/20 15:37:16 WARN Utils: Your hostname, codespaces-7465f4 resolves to a loopback address: 127.0.0.1; using 10.0.1.210 instead (on interface eth0)
25/01/20 15:37:16 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address


:: loading settings :: url = jar:file:/usr/local/python/3.12.1/lib/python3.12/site-packages/pyspark/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /home/codespace/.ivy2/cache
The jars for the packages stored in: /home/codespace/.ivy2/jars
org.apache.iceberg#iceberg-spark-runtime-3.5_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-3b019e13-5f39-4a38-94a1-08d3b7d8288d;1.0
	confs: [default]
	found org.apache.iceberg#iceberg-spark-runtime-3.5_2.12;1.7.1 in central
:: resolution report :: resolve 179ms :: artifacts dl 4ms
	:: modules in use:
	org.apache.iceberg#iceberg-spark-runtime-3.5_2.12;1.7.1 from central in [default]
	---------------------------------------------------------------------
	|                  |            modules            ||   artifacts   |
	|       conf       | number| search|dwnlded|evicted|| number|dwnlded|
	---------------------------------------------------------------------
	|      default     |   1   |   0   |   0   |   0   ||   1   |   0   |
	---------------------------------------------------------------------
:: retrieving :: org.apa

In [3]:
spark.range(10).show()

+---+
| id|
+---+
|  0|
|  1|
|  2|
|  3|
|  4|
|  5|
|  6|
|  7|
|  8|
|  9|
+---+



In [4]:
spark.sql("CREATE OR REPLACE TABLE local.db.table (id bigint, data string) USING iceberg;")
spark.read.table("local.db.table").show()

+---+----+
| id|data|
+---+----+
+---+----+



In [5]:
spark.sql("INSERT INTO local.db.table VALUES (1, 'a'), (2, 'b'), (3, 'c');")
spark.read.table("local.db.table").show()

                                                                                

+---+----+
| id|data|
+---+----+
|  1|   a|
|  2|   b|
|  3|   c|
+---+----+



In [6]:
spark.range(10).withColumn("data", F.lit("mydata")).select("id", "data").writeTo("local.db.table").append()
spark.read.table("local.db.table").show()


+---+------+
| id|  data|
+---+------+
|  1|     a|
|  2|     b|
|  3|     c|
|  0|mydata|
|  1|mydata|
|  2|mydata|
|  3|mydata|
|  4|mydata|
|  5|mydata|
|  6|mydata|
|  7|mydata|
|  8|mydata|
|  9|mydata|
+---+------+



In [7]:
spark.read.table("local.db.table.snapshots").show()

+--------------------+-------------------+-------------------+---------+--------------------+--------------------+
|        committed_at|        snapshot_id|          parent_id|operation|       manifest_list|             summary|
+--------------------+-------------------+-------------------+---------+--------------------+--------------------+
|2025-01-20 15:37:...|8492778904122201325|               NULL|   append|warehouse/db/tabl...|{spark.app.id -> ...|
|2025-01-20 15:37:...|6031520081634296825|8492778904122201325|   append|warehouse/db/tabl...|{spark.app.id -> ...|
+--------------------+-------------------+-------------------+---------+--------------------+--------------------+



In [9]:
# Time Travel

(spark.read
    .option("snapshot-id", 8492778904122201325)
    .table("local.db.table")).show()

+---+----+
| id|data|
+---+----+
|  1|   a|
|  2|   b|
|  3|   c|
+---+----+



In [10]:
sql = """
DESCRIBE EXTENDED local.db.table
"""
spark.sql(sql).show()

+--------------------+--------------------+-------+
|            col_name|           data_type|comment|
+--------------------+--------------------+-------+
|                  id|              bigint|   NULL|
|                data|              string|   NULL|
|                    |                    |       |
|  # Metadata Columns|                    |       |
|            _spec_id|                 int|       |
|          _partition|            struct<>|       |
|               _file|              string|       |
|                _pos|              bigint|       |
|            _deleted|             boolean|       |
|                    |                    |       |
|# Detailed Table ...|                    |       |
|                Name|      local.db.table|       |
|                Type|             MANAGED|       |
|            Location|./warehouse/db/table|       |
|            Provider|             iceberg|       |
|               Owner|           codespace|       |
|    Table P

In [11]:
sql = """
DESCRIBE local.db.table
"""
spark.sql(sql).show()

+--------+---------+-------+
|col_name|data_type|comment|
+--------+---------+-------+
|      id|   bigint|   NULL|
|    data|   string|   NULL|
+--------+---------+-------+



In [12]:
spark.sql("SHOW TBLPROPERTIES local.db.table").show()

+--------------------+-------------------+
|                 key|              value|
+--------------------+-------------------+
| current-snapshot-id|6031520081634296825|
|              format|    iceberg/parquet|
|      format-version|                  2|
|write.parquet.com...|               zstd|
+--------------------+-------------------+



In [13]:
spark.read.format("iceberg").load("./warehouse/db/table").show()

+---+------+
| id|  data|
+---+------+
|  1|     a|
|  2|     b|
|  3|     c|
|  0|mydata|
|  1|mydata|
|  2|mydata|
|  3|mydata|
|  4|mydata|
|  5|mydata|
|  6|mydata|
|  7|mydata|
|  8|mydata|
|  9|mydata|
+---+------+



In [15]:
spark.read.table("local.db.table.snapshots").show()

+--------------------+-------------------+-------------------+---------+--------------------+--------------------+
|        committed_at|        snapshot_id|          parent_id|operation|       manifest_list|             summary|
+--------------------+-------------------+-------------------+---------+--------------------+--------------------+
|2025-01-20 15:37:...|8492778904122201325|               NULL|   append|warehouse/db/tabl...|{spark.app.id -> ...|
|2025-01-20 15:37:...|6031520081634296825|8492778904122201325|   append|warehouse/db/tabl...|{spark.app.id -> ...|
+--------------------+-------------------+-------------------+---------+--------------------+--------------------+



In [16]:
(spark.read
  .format("iceberg")
  .option("start-snapshot-id", "8492778904122201325")
  .option("end-snapshot-id", "6031520081634296825")
  .load("./warehouse/db/table")).show()

+---+------+
| id|  data|
+---+------+
|  0|mydata|
|  1|mydata|
|  2|mydata|
|  3|mydata|
|  4|mydata|
|  5|mydata|
|  6|mydata|
|  7|mydata|
|  8|mydata|
|  9|mydata|
+---+------+



In [17]:
spark.read.table("local.db.table.snapshots").show()


+--------------------+-------------------+-------------------+---------+--------------------+--------------------+
|        committed_at|        snapshot_id|          parent_id|operation|       manifest_list|             summary|
+--------------------+-------------------+-------------------+---------+--------------------+--------------------+
|2025-01-20 15:37:...|8492778904122201325|               NULL|   append|warehouse/db/tabl...|{spark.app.id -> ...|
|2025-01-20 15:37:...|6031520081634296825|8492778904122201325|   append|warehouse/db/tabl...|{spark.app.id -> ...|
+--------------------+-------------------+-------------------+---------+--------------------+--------------------+



In [18]:
spark.read.table("local.db.table.history").show()

+--------------------+-------------------+-------------------+-------------------+
|     made_current_at|        snapshot_id|          parent_id|is_current_ancestor|
+--------------------+-------------------+-------------------+-------------------+
|2025-01-20 15:37:...|8492778904122201325|               NULL|               true|
|2025-01-20 15:37:...|6031520081634296825|8492778904122201325|               true|
+--------------------+-------------------+-------------------+-------------------+



In [19]:
spark.read.table("local.db.table.entries").show()


+------+-------------------+---------------+--------------------+--------------------+--------------------+
|status|        snapshot_id|sequence_number|file_sequence_number|           data_file|    readable_metrics|
+------+-------------------+---------------+--------------------+--------------------+--------------------+
|     1|6031520081634296825|              2|                   2|{0, warehouse/db/...|{{72, 5, 0, NULL,...|
|     1|6031520081634296825|              2|                   2|{0, warehouse/db/...|{{72, 5, 0, NULL,...|
|     1|8492778904122201325|              1|                   1|{0, warehouse/db/...|{{37, 1, 0, NULL,...|
|     1|8492778904122201325|              1|                   1|{0, warehouse/db/...|{{42, 2, 0, NULL,...|
+------+-------------------+---------------+--------------------+--------------------+--------------------+



In [20]:
spark.read.table("local.db.table.files").show(truncate=False)


+-------+------------------------------------------------------------------------------------+-----------+-------+------------+------------------+------------------+----------------+-----------------+----------------+----------------------------------------------------------+----------------------------------------------------------+------------+-------------+------------+-------------+----------------------------------------------------------+
|content|file_path                                                                           |file_format|spec_id|record_count|file_size_in_bytes|column_sizes      |value_counts    |null_value_counts|nan_value_counts|lower_bounds                                              |upper_bounds                                              |key_metadata|split_offsets|equality_ids|sort_order_id|readable_metrics                                          |
+-------+------------------------------------------------------------------------------------+--------

In [21]:
spark.read.table("local.db.table.partitions").show(truncate=False)

+------------+----------+-----------------------------+----------------------------+--------------------------+----------------------------+--------------------------+-----------------------+------------------------+
|record_count|file_count|total_data_file_size_in_bytes|position_delete_record_count|position_delete_file_count|equality_delete_record_count|equality_delete_file_count|last_updated_at        |last_updated_snapshot_id|
+------------+----------+-----------------------------+----------------------------+--------------------------+----------------------------+--------------------------+-----------------------+------------------------+
|13          |4         |2700                         |0                           |0                         |0                           |0                         |2025-01-20 15:37:27.878|6031520081634296825     |
+------------+----------+-----------------------------+----------------------------+--------------------------+---------------------

In [22]:
spark.read.table("local.db.table.refs").show(truncate=False)


+----+------+-------------------+-----------------------+---------------------+----------------------+
|name|type  |snapshot_id        |max_reference_age_in_ms|min_snapshots_to_keep|max_snapshot_age_in_ms|
+----+------+-------------------+-----------------------+---------------------+----------------------+
|main|BRANCH|6031520081634296825|NULL                   |NULL                 |NULL                  |
+----+------+-------------------+-----------------------+---------------------+----------------------+



In [23]:
spark.read.table("local.db.table").show(truncate=False)


+---+------+
|id |data  |
+---+------+
|0  |mydata|
|1  |mydata|
|2  |mydata|
|3  |mydata|
|4  |mydata|
|5  |mydata|
|6  |mydata|
|7  |mydata|
|8  |mydata|
|9  |mydata|
|1  |a     |
|2  |b     |
|3  |c     |
+---+------+



In [24]:
sql = """
UPDATE local.db.table
SET data = 'hello'
WHERE id=0
"""
spark.sql(sql)

DataFrame[]

In [25]:
spark.read.table("local.db.table").show(truncate=False)

+---+------+
|id |data  |
+---+------+
|1  |a     |
|2  |b     |
|3  |c     |
|5  |mydata|
|6  |mydata|
|7  |mydata|
|8  |mydata|
|9  |mydata|
|0  |hello |
|1  |mydata|
|2  |mydata|
|3  |mydata|
|4  |mydata|
+---+------+



In [26]:
spark.range(10).writeTo("local.db.table2").createOrReplace()

25/01/20 15:39:38 WARN HadoopTableOperations: Error reading version hint file warehouse/db/table2/metadata/version-hint.text
java.io.FileNotFoundException: File warehouse/db/table2/metadata/version-hint.text does not exist
	at org.apache.hadoop.fs.RawLocalFileSystem.deprecatedGetFileStatus(RawLocalFileSystem.java:779)
	at org.apache.hadoop.fs.RawLocalFileSystem.getFileLinkStatusInternal(RawLocalFileSystem.java:1100)
	at org.apache.hadoop.fs.RawLocalFileSystem.getFileStatus(RawLocalFileSystem.java:769)
	at org.apache.hadoop.fs.FilterFileSystem.getFileStatus(FilterFileSystem.java:462)
	at org.apache.hadoop.fs.ChecksumFileSystem$ChecksumFSInputChecker.<init>(ChecksumFileSystem.java:160)
	at org.apache.hadoop.fs.ChecksumFileSystem.open(ChecksumFileSystem.java:372)
	at org.apache.hadoop.fs.FileSystem.open(FileSystem.java:976)
	at org.apache.iceberg.hadoop.HadoopTableOperations.findVersion(HadoopTableOperations.java:317)
	at org.apache.iceberg.hadoop.HadoopTableOperations.refresh(HadoopTable

In [27]:
spark.read.table("local.db.table2").show()

+---+
| id|
+---+
|  0|
|  1|
|  2|
|  3|
|  4|
|  5|
|  6|
|  7|
|  8|
|  9|
+---+



In [29]:
spark.read.table("local.db.table.snapshots").show(truncate=False)

+-----------------------+-------------------+-------------------+---------+------------------------------------------------------------------------------------------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|committed_at           |snapshot_id        |parent_id          |operation|manifest_list                                                                                   |summary                                                                                                                    

In [31]:
spark.sql("CALL local.system.expire_snapshots(table => 'db.table', snapshot_ids => ARRAY(8492778904122201325))")

                                                                                

DataFrame[deleted_data_files_count: bigint, deleted_position_delete_files_count: bigint, deleted_equality_delete_files_count: bigint, deleted_manifest_files_count: bigint, deleted_manifest_lists_count: bigint, deleted_statistics_files_count: bigint]

In [32]:
spark.read.table("local.db.table.snapshots").show()

+--------------------+-------------------+-------------------+---------+--------------------+--------------------+
|        committed_at|        snapshot_id|          parent_id|operation|       manifest_list|             summary|
+--------------------+-------------------+-------------------+---------+--------------------+--------------------+
|2025-01-20 15:37:...|6031520081634296825|8492778904122201325|   append|warehouse/db/tabl...|{spark.app.id -> ...|
|2025-01-20 15:39:...|7201136700195168713|6031520081634296825|overwrite|warehouse/db/tabl...|{spark.app.id -> ...|
+--------------------+-------------------+-------------------+---------+--------------------+--------------------+



In [33]:
spark.range(2).withColumn("data", F.lit("hi")).select("id", "data").writeTo("local.db.table").append()
spark.range(2).withColumn("data", F.lit("hi2")).select("id", "data").writeTo("local.db.table").append()

In [34]:
spark.read.table("local.db.table").show()

+---+------+
| id|  data|
+---+------+
|  1|     a|
|  2|     b|
|  3|     c|
|  0|    hi|
|  1|    hi|
|  5|mydata|
|  6|mydata|
|  7|mydata|
|  8|mydata|
|  9|mydata|
|  0|   hi2|
|  1|   hi2|
|  0| hello|
|  1|mydata|
|  2|mydata|
|  3|mydata|
|  4|mydata|
+---+------+



In [35]:
spark.read.table("local.db.table.snapshots").show()

+--------------------+-------------------+-------------------+---------+--------------------+--------------------+
|        committed_at|        snapshot_id|          parent_id|operation|       manifest_list|             summary|
+--------------------+-------------------+-------------------+---------+--------------------+--------------------+
|2025-01-20 15:37:...|6031520081634296825|8492778904122201325|   append|warehouse/db/tabl...|{spark.app.id -> ...|
|2025-01-20 15:39:...|7201136700195168713|6031520081634296825|overwrite|warehouse/db/tabl...|{spark.app.id -> ...|
|2025-01-20 15:40:...| 749386769863266795|7201136700195168713|   append|warehouse/db/tabl...|{spark.app.id -> ...|
|2025-01-20 15:40:...|6938968765761808873| 749386769863266795|   append|warehouse/db/tabl...|{spark.app.id -> ...|
+--------------------+-------------------+-------------------+---------+--------------------+--------------------+



In [37]:
spark.sql("CALL local.system.rollback_to_snapshot('db.table', 7201136700195168713)").show()

+--------------------+-------------------+
|previous_snapshot_id|current_snapshot_id|
+--------------------+-------------------+
| 6938968765761808873|7201136700195168713|
+--------------------+-------------------+



In [38]:
spark.read.table("local.db.table").show()

+---+------+
| id|  data|
+---+------+
|  1|     a|
|  5|mydata|
|  6|mydata|
|  7|mydata|
|  8|mydata|
|  9|mydata|
|  2|     b|
|  3|     c|
|  0| hello|
|  1|mydata|
|  2|mydata|
|  3|mydata|
|  4|mydata|
+---+------+

