In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col

class IcebergUtility:
    MAX_ROW_DISPLAY = 100

    @staticmethod
    def list_snapshot_ids(table_name: str, spark: SparkSession) -> list[int]:
        catalog_name, database_name, tbl_name = table_name.split(".")
        jvm = spark._jvm
        conf = jvm.org.apache.hadoop.conf.Configuration()
        # catalog = spark._jsparkSession.sessionState().catalogManager().catalog(catalog_name)
        catalog = jvm.org.apache.iceberg.hadoop.HadoopCatalog(conf, "file:///home/jovyan/work/iceberg/warehouse")
        # iceberg_catalog = catalog.icebergCatalog()
        # table_identifier = iceberg_catalog.TableIdentifier.of(database_name, tbl_name)
        table_identifier = jvm.org.apache.iceberg.catalog.TableIdentifier.parse("db.fruits_price")
        table = catalog.loadTable(table_identifier)
        
        snapshots = list(table.snapshots())

        for snapshot in snapshots:
            print(f"Snapshot ID: {snapshot.snapshotId()}")
            print(f"Timestamp: {snapshot.timestampMillis()}")
            print(f"Operation: {snapshot.operation()}")
            print(f"Summary: {snapshot.summary()}")
            print("--------")

        print(f"Total snapshots = {len(snapshots)}")
        return [snapshot.snapshotId() for snapshot in snapshots]

    @staticmethod
    def show_table_contents(table_name: str, spark: SparkSession, snapshot_id: int = None):
        if snapshot_id:
            df = spark.read.format("iceberg") \
                .option("snapshot-id", snapshot_id) \
                .load(table_name)
        else:
            df = spark.read.format("iceberg").load(table_name)
        
        df.show(IcebergUtility.MAX_ROW_DISPLAY)

    @staticmethod
    def write_as_table(df, table_name: str):
        df.writeTo(table_name).using("iceberg").tableProperty("write.format.default", "orc").createOrReplace()

    @staticmethod
    def write_as_partitioned_table(df, table_name: str, partition_col: str):
        df.writeTo(table_name).partitionedBy(col(partition_col)).using("iceberg").createOrReplace()

    @staticmethod
    def append_to_table(df, table_name: str):
        df.writeTo(table_name).option("write.format.default", "orc").append()

    @staticmethod
    def count_rows(table_name: str, spark: SparkSession) -> int:
        df = spark.read.format("iceberg").load(table_name)
        return df.count()

    @staticmethod
    def describe_table(table_name: str, spark: SparkSession):
        spark.sql(f"DESCRIBE TABLE {table_name}").show(truncate=False)

    @staticmethod
    def list_tables(catalog: str, namespace: str, spark: SparkSession):
        tables = spark.catalog.listTables(f"{catalog}.{namespace}")
        print(f" Number of tables : {len(tables)}\n")
        for t in tables:
            print(f"[ {t.catalog} | {t.namespace} | {t.name} ]")
        

    @staticmethod
    def delete_table(table_name: str, spark: SparkSession):
        spark.sql(f"DROP TABLE IF EXISTS {table_name} PURGE")


In [2]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, lit, round as pyspark_round

class Utility:

    @staticmethod
    def get_spark_session() -> SparkSession:
        spark = (
            SparkSession.builder
            .appName("Iceberg demo app")
            .master("local[*]")  # use local mode
            .config("spark.sql.catalog.my_catalog", "org.apache.iceberg.spark.SparkCatalog")
            .config("spark.sql.catalog.my_catalog.type", "hadoop")
            .config("spark.sql.catalog.my_catalog.warehouse", "warehouse")
            .getOrCreate()
        )
        return spark

    @staticmethod
    def read_file(file: str, spark: SparkSession):
        df = spark.read.option("multiline", "true").json(file)
        return df

    @staticmethod
    def drop_column(df, col_name: str):
        return df.drop(col_name)

    @staticmethod
    def add_column(df, col_name: str, value):
        return df.withColumn(col_name, lit(value))

    @staticmethod
    def apply_discount(df):
        return df.withColumn(
            "final_price",
            pyspark_round(col("price") * (lit(1) - col("discount") / 100.0), 3)
        )


In [3]:
input_file = "input/fruits.json"
output_table = "my_catalog.db.fruits_price"

# Initialize Spark session
spark = Utility.get_spark_session()

# Delete the table if it exists
IcebergUtility.delete_table(output_table, spark)

In [4]:
# Read and transform the data - Apply 5% Discount
df = Utility.read_file(input_file, spark)
df = Utility.drop_column(df, "discount")
df = Utility.add_column(df, "discount", 5.0)
df = Utility.apply_discount(df)

# Write the DataFrame as an Iceberg table
IcebergUtility.write_as_table(df, output_table)

# Display the table contents
IcebergUtility.show_table_contents(output_table, spark)


+-----+----------+------------+--------+-----------+
|price|product_id|product_name|discount|final_price|
+-----+----------+------------+--------+-----------+
|  1.2|      F001|       Apple|     5.0|       1.14|
|  0.5|      F002|      Banana|     5.0|      0.475|
|  0.8|      F003|      Orange|     5.0|       0.76|
|  2.0|      F004|      Grapes|     5.0|        1.9|
|  1.5|      F005|       Mango|     5.0|      1.425|
|  3.0|      F006|   Pineapple|     5.0|       2.85|
|  2.5|      F007|  Strawberry|     5.0|      2.375|
|  3.2|      F008|   Blueberry|     5.0|       3.04|
|  4.0|      F009|  Watermelon|     5.0|        3.8|
|  1.8|      F010|       Peach|     5.0|       1.71|
+-----+----------+------------+--------+-----------+



In [5]:
IcebergUtility.list_tables("my_catalog","db", spark)

 Number of tables : 1

