In [None]:
from pyspark.sql import SparkSession
spark=SparkSession.builder.appName("exercise").getOrCreate()


In [None]:
rides_data = [
("R001","U001","Hyderabad",12.5,240,"Completed"),
("R002","U002","Delhi",8.2,180,"Completed"),
("R003","U003","Mumbai",15.0,300,"Cancelled"),
("R004","U004","Bangalore",5.5,120,"Completed"),
("R005","U005","Hyderabad",20.0,360,"Completed"),
("R006","U006","Delhi",25.0,420,"Completed"),
("R007","U007","Mumbai",7.5,150,"Completed"),
("R008","U008","Bangalore",18.0,330,"Completed"),
("R009","U009","Delhi",6.0,140,"Cancelled"),
("R010","U010","Hyderabad",10.0,200,"Completed")
]
rides_cols = [
"ride_id",
"user_id",
"city",
"distance_km",
"duration_seconds",
"status"
]

rides_df = spark.createDataFrame(rides_data, rides_cols)
rides_df.show()

+-------+-------+---------+-----------+----------------+---------+
|ride_id|user_id|     city|distance_km|duration_seconds|   status|
+-------+-------+---------+-----------+----------------+---------+
|   R001|   U001|Hyderabad|       12.5|             240|Completed|
|   R002|   U002|    Delhi|        8.2|             180|Completed|
|   R003|   U003|   Mumbai|       15.0|             300|Cancelled|
|   R004|   U004|Bangalore|        5.5|             120|Completed|
|   R005|   U005|Hyderabad|       20.0|             360|Completed|
|   R006|   U006|    Delhi|       25.0|             420|Completed|
|   R007|   U007|   Mumbai|        7.5|             150|Completed|
|   R008|   U008|Bangalore|       18.0|             330|Completed|
|   R009|   U009|    Delhi|        6.0|             140|Cancelled|
|   R010|   U010|Hyderabad|       10.0|             200|Completed|
+-------+-------+---------+-----------+----------------+---------+



In [None]:
surge_data = [
("Hyderabad",1.2),
("Delhi",1.5),
("Mumbai",1.8),
("Bangalore",1.3)
]
surge_cols = ["city","surge_multiplier"]
surge_df = spark.createDataFrame(surge_data, surge_cols)
surge_df.show()

+---------+----------------+
|     city|surge_multiplier|
+---------+----------------+
|Hyderabad|             1.2|
|    Delhi|             1.5|
|   Mumbai|             1.8|
|Bangalore|             1.3|
+---------+----------------+



In [None]:

from pyspark.sql.functions import col

filtered_orders = rides_df.filter(col("status") == "Completed").select("ride_id", "city", "distance_km")
rows = filtered_orders.count()

+-------+---------+-----------+
|ride_id|     city|distance_km|
+-------+---------+-----------+
|   R001|Hyderabad|       12.5|
|   R002|    Delhi|        8.2|
|   R004|Bangalore|        5.5|
|   R005|Hyderabad|       20.0|
|   R006|    Delhi|       25.0|
|   R007|   Mumbai|        7.5|
|   R008|Bangalore|       18.0|
|   R010|Hyderabad|       10.0|
+-------+---------+-----------+



**Explanation — Has Spark executed anything?**

No. Spark uses lazy evaluation: transformations build a DAG, but nothing runs until an action (e.g., count, show, write). The code above only constructs the logical plan.

In [None]:
rows = filtered_orders.count()

**What caused execution?**

The line pipeline_df.count() is the action that triggers job execution.
Previous lines are transformations; they didn’t execute because Spark defers computation until an action is called.

In [None]:

filtered_selected =rides_df.filter(col("status") == "Completed").filter(col("distance_km") > 10).select("ride_id", "city", "distance_km")
filtered_selected.show()


+-------+---------+-----------+
|ride_id|     city|distance_km|
+-------+---------+-----------+
|   R001|Hyderabad|       12.5|
|   R005|Hyderabad|       20.0|
|   R006|    Delhi|       25.0|
|   R008|Bangalore|       18.0|
+-------+---------+-----------+



In [None]:
filtered_selected.explain(True)

