In [1]:
# Source - https://stackoverflow.com/a/69297835
# Posted by jmPicaza
# Retrieved 2026-01-17, License - CC BY-SA 4.0

from IPython.core.display import HTML
display(HTML("<style>pre { white-space: pre !important; }</style>"))


In [2]:
from pyspark.sql import SparkSession

spark = (
    SparkSession.builder.appName('iceberg-incremenatal')
    
    # Iceberg package (same as --packages)
    .config(
        "spark.jars.packages", "org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.9.2"
    )
    
    # Catalog configuration
    .config("spark.sql.catalog.local", "org.apache.iceberg.spark.SparkCatalog")
    .config("spark.sql.catalog.local.type", "hadoop")
    .config("spark.sql.catalog.local.warehouse", "/Users/codebase/Documents/codebase/Courses/LearnApacheIceberg/warehouse")
        
    .getOrCreate()
    )

spark

26/01/18 12:27:11 WARN Utils: Your hostname, codebase.local resolves to a loopback address: 127.0.0.1; using 192.168.1.7 instead (on interface en0)
26/01/18 12:27:11 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Ivy Default Cache set to: /Users/codebase/.ivy2/cache
The jars for the packages stored in: /Users/codebase/.ivy2/jars
org.apache.iceberg#iceberg-spark-runtime-3.5_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-90e4725e-4f79-4d26-af22-6817da4ab88a;1.0
	confs: [default]
	found org.apache.iceberg#iceberg-spark-runtime-3.5_2.12;1.9.2 in central
:: resolution report :: resolve 36ms :: artifacts dl 1ms
	:: modules in use:
	org.apache.iceberg#iceberg-spark-runtime-3.5_2.12;1.9.2 from central in [default]
	---------------------------------------------------------------------
	|                  |            modules            ||   artifacts   |
	|       conf       | number| search|dwnlded|evicted|| number|dwnlded|
	-

:: loading settings :: url = jar:file:/opt/spark/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


In [3]:
spark.sql("""
CREATE TABLE local.db.events (
  id BIGINT,
  name STRING,
  age BIGINT,
  ts TIMESTAMP
)
USING iceberg
TBLPROPERTIES (
  'write.parquet.stats-enabled.column.age' = 'true',
  'write.parquet.stats-enabled.column.event_time' = 'true'
)
""")


DataFrame[]

In [45]:
data_to_write = []
data = (1, "Charlie", 28, dt.now())
for i in range(1000_000):
    data_to_write.append(data)

In [46]:
from datetime import datetime as dt



# data = [(1, "Charlie", 28, dt.now()),
#         (3, "Diana", 34, dt.now())]

df = spark.createDataFrame(data_to_write, ["id", "name", "age", "ts"])

In [47]:
df.writeTo("local.db.events").append()

26/01/18 12:34:43 WARN TaskSetManager: Stage 27 contains a task of very large size (1758 KiB). The maximum recommended task size is 1000 KiB.


In [6]:
spark.sql(
    """
    SELECT upper_bounds FROM local.db.events.files
    """
).show(truncate=False)

+-----------------------------------------------------------------------------------------------------------------------------+
|upper_bounds                                                                                                                 |
+-----------------------------------------------------------------------------------------------------------------------------+
|{1 -> [03 00 00 00 00 00 00 00], 2 -> [43 68 61 72 6C 69 65], 3 -> [1C 00 00 00 00 00 00 00], 4 -> [92 53 89 15 A4 48 06 00]}|
|{1 -> [04 00 00 00 00 00 00 00], 2 -> [44 69 61 6E 61], 3 -> [22 00 00 00 00 00 00 00], 4 -> [94 53 89 15 A4 48 06 00]}      |
+-----------------------------------------------------------------------------------------------------------------------------+



In [7]:
spark.sql(
    """
    DESCRIBE TABLE EXTENDED local.db.events;
    """
).show(truncate=False)

+----------------------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-------+
|col_name                    |data_type                                                                                                                                                                                                            |comment|
+----------------------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-------+
|id                          |bigint                                                                                                                                                                                                             

In [8]:
from pyspark.sql.functions import col, max as spark_max

def iceberg_max_from_stats(table, ts_cols):
    """
    table   : 'local.db.events'
    ts_cols : ['ts', 'ingest_ts']
    """

    df = spark.table(f"{table}.files")

    return df.select([
        spark_max(
            col("readable_metrics")
            .getItem(c)
            .getField("upper_bound")
        ).alias(f"max_{c}")
        for c in ts_cols
    ])

In [9]:
iceberg_max_from_stats(
    "local.db.events",
    ["ts", "id"]
).show(truncate=False)


+--------------------------+------+
|max_ts                    |max_id|
+--------------------------+------+
|2026-01-18 12:27:13.421716|4     |
+--------------------------+------+



In [10]:
spark.sql(
    """
    SELECT * FROM local.db.events
    """
).show(truncate=False)

+---+-------+---+--------------------------+
|id |name   |age|ts                        |
+---+-------+---+--------------------------+
|3  |Charlie|28 |2026-01-18 12:27:13.421714|
|4  |Diana  |34 |2026-01-18 12:27:13.421716|
+---+-------+---+--------------------------+



In [48]:
 df = spark.table(f"local.db.events.files")

In [51]:
df.show(50, truncate=False)

+-------+-----------------------------------------------------------------------------------------------------------------------------------------------------+-----------+-------+------------+------------------+----------------------------------------+----------------------------------------------------+--------------------------------+----------------+-----------------------------------------------------------------------------------------------------------------------------+-----------------------------------------------------------------------------------------------------------------------------+------------+-------------+------------+-------------+------------+--------------------+--------------+---------------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|content|file_path                                                                

In [50]:
df.count()

26

In [52]:
df.select('readable_metrics').show(truncate=False)

+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|readable_metrics                                                                                                                                                                        |
+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|{{284, 99328, 0, NULL, 28, 28}, {284, 99328, 0, NULL, 1, 1}, {287, 99328, 0, NULL, Charlie, Charlie}, {284, 99328, 0, NULL, 2026-01-18 12:34:35.246161, 2026-01-18 12:34:35.246161}}    |
|{{330, 100352, 0, NULL, 28, 28}, {330, 100352, 0, NULL, 1, 1}, {333, 100352, 0, NULL, Charlie, Charlie}, {330, 100352, 0, NULL, 2026-01-18 12:34:35.246161, 2026-01-18 12:34:35.246161}}|
|{{330, 100352, 0, NULL, 28, 28}, {330, 100352, 0, NULL, 1, 1}, {

In [53]:
df.select([spark_max(col('readable_metrics').getItem(c).getField('upper_bound')) for c in ['ts', 'id']]).show(truncate=False)

+------------------------------------+------------------------------------+
|max(readable_metrics.ts.upper_bound)|max(readable_metrics.id.upper_bound)|
+------------------------------------+------------------------------------+
|2026-01-18 12:34:35.246161          |4                                   |
+------------------------------------+------------------------------------+

