<a href="https://colab.research.google.com/github/Silvio-0-1/Python-Training/blob/main/16-12-2025/pyspark/knowledge_on_dag.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

## APACHE SPARK EXERCISES

**Topics:**

1. DAG (Directed Acyclic Graph)
2. Broadcast Joins
3. Transformations
4. Actions
5. Partitions

In [234]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName('partitioning').getOrCreate()

In [235]:
from pyspark.sql.functions import col, broadcast

**DOMAIN:** Ride-Hailing Platform Analytics (Uber/Ola)

In [236]:
# DATASET 1 — RIDES (Large Fact Table)

rides_data = [
    ("R001","U001","Hyderabad",12.5,240,"Completed"),
    ("R002","U002","Delhi",8.2,180,"Completed"),
    ("R003","U003","Mumbai",15.0,300,"Cancelled"),
    ("R004","U004","Bangalore",5.5,120,"Completed"),
    ("R005","U005","Hyderabad",20.0,360,"Completed"),
    ("R006","U006","Delhi",25.0,420,"Completed"),
    ("R007","U007","Mumbai",7.5,150,"Completed"),
    ("R008","U008","Bangalore",18.0,330,"Completed"),
    ("R009","U009","Delhi",6.0,140,"Cancelled"),
    ("R010","U010","Hyderabad",10.0,200,"Completed")
]

rides_cols = [
    "ride_id",
    "user_id",
    "city",
    "distance_km",
    "duration_seconds",
    "status"
]

rides_df = spark.createDataFrame(rides_data, rides_cols)

In [237]:
# DATASET 2 — CITY SURGE MULTIPLIERS (Small Lookup)

surge_data = [
    ("Hyderabad", 1.2),
    ("Delhi", 1.5),
    ("Mumbai", 1.8),
    ("Bangalore", 1.3)
]

surge_cols = ["city", "surge_multiplier"]

surge_df = spark.createDataFrame(surge_data, surge_cols)

### **SET 1: TRANSFORMATIONS vs ACTIONS**



**Exercise 1.1:** Create a transformation pipeline that:

1. Filters only Completed rides
2. Selects ride_id , city , distance_km

**Tasks:**

1. Do not trigger any action.
2. Explain whether Spark executed anything.

In [238]:
# filters and selects without triggering action
filtered_rides_df = rides_df.select(col("ride_id"), col("city"), col("distance_km")).filter((col("status") == "Completed"))

**Task 2:** Explain whether Spark executed anything.

> Because Spark uses **lazy evaluation**, it has not yet executed any computations. The transformations are only recorded as a logical plan and will only be executed when an action is called on the **filtered_rides_df** DataFrame.



**Exercise 1.2:** Trigger a single action on the pipeline.

**Tasks:**

1. Identify which line caused execution.
2. Explain why previous lines did not execute.

In [239]:
# trigger action on the pipeline
filtered_rides_df.show()

+-------+---------+-----------+
|ride_id|     city|distance_km|
+-------+---------+-----------+
|   R001|Hyderabad|       12.5|
|   R002|    Delhi|        8.2|
|   R004|Bangalore|        5.5|
|   R005|Hyderabad|       20.0|
|   R006|    Delhi|       25.0|
|   R007|   Mumbai|        7.5|
|   R008|Bangalore|       18.0|
|   R010|Hyderabad|       10.0|
+-------+---------+-----------+



**Task 1:** Identify which line caused execution.
> The line **filtered_rides_df.show()** is the action that triggered the execution of the transformation pipeline.

**Task 2:** Explain why previous lines did not execute.
> Previous lines did not execute immediately because Spark employs **lazy evaluation**.

### **SET 2: DAG & LINEAGE**

**Exercise 2.1:** Create a transformation chain with:

1. Multiple filters
2. A column selection

**Tasks:**

1. Run explain(True)

2. Identify:

* Logical plan
* Optimized logical plan
* Physical plan

In [240]:
# multiple filters and column selection
transformed_rides_df = rides_df.filter(col("status") == "Completed").filter(col("distance_km") > 10).select("ride_id", "city", "distance_km", "status")

# run explain
transformed_rides_df.explain(True)

