# Import Libraries


In [1]:
from pyspark.sql import SparkSession

# Create Spark Session


In [2]:
spark = SparkSession.builder.getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/02/20 22:20:26 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
25/02/20 22:20:27 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
25/02/20 22:20:27 WARN Utils: Service 'SparkUI' could not bind on port 4041. Attempting port 4042.


# Load NPI Data (Huge Data Set)


In [3]:
npi_df = spark.read.option("header", "true").csv(
    "/home/kaeshur/Downloads/NPPES_Data_Dissemination_February_2025/npidata_pfile_20050523-20250209.csv"
)

In [4]:
npi_df.limit(5).show()

25/02/20 22:20:29 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.


+----------+----------------+---------------+------------------------------------+------------------------------------------------+-------------------------------+-------------------+--------------------+-------------------------+-------------------------+------------------------+--------------------------------+------------------------------------------+------------------------+-------------------------+--------------------------+-------------------------------+-------------------------------+------------------------------+----------------------------------+--------------------------------------------+---------------------------------------------+-------------------------------------------+--------------------------------------------+---------------------------------------------+----------------------------------------------------------------+--------------------------------------------------+--------------------------------------------+------------------------------------------------

In [None]:
# npi_df.columns

# What is the Catalyst Optimizer?

The Catalyst Optimizer is Spark's query optimization engine, designed to improve the execution of Spark SQL queries and DataFrame transformations. It applies various optimization techniques to generate efficient execution plans.

## It works in four phases:

- Analysis : Resolves logical inconsistencies in the query.
- Logical Optimization : Applies rule-based optimizations like predicate pushdown.
- Physical Planning : Converts the logical plan into a physical execution plan.
- Code Generation : Uses Tungsten to generate optimized bytecode.

## Control
- The Catalyst Optimizer itself cannot be completely disabled.
- You can disable or enable specific optimizations using spark.conf.set().
- Disabling optimizations can degrade performance, so it's usually not recommended.



In [6]:
data = [
    (1, "Alice", 25, "HR"),
    (2, "Bob", 30, "Engineering"),
    (3, "Charlie", 28, "HR"),
    (4, "David", 35, "Engineering"),
]

columns = ["ID", "Name", "Age", "Department"]

# Create DataFrame
df = spark.createDataFrame(data, columns)
df.show()

+---+-------+---+-----------+
| ID|   Name|Age| Department|
+---+-------+---+-----------+
|  1|  Alice| 25|         HR|
|  2|    Bob| 30|Engineering|
|  3|Charlie| 28|         HR|
|  4|  David| 35|Engineering|
+---+-------+---+-----------+



In [7]:
df.createOrReplaceTempView("employees")

query = """
SELECT Name, Age FROM employees
WHERE Department = 'HR' AND Age > 25
"""

result = spark.sql(query)
result.show()

+-------+---+
|   Name|Age|
+-------+---+
|Charlie| 28|
+-------+---+



### Logical Plan (Before Optimization)

In [8]:
result.explain(mode="formatted")

== Physical Plan ==
* Project (3)
+- * Filter (2)
   +- * Scan ExistingRDD (1)


