#### Broadcast Hash Join (BHJ)

- When: One dataset is very small (fits in each executor‚Äôs memory, typically <10MB by default).

- How: Spark broadcasts (sends a full copy of) the small dataset to all executors.

- Why: Avoids shuffling the big dataset ‚Äî fastest possible join when one side is small.

- Cost: Broadcast communication overhead, but no shuffle.

- Hint: broadcast(df) or df.hint("broadcast").

When you join two datasets in Spark (say orders and customers), Spark must shuffle data between executors so that rows with the same join key land on the same machine.

This shuffle is expensive ‚Äî it involves:

-Disk I/O

-Network transfer

-Serialization/deserialization

-Memory overhead

üí° So if one dataset is small enough, we can avoid shuffle altogether by sending that small dataset to every executor.
This is called a Broadcast Join (or Map-side Join).

In [2]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("JoinStrategiesDemo").getOrCreate()

customers = spark.read.csv("customers.csv", header=True, inferSchema=True)
orders = spark.read.csv("order.csv", header=True, inferSchema=True)

customers.show(5)
orders.show(5)


+-------+-----+------+---+
|cust_id| name|region|age|
+-------+-----+------+---+
|      1|Alice| North| 28|
|      2|  Bob| South| 35|
|      3|Carol|  East| 40|
|      4|David|  West| 23|
|      5|  Eva| South| 31|
+-------+-----+------+---+
only showing top 5 rows
+--------+-------+----------+------+
|order_id|cust_id|order_date|amount|
+--------+-------+----------+------+
|     101|      1|2024-01-01|   250|
|     102|      2|2024-01-03|   300|
|     103|      2|2024-01-05|   150|
|     104|      3|2024-02-01|   500|
|     105|      5|2024-02-12|   400|
+--------+-------+----------+------+
only showing top 5 rows


In [4]:
from pyspark.sql.functions import broadcast

