In [0]:
# Spark Session
from pyspark.sql import SparkSession
spark = (
        SparkSession
        .builder
        .appName("Optimizing Joins")
        .master("local[*]")
        .config("spark.executor.cores",4)
        .config("spark.cores.max",16)
        .config("spark.executor.memory", "512M")
        .getOrCreate()
)
spark


In [0]:
# Disable Adaptive Query Engine(AQE) and Broadcast Join
spark.conf.set("spark.sql.adaptive.enabled", False)
spark.conf.set("spark.sql.adaptive.coalescePartitions.enabled", False)
spark.conf.set("spark.sql.adaptive.autoBroadcastJoinThreshold", -1)

In [0]:
# Read EMP CSV file with 10 million records
emp_schema = "first_name string, last_name string, job_title string, dob date, email string, phone string, salary double, department string, department_id integer"
emp = spark.read.schema(emp_schema).option("header",True).csv("/data/input/datasets/employee_recs.csv")

In [0]:
# Read DEPT CSV file with 10 records
dept_schema ="department_id int, department_name string, description string, city string, state string, country string "
dept = spark.read.schema(dept_schema).option("header",True).csv("/data/input/datasets/department_recs.csv")

In [0]:
# JOINING datasets
dfjoined = emp.join(dept, on='department_id', how="left_outer")

- '*noop"* will simulate and read the whole employee and department dataframe.
- Simulating the whole dataframe allows us to benchmark the timings and understand the join better
- The *"noop"* format is a special format that performs no actual I/O operation; it's essentially a no-operation (no-op).
- The *"noop"* doesn't write anywhere, no data is saved.

In [0]:
dfjoined.write.format("noop").mode("overwrite").save()
dfjoined.explain()

== Physical Plan ==
*(2) Project [department_id#97, first_name#89, last_name#90, job_title#91, dob#92, email#93, phone#94, salary#95, department#96, department_name#108, description#109, city#110, state#111, country#112]
+- *(2) BroadcastHashJoin [department_id#97], [department_id#107], LeftOuter, BuildRight, false, false
   :- FileScan csv [first_name#89,last_name#90,job_title#91,dob#92,email#93,phone#94,salary#95,department#96,department_id#97] Batched: false, DataFilters: [], Format: CSV, Location: InMemoryFileIndex(1 paths)[dbfs:/data/input/datasets/employee_recs.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<first_name:string,last_name:string,job_title:string,dob:date,email:string,phone:string,sal...
   +- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, false] as bigint)),false), [plan_id=197]
      +- *(1) Filter isnotnull(department_id#107)
         +- FileScan csv [department_id#107,department_name#108,description#109,city#110,state#111,

- The reason dfjoined.explain() shows "BroadCastHashJoin" above even though we didn't explicitly use a broadcast join is due to Spark's automatic optimization.
- Post spark 2.0 by default data less than 10MB is broadcasted and join operation will be sort merge
- We can disable automatic broadcast joins: spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1).
- Let's see how the explain looks without broadcast below

In [0]:
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1)
dfjoined = emp.join(dept, on='department_id', how="left_outer")
dfjoined.write.format("noop").mode("overwrite").save()
dfjoined.explain()