== Parsed Logical Plan ==
'Project ['ride_id, 'city, 'distance_km]
+- Filter (distance_km#3 > cast(10 as double))
   +- Filter (status#5 = Completed)
      +- LogicalRDD [ride_id#0, user_id#1, city#2, distance_km#3, duration_seconds#4L, status#5], false

== Analyzed Logical Plan ==
ride_id: string, city: string, distance_km: double
Project [ride_id#0, city#2, distance_km#3]
+- Filter (distance_km#3 > cast(10 as double))
   +- Filter (status#5 = Completed)
      +- LogicalRDD [ride_id#0, user_id#1, city#2, distance_km#3, duration_seconds#4L, status#5], false

== Optimized Logical Plan ==
Project [ride_id#0, city#2, distance_km#3]
+- Filter ((isnotnull(status#5) AND isnotnull(distance_km#3)) AND ((status#5 = Completed) AND (distance_km#3 > 10.0)))
   +- LogicalRDD [ride_id#0, user_id#1, city#2, distance_km#3, duration_seconds#4L, status#5], false

== Physical Plan ==
*(1) Project [ride_id#0, city#2, distance_km#3]
+- *(1) Filter ((isnotnull(status#5) AND isnotnull(distance_km#3)) AND ((s

In [None]:
rides_df.rdd.getNumPartitions()

2

In [None]:
df_repart=rides_df.repartition(3)
df_repart.rdd.getNumPartitions()

3

In [None]:
df_coalsec=rides_df.coalesce(1)
df_coalsec.rdd.getNumPartitions()

1

In [None]:
filtered_selected.write.mode("overwrite").parquet("/tmp/output_parquet")

In [None]:
df_repart=rides_df.repartition("city")
df_repart.rdd.getNumPartitions()

1

**Is a shuffle introduced?**

Yes. Repartitioning by a column requires redistributing rows by key across executors, leading to an Exchange (shuffle) operator in the physical plan.

In [None]:
rides_df.explain(True)

== Parsed Logical Plan ==
LogicalRDD [ride_id#0, user_id#1, city#2, distance_km#3, duration_seconds#4L, status#5], false

== Analyzed Logical Plan ==
ride_id: string, user_id: string, city: string, distance_km: double, duration_seconds: bigint, status: string
LogicalRDD [ride_id#0, user_id#1, city#2, distance_km#3, duration_seconds#4L, status#5], false

== Optimized Logical Plan ==
LogicalRDD [ride_id#0, user_id#1, city#2, distance_km#3, duration_seconds#4L, status#5], false

== Physical Plan ==
*(1) Scan ExistingRDD[ride_id#0,user_id#1,city#2,distance_km#3,duration_seconds#4L,status#5]



In [None]:
joined_df = rides_df.join(
    surge_df,
    on="city",
    how="inner"
)
joined_df.show()

+---------+-------+-------+-----------+----------------+---------+----------------+
|     city|ride_id|user_id|distance_km|duration_seconds|   status|surge_multiplier|
+---------+-------+-------+-----------+----------------+---------+----------------+
|Bangalore|   R004|   U004|        5.5|             120|Completed|             1.3|
|Bangalore|   R008|   U008|       18.0|             330|Completed|             1.3|
|    Delhi|   R002|   U002|        8.2|             180|Completed|             1.5|
|    Delhi|   R006|   U006|       25.0|             420|Completed|             1.5|
|    Delhi|   R009|   U009|        6.0|             140|Cancelled|             1.5|
|Hyderabad|   R001|   U001|       12.5|             240|Completed|             1.2|
|Hyderabad|   R005|   U005|       20.0|             360|Completed|             1.2|
|Hyderabad|   R010|   U010|       10.0|             200|Completed|             1.2|
|   Mumbai|   R003|   U003|       15.0|             300|Cancelled|          

In [None]:
joined_df.explain(True)

== Parsed Logical Plan ==
'Join UsingJoin(Inner, [city])
:- LogicalRDD [ride_id#0, user_id#1, city#2, distance_km#3, duration_seconds#4L, status#5], false
+- LogicalRDD [city#25, surge_multiplier#26], false

== Analyzed Logical Plan ==
city: string, ride_id: string, user_id: string, distance_km: double, duration_seconds: bigint, status: string, surge_multiplier: double
Project [city#2, ride_id#0, user_id#1, distance_km#3, duration_seconds#4L, status#5, surge_multiplier#26]
+- Join Inner, (city#2 = city#25)
   :- LogicalRDD [ride_id#0, user_id#1, city#2, distance_km#3, duration_seconds#4L, status#5], false
   +- LogicalRDD [city#25, surge_multiplier#26], false

== Optimized Logical Plan ==
Project [city#2, ride_id#0, user_id#1, distance_km#3, duration_seconds#4L, status#5, surge_multiplier#26]
+- Join Inner, (city#2 = city#25)
   :- Filter isnotnull(city#2)
   :  +- LogicalRDD [ride_id#0, user_id#1, city#2, distance_km#3, duration_seconds#4L, status#5], false
   +- Filter isnotnull(city

In [None]:
rides_df = rides_df.filter(col("distance_km") > 10)
joined_df = rides_df.join(
    surge_df,
    on="city",
    how="inner"
)
joined_df.show()

+---------+-------+-------+-----------+----------------+---------+----------------+
|     city|ride_id|user_id|distance_km|duration_seconds|   status|surge_multiplier|
+---------+-------+-------+-----------+----------------+---------+----------------+
|Bangalore|   R008|   U008|       18.0|             330|Completed|             1.3|
|    Delhi|   R006|   U006|       25.0|             420|Completed|             1.5|
|Hyderabad|   R001|   U001|       12.5|             240|Completed|             1.2|
|Hyderabad|   R005|   U005|       20.0|             360|Completed|             1.2|
|   Mumbai|   R003|   U003|       15.0|             300|Cancelled|             1.8|
+---------+-------+-------+-----------+----------------+---------+----------------+



In [None]:
joined_df.explain(True)

== Parsed Logical Plan ==
'Join UsingJoin(Inner, [city])
:- Filter (distance_km#3 > cast(10 as double))
:  +- LogicalRDD [ride_id#0, user_id#1, city#2, distance_km#3, duration_seconds#4L, status#5], false
+- LogicalRDD [city#25, surge_multiplier#26], false

== Analyzed Logical Plan ==
city: string, ride_id: string, user_id: string, distance_km: double, duration_seconds: bigint, status: string, surge_multiplier: double
Project [city#2, ride_id#0, user_id#1, distance_km#3, duration_seconds#4L, status#5, surge_multiplier#26]
+- Join Inner, (city#2 = city#25)
   :- Filter (distance_km#3 > cast(10 as double))
   :  +- LogicalRDD [ride_id#0, user_id#1, city#2, distance_km#3, duration_seconds#4L, status#5], false
   +- LogicalRDD [city#25, surge_multiplier#26], false

== Optimized Logical Plan ==
Project [city#2, ride_id#0, user_id#1, distance_km#3, duration_seconds#4L, status#5, surge_multiplier#26]
+- Join Inner, (city#2 = city#25)
   :- Filter ((isnotnull(distance_km#3) AND (distance_km#3 

In [None]:
from pyspark.sql.functions import broadcast
rides_df = rides_df.filter(col("distance_km") > 10)
broadcast_joined_df = rides_df.join(
    broadcast(surge_df),
    on="city",
    how="inner"
)
broadcast_joined_df.show()

+---------+-------+-------+-----------+----------------+---------+----------------+
|     city|ride_id|user_id|distance_km|duration_seconds|   status|surge_multiplier|
+---------+-------+-------+-----------+----------------+---------+----------------+
|Hyderabad|   R001|   U001|       12.5|             240|Completed|             1.2|
|   Mumbai|   R003|   U003|       15.0|             300|Cancelled|             1.8|
|Hyderabad|   R005|   U005|       20.0|             360|Completed|             1.2|
|    Delhi|   R006|   U006|       25.0|             420|Completed|             1.5|
|Bangalore|   R008|   U008|       18.0|             330|Completed|             1.3|
+---------+-------+-------+-----------+----------------+---------+----------------+



In [None]:
broadcast_joined_df.explain(True)

== Parsed Logical Plan ==
'Join UsingJoin(Inner, [city])
:- Filter (distance_km#3 > cast(10 as double))
:  +- Filter (distance_km#3 > cast(10 as double))
:     +- Filter (distance_km#3 > cast(10 as double))
:        +- LogicalRDD [ride_id#0, user_id#1, city#2, distance_km#3, duration_seconds#4L, status#5], false
+- ResolvedHint (strategy=broadcast)
   +- LogicalRDD [city#25, surge_multiplier#26], false

== Analyzed Logical Plan ==
city: string, ride_id: string, user_id: string, distance_km: double, duration_seconds: bigint, status: string, surge_multiplier: double
Project [city#2, ride_id#0, user_id#1, distance_km#3, duration_seconds#4L, status#5, surge_multiplier#26]
+- Join Inner, (city#2 = city#25)
   :- Filter (distance_km#3 > cast(10 as double))
   :  +- Filter (distance_km#3 > cast(10 as double))
   :     +- Filter (distance_km#3 > cast(10 as double))
   :        +- LogicalRDD [ride_id#0, user_id#1, city#2, distance_km#3, duration_seconds#4L, status#5], false
   +- ResolvedHint (

**Compare physical plans (non-broadcast vs broadcast)**

Operators that disappear with broadcast:

Exchange (shuffle by city)
Sort (for SortMergeJoin)


Performance impact:

Removes expensive network shuffle and sorts → fewer stages, lower latency.
Ideal when one side (lookup table) is small enough to fit in executor memory.

In [None]:
broadcast_joined_df.explain(mode="formatted")

== Physical Plan ==
AdaptiveSparkPlan (8)
+- Project (7)
   +- BroadcastHashJoin Inner BuildRight (6)
      :- Filter (2)
      :  +- Scan ExistingRDD (1)
      +- BroadcastExchange (5)
         +- Filter (4)
            +- Scan ExistingRDD (3)


(1) Scan ExistingRDD
Output [6]: [ride_id#0, user_id#1, city#2, distance_km#3, duration_seconds#4L, status#5]
Arguments: [ride_id#0, user_id#1, city#2, distance_km#3, duration_seconds#4L, status#5], MapPartitionsRDD[4] at applySchemaToPythonRDD at NativeMethodAccessorImpl.java:0, ExistingRDD, UnknownPartitioning(0)

(2) Filter
Input [6]: [ride_id#0, user_id#1, city#2, distance_km#3, duration_seconds#4L, status#5]
Condition : ((isnotnull(distance_km#3) AND (distance_km#3 > 10.0)) AND isnotnull(city#2))

(3) Scan ExistingRDD
Output [2]: [city#25, surge_multiplier#26]
Arguments: [city#25, surge_multiplier#26], MapPartitionsRDD[11] at applySchemaToPythonRDD at NativeMethodAccessorImpl.java:0, ExistingRDD, UnknownPartitioning(0)

(4) Filter
Input [

**Exercise 6.1 — Identify expensive operators & classify**

Common expensive operators you’ll see in the physical plan:

Exchange (shuffle) → Network-heavy (data moved across executors; also disk spill possible).
Sort / SortMergeJoin → CPU-heavy (sorting large datasets), can be memory-heavy for buffers.
HashAggregate / HashJoin → Memory-heavy (hash tables), also some CPU for hashing.
BroadcastExchange → Network (sending small table to executors), lower cost vs full shuffle.
FileScan on large inputs → I/O-heavy (disk), not CPU.


**Exercise 6.2 — Why Spark defaults to SortMergeJoin?**

For large equijoins, SortMergeJoin scales well: it sorts both sides and streams the merge—stable and spillable.
It avoids building very large in-memory hash tables, which can be risky under pressure.
Defaults are tuned for correctness and scalability; Spark uses broadcast or hash joins when configurations and size estimates indicate they’re better.

In [None]:
from pyspark.sql.functions import lower,col
check_df=broadcast_joined_df.filter(col("status")=="Completed").withColumn("city",lower(col("city"))).select("city","ride_id","user_id")


**What has Spark done so far?**

Spark has parsed your transformations, built the logical plan, applied Catalyst optimizations, and prepared a physical plan. Actual computation is deferred until you call an action.

In [None]:
count_df=check_df.count()
check_df.show()

output_path = "/tmp/output_parquet"

check_df.write.mode("overwrite").parquet(output_path)


+---------+-------+-------+
|     city|ride_id|user_id|
+---------+-------+-------+
|hyderabad|   R001|   U001|
|hyderabad|   R005|   U005|
|    delhi|   R006|   U006|
|bangalore|   R008|   U008|
+---------+-------+-------+



1.**Why does broadcast remove shuffle from the DAG?**

Broadcast sends the smaller DataFrame to all executors, so each partition of the larger DataFrame can join locally.
This eliminates the need to redistribute data by key across the cluster.
Result: No shuffle stage in the physical plan for the join.


2.**Why does repartition always introduce shuffle?**

Repartition changes how data is distributed across partitions.
To achieve even distribution or partitioning by a column, Spark must move rows between executors.
This movement is implemented as a shuffle (Exchange operator).


3.**Why is coalesce cheaper than repartition?**

Coalesce only reduces partitions by merging existing ones without redistributing data.
It avoids a full shuffle and keeps data mostly where it is.
Repartition, in contrast, always shuffles to evenly spread data.


**4.Why does Spark delay execution until an action?**

Spark uses lazy evaluation to build a logical DAG of transformations first.
This allows Catalyst to optimize the entire plan before running it.
Execution happens only when an action (e.g., count, show, write)

---