bhj_df = orders.join(broadcast(customers), orders.cust_id == customers.cust_id, "inner")
bhj_df.explain()

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- BroadcastHashJoin [cust_id#117], [cust_id#95], Inner, BuildRight, false
   :- Filter isnotnull(cust_id#117)
   :  +- FileScan csv [order_id#116,cust_id#117,order_date#118,amount#119] Batched: false, DataFilters: [isnotnull(cust_id#117)], Format: CSV, Location: InMemoryFileIndex(1 paths)[file:/home/developer/Workspace_Projects/Data_Engineer/PySpark/order.csv], PartitionFilters: [], PushedFilters: [IsNotNull(cust_id)], ReadSchema: struct<order_id:int,cust_id:int,order_date:date,amount:int>
   +- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, false] as bigint)),false), [plan_id=145]
      +- Filter isnotnull(cust_id#95)
         +- FileScan csv [cust_id#95,name#96,region#97,age#98] Batched: false, DataFilters: [isnotnull(cust_id#95)], Format: CSV, Location: InMemoryFileIndex(1 paths)[file:/home/developer/Workspace_Projects/Data_Engineer/PySpark/customer..., PartitionFilters: [], PushedFilters: [IsNotNull(cus

#### Shuffle Hash Join (SHJ)

- When: Both sides are moderate in size but not sorted; each must be shuffled so rows with same join key end up on same partition.

- Spark builds hash tables on one side (usually smaller).

- Cost: Both sides shuffle, but uses in-memory hash map to probe.

- Requirements: Join key must be equi-join (using ==).

In [15]:
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1)

shj_df = orders.join(customers, "cust_id")
shj_df.explain()

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- Project [cust_id#117, order_id#116, order_date#118, amount#119, name#96, region#97, age#98]
   +- SortMergeJoin [cust_id#117], [cust_id#95], Inner
      :- Sort [cust_id#117 ASC NULLS FIRST], false, 0
      :  +- Exchange hashpartitioning(cust_id#117, 200), ENSURE_REQUIREMENTS, [plan_id=470]
      :     +- Filter isnotnull(cust_id#117)
      :        +- FileScan csv [order_id#116,cust_id#117,order_date#118,amount#119] Batched: false, DataFilters: [isnotnull(cust_id#117)], Format: CSV, Location: InMemoryFileIndex(1 paths)[file:/home/developer/Workspace_Projects/Data_Engineer/PySpark/order.csv], PartitionFilters: [], PushedFilters: [IsNotNull(cust_id)], ReadSchema: struct<order_id:int,cust_id:int,order_date:date,amount:int>
      +- Sort [cust_id#95 ASC NULLS FIRST], false, 0
         +- Exchange hashpartitioning(cust_id#95, 200), ENSURE_REQUIREMENTS, [plan_id=471]
            +- Filter isnotnull(cust_id#95)
               +- Fil

In [24]:
orders.hint("shuffle_hash").join(customers, "cust_id").explain()


== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- Project [cust_id#117, order_id#116, order_date#118, amount#119, name#96, region#97, age#98]
   +- ShuffledHashJoin [cust_id#117], [cust_id#95], Inner, BuildLeft
      :- Exchange hashpartitioning(cust_id#117, 200), ENSURE_REQUIREMENTS, [plan_id=871]
      :  +- Filter isnotnull(cust_id#117)
      :     +- FileScan csv [order_id#116,cust_id#117,order_date#118,amount#119] Batched: false, DataFilters: [isnotnull(cust_id#117)], Format: CSV, Location: InMemoryFileIndex(1 paths)[file:/home/developer/Workspace_Projects/Data_Engineer/PySpark/order.csv], PartitionFilters: [], PushedFilters: [IsNotNull(cust_id)], ReadSchema: struct<order_id:int,cust_id:int,order_date:date,amount:int>
      +- Exchange hashpartitioning(cust_id#95, 200), ENSURE_REQUIREMENTS, [plan_id=872]
         +- Filter isnotnull(cust_id#95)
            +- FileScan csv [cust_id#95,name#96,region#97,age#98] Batched: false, DataFilters: [isnotnull(cust_id#95)], Format: C

#### Sort Merge Join (SMJ)

- When: Large datasets; Spark sorts both sides on the join key and then merges them (like merging two sorted arrays).

- How: Requires both sides to be hash-partitioned on join key, then sorted.

- Cost: Heavy CPU on sorting + shuffling both sides.

- Benefit: Very scalable and stable; used for large fact‚Äìfact joins.

In [23]:
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1) 
smj_df = orders.hint("merge").join(customers, "cust_id")
smj_df.explain(True)


== Parsed Logical Plan ==
'Join UsingJoin(Inner, [cust_id])
:- ResolvedHint (strategy=merge)
:  +- Relation [order_id#116,cust_id#117,order_date#118,amount#119] csv
+- Relation [cust_id#95,name#96,region#97,age#98] csv

== Analyzed Logical Plan ==
cust_id: int, order_id: int, order_date: date, amount: int, name: string, region: string, age: int
Project [cust_id#117, order_id#116, order_date#118, amount#119, name#96, region#97, age#98]
+- Join Inner, (cust_id#117 = cust_id#95)
   :- ResolvedHint (strategy=merge)
   :  +- Relation [order_id#116,cust_id#117,order_date#118,amount#119] csv
   +- Relation [cust_id#95,name#96,region#97,age#98] csv

== Optimized Logical Plan ==
Project [cust_id#117, order_id#116, order_date#118, amount#119, name#96, region#97, age#98]
+- Join Inner, (cust_id#117 = cust_id#95), leftHint=(strategy=merge)
   :- Filter isnotnull(cust_id#117)
   :  +- Relation [order_id#116,cust_id#117,order_date#118,amount#119] csv
   +- Filter isnotnull(cust_id#95)
      +- Relat

#### Shuffle-and-Replicate Nested Loop Join (a.k.a. Cartesian Join)


When: Cross join between two large datasets (no join condition at all).

How: Spark replicates all partitions of one side to every partition of the other.

Cost: Extremely expensive ‚Äî can produce billions of combinations!

In [25]:
cross_df = customers.crossJoin(orders)
cross_df.explain(True)


== Parsed Logical Plan ==
Join Cross
:- Relation [cust_id#95,name#96,region#97,age#98] csv
+- Relation [order_id#116,cust_id#117,order_date#118,amount#119] csv

== Analyzed Logical Plan ==
cust_id: int, name: string, region: string, age: int, order_id: int, cust_id: int, order_date: date, amount: int
Join Cross
:- Relation [cust_id#95,name#96,region#97,age#98] csv
+- Relation [order_id#116,cust_id#117,order_date#118,amount#119] csv

== Optimized Logical Plan ==
Join Cross
:- Relation [cust_id#95,name#96,region#97,age#98] csv
+- Relation [order_id#116,cust_id#117,order_date#118,amount#119] csv

== Physical Plan ==
CartesianProduct
:- FileScan csv [cust_id#95,name#96,region#97,age#98] Batched: false, DataFilters: [], Format: CSV, Location: InMemoryFileIndex(1 paths)[file:/home/developer/Workspace_Projects/Data_Engineer/PySpark/customer..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<cust_id:int,name:string,region:string,age:int>
+- FileScan csv [order_id#116,cust_id#117,o

In [29]:
from pyspark.sql.functions import broadcast, col

bnlj_df = customers.alias("c").join(
    broadcast(orders.alias("o")),
    (col("c.cust_id") == col("o.cust_id")) & (col("c.age") > 25),
    "inner"
)
bnlj_df.explain(True)


== Parsed Logical Plan ==
Join Inner, ((cust_id#95 = cust_id#117) AND (age#98 > 25))
:- SubqueryAlias c
:  +- Relation [cust_id#95,name#96,region#97,age#98] csv
+- ResolvedHint (strategy=broadcast)
   +- SubqueryAlias o
      +- Relation [order_id#116,cust_id#117,order_date#118,amount#119] csv

== Analyzed Logical Plan ==
cust_id: int, name: string, region: string, age: int, order_id: int, cust_id: int, order_date: date, amount: int
Join Inner, ((cust_id#95 = cust_id#117) AND (age#98 > 25))
:- SubqueryAlias c
:  +- Relation [cust_id#95,name#96,region#97,age#98] csv
+- ResolvedHint (strategy=broadcast)
   +- SubqueryAlias o
      +- Relation [order_id#116,cust_id#117,order_date#118,amount#119] csv

== Optimized Logical Plan ==
Join Inner, (cust_id#95 = cust_id#117), rightHint=(strategy=broadcast)
:- Filter ((isnotnull(age#98) AND (age#98 > 25)) AND isnotnull(cust_id#95))
:  +- Relation [cust_id#95,name#96,region#97,age#98] csv
+- Filter isnotnull(cust_id#117)
   +- Relation [order_id#11

`                 ‚îå‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îê
                  ‚îÇ Spark Join   ‚îÇ
                  ‚îÇ Strategies   ‚îÇ
                  ‚îî‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚î¨‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îò
                         ‚îÇ
     ‚îå‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îº‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îê
     ‚îÇ                   ‚îÇ                     ‚îÇ
 Broadcast            Shuffle              Nested Loop
     ‚îÇ                   ‚îÇ                     ‚îÇ
     ‚ñº                   ‚ñº                     ‚ñº
Broadcast Hash     Shuffle Hash         Broadcast NL
 (small + big)      (medium size)       (small + non-equi)
                       ‚ñº
                   Sort Merge
                  (large + equi)
                       ‚ñº
                   Shuffle-Replicate NL
                (large + non-equi)
`