In [1]:
import findspark
findspark.init()
findspark.find()

'/Users/akashdeepgupta/Documents/project-repos/pyspark-35-setup/spark-3.5.3-bin-hadoop3'

In [None]:
from pyspark.sql import SparkSession

DW_PATH='/Users/akashdeepgupta/Documents/project-repos/pyspark-playground/warehouse'
# SPARK_VERSION='3.5'
# ICEBERG_VERSION='1.5.0'

spark = SparkSession.builder \
    .master("local[4]") \
    .appName("iceberg-poc") \
    .config('spark.jars.packages', f'org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.5.0,org.apache.spark:spark-avro_2.12:3.5.0')\
    .config('spark.sql.extensions','org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions')\
    .config('spark.sql.catalog.local','org.apache.iceberg.spark.SparkCatalog') \
    .config('spark.sql.catalog.local.type','hadoop') \
    .config('spark.sql.catalog.local.warehouse',DW_PATH) \
    .getOrCreate()

In [None]:
from pyspark.sql.functions import rand, lit, array, rand, when, col

TGT_TBL = "local.db.emp_bv_details"

t1 = spark.range(30000).withColumn("year", 
                                when(col("id") <= 10000, lit(2023))\
                                .when(col("id").between(10001, 15000), lit(2024))\
                                .otherwise(lit(2025))
                                )
t1 = t1.withColumn("business_vertical", array(
        lit("Retail"), 
        lit("SME"), 
        lit("Cor"), 
        lit("Analytics")
        ).getItem((rand()*4).cast("int")))\
        .withColumn("is_updated", lit(False))

t1.coalesce(1).writeTo(TGT_TBL).partitionedBy('year').using('iceberg')\
    .tableProperty('format-version','2')\
    .tableProperty('write.delete.mode','merge-on-read')\
    .tableProperty('write.update.mode','merge-on-read')\
    .tableProperty('write.merge.mode','merge-on-read')\
    .create()

In [5]:
from pyspark.sql.functions import max, min
spark.table(TGT_TBL).groupBy("year").agg(min("id").alias("min_id"), max("id").alias("max_id")).orderBy("year").show()

+----+------+------+
|year|min_id|max_id|
+----+------+------+
|2023|     0| 10000|
|2024| 10001| 15000|
|2025| 15001| 29999|
+----+------+------+



In [7]:
# New department created called Sales and 3000 employees switched in 2025
updated_records = spark.range(15000, 18001).withColumn("year", lit(2025)).withColumn("business_vertical", lit("Sales"))
STG_TBL = "local.db.emp_bv_updates"

# updated_records.groupBy("year").agg(min("id")).show()
updated_records.coalesce(1).writeTo(STG_TBL).partitionedBy('year').using('iceberg')\
    .tableProperty('format-version','2')\
    .create()

In [8]:
spark.sql(f"""
MERGE INTO {TGT_TBL} as tgt
USING (Select *, False as is_updated from {STG_TBL}) as src
ON tgt.id = src.id
WHEN MATCHED AND src.business_vertical <> tgt.business_vertical AND tgt.year = src.year THEN
    UPDATE SET tgt.is_updated = True, tgt.business_vertical = src.business_vertical
WHEN NOT MATCHED THEN
    INSERT *
""")

DataFrame[]

## Physical Plan BEFORE Optimization

```sql
== Physical Plan ==
WriteDelta (32)
+- AdaptiveSparkPlan (31)
   +- == Final Plan ==
      AQEShuffleRead (20)
      +- ShuffleQueryStage (19), Statistics(sizeInBytes=1031.3 KiB, rowCount=6.00E+3)
         +- Exchange (18)
            +- MergeRows (17)
               +- * SortMergeJoin RightOuter (16)
                  :- * Sort (8)
                  :  +- AQEShuffleRead (7)
                  :     +- ShuffleQueryStage (6), Statistics(sizeInBytes=7.8 MiB, rowCount=3.00E+4)
                  :        +- Exchange (5)
                  :           +- * Filter (4)
                  :              +- * Project (3)
                  :                 +- * ColumnarToRow (2)
                  :                    +- BatchScan local.db.emp_bv_details (1)
                  +- * Sort (15)
                     +- AQEShuffleRead (14)
                        +- ShuffleQueryStage (13), Statistics(sizeInBytes=140.7 KiB, rowCount=3.00E+3)
                           +- Exchange (12)
                              +- * Project (11)
                                 +- * ColumnarToRow (10)
                                    +- BatchScan local.db.emp_bv_updates (9)
```

