In [1]:
import os
os.environ["JAVA_HOME"] = "/Library/Java/JavaVirtualMachines/openjdk-17.jdk/Contents/Home"

# Import findspark to help Jupyter locate your Spark installation
import findspark

# Initialize the findspark library — sets up environment so Spark works in notebooks
findspark.init()

# Import SparkSession, the main entry point to Spark functionality
from pyspark.sql import SparkSession


#Note that we have set parallelism to 8
spark = (SparkSession.builder # Start Spark session builder
            .appName("OptimizeProcessingJob") # Set application name
            .config("spark.sql.shuffle.partitions", 8) # Set number of shuffle partitions (e.g. for groupBy, joins)
            .config("spark.default.parallelism", 8) # Set default parallelism
            .config("spark.sql.warehouse.dir", "spark-warehouse")
            .config("spark.driver.extraJavaOptions", "--add-opens java.base/javax.security.auth=ALL-UNNAMED")
            .config("spark.executor.extraJavaOptions", "--add-opens java.base/javax.security.auth=ALL-UNNAMED") 
            .enableHiveSupport() # Enable Hive support for Spark SQL
            .master("local[2]") # Run Spark locally using 2 CPU threads
            .getOrCreate()
)

# Print the version of Spark you're using (e.g., "4.0.0")
print("✅ Spark is ready. Version:", spark.version)

Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/07/08 10:49:00 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


✅ Spark is ready. Version: 4.0.0


Recall that the shuffle partitions determine the number of output partitions when Spark does wide transformations like `groupBy`, `join`, or `reduceByKey`
- So after a shuffle

Default parallelism controls the default number of partitions for non-shuffle operations, especially RDD-based operations (like `parallelize()`)
- How RDDs are split when first created (either by loading in data for the first time, or converting a dataframe to an RDD)

### 05.01 Pushing down Projections
When downstream queries/processing only looks for a subset of columns, Spark optimizer is smart enough to identify them and only read those columns into the in-memory data frame. This saves on I/O and memory. This is called Projection Push down. While building data pipelines, it helps to be aware of how Spark works and take advantage of this for optimization.

In [2]:
sales_data = spark\
                .read\
                .parquet("dummy_hdfs/partitioned_parquet")

#show the execution plan
print("\n--------------------------EXPLAIN--------------------------")
sales_data.select("Product","Quantity").explain()
print("-------------------------END EXPLAIN-----------------------\n")

                                                                                