== Physical Plan ==
*(2) Project [department_id#97, first_name#89, last_name#90, job_title#91, dob#92, email#93, phone#94, salary#95, department#96, department_name#108, description#109, city#110, state#111, country#112]
+- *(2) SortMergeJoin [department_id#97], [department_id#107], LeftOuter
   :- Sort [department_id#97 ASC NULLS FIRST], false, 0
   :  +- Exchange hashpartitioning(department_id#97, 200), ENSURE_REQUIREMENTS, [plan_id=559]
   :     +- FileScan csv [first_name#89,last_name#90,job_title#91,dob#92,email#93,phone#94,salary#95,department#96,department_id#97] Batched: false, DataFilters: [], Format: CSV, Location: InMemoryFileIndex(1 paths)[dbfs:/data/input/datasets/employee_recs.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<first_name:string,last_name:string,job_title:string,dob:date,email:string,phone:string,sal...
   +- Sort [department_id#107 ASC NULLS FIRST], false, 0
      +- Exchange hashpartitioning(department_id#107, 200), ENSURE_REQUIREMENTS, [p

### USING BROADCAST JOIN (explicitly)

- When the data is brodcasted, it is present in all of the excutors which in turn eliminates shuffling leading to performance optimiziation.

In [0]:
# JOINING datasets using broadcast join

from pyspark.sql.functions import broadcast
dfjoined_brodcasted= emp.join(broadcast(dept), on='department_id', how="left_outer")
dfjoined_brodcasted.write.format("noop").mode("overwrite").save()
dfjoined_brodcasted.explain()


== Physical Plan ==
*(2) Project [department_id#97, first_name#89, last_name#90, job_title#91, dob#92, email#93, phone#94, salary#95, department#96, department_name#108, description#109, city#110, state#111, country#112]
+- *(2) BroadcastHashJoin [department_id#97], [department_id#107], LeftOuter, BuildRight, false, false
   :- FileScan csv [first_name#89,last_name#90,job_title#91,dob#92,email#93,phone#94,salary#95,department#96,department_id#97] Batched: false, DataFilters: [], Format: CSV, Location: InMemoryFileIndex(1 paths)[dbfs:/data/input/datasets/employee_recs.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<first_name:string,last_name:string,job_title:string,dob:date,email:string,phone:string,sal...
   +- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, false] as bigint)),false), [plan_id=447]
      +- *(1) Filter isnotnull(department_id#107)
         +- FileScan csv [department_id#107,department_name#108,description#109,city#110,state#111,

- What if you do broadcast on a larger dataframe? Let's check below

In [0]:
# Read Sales CSV file with 1 million records
sschema ="transacted_at string, trx_id string, retailer_id string, description string, amount double, city_id string"
sales = spark.read.schema(sschema).option("header",True).csv("/data/input/datasets/sales.csv")

In [0]:
sales.createOrReplaceTempView("Sales")

In [0]:
# Read Cities CSV file with 5 million records
cschema ="city_id string, city string, state string, state_abv string, country string"
cities = spark.read.schema(cschema).option("header",True).csv("/data/input/datasets/cities.csv")

In [0]:
cities.createOrReplaceTempView("Cities")

In [0]:
# JOIN data
dfjoinedsales = sales.join(broadcast(cities), on='city_id', how="left_outer")
dfjoinedsales.write.format("noop").mode("overwrite").save()  # Action to check whats happening in the Spark UI

- The above action will fail for most of the tasks! Since you are broadcasting the join to a larger dataframe which is not allowed! (check Spark UI for more)

**Broadcast using SQL Hints**

In [0]:
spark.conf.set("spark.sql.adaptive.enabled", False)
spark.conf.get("spark.sql.adaptive.enabled")


Out[6]: 'false'

In [0]:
spark.conf.set("spark.sql.adaptive.autoBroadcastJoinThreshold", -1)
spark.conf.get("spark.sql.adaptive.autoBroadcastJoinThreshold")

Out[10]: '-1'

In [0]:
dfsqlopt= spark.sql("""
                    SELECT *
                    FROM Sales s
                    JOIN Cities c ON s.city_id=c.city_id
                    """)
dfsqlopt.show()

+-------------+-------------+-----------+-----------+------+-------+-------+-------+--------+---------+-------+
|transacted_at|       trx_id|retailer_id|description|amount|city_id|city_id|   city|   state|state_abv|country|
+-------------+-------------+-----------+-----------+------+-------+-------+-------+--------+---------+-------+
|   2020-03-19|TRX0001249281|      R0021|  Furniture|3681.2|   C003|   C003|Chicago|Illinois|       IL|    USA|
|   2020-03-19|TRX0001249281|      R0021|  Furniture|3681.2|   C003|   C003|Chicago|Illinois|       IL|    USA|
|   2020-03-19|TRX0001249281|      R0021|  Furniture|3681.2|   C003|   C003|Chicago|Illinois|       IL|    USA|
|   2020-03-19|TRX0001249281|      R0021|  Furniture|3681.2|   C003|   C003|Chicago|Illinois|       IL|    USA|
|   2020-03-19|TRX0001249281|      R0021|  Furniture|3681.2|   C003|   C003|Chicago|Illinois|       IL|    USA|
|   2020-03-19|TRX0001249281|      R0021|  Furniture|3681.2|   C003|   C003|Chicago|Illinois|       IL| 

In [0]:
dfsqlopt.explain()

== Physical Plan ==
*(3) SortMergeJoin [city_id#7], [city_id#14], Inner
:- Sort [city_id#7 ASC NULLS FIRST], false, 0
:  +- Exchange hashpartitioning(city_id#7, 200), ENSURE_REQUIREMENTS, [plan_id=458]
:     +- *(1) Filter isnotnull(city_id#7)
:        +- FileScan csv [transacted_at#2,trx_id#3,retailer_id#4,description#5,amount#6,city_id#7] Batched: false, DataFilters: [isnotnull(city_id#7)], Format: CSV, Location: InMemoryFileIndex(1 paths)[dbfs:/data/input/datasets/sales.csv], PartitionFilters: [], PushedFilters: [IsNotNull(city_id)], ReadSchema: struct<transacted_at:string,trx_id:string,retailer_id:string,description:string,amount:double,cit...
+- Sort [city_id#14 ASC NULLS FIRST], false, 0
   +- Exchange hashpartitioning(city_id#14, 200), ENSURE_REQUIREMENTS, [plan_id=464]
      +- *(2) Filter isnotnull(city_id#14)
         +- FileScan csv [city_id#14,city#15,state#16,state_abv#17,country#18] Batched: false, DataFilters: [isnotnull(city_id#14)], Format: CSV, Location: InMemoryFileI

In [0]:
dfsqloptbroadcast= spark.sql("""
                    SELECT /*+ broadcast(s)*/ *
                    FROM Sales s
                    JOIN Cities c ON s.city_id=c.city_id
                    """)
dfsqloptbroadcast.show()

+-------------+-------------+-----------+-----------+-------+-------+-------+-----------+----------+---------+-------+
|transacted_at|       trx_id|retailer_id|description| amount|city_id|city_id|       city|     state|state_abv|country|
+-------------+-------------+-----------+-----------+-------+-------+-------+-----------+----------+---------+-------+
|   2022-11-02|TRX0000624636|      R0639|  Groceries| 132.57|   C002|   C002|Los Angeles|California|       CA|    USA|
|   2022-11-12|TRX0000624632|      R0569|   Clothing| 202.14|   C002|   C002|Los Angeles|California|       CA|    USA|
|   2022-12-10|TRX0000624631|      R0629|  Furniture| 4429.3|   C002|   C002|Los Angeles|California|       CA|    USA|
|   2022-06-07|TRX0000624628|      R0888|   Clothing|3699.46|   C002|   C002|Los Angeles|California|       CA|    USA|
|   2021-06-10|TRX0000624617|      R0660|      Books|2326.66|   C002|   C002|Los Angeles|California|       CA|    USA|
|   2022-05-27|TRX0000624611|      R0001|Electro

In [0]:
dfsqloptbroadcast.explain()

== Physical Plan ==
*(2) BroadcastHashJoin [city_id#7], [city_id#14], Inner, BuildLeft, false, false
:- BroadcastExchange HashedRelationBroadcastMode(ArrayBuffer(input[5, string, false]),false), [plan_id=971]
:  +- *(1) Filter isnotnull(city_id#7)
:     +- FileScan csv [transacted_at#2,trx_id#3,retailer_id#4,description#5,amount#6,city_id#7] Batched: false, DataFilters: [isnotnull(city_id#7)], Format: CSV, Location: InMemoryFileIndex(1 paths)[dbfs:/data/input/datasets/sales.csv], PartitionFilters: [], PushedFilters: [IsNotNull(city_id)], ReadSchema: struct<transacted_at:string,trx_id:string,retailer_id:string,description:string,amount:double,cit...
+- *(2) Filter isnotnull(city_id#14)
   +- FileScan csv [city_id#14,city#15,state#16,state_abv#17,country#18] Batched: false, DataFilters: [isnotnull(city_id#14)], Format: CSV, Location: InMemoryFileIndex(1 paths)[dbfs:/data/input/datasets/cities.csv], PartitionFilters: [], PushedFilters: [IsNotNull(city_id)], ReadSchema: struct<city_id:stri

### BUCKETING

> IMPORTANT
- If you want to write the DataFrame to a specific path (not as a table), you cannot use bucketBy directly with .save(path)—bucketBy is only supported with saveAsTable (for Hive tables).
- Spark’s `.bucketBy()` method **does not support bucketing for CSV files** when writing data. If you use `.bucketBy()` with `.format("csv")`, Spark will ignore the bucketing and simply write partitioned CSV files. This is documented behavior in Spark: bucketing is only supported for certain formats (like Parquet, ORC, and Hive tables), not CSV.
> 
- **Why you might see performance improvement:**  
- When you use `.bucketBy()` with CSV, Spark still writes out multiple files (one per task/partition), which can look similar to bucketing, but these are not true buckets. Any performance gain is likely due to:
- - Reduced file size per partition (easier parallelism)
- - Partitioning by key, not true bucketing
> 
- **True bucketing** means Spark writes special metadata and organizes files so that Spark can skip shuffling during joins. This only works for supported formats.
> 
- **Summary:**  
- - `.bucketBy()` is ignored for CSV writes; no bucket metadata is written.
- - Any observed performance gain is due to partitioning, not bucketing.
- For true bucketing and shuffle reduction in joins, use Parquet, ORC, or Hive tables.
> 
**Reference:**  
[Apache Spark Documentation – Bucketing](https://spark.apache.org/docs/latest/sql-data-sources-bucketization.html)  
> "Currently, bucketed tables are only supported for Hive-compatible file formats such as Parquet and ORC."

In [0]:
# Write Sales data in Buckets
sales.write.format("csv").bucketBy(4, "city_id").option("header",True).option("path", "/data/input/datasets/bucketed_sales.csv").mode("overwrite").saveAsTable("salesBucket")

In [0]:
# Write Cities data in Buckets
cities.write.format("csv").bucketBy(4, "city_id").option("header",True).option("path", "/data/input/datasets/bucketed_cities.csv").mode("overwrite").saveAsTable("citiesBucket")

In [0]:
# READ sales table
salesBucket = spark.read.table("salesBucket")

In [0]:
# READ cities table
citiesBucket = spark.read.table("citiesBucket")

In [0]:
# JOIN the above BUCKETED Tables 
joined = salesBucket.join(citiesBucket, on=salesBucket.city_id==citiesBucket.city_id, how="left_outer")

In [0]:
joined.explain()

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- SortMergeJoin [city_id#854], [city_id#861], LeftOuter
   :- Sort [city_id#854 ASC NULLS FIRST], false, 0
   :  +- FileScan csv spark_catalog.default.salesbucket[transacted_at#849,trx_id#850,retailer_id#851,description#852,amount#853,city_id#854] Batched: false, Bucketed: true, DataFilters: [], Format: CSV, Location: InMemoryFileIndex(1 paths)[dbfs:/data/input/datasets/bucketed_sales.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<transacted_at:string,trx_id:string,retailer_id:string,description:string,amount:double,cit..., SelectedBucketsCount: 4 out of 4
   +- Sort [city_id#861 ASC NULLS FIRST], false, 0
      +- Filter isnotnull(city_id#861)
         +- FileScan csv spark_catalog.default.citiesbucket[city_id#861,city#862,state#863,state_abv#864,country#865] Batched: false, Bucketed: true, DataFilters: [isnotnull(city_id#861)], Format: CSV, Location: InMemoryFileIndex(1 paths)[dbfs:/data/input/datasets/bucket

In the above explain plan:
- FileScan on both tables shows Bucketed: true, with SelectedBucketsCount: 4 out of 4, meaning all buckets are being utilized.
- The plan shows Sort operations followed by a SortMergeJoin, which is expected for bucketed tables.

> Key points:
- Spark is leveraging bucketing for the join, which explains why the join itself is very fast (<1s).
- The join is not shuffling data across the cluster, just reading from bucketed files and merging

### Points To Note

- Joining columns different from Bucket column, same Bucket size - shuffle on both tabless
- Joining columns same, One table in bucket - Shuffle on non Bucketed table
- Joining columns same, Different bucket size - Shuffle on smaller Bucketed table
- Joining columns same, Same bucket size - No Shuffle (Faster Join)
> 
- So, its very important to choose Bucket column and Bucket size.
- Decide effectively on number of Buckets, as too many Buckets with not enough data can lead to small size issue
- Prefer Shuffle Hash Join if datasets are small.