```sql
-- No filters pushed down during Batch Scan
(1) BatchScan local.db.emp_bv_details
Output [7]: [id#498L, year#499, business_vertical#500, _file#507, _pos#508L, _spec_id#505, _partition#506]
local.db.emp_bv_details (branch=null) [filters=, groupedBy=]
```

## Optimized Merge Implementation

In [11]:
TGT_TBL = "local.db.emp_bv_details"
STG_TBL = "local.db.emp_bv_updates"

# To avoid Pre-Sorting before writing into table
spark.sql(f"""ALTER TABLE {TGT_TBL} SET TBLPROPERTIES (
    'write.spark.fanout.enabled'='true'
)""")

DataFrame[]

In [12]:
# To avoid sort due to Sort Merge Join by prefering Hash Join if possible.
spark.conf.set('spark.sql.join.preferSortMergeJoin', 'false')

# To avoid Shuffle before writing into table.
spark.conf.set("spark.sql.iceberg.distribution-mode", "none")

# Enabling SPJ
spark.conf.set('spark.sql.sources.v2.bucketing.enabled','true')
spark.conf.set('spark.sql.sources.v2.bucketing.pushPartValues.enabled','true')
spark.conf.set('spark.sql.iceberg.planning.preserve-data-grouping','true')
spark.conf.set('spark.sql.requireAllClusterKeysForCoPartition','false')
spark.conf.set('spark.sql.sources.v2.bucketing.partiallyClusteredDistribution.enabled','true')

In [13]:
# Updating some employees departments after setting some configs for optimization
updated_records = spark.range(28000, 35001).withColumn("year", lit(2025)).withColumn("business_vertical", lit("DataEngineering"))
updated_records.coalesce(1).writeTo(STG_TBL).overwritePartitions()

In [14]:
spark.table(STG_TBL).groupBy("year", "business_vertical").count().show()

+----+-----------------+-----+
|year|business_vertical|count|
+----+-----------------+-----+
|2025|  DataEngineering| 7001|
+----+-----------------+-----+



In [29]:
# Important things to notice:
# 1. filter applied in on condition -- any filter here will be pushed down.
# 2. partitioned column in join, if possible to enabled SPJ
spark.sql(f"""
MERGE INTO {TGT_TBL} as tgt
USING (SELECT *, False as is_updated from {STG_TBL}) as src
ON tgt.id = src.id AND tgt.year = src.year AND tgt.year = 2025
WHEN MATCHED AND 
    tgt.business_vertical <> src.business_vertical 
    THEN
    UPDATE SET tgt.is_updated = True, tgt.business_vertical = src.business_vertical
WHEN NOT MATCHED THEN
    INSERT *
""")

DataFrame[]

## Physical Plan AFTER Optimization
```sql
== Physical Plan ==
WriteDelta (16)
+- AdaptiveSparkPlan (15)
   +- == Final Plan ==
      MergeRows (9)
      +- * ShuffledHashJoin RightOuter BuildRight (8)
         :- * Filter (4)
         :  +- * Project (3)
         :     +- * ColumnarToRow (2)
         :        +- BatchScan local.db.emp_bv_details (1)
         +- * Project (7)
            +- * ColumnarToRow (6)
               +- BatchScan local.db.emp_bv_updates (5)
```

```sql
-- Push down filter of year=2025
(1) BatchScan local.db.emp_bv_details
Output [7]: [id#770L, year#771, business_vertical#772, _file#779, _pos#780L, _spec_id#777, _partition#778]
local.db.emp_bv_details (branch=null) [filters=year IS NOT NULL, year = 2025, groupedBy=year]
```

In [None]:
spark.table(f"{TGT_TBL}.history").show()

In [31]:
from pyspark.sql.functions import max, min
spark.table(TGT_TBL).where("business_vertical = 'DataEngineering'").groupBy("is_updated").agg(min("id"), max("id")).show()

+----------+-------+-------+
|is_updated|min(id)|max(id)|
+----------+-------+-------+
|      true|  28000|  29999|
|     false|  30000|  35000|
+----------+-------+-------+

