In [34]:
import pyspark
from pyspark.sql import SparkSession

MINIO_ACCESS_KEY = "admin"
MINIO_SECRET_KEY = "password"

conf = (
    pyspark.SparkConf()
        .setAppName("iceberg-spark")

        # Spark + Iceberg JAR already in image, no need for jars.packages
        .set("spark.jars", "/opt/spark/jars/iceberg-spark-runtime-3.5_2.12-1.9.0-SNAPSHOT.jar")

        # Iceberg extensions
        .set("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions")

        # REST catalog setup
        .set("spark.sql.catalog.rest", "org.apache.iceberg.spark.SparkCatalog")
        .set("spark.sql.catalog.rest.type", "rest")
        .set("spark.sql.catalog.rest.uri", "http://iceberg-rest:8181")
        .set("spark.sql.catalog.rest.warehouse", "s3://warehouse")
        .set("spark.sql.catalog.rest.io-impl", "org.apache.iceberg.aws.s3.S3FileIO")
        .set("spark.sql.catalog.rest.s3.endpoint", "http://minio:9000")
        .set("spark.sql.catalog.rest.s3.path-style-access", "true")
        .set("spark.sql.catalog.rest.s3.access-key-id", MINIO_ACCESS_KEY)
        .set("spark.sql.catalog.rest.s3.secret-access-key", MINIO_SECRET_KEY)

        # NESSIE catalog config
        .set('spark.sql.catalog.nessie', 'org.apache.iceberg.spark.SparkCatalog')
        .set('spark.sql.catalog.nessie.uri', NESSIE_URI)
        .set('spark.sql.catalog.nessie.ref', 'main')
        .set('spark.sql.catalog.nessie.authentication.type', 'NONE')
        .set('spark.sql.catalog.nessie.catalog-impl', 'org.apache.iceberg.nessie.NessieCatalog')
        .set('spark.sql.catalog.nessie.warehouse', 's3a://warehouse')
        .set('spark.sql.catalog.nessie.s3.endpoint', 'http://minio:9000')
        .set('spark.sql.catalog.nessie.io-impl', 'org.apache.iceberg.aws.s3.S3FileIO')
        #MINIO CREDENTIALS
        .set('spark.hadoop.fs.s3a.access.key', MINIO_ACCESS_KEY)
        .set('spark.hadoop.fs.s3a.secret.key', MINIO_SECRET_KEY)
)

spark = SparkSession.builder.config(conf=conf).getOrCreate()
spark.sparkContext.setLogLevel("INFO")

print("✅ Spark session started")

✅ Spark session started


25/04/28 17:02:07 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


# partitioning by day

In [35]:
spark.sql("CREATE NAMESPACE IF NOT EXISTS nessie.default")


DataFrame[]

In [36]:
spark.sql("""
CREATE TABLE IF NOT EXISTS nessie.default.evolve_test (
    id BIGINT,
    ts TIMESTAMP
)
USING iceberg
PARTITIONED BY (days(ts))
""")

25/04/28 17:08:18 INFO BaseMetastoreCatalog: Table properties set at catalog level through catalog properties: {gc.enabled=false, write.metadata.delete-after-commit.enabled=false}
25/04/28 17:08:18 INFO BaseMetastoreCatalog: Table properties enforced at catalog level through catalog properties: {}
25/04/28 17:08:18 INFO NessieIcebergClient: Committed 'default.evolve_test' against 'Branch{name=main, metadata=null, hash=62cb9f22976da05c9406cb22f4a3e5f78b476ff68b1009a35f4f96949a4268f9}', expected commit-id was 'f4d0c03304e3a63565dadc77670e9a1f1893673c86b65cfd65b001fa40f38a99'
25/04/28 17:08:18 INFO BaseMetastoreTableOperations: Successfully committed to table default.evolve_test in 16 ms
25/04/28 17:08:18 INFO BaseMetastoreTableOperations: Refreshing table metadata from new version: s3a://warehouse/default/evolve_test_16ea6446-4247-4065-afa6-5fd5a429f0a3/metadata/00000-e6c2a842-05f9-4cc9-bfe2-ffed92d09797.metadata.json
25/04/28 17:08:18 INFO NessieUtil: loadTableMetadata for 'default.evol