== Parsed Logical Plan ==
'Project ['ride_id, 'city, 'distance_km, 'status]
+- Filter (distance_km#365 > cast(10 as double))
   +- Filter (status#367 = Completed)
      +- LogicalRDD [ride_id#362, user_id#363, city#364, distance_km#365, duration_seconds#366L, status#367], false

== Analyzed Logical Plan ==
ride_id: string, city: string, distance_km: double, status: string
Project [ride_id#362, city#364, distance_km#365, status#367]
+- Filter (distance_km#365 > cast(10 as double))
   +- Filter (status#367 = Completed)
      +- LogicalRDD [ride_id#362, user_id#363, city#364, distance_km#365, duration_seconds#366L, status#367], false

== Optimized Logical Plan ==
Project [ride_id#362, city#364, distance_km#365, status#367]
+- Filter ((isnotnull(status#367) AND isnotnull(distance_km#365)) AND ((status#367 = Completed) AND (distance_km#365 > 10.0)))
   +- LogicalRDD [ride_id#362, user_id#363, city#364, distance_km#365, duration_seconds#366L, status#367], false

== Physical Plan ==
*(1) Proj

**Exercise 2.2:** Reorder transformations (filter after join vs before join).

**Tasks:**

1. Compare DAGs.
2. Identify which plan is more efficient and why.

In [241]:
# filter before join
filtered_rides_df_join = rides_df.filter(col("distance_km") > 10)
filtered_then_joined_df = filtered_rides_df_join.join(surge_df, on="city", how="inner")

filtered_then_joined_df.explain(True)

== Parsed Logical Plan ==
'Join UsingJoin(Inner, [city])
:- Filter (distance_km#365 > cast(10 as double))
:  +- LogicalRDD [ride_id#362, user_id#363, city#364, distance_km#365, duration_seconds#366L, status#367], false
+- LogicalRDD [city#368, surge_multiplier#369], false

== Analyzed Logical Plan ==
city: string, ride_id: string, user_id: string, distance_km: double, duration_seconds: bigint, status: string, surge_multiplier: double
Project [city#364, ride_id#362, user_id#363, distance_km#365, duration_seconds#366L, status#367, surge_multiplier#369]
+- Join Inner, (city#364 = city#368)
   :- Filter (distance_km#365 > cast(10 as double))
   :  +- LogicalRDD [ride_id#362, user_id#363, city#364, distance_km#365, duration_seconds#366L, status#367], false
   +- LogicalRDD [city#368, surge_multiplier#369], false

== Optimized Logical Plan ==
Project [city#364, ride_id#362, user_id#363, distance_km#365, duration_seconds#366L, status#367, surge_multiplier#369]
+- Join Inner, (city#364 = city#

In [242]:
# filter after join
joined_then_filtered_df = rides_df.join(surge_df, on="city", how="inner").filter(col("distance_km") > 10)

joined_then_filtered_df.explain(True)

== Parsed Logical Plan ==
'Filter '`>`('distance_km, 10)
+- Project [city#364, ride_id#362, user_id#363, distance_km#365, duration_seconds#366L, status#367, surge_multiplier#369]
   +- Join Inner, (city#364 = city#368)
      :- LogicalRDD [ride_id#362, user_id#363, city#364, distance_km#365, duration_seconds#366L, status#367], false
      +- LogicalRDD [city#368, surge_multiplier#369], false

== Analyzed Logical Plan ==
city: string, ride_id: string, user_id: string, distance_km: double, duration_seconds: bigint, status: string, surge_multiplier: double
Filter (distance_km#365 > cast(10 as double))
+- Project [city#364, ride_id#362, user_id#363, distance_km#365, duration_seconds#366L, status#367, surge_multiplier#369]
   +- Join Inner, (city#364 = city#368)
      :- LogicalRDD [ride_id#362, user_id#363, city#364, distance_km#365, duration_seconds#366L, status#367], false
      +- LogicalRDD [city#368, surge_multiplier#369], false

== Optimized Logical Plan ==
Project [city#364, ride_id

**Task 1: Compare DAGs.**

> In both scenarios, the **Optimized Logical Plan** and the **Physical Plan** look identical. This is a great demonstration of Spark's **Catalyst Optimizer**. Even when we explicitly write the filter *after* the join in the code, the optimizer is smart enough to push the filter down (known as **Predicate Pushdown**) to **rides_df** *before* the join operation.

> This means that in both cases, the filter **distance_km > 10.0** is applied to the **rides_df** *before* it participates in the **SortMergeJoin** and subsequent **Exchange** (shuffle) operations.



**Task 2: Which plan is more efficient?**

> Because Spark's Catalyst Optimizer applies **Predicate Pushdown**, both explicit coding strategies (filter before join vs. filter after join) result in the **same efficient physical plan**.

### **SET 3: PARTITIONS & SHUFFLE**

**Exercise 3.1:** Check the number of partitions of rides_df .

**Tasks:**

1. Repartition into 4 partitions
2. Coalesce into 1 partition
3. Observe number of output files when writing to Parquet

In [243]:
# number of patitions
rides_df.rdd.getNumPartitions()

2

In [244]:
# repartition into 4 partitions
rides_df_repart = rides_df.repartition(4)
rides_df_repart.rdd.getNumPartitions()

4

In [245]:
# coalesce into 1 partition
rides_df_coalesce = rides_df_repart.coalesce(1)
rides_df_coalesce.rdd.getNumPartitions()

1

In [246]:
rides_df_coalesce.write.mode("overwrite").parquet("rides_df_coalesce.parquet")

**Exercise 3.2:** Repartition rides by city.

**Tasks:**

1. Run explain(True).
2. Identify whether a shuffle is introduced.

In [247]:
# repartition rides
rides_df_repartition = rides_df.repartition(col("city"))
rides_df_repartition.rdd.getNumPartitions()

1

In [248]:
# run explain
rides_df_repartition.explain(True)

== Parsed Logical Plan ==
'RepartitionByExpression ['city]
+- LogicalRDD [ride_id#362, user_id#363, city#364, distance_km#365, duration_seconds#366L, status#367], false

== Analyzed Logical Plan ==
ride_id: string, user_id: string, city: string, distance_km: double, duration_seconds: bigint, status: string
RepartitionByExpression [city#364]
+- LogicalRDD [ride_id#362, user_id#363, city#364, distance_km#365, duration_seconds#366L, status#367], false

== Optimized Logical Plan ==
RepartitionByExpression [city#364]
+- LogicalRDD [ride_id#362, user_id#363, city#364, distance_km#365, duration_seconds#366L, status#367], false

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=true
+- == Final Plan ==
   ResultQueryStage 1
   +- AQEShuffleRead coalesced
      +- ShuffleQueryStage 0
         +- Exchange hashpartitioning(city#364, 200), REPARTITION_BY_COL, [plan_id=3625]
            +- *(1) Scan ExistingRDD[ride_id#362,user_id#363,city#364,distance_km#365,duration_seconds#366L,status#367]
+- ==

**Task 2:** Identify whether a shuffle is introduced.

> Yes, a shuffle is introduced. In the **Physical Plan** we can see Exchange hashpartitioning (city#140, 200), REPARTITION_BY_COL.



### **SET 4: JOIN WITHOUT BROADCAST (BAD DAG)**

**Exercise 4.1:** Join *rides_df* with *surge_df* on *city* without using broadcast.

**Tasks:**

1. Run explain(True)

2. Identify:

* Join type
* Exchange operators
* Sort operations
* Stage boundaries

In [249]:
# join without broadcast
joined_df = rides_df.join(surge_df, on = "city", how = "inner")

In [250]:
# run explain
joined_df.explain(True)

== Parsed Logical Plan ==
'Join UsingJoin(Inner, [city])
:- LogicalRDD [ride_id#362, user_id#363, city#364, distance_km#365, duration_seconds#366L, status#367], false
+- LogicalRDD [city#368, surge_multiplier#369], false

== Analyzed Logical Plan ==
city: string, ride_id: string, user_id: string, distance_km: double, duration_seconds: bigint, status: string, surge_multiplier: double
Project [city#364, ride_id#362, user_id#363, distance_km#365, duration_seconds#366L, status#367, surge_multiplier#369]
+- Join Inner, (city#364 = city#368)
   :- LogicalRDD [ride_id#362, user_id#363, city#364, distance_km#365, duration_seconds#366L, status#367], false
   +- LogicalRDD [city#368, surge_multiplier#369], false

== Optimized Logical Plan ==
Project [city#364, ride_id#362, user_id#363, distance_km#365, duration_seconds#366L, status#367, surge_multiplier#369]
+- Join Inner, (city#364 = city#368)
   :- Filter isnotnull(city#364)
   :  +- LogicalRDD [ride_id#362, user_id#363, city#364, distance_km#

**Exercise 4.2:** Apply a filter *(distance_km > 10)* before the join.

**Tasks:**

1. Observe whether shuffle is removed.
2. Explain why or why not?

In [251]:
# apply a filter before the join
filtered_rides_df_2 = rides_df.filter(col("distance_km") > 10)
joined_filtered_df_2 = filtered_rides_df_2.join(surge_df, on="city", how="inner")

joined_filtered_df_2.explain(True)

== Parsed Logical Plan ==
'Join UsingJoin(Inner, [city])
:- Filter (distance_km#365 > cast(10 as double))
:  +- LogicalRDD [ride_id#362, user_id#363, city#364, distance_km#365, duration_seconds#366L, status#367], false
+- LogicalRDD [city#368, surge_multiplier#369], false

== Analyzed Logical Plan ==
city: string, ride_id: string, user_id: string, distance_km: double, duration_seconds: bigint, status: string, surge_multiplier: double
Project [city#364, ride_id#362, user_id#363, distance_km#365, duration_seconds#366L, status#367, surge_multiplier#369]
+- Join Inner, (city#364 = city#368)
   :- Filter (distance_km#365 > cast(10 as double))
   :  +- LogicalRDD [ride_id#362, user_id#363, city#364, distance_km#365, duration_seconds#366L, status#367], false
   +- LogicalRDD [city#368, surge_multiplier#369], false

== Optimized Logical Plan ==
Project [city#364, ride_id#362, user_id#363, distance_km#365, duration_seconds#366L, status#367, surge_multiplier#369]
+- Join Inner, (city#364 = city#

**Tasks:** Observe whether shuffle is removed. Why or why not?

> No, the shuffle was not removed.

> This is because the default join strategy is SortMergeJoin, which requires both DataFrames to be co-partitioned (and sorted) by the join key, and this co-partitioning usually involves a shuffle.

### **SET 5: BROADCAST JOIN (GOOD DAG)**

**Exercise 5.1:** Apply a broadcast hint to *surge_df* .

**Tasks:**

1. Run explain(True)

2. Identify:

* Join type
* BroadcastExchange
* Disappearance of shuffles

In [252]:
# join with broadcast
broadcast_join_df = rides_df.join(broadcast(surge_df), on = "city", how = "inner")

# run explain
broadcast_join_df.explain(True)

== Parsed Logical Plan ==
'Join UsingJoin(Inner, [city])
:- LogicalRDD [ride_id#362, user_id#363, city#364, distance_km#365, duration_seconds#366L, status#367], false
+- ResolvedHint (strategy=broadcast)
   +- LogicalRDD [city#368, surge_multiplier#369], false

== Analyzed Logical Plan ==
city: string, ride_id: string, user_id: string, distance_km: double, duration_seconds: bigint, status: string, surge_multiplier: double
Project [city#364, ride_id#362, user_id#363, distance_km#365, duration_seconds#366L, status#367, surge_multiplier#369]
+- Join Inner, (city#364 = city#368)
   :- LogicalRDD [ride_id#362, user_id#363, city#364, distance_km#365, duration_seconds#366L, status#367], false
   +- ResolvedHint (strategy=broadcast)
      +- LogicalRDD [city#368, surge_multiplier#369], false

== Optimized Logical Plan ==
Project [city#364, ride_id#362, user_id#363, distance_km#365, duration_seconds#366L, status#367, surge_multiplier#369]
+- Join Inner, (city#364 = city#368), rightHint=(strateg

**Exercise 5.2:** Compare physical plans from:

* Exercise 4.1
* Exercise 5.1

**Tasks:**

1. List operators that disappeared.
2. Explain performance impact.

**Task 1:** List operators that disappeared.

> In essence, the entire Sort operation and the Exchange hashpartitioning (shuffle) operation are removed. These are replaced by a single BroadcastExchange for the smaller surge_df in the BroadcastHashJoin plan.

**Task 2: Performance Impact.**

> The absence of Sort and Exchange hashpartitioning operators means that the costly operations of sorting large datasets and shuffling data across the network are avoided.

> This allows for a much faster join operation compared to SortMergeJoin, which always requires sorting and potentially shuffling both datasets.

### **SET 6: DAG INTERPRETATION**

**Exercise 6.1:** From the physical plan:

1. Identify all expensive operators.
2. Classify them as CPU, memory, or network heavy.

From Spark's physical plans, the most expensive operators and their classifications are:

1.  **Exchange:** This operator is introduced during shuffles. It is primarily:
      *   **Network-heavy:** Due to data transfer across the cluster between executors.
      *   **CPU-heavy:** For serialization, deserialization, and hashing of data before and after transfer.

2.  **Sort:** This operator sorts data within partitions. It is:
      *   **Memory-heavy:** To hold data being sorted in memory.
      *   **CPU-heavy:** For comparison operations during the sorting process.


3.  **BroadcastExchange**: This operator is specific to where a smaller DataFrame is broadcast to all worker nodes. It is:
      *   **Memory-heavy:** The entire broadcasted table must fit into the memory of each executor.
      *   **Network-heavy:** To transfer the small table to all executors.

4.  **SortMergeJoin / ShuffleHashJoin**: These are composite operations that inherently include `Exchange` and/or `Sort` operators, inheriting their costs.

**Exercise 6.2:** Explain why Spark defaults to SortMergeJoin.

In brief, **SortMergeJoin** is Spark’s default for large joins because it’s robust and memory-efficient compared to hash joins.



### **SET 7: ACTION-DRIVEN EXECUTION**

**Exercise 7.1:** Create a long transformation pipeline without any action.

**Task:** Explain what Spark has done so far.

In [253]:
# long transformation pipeline
long_pipeline_df = rides_df.filter(col("status") == "Completed")\
                             .filter(col("distance_km") > 5)\
                             .select("ride_id", "city", "distance_km", "duration_seconds")\
                             .withColumn("duration_minutes", col("duration_seconds") / 60)\
                             .filter(col("duration_minutes") < 10)

**Task:** Explain what Spark has done so far.

> Since no action has been called, Spark has not actually executed any computations yet. It has merely built a logical plan (or DAG) of these transformations in memory.

**Exercise 7.2:** Trigger different actions (count, show, write) separately.

**Tasks:**

1. Observe whether Spark recomputes the DAG.
2. Explain behavior.

In [254]:
# triggering count()
row_count = long_pipeline_df.count()
print(f"Row count: {row_count}")

# Explanation: Spark executed the entire DAG up to this point to compute the count.

Row count: 8


In [255]:
# triggering show()
long_pipeline_df.show()

# Explanation: Spark re-executed the entire DAG to display the results.

+-------+---------+-----------+----------------+------------------+
|ride_id|     city|distance_km|duration_seconds|  duration_minutes|
+-------+---------+-----------+----------------+------------------+
|   R001|Hyderabad|       12.5|             240|               4.0|
|   R002|    Delhi|        8.2|             180|               3.0|
|   R004|Bangalore|        5.5|             120|               2.0|
|   R005|Hyderabad|       20.0|             360|               6.0|
|   R006|    Delhi|       25.0|             420|               7.0|
|   R007|   Mumbai|        7.5|             150|               2.5|
|   R008|Bangalore|       18.0|             330|               5.5|
|   R010|Hyderabad|       10.0|             200|3.3333333333333335|
+-------+---------+-----------+----------------+------------------+



In [256]:
# triggering write()
long_pipeline_df.write.mode("overwrite").parquet("long_pipeline_df.parquet")

# Explanation: Spark re-executed the entire DAG again to write the data.

**Task 2:** Explain behavior.

> Spark's operations are lazy. This means that when you define a series of transformations (like filter, select),
Spark doesn't immediately compute the results. Instead, it builds a Directed Acyclic Graph (DAG) of operations.
The actual computation only kicks off when an 'action' is called (like count(), show(), collect(), write()).

> Crucially, for each action, Spark, will re-evaluate and re-execute the *entire* DAG from the source.

> This behavior can lead to inefficiencies if you perform multiple actions on the same transformed DataFrame.
To avoid recomputing the same transformations repeatedly, you would typically use caching (e.g., **.cache()** or **.persist()**)
on intermediate DataFrames that are going to be used in multiple subsequent actions.

### **SET 8: THINKING QUESTIONS**

1. Why does broadcast remove shuffle from the DAG?

> Broadcast removes shuffle from the DAG because:

> * In a normal join (like SortMergeJoin), Spark shuffles both datasets so that rows with the same join key end up in the same partition.

> * With BroadcastHashJoin, Spark broadcasts the smaller dataset to all executors. This means the big dataset doesn't need to move, and each executor can join locally.

> Result: No need to repartition or shuffle the big dataset, because the join can happen locally using the broadcasted copy.

2. Why does repartition always introduce shuffle?

> **repartition()** changes the number of partitions and redistributes data evenly. To do this, Spark must move data across the cluster, which is a shuffle.

3. Why is coalesce cheaper than repartition?

> **coalesce()** only reduces partitions by merging existing ones.
It does not shuffle data, so it's a narrow transformation (fast).

> **repartition()** shuffles everything to balance partitions, so it's expensive.



4. Why does Spark delay execution until an action?

> Spark uses lazy evaluation:

> * Transformations (map, filter, join) just build a plan.
Spark waits for an action (collect, count, save) to know what result you need.

> * This allows Spark to optimize the whole job before running it.