[ my_catalog | ['db'] | fruits_price ]


In [6]:
# Read and transform the data - Apply 10% Discount
df = Utility.read_file(input_file, spark)
df = Utility.drop_column(df, "discount")
df = Utility.add_column(df, "discount", 10.0)
df = Utility.apply_discount(df)

# Write the DataFrame as an Iceberg table
IcebergUtility.write_as_table(df, output_table)

# Display the table contents
IcebergUtility.show_table_contents(output_table, spark)


+-----+----------+------------+--------+-----------+
|price|product_id|product_name|discount|final_price|
+-----+----------+------------+--------+-----------+
|  1.2|      F001|       Apple|    10.0|       1.08|
|  0.5|      F002|      Banana|    10.0|       0.45|
|  0.8|      F003|      Orange|    10.0|       0.72|
|  2.0|      F004|      Grapes|    10.0|        1.8|
|  1.5|      F005|       Mango|    10.0|       1.35|
|  3.0|      F006|   Pineapple|    10.0|        2.7|
|  2.5|      F007|  Strawberry|    10.0|       2.25|
|  3.2|      F008|   Blueberry|    10.0|       2.88|
|  4.0|      F009|  Watermelon|    10.0|        3.6|
|  1.8|      F010|       Peach|    10.0|       1.62|
+-----+----------+------------+--------+-----------+



In [7]:
# Read and transform the data - Apply 15% Discount
df = Utility.read_file(input_file, spark)
df = Utility.drop_column(df, "discount")
df = Utility.add_column(df, "discount", 15.0)
df = Utility.apply_discount(df)

# Write the DataFrame as an Iceberg table
IcebergUtility.write_as_table(df, output_table)

# Display the table contents
IcebergUtility.show_table_contents(output_table, spark)


+-----+----------+------------+--------+-----------+
|price|product_id|product_name|discount|final_price|
+-----+----------+------------+--------+-----------+
|  1.2|      F001|       Apple|    15.0|       1.02|
|  0.5|      F002|      Banana|    15.0|      0.425|
|  0.8|      F003|      Orange|    15.0|       0.68|
|  2.0|      F004|      Grapes|    15.0|        1.7|
|  1.5|      F005|       Mango|    15.0|      1.275|
|  3.0|      F006|   Pineapple|    15.0|       2.55|
|  2.5|      F007|  Strawberry|    15.0|      2.125|
|  3.2|      F008|   Blueberry|    15.0|       2.72|
|  4.0|      F009|  Watermelon|    15.0|        3.4|
|  1.8|      F010|       Peach|    15.0|       1.53|
+-----+----------+------------+--------+-----------+



In [8]:
snapshots = IcebergUtility.list_snapshot_ids(output_table, spark)

Snapshot ID: 605425038203776521
Timestamp: 1749889644235
Operation: append
Summary: {'spark.app.id': 'local-1749889641761', 'added-data-files': '1', 'added-records': '10', 'added-files-size': '943', 'changed-partition-count': '1', 'total-records': '10', 'total-files-size': '943', 'total-data-files': '1', 'total-delete-files': '0', 'total-position-deletes': '0', 'total-equality-deletes': '0', 'engine-version': '3.5.2', 'app-id': 'local-1749889641761', 'engine-name': 'spark', 'iceberg-version': 'Apache Iceberg unspecified (commit 7dbafb438ee1e68d0047bebcb587265d7d87d8a1)'}
--------
Snapshot ID: 1804337025711513005
Timestamp: 1749889645509
Operation: append
Summary: {'spark.app.id': 'local-1749889641761', 'added-data-files': '1', 'added-records': '10', 'added-files-size': '948', 'changed-partition-count': '1', 'total-records': '10', 'total-files-size': '948', 'total-data-files': '1', 'total-delete-files': '0', 'total-position-deletes': '0', 'total-equality-deletes': '0', 'engine-version':

In [9]:
for id in snapshots:
    print(f"Snapshot Id : {id}")
    IcebergUtility.show_table_contents(output_table, spark, id)
    

Snapshot Id : 605425038203776521
+-----+----------+------------+--------+-----------+
|price|product_id|product_name|discount|final_price|
+-----+----------+------------+--------+-----------+
|  1.2|      F001|       Apple|     5.0|       1.14|
|  0.5|      F002|      Banana|     5.0|      0.475|
|  0.8|      F003|      Orange|     5.0|       0.76|
|  2.0|      F004|      Grapes|     5.0|        1.9|
|  1.5|      F005|       Mango|     5.0|      1.425|
|  3.0|      F006|   Pineapple|     5.0|       2.85|
|  2.5|      F007|  Strawberry|     5.0|      2.375|
|  3.2|      F008|   Blueberry|     5.0|       3.04|
|  4.0|      F009|  Watermelon|     5.0|        3.8|
|  1.8|      F010|       Peach|     5.0|       1.71|
+-----+----------+------------+--------+-----------+

Snapshot Id : 1804337025711513005
+-----+----------+------------+--------+-----------+
|price|product_id|product_name|discount|final_price|
+-----+----------+------------+--------+-----------+
|  1.2|      F001|       Apple|

In [10]:
spark.stop

<bound method SparkSession.stop of <pyspark.sql.session.SparkSession object at 0xffff5c22fcd0>>

In [11]:
%%bash
ls warehouse/db/fruits_price/data


00000-1-791bbd97-ce0b-4d39-bfda-67b3187be9fe-0-00001.orc
00000-6-e0e5b3bc-d305-46c0-8b5f-915b3b4833f7-0-00001.orc
00000-9-5ed84a4e-60e2-4eea-94e6-5d8ec109d90c-0-00001.orc


In [12]:
%%bash
rm -rf warehouse
rm -rf spark-warehouse