DataFrame[]

In [47]:
spark.sql("select * from nessie.default.evolve_test.snapshots").show()

+--------------------+-------------------+-------------------+---------+--------------------+--------------------+
|        committed_at|        snapshot_id|          parent_id|operation|       manifest_list|             summary|
+--------------------+-------------------+-------------------+---------+--------------------+--------------------+
|2025-04-28 17:29:...|3474037751171505592|               NULL|   append|s3a://warehouse/d...|{spark.app.id -> ...|
|2025-04-28 17:42:...|2089434004966214081|3474037751171505592|   append|s3a://warehouse/d...|{spark.app.id -> ...|
+--------------------+-------------------+-------------------+---------+--------------------+--------------------+



25/04/28 17:43:33 INFO BaseMetastoreTableOperations: Refreshing table metadata from new version: s3a://warehouse/default/evolve_test_16ea6446-4247-4065-afa6-5fd5a429f0a3/metadata/00003-53cc5ed8-4d8d-4daa-a370-17535ee32a11.metadata.json
25/04/28 17:43:33 INFO NessieUtil: loadTableMetadata for 'default.evolve_test' from location 's3a://warehouse/default/evolve_test_16ea6446-4247-4065-afa6-5fd5a429f0a3/metadata/00003-53cc5ed8-4d8d-4daa-a370-17535ee32a11.metadata.json' at 'Branch{name=main, metadata=null, hash=7a0eec1903a64204a45c141b5ee1719b329ee6aaa685e0a8824a9e3b232567cd}'
25/04/28 17:43:33 INFO BaseMetastoreCatalog: Table loaded by catalog: nessie.default.evolve_test.snapshots
25/04/28 17:43:33 INFO BaseMetastoreTableOperations: Refreshing table metadata from new version: s3a://warehouse/default/evolve_test_16ea6446-4247-4065-afa6-5fd5a429f0a3/metadata/00003-53cc5ed8-4d8d-4daa-a370-17535ee32a11.metadata.json
25/04/28 17:43:33 INFO NessieUtil: loadTableMetadata for 'default.evolve_test'

In [44]:
spark.sql("describe table nessie.default.evolve_test").show()

+--------------+----------+-------+
|      col_name| data_type|comment|
+--------------+----------+-------+
|            id|    bigint|   NULL|
|            ts| timestamp|   NULL|
|              |          |       |
|# Partitioning|          |       |
|        Part 0|  days(ts)|       |
|        Part 1|months(ts)|       |
+--------------+----------+-------+



25/04/28 17:40:14 INFO BaseMetastoreTableOperations: Refreshing table metadata from new version: s3a://warehouse/default/evolve_test_16ea6446-4247-4065-afa6-5fd5a429f0a3/metadata/00002-9cd2ab10-2aca-41d6-b28e-682e7c90fa1d.metadata.json
25/04/28 17:40:14 INFO NessieUtil: loadTableMetadata for 'default.evolve_test' from location 's3a://warehouse/default/evolve_test_16ea6446-4247-4065-afa6-5fd5a429f0a3/metadata/00002-9cd2ab10-2aca-41d6-b28e-682e7c90fa1d.metadata.json' at 'Branch{name=main, metadata=null, hash=b946eec63ce2f93fe6f3285c08bc0e8fcd074ccbcea80294187cacea86f5ea37}'


In [40]:
from datetime import datetime
from pyspark.sql import Row

# Create example data
data = [
    Row(id=1, ts=datetime(2024, 1, 10, 10, 0, 0)),
    Row(id=2, ts=datetime(2024, 1, 11, 11, 30, 0)),
    Row(id=3, ts=datetime(2024, 1, 12, 15, 45, 0)),
]