--------------------------EXPLAIN--------------------------
== Physical Plan ==
*(1) Project [Product#6, Quantity#3]
+- *(1) ColumnarToRow
   +- FileScan parquet [Quantity#3,Product#6] Batched: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex(1 paths)[file:/Users/bing/Downloads/Spark/Ex_Files_Big_Data_Analytics_Hadoop_Ap..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<Quantity:int>


-------------------------END EXPLAIN-----------------------



Projection Pushdown 
- When we only search for the columns we need from the disk, rather than loading the entire file
- In this case, Parquet files are columnar, so Spark can read just the necessary ones without scanning everything 
- This saves on I/O (less data from the disk), memory (fewer columns stored in memory), and compute (less data is processed)
- Happens automatically with columnar formats like Parquet and improves performance

### 05.02 Pushing down Filters
When downstream queries/processing only looks for a subset of subset, Spark optimizer is smart enough to identify them and only read those columns into the in-memory data frame. This saves on I/O and memory. This is called Filter Push down. This works for both partition columns and non-partition columns. While building data pipelines, it helps to be aware of how Spark works and take advantage of this for optimization.

In [3]:
from pyspark.sql.functions import col

#Use a partition attribute for filtering
mouse_df = sales_data.where(col("Product") == 'Mouse')
mouse_df.show(5)

#show the execution plan
print("\n--------------------------EXPLAIN--------------------------")
mouse_df.explain()
print("-------------------------END EXPLAIN-----------------------\n")


+---+--------+----------+--------+-----+--------------------+-------+
| ID|Customer|      Date|Quantity| Rate|                Tags|Product|
+---+--------+----------+--------+-----+--------------------+-------+
|  6|  Google|2019/11/23|       5|40.58|                NULL|  Mouse|
|  8|  Google|2019/11/13|       1|46.79|Urgent:Discount:P...|  Mouse|
| 14|   Apple|2019/11/09|       4|40.27|            Discount|  Mouse|
| 15|   Apple|2019/11/25|       5|38.89|                NULL|  Mouse|
| 20|LinkedIn|2019/11/25|       4|36.77|       Urgent:Pickup|  Mouse|
+---+--------+----------+--------+-----+--------------------+-------+
only showing top 5 rows

--------------------------EXPLAIN--------------------------
== Physical Plan ==
*(1) ColumnarToRow
+- FileScan parquet [ID#0,Customer#1,Date#2,Quantity#3,Rate#4,Tags#5,Product#6] Batched: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex(1 paths)[file:/Users/bing/Downloads/Spark/Ex_Files_Big_Data_Analytics_Hadoop_Ap..., Parti

Predicate Pushdown 
- When filters (i.e. WHERE conditions) are pushed down to the data source level, so only the rows that match the filter are read from the disk
- If we only need data where `Product = 'Mouse'`, Spark tells the Parquet reader to only scan the rows where `Product` is "Mouse"
- This also helps to reduce I/O, memory used, and downstream computation

In [4]:
google_df = sales_data.where(col("Customer") == 'Google')
google_df.show(5)

#show the execution plan
print("\n--------------------------EXPLAIN--------------------------")
google_df.explain()
print("-------------------------END EXPLAIN-----------------------\n")

+---+--------+----------+--------+-----+--------------------+-------+
| ID|Customer|      Date|Quantity| Rate|                Tags|Product|
+---+--------+----------+--------+-----+--------------------+-------+
|  6|  Google|2019/11/23|       5|40.58|                NULL|  Mouse|
|  8|  Google|2019/11/13|       1|46.79|Urgent:Discount:P...|  Mouse|
| 35|  Google|2019/11/17|       2|49.33|              Pickup|  Mouse|
| 51|  Google|2019/11/27|       4| 32.8|              Urgent|  Mouse|
| 57|  Google|2019/11/21|       5| 32.0|              Pickup|  Mouse|
+---+--------+----------+--------+-----+--------------------+-------+
only showing top 5 rows

--------------------------EXPLAIN--------------------------
== Physical Plan ==
*(1) Filter (isnotnull(Customer#1) AND (Customer#1 = Google))
+- *(1) ColumnarToRow
   +- FileScan parquet [ID#0,Customer#1,Date#2,Quantity#3,Rate#4,Tags#5,Product#6] Batched: true, DataFilters: [isnotnull(Customer#1), (Customer#1 = Google)], Format: Parquet, Locat

- The output here shows the filter operation where Spark keeps rows where Cusomter is not null and equals "Google"
- The * means the step is run in parallel
- Pushed Filters are the filters which Spark will apply while reading from disk, and so above we can see `PushedFilters: [IsNotNull(Customer), EqualTo(Customer,Google)]` 
- So Spark applies these filters into the Parquet reader, and instead of loading all the rows from the disk it filters it as it reads

TLDR: "This physical plan shows Spark scanning a Parquet file, applying a filter to select rows where `Customer = "Google"`. Spark performs **predicate pushdown**, which means it filters data *while reading from disk* instead of reading all rows into memory. The data is read in columnar batches, then converted to rows for filtering. This leads to faster and more efficient query execution."

### 05.03 Partitioning and coalescing
While performing actions, Spark creates results with the default partition count. In the case of Local mode, its usually equal to the number of cores. In the case of Clusters, where we connect to a group of external machines, the default is 200. This can be too much, if the number of cores in the cluster is significantly less than the number of partitions (we would have to queue operations), So repartitioning helps to set the optimal number of partitions.

Repartition does a full reshuffle and can be used for increasing/decreasing partitions.

Coalasce simply consolidates existing partitions and avoids a full reshuffle. It can be used to decrease the number of partitions.

Repartition and Coalasce themselves take significant time and resources. Do them only if multiple steps downstream will benefit from them.
- So `.repartition()` shuffles the data again and distributes among the number of clusters 
- `.coalesce()` is primarily used to decrease the number of partitions, and here we avoid a full shuffle by collapsing existing partitions together

In [5]:
print("Default parallelism from Spark Session :", 
      spark.sparkContext.defaultParallelism)

#Read a file with default parallelism
raw_sales_data = spark\
                .read\
                .option("inferSchema", "true")\
                .option("header", "true")\
                .csv("datasets/sales_orders.csv")

#Partitions in sales data partitioned by product (read previously)
#1 partition per product
print("\nPartitions in hdfs data with parquet : ",
      sales_data.rdd.getNumPartitions())

#Raw partition count
print("\nPartitions in data frame for raw CSV read : ", 
      raw_sales_data.rdd.getNumPartitions())

#Repartition to 5 partitions
partitioned_sales_data = raw_sales_data.repartition(5)

print("\nPartitions in raw data frame after repartitioning : ", 
      partitioned_sales_data.rdd.getNumPartitions())

#coalesce to 3 partitions
coalesced_sales_data = partitioned_sales_data.coalesce(3)

print("\nPartitions in raw data frame after coalesce : ", 
      coalesced_sales_data.rdd.getNumPartitions())

Default parallelism from Spark Session : 8


                                                                                


Partitions in hdfs data with parquet :  4

Partitions in data frame for raw CSV read :  1

Partitions in raw data frame after repartitioning :  5

Partitions in raw data frame after coalesce :  3


### 05.04 Optimizing Joins
By default, joining two data frames require a lot of shuffling. This is because when we join, Spark needs to bring matching keys from both Dataframes onto the same executor so it can compare them for the join. To do this, Spark reorganizes and redistributes the data across the cluster so that rows with the same join key end up on the same partition, meaning we move data between executors and write to disk and read again. 


 If one data frame is considerably small, and can fit in memory, a better option is to send a full copy of that small DataFrame to every executor, and then use those copies to join locally. Spark Optimizer chooses Broadcast joins when possible. Data frames within spark.sql.autoBroadcastJoinThreshold are automatically broadcasted

- Spark automatically chooses this method when the smaller DataFrame is under a certain size (default: **10MB**, set by `spark.sql.autoBroadcastJoinThreshold`).
- Broadcast joins greatly **reduce shuffle costs** and **speed up** operations when applicable

In [None]:
from pyspark.sql.functions import broadcast

product_data = spark\
                .read\
                .option("inferSchema", "true")\
                .option("header", "true")\
                .csv("datasets/product_vendor.csv")
product_data.show(5)

#Broadcast product data
broadcast_product=broadcast(product_data) #marks the DataFrame as a broadcast variable, and makes it available to all executors before the join operation

#Join with broadcasted local copy of product data
joined_data = sales_data.join(broadcast_product,"Product")

joined_data.show(5)

#show the execution plan
print("\n--------------------------EXPLAIN--------------------------")
joined_data.explain()
print("-------------------------END EXPLAIN-----------------------\n")

+--------+-----------+
| Product|     Vendor|
+--------+-----------+
|   Mouse|   Logitech|
|Keyboard|  Microsoft|
|  Webcam|   Logitech|
| Headset|Plantronics|
+--------+-----------+

+-------+---+--------+----------+--------+-----+--------------------+--------+
|Product| ID|Customer|      Date|Quantity| Rate|                Tags|  Vendor|
+-------+---+--------+----------+--------+-----+--------------------+--------+
|  Mouse|  6|  Google|2019/11/23|       5|40.58|                NULL|Logitech|
|  Mouse|  8|  Google|2019/11/13|       1|46.79|Urgent:Discount:P...|Logitech|
|  Mouse| 14|   Apple|2019/11/09|       4|40.27|            Discount|Logitech|
|  Mouse| 15|   Apple|2019/11/25|       5|38.89|                NULL|Logitech|
|  Mouse| 20|LinkedIn|2019/11/25|       4|36.77|       Urgent:Pickup|Logitech|
+-------+---+--------+----------+--------+-----+--------------------+--------+
only showing top 5 rows

--------------------------EXPLAIN--------------------------
== Physical Plan ==

### 05.05 Storing Intermediate Results
By default, every time an action is performed, Spark executes all the previous steps right from the data read. This can end up being very expensive, especially while using Spark in a development or interactive mode. A better option is to cache the intermediate results when wanting to perform the same action on the same RDD multiple times. Spark can cache in memory. It can also persist in both memory and disk. While running under YARN, persistance happens in HDFS by default.

- `.cache()` stores in memory only.

- `.persist()` can store in memory + disk, or other strategies (useful if data is too big for RAM).

In [None]:
print("Plan before caching intermediate results:\n-------------------------")
data_before = coalesced_sales_data.where(col("Customer") == 'Google')
data_before.explain()

#store intermediate results on disk
coalesced_sales_data.persist() #caches the DataFrame in memory and on disk

print("Plan after caching intermediate results:\n-------------------------")
data_after = coalesced_sales_data.where(col("Customer") == 'Google')
data_after.explain()


Plan before caching intermediate results:
-------------------------
== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- Coalesce 3
   +- Exchange RoundRobinPartitioning(5), REPARTITION_BY_NUM, [plan_id=279]
      +- Filter (isnotnull(Customer#70) AND (Customer#70 = Google))
         +- FileScan csv [ID#69,Customer#70,Product#71,Date#72,Quantity#73,Rate#74,Tags#75] Batched: false, DataFilters: [isnotnull(Customer#70), (Customer#70 = Google)], Format: CSV, Location: InMemoryFileIndex(1 paths)[file:/Users/bing/Downloads/Spark/Ex_Files_Big_Data_Analytics_Hadoop_Ap..., PartitionFilters: [], PushedFilters: [IsNotNull(Customer), EqualTo(Customer,Google)], ReadSchema: struct<ID:int,Customer:string,Product:string,Date:string,Quantity:int,Rate:double,Tags:string>


Plan after caching intermediate results:
-------------------------
== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- Filter (isnotnull(Customer#70) AND (Customer#70 = Google))
   +- InMemoryTableScan [ID#69, Customer#7

25/07/08 16:59:16 WARN HeartbeatReceiver: Removing executor driver with no recent heartbeats: 214006 ms exceeds timeout 120000 ms
25/07/08 16:59:16 WARN SparkContext: Killing executors is not supported by current scheduler.
25/07/08 16:59:21 ERROR Inbox: Ignoring error
org.apache.spark.SparkException: Exception thrown in awaitResult: 
	at org.apache.spark.util.SparkThreadUtils$.awaitResult(SparkThreadUtils.scala:53)
	at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:342)
	at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75)
	at org.apache.spark.rpc.RpcEnv.setupEndpointRefByURI(RpcEnv.scala:102)
	at org.apache.spark.rpc.RpcEnv.setupEndpointRef(RpcEnv.scala:110)
	at org.apache.spark.util.RpcUtils$.makeDriverRef(RpcUtils.scala:36)
	at org.apache.spark.storage.BlockManagerMasterEndpoint.driverEndpoint$lzycompute(BlockManagerMasterEndpoint.scala:132)
	at org.apache.spark.storage.BlockManagerMasterEndpoint.org$apache$spark$storage$BlockManagerMasterEndpoint$$

What does `.collect()` do?
- It executes all the lazy transformations you've done on the DataFrame or RDD.
- It retrieves all records from the distributed Spark cluster.
- It returns the data as a Python list of rows (or values), stored in the driver's memory.