In [10]:
# Set the PySpark environment variables
import os
os.environ['SPARK_HOME'] = "/home/user5/Downloads/spark"
os.environ['PYSPARK_DRIVER_PYTHON'] = 'jupyter'
os.environ['PYSPARK_DRIVER_PYTHON_OPTS'] = 'lab'
os.environ['PYSPARK_PYTHON'] = 'python'

Broadcasting in Spark 

"Sending a small dataset to all worker nodes once so it can be used locally by tasks without shuffling".

Why it’s useful
"Avoids repeated sending of the same data to executors."

Avoids shuffle join when joining big dataset + tiny dataset.

When to use
"Small lookup tables or reference data used in joins, filters, or maps."

When one dataset is much smaller than the other.

In [11]:
#step1 :Create Spark Session
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("BroadcastDemo") \
    .getOrCreate()


25/08/06 13:09:52 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


In [12]:
#step2: Create Big & Small DataFrames
#We’ll pretend big_df is a large dataset and small_df is a small lookup table.

# Big dataset: 1 million rows
big_data = [(i, f"val{i}") for i in range(1, 1000001)]
big_df = spark.createDataFrame(big_data, ["id", "value"])

# Small lookup table: 5 rows
small_data = [(1, "A"), (50, "B"), (100, "C"), (500, "D"), (1000, "E")]
small_df = spark.createDataFrame(small_data, ["id", "category"])


In [4]:
# step3 : Join Without Broadcasting (Shuffle Happens)

joined_normal = big_df.join(small_df, "id")
joined_normal.show(5)

# You’ll see Shuffle Read / Shuffle Write in the join stage in spark UI

25/08/06 13:04:45 WARN TaskSetManager: Stage 0 contains a task of very large size (2094 KiB). The maximum recommended task size is 1000 KiB.
                                                                                

+----+-------+--------+
|  id|  value|category|
+----+-------+--------+
|  50|  val50|       B|
|   1|   val1|       A|
| 100| val100|       C|
| 500| val500|       D|
|1000|val1000|       E|
+----+-------+--------+



In [5]:
spark

In [7]:
spark.stop()

In [16]:
# step4 :Join With Broadcasting (No Shuffle for small_df)
from pyspark.sql.functions import broadcast

joined_broadcast = big_df.join(broadcast(small_df), "id")
joined_broadcast.show(5)

#No shuffle stage for the join ap spark UI me view kr skte hain
#small_df is sent once to all executors.

25/08/06 13:16:26 WARN TaskSetManager: Stage 8 contains a task of very large size (2094 KiB). The maximum recommended task size is 1000 KiB.
25/08/06 13:16:26 WARN TaskSetManager: Stage 9 contains a task of very large size (2330 KiB). The maximum recommended task size is 1000 KiB.
25/08/06 13:16:26 WARN TaskSetManager: Stage 10 contains a task of very large size (2330 KiB). The maximum recommended task size is 1000 KiB.
25/08/06 13:16:26 WARN TaskSetManager: Stage 11 contains a task of very large size (2330 KiB). The maximum recommended task size is 1000 KiB.
25/08/06 13:16:27 WARN TaskSetManager: Stage 12 contains a task of very large size (2330 KiB). The maximum recommended task size is 1000 KiB.
25/08/06 13:16:27 WARN TaskSetManager: Stage 13 contains a task of very large size (2330 KiB). The maximum recommended task size is 1000 KiB.


+----+-------+--------+
|  id|  value|category|
+----+-------+--------+
|   1|   val1|       A|
|  50|  val50|       B|
| 100| val100|       C|
| 500| val500|       D|
|1000|val1000|       E|
+----+-------+--------+



In [14]:
spark

Difference:::
Normal Join (Shuffle Join)
Both datasets are shuffled so matching keys go to the same partition.

Slow for big datasets → lots of network traffic.

Broadcast Join
Small dataset is copied (broadcast) to all executors once.

Big dataset is scanned locally → no shuffle for the small one.

Much faster when one dataset is tiny.

In [18]:
spark.stop()