df = spark.createDataFrame(data)

# Insert into table
df.writeTo("nessie.default.evolve_test").append()


25/04/28 17:29:42 INFO BaseMetastoreTableOperations: Refreshing table metadata from new version: s3a://warehouse/default/evolve_test_16ea6446-4247-4065-afa6-5fd5a429f0a3/metadata/00000-e6c2a842-05f9-4cc9-bfe2-ffed92d09797.metadata.json
25/04/28 17:29:42 INFO NessieUtil: loadTableMetadata for 'default.evolve_test' from location 's3a://warehouse/default/evolve_test_16ea6446-4247-4065-afa6-5fd5a429f0a3/metadata/00000-e6c2a842-05f9-4cc9-bfe2-ffed92d09797.metadata.json' at 'Branch{name=main, metadata=null, hash=62cb9f22976da05c9406cb22f4a3e5f78b476ff68b1009a35f4f96949a4268f9}'
25/04/28 17:29:42 INFO BaseMetastoreCatalog: Table loaded by catalog: nessie.default.evolve_test
25/04/28 17:29:42 INFO CodeGenerator: Code generated in 5.348041 ms
25/04/28 17:29:42 INFO DAGScheduler: Registering RDD 51 (append at NativeMethodAccessorImpl.java:0) as input to shuffle 0
25/04/28 17:29:42 INFO DAGScheduler: Got map stage job 11 (append at NativeMethodAccessorImpl.java:0) with 12 output partitions
25/04/

# alter partitioning to month

In [43]:
spark.sql("""
ALTER TABLE nessie.default.evolve_test
ADD PARTITION FIELD months(ts)
""")

25/04/28 17:40:09 INFO NessieIcebergClient: Committed 'default.evolve_test' against 'Branch{name=main, metadata=null, hash=b946eec63ce2f93fe6f3285c08bc0e8fcd074ccbcea80294187cacea86f5ea37}', expected commit-id was 'ead8774eb36bf8b1d8084bdd0d0d3efe71272a3101533a03bcc832e1e32dd949'
25/04/28 17:40:09 INFO BaseMetastoreTableOperations: Successfully committed to table default.evolve_test in 27 ms


DataFrame[]

In [46]:
data2 = [
    Row(id=4, ts=datetime(2024, 2, 5, 9, 0, 0)),
    Row(id=5, ts=datetime(2024, 2, 15, 16, 30, 0)),
]

df2 = spark.createDataFrame(data2)
df2.writeTo("nessie.default.evolve_test").append()