(1) Scan ExistingRDD [codegen id : 1]
Output [4]: [ID#2329L, Name#2330, Age#2331L, Department#2332]
Arguments: [ID#2329L, Name#2330, Age#2331L, Department#2332], MapPartitionsRDD[18] at applySchemaToPythonRDD at NativeMethodAccessorImpl.java:0, ExistingRDD, UnknownPartitioning(0)

(2) Filter [codegen id : 1]
Input [4]: [ID#2329L, Name#2330, Age#2331L, Department#2332]
Condition : ((isnotnull(Department#2332) AND isnotnull(Age#2331L)) AND ((Department#2332 = HR) AND (Age#2331L > 25)))

(3) Project [codegen id : 1]
Output [2]: [Name#2330, Age#2331L]
Input [4]: [ID#2329L, Name#2330, Age#2331L, Department#2332]




### Physical Plan (After optimization)

In [9]:
result.explain(mode="cost")

== Optimized Logical Plan ==
Project [Name#2330, Age#2331L], Statistics(sizeInBytes=4.5 EiB)
+- Filter ((isnotnull(Department#2332) AND isnotnull(Age#2331L)) AND ((Department#2332 = HR) AND (Age#2331L > 25))), Statistics(sizeInBytes=8.0 EiB)
   +- LogicalRDD [ID#2329L, Name#2330, Age#2331L, Department#2332], false, Statistics(sizeInBytes=8.0 EiB)

== Physical Plan ==
*(1) Project [Name#2330, Age#2331L]
+- *(1) Filter ((isnotnull(Department#2332) AND isnotnull(Age#2331L)) AND ((Department#2332 = HR) AND (Age#2331L > 25)))
   +- *(1) Scan ExistingRDD[ID#2329L,Name#2330,Age#2331L,Department#2332]




# What is Apache Spark Tungsten?
Tungsten is an execution engine optimization introduced in Apache Spark 1.5 to improve CPU efficiency and memory management for large-scale data processing. It is part of Project Tungsten, which optimizes Spark’s performance using low-level JVM optimizations, cache-aware algorithms, and off-heap memory management.


Tungsten Enabled 🏎️

Faster execution due to bytecode compilation and vectorized processing.
Uses JIT (Just-In-Time) optimizations for better CPU efficiency.

Tungsten Disabled 🐢

Slower execution since Spark interprets expressions instead of compiling them.
Increased GC (Garbage Collection) overhead due to inefficient memory usage.


In [None]:
print(spark.conf.get("spark.sql.codegen.wholeStage"))  # Should be "false"
print(spark.conf.get("spark.sql.codegen.factoryMode"))  # Should be "NO_CODEGEN"
print(spark.conf.get("spark.sql.adaptive.enabled"))  # Should be "false"


# Adaptive Query Execution (AQE)

Adaptive Query Execution (AQE) is a feature in Apache Spark 3.0+ that dynamically optimizes query plans at runtime based on actual data statistics, rather than relying only on static query plans created during query compilation.

In [None]:
# from pyspark.sql import SparkSession

# spark = SparkSession.builder \
#     .appName("AQE Example") \
#     .config("spark.sql.adaptive.enabled", "true") \
#     .getOrCreate()


## Before AQE (Static Planning)
- Uses fixed shuffle partitions (e.g., 200).
- Cannot adjust join types dynamically.
- Struggles with skewed data.
## After AQE (Dynamic Execution)
- Dynamically adjusts shuffle partitions based on data size.
- Detects and fixes skewed partitions automatically.
- Switches join strategies at runtime for better performance.




### **1. What is Adaptive Query Execution (AQE) in Apache Spark?**  
**Adaptive Query Execution (AQE)** is a feature introduced in **Spark 3.0** that dynamically optimizes query execution plans **at runtime**, based on actual **data statistics**. Unlike traditional query optimization, which relies on **static execution plans**, AQE **modifies the plan dynamically** as data is being processed.  

### **2. How does AQE improve query performance?**  
AQE improves query performance by making **real-time optimizations** to execution plans based on **actual data characteristics**. It achieves this by:  
- **Reducing Shuffle Partitions Dynamically:** Adjusts the number of partitions to balance workload distribution.  
- **Handling Skewed Data:** Detects and mitigates data skew to prevent slow joins.  
- **Choosing the Best Join Strategy at Runtime:** Dynamically switches between **broadcast joins, shuffle joins, or sort-merge joins** for optimal performance.  

### **3. What are the three main optimizations in AQE?**  
AQE performs three key optimizations:  

| **Optimization**                 | **Description** | **Benefit** |
|----------------------------------|---------------|------------|
| **Dynamic Shuffle Partitioning** | Adjusts the number of shuffle partitions based on actual data size. | Prevents excessive small or large partitions, improving parallelism. |
| **Skew Join Optimization** | Identifies **skewed data** and splits large partitions into smaller ones before joining. | Prevents slow tasks due to skewed keys. |
| **Dynamically Switching Join Strategies** | Based on runtime statistics, switches between **broadcast join**, **shuffle join**, or **sort-merge join**. | Ensures the best join strategy is used, reducing execution time. |

### **4. How does AQE handle data skew?**  
Data skew occurs when **some partitions are much larger** than others, causing **certain tasks to take significantly longer** to process. AQE **fixes this issue dynamically** by:  
- **Detecting large partitions** using **runtime statistics**.  
- **Splitting skewed partitions** into **smaller sub-partitions**.  
- **Rebalancing the workload** across multiple tasks to prevent **straggler tasks** from slowing down execution.  

This ensures **efficient load balancing** and prevents a few large partitions from slowing down the entire job.  

### **5. How does AQE decide whether to use a broadcast join or a shuffle join?**  
AQE dynamically **chooses the best join strategy** based on **runtime data statistics**:  
- **If one table is small enough (typically < 10MB)**, Spark **automatically switches to a Broadcast Hash Join**, avoiding expensive **shuffle operations**.  
- **If both tables are large**, AQE **keeps the default Shuffle Hash Join or Sort-Merge Join**.  

This **reduces unnecessary data movement** and significantly improves join performance.  


# Optimization Tips for Transformations & Actions
1. Prefer reduceByKey() over groupByKey()
    - groupByKey() causes a shuffle, while reduceByKey() performs aggregation before shuffle, reducing data transfer.
2. Avoid collect() on large datasets
    - collect() brings all data to the driver, which can cause OutOfMemory issues.
3. Use persist() or cache() when reusing RDDs
```python
rdd = spark.sparkContext.parallelize([1, 2, 3, 4, 5])
cached_rdd = rdd.map(lambda x: x * 2).cache()
print(cached_rdd.count())  # First action triggers execution
print(cached_rdd.collect())  # Data is now cached, avoiding recomputation
```