25/04/28 17:42:02 INFO BaseMetastoreTableOperations: Refreshing table metadata from new version: s3a://warehouse/default/evolve_test_16ea6446-4247-4065-afa6-5fd5a429f0a3/metadata/00002-9cd2ab10-2aca-41d6-b28e-682e7c90fa1d.metadata.json
25/04/28 17:42:02 INFO NessieUtil: loadTableMetadata for 'default.evolve_test' from location 's3a://warehouse/default/evolve_test_16ea6446-4247-4065-afa6-5fd5a429f0a3/metadata/00002-9cd2ab10-2aca-41d6-b28e-682e7c90fa1d.metadata.json' at 'Branch{name=main, metadata=null, hash=b946eec63ce2f93fe6f3285c08bc0e8fcd074ccbcea80294187cacea86f5ea37}'
25/04/28 17:42:02 INFO BaseMetastoreCatalog: Table loaded by catalog: nessie.default.evolve_test
25/04/28 17:42:02 INFO DAGScheduler: Registering RDD 67 (append at NativeMethodAccessorImpl.java:0) as input to shuffle 1
25/04/28 17:42:02 INFO DAGScheduler: Got map stage job 15 (append at NativeMethodAccessorImpl.java:0) with 12 output partitions
25/04/28 17:42:02 INFO DAGScheduler: Final stage: ShuffleMapStage 16 (appe

In [54]:
spark.sql("SELECT * FROM nessie.default.evolve_test.partitions").show()


+------------------+-------+------------+----------+-----------------------------+----------------------------+--------------------------+----------------------------+--------------------------+--------------------+------------------------+
|         partition|spec_id|record_count|file_count|total_data_file_size_in_bytes|position_delete_record_count|position_delete_file_count|equality_delete_record_count|equality_delete_file_count|     last_updated_at|last_updated_snapshot_id|
+------------------+-------+------------+----------+-----------------------------+----------------------------+--------------------------+----------------------------+--------------------------+--------------------+------------------------+
| {2024-02-05, 649}|      1|           1|         1|                          730|                           0|                         0|                           0|                         0|2025-04-28 17:42:...|     2089434004966214081|
|{2024-01-12, NULL}|      0|        

25/04/28 17:52:48 INFO V2ScanRelationPushDown: 
Output: partition#1044, spec_id#1045, record_count#1046L, file_count#1047, total_data_file_size_in_bytes#1048L, position_delete_record_count#1049L, position_delete_file_count#1050, equality_delete_record_count#1051L, equality_delete_file_count#1052, last_updated_at#1053, last_updated_snapshot_id#1054L
         
25/04/28 17:52:48 INFO SnapshotScan: Scanning table nessie.default.evolve_test snapshot 2089434004966214081 created at 2025-04-28T17:42:03.181+00:00 with filter true
25/04/28 17:52:48 INFO SparkPartitioningAwareScan: Reporting UnknownPartitioning with 1 partition(s) for table nessie.default.evolve_test.partitions
25/04/28 17:52:48 INFO MemoryStore: Block broadcast_64 stored as values in memory (estimated size 32.0 KiB, free 434.2 MiB)
25/04/28 17:52:48 INFO MemoryStore: Block broadcast_64_piece0 stored as bytes in memory (estimated size 4.2 KiB, free 434.2 MiB)
25/04/28 17:52:48 INFO BlockManagerInfo: Added broadcast_64_piece0 in m

In [57]:
spark.sql("""CALL nessie.system.rewrite_data_files(
  table => 'nessie.default.evolve_test',
  options => map(
    'min-input-files', '1',          
    'target-file-size-bytes', '5242880'  
  )
)""")

25/04/28 18:15:12 INFO BaseMetastoreTableOperations: Refreshing table metadata from new version: s3a://warehouse/default/evolve_test_16ea6446-4247-4065-afa6-5fd5a429f0a3/metadata/00003-53cc5ed8-4d8d-4daa-a370-17535ee32a11.metadata.json
25/04/28 18:15:12 INFO NessieUtil: loadTableMetadata for 'default.evolve_test' from location 's3a://warehouse/default/evolve_test_16ea6446-4247-4065-afa6-5fd5a429f0a3/metadata/00003-53cc5ed8-4d8d-4daa-a370-17535ee32a11.metadata.json' at 'Branch{name=main, metadata=null, hash=7a0eec1903a64204a45c141b5ee1719b329ee6aaa685e0a8824a9e3b232567cd}'
25/04/28 18:15:12 INFO BaseMetastoreCatalog: Table loaded by catalog: nessie.default.evolve_test
25/04/28 18:15:12 INFO SnapshotScan: Scanning table nessie.default.evolve_test snapshot 2089434004966214081 created at 2025-04-28T17:42:03.181+00:00 with filter true
25/04/28 18:15:12 INFO LoggingMetricsReporter: Received metrics report: ScanReport{tableName=nessie.default.evolve_test, snapshotId=2089434004966214081, filte

DataFrame[rewritten_data_files_count: int, added_data_files_count: int, rewritten_bytes_count: bigint, failed_data_files_count: int]

In [60]:
spark.sql("SELECT * FROM nessie.default.evolve_test.files").show(truncate=False)


+-------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-----------+-------+-----------------+------------+------------------+------------------+----------------+-----------------+----------------+----------------------------------------------------------------+----------------------------------------------------------------+------------+-------------+------------+-------------+--------------------+--------------+---------------------+------------------------------------------------------------------------------------+
|content|file_path                                                                                                                                                                     |file_format|spec_id|partition        |record_count|file_size_in_bytes|column_sizes      |value_counts    |null_value_counts|nan_value_counts|lower_bounds          

25/04/28 18:34:33 INFO BaseMetastoreTableOperations: Refreshing table metadata from new version: s3a://warehouse/default/evolve_test_16ea6446-4247-4065-afa6-5fd5a429f0a3/metadata/00004-38f1b844-df18-4197-9c7e-2ef64496fdf6.metadata.json
25/04/28 18:34:33 INFO NessieUtil: loadTableMetadata for 'default.evolve_test' from location 's3a://warehouse/default/evolve_test_16ea6446-4247-4065-afa6-5fd5a429f0a3/metadata/00004-38f1b844-df18-4197-9c7e-2ef64496fdf6.metadata.json' at 'Branch{name=main, metadata=null, hash=f6b014ba75cdc9ef820de0493af352ac85fbc4ec57047b39da9001bb40dd2df5}'
25/04/28 18:34:33 INFO BaseMetastoreCatalog: Table loaded by catalog: nessie.default.evolve_test.files
25/04/28 18:34:33 INFO BaseMetastoreTableOperations: Refreshing table metadata from new version: s3a://warehouse/default/evolve_test_16ea6446-4247-4065-afa6-5fd5a429f0a3/metadata/00004-38f1b844-df18-4197-9c7e-2ef64496fdf6.metadata.json
25/04/28 18:34:33 INFO NessieUtil: loadTableMetadata for 'default.evolve_test' fro

👉 Iceberg partitions are logical groups in metadata to optimize query planning and file skipping,
👉 not strict control of how many physical folders/files you have in storage.

✅ You can have 1,000 files for the same logical partition if small writes happen — and Iceberg still efficiently filters them at query time.

✅ To physically consolidate small files — rewrite/compact must be triggered manually or automatically by a system like Flink Iceberg or Spark Actions.

In [61]:
query = """
SELECT * FROM nessie.default.evolve_test
WHERE ts BETWEEN TIMESTAMP('2024-01-01') AND TIMESTAMP('2024-01-31')
"""

# 2. Run EXPLAIN to see the plan
spark.sql(f"EXPLAIN EXTENDED {query}").show(truncate=False)

+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

25/04/28 19:55:17 INFO BaseMetastoreTableOperations: Refreshing table metadata from new version: s3a://warehouse/default/evolve_test_16ea6446-4247-4065-afa6-5fd5a429f0a3/metadata/00004-38f1b844-df18-4197-9c7e-2ef64496fdf6.metadata.json
25/04/28 19:55:17 INFO NessieUtil: loadTableMetadata for 'default.evolve_test' from location 's3a://warehouse/default/evolve_test_16ea6446-4247-4065-afa6-5fd5a429f0a3/metadata/00004-38f1b844-df18-4197-9c7e-2ef64496fdf6.metadata.json' at 'Branch{name=main, metadata=null, hash=f6b014ba75cdc9ef820de0493af352ac85fbc4ec57047b39da9001bb40dd2df5}'
25/04/28 19:55:17 INFO BaseMetastoreCatalog: Table loaded by catalog: nessie.default.evolve_test
25/04/28 19:55:17 INFO V2ScanRelationPushDown: 
Pushing operators to nessie.default.evolve_test
Pushed Filters: ts IS NOT NULL, ts >= 1704067200000000, ts <= 1706659200000000
Post-Scan Filters: isnotnull(ts#1624),(ts#1624 >= 2024-01-01 00:00:00),(ts#1624 <= 2024-01-31 00:00:00)
         
25/04/28 19:55:17 INFO V2ScanRelati