# Spark Performance Tuning

## 1. Caching and Persistence

Caching and persistence are optimization techniques for iterative and interactive Spark computations. They help store intermediate data in memory or more durable storage mediums to avoid recomputing the same information all the time.

### 1.1 Why and When to Cache or Persist?

Caching in Spark is a way to store computed datasets in memory for subsequent uses without having to recompute them. Using cache effectively can greatly improve the performance of Spark applications. Here are situations where caching is typically beneficial:

1. **Iterative Algorithms**: Caching is particularly useful for iterative algorithms like those found in machine learning (e.g., gradient descent in linear regression, iterative computations in graph processing). In each iteration, the same data is processed repeatedly. By caching the dataset, you avoid recomputing or re-reading it in each iteration.

2. **Reused Data**: If any DataFrame or RDD is used across multiple Spark operations or actions, it's beneficial to cache it. For example, if you're performing multiple separate aggregations on the same dataset, caching can help.

3. **Expensive Computations**: If generating a DataFrame or RDD involves heavy computations or requires data from various sources, caching it once it's computed can save significant time.

4. **Frequent Joins with a Small DataFrame**: If you have a smaller DataFrame that gets frequently joined with other larger DataFrames, it might be a good idea to cache the smaller one. In some cases, broadcasting the smaller DataFrame (which inherently caches it on each executor) can be even more beneficial.

5. **Interactive Analysis**: When doing exploratory data analysis using tools like SparkSQL or Databricks notebooks, users often run multiple ad-hoc queries on the same dataset. Caching can help speed up this interactive analysis.

6. **Checkpointing**: If you're breaking lineage (chain of transformations) to truncate the long lineage or to manage recomputation cost, caching the DataFrame before calling a checkpoint can be advantageous.

However, while caching can be very beneficial in these situations, there are times when it may not be optimal:

- **Infrequent Access**: If the data is accessed only once, caching might not only be unnecessary but can also add overhead. It might be slower to cache and then compute rather than just compute directly.

- **Limited Memory**: If the available memory is limited, caching large DataFrames might cause other cached DataFrames to be evicted or even lead to out-of-memory errors. You need to be cautious about what you choose to cache in such environments.

- **Mutable Workloads**: If you keep updating or changing the data, caching may not be efficient as you'll have to persist the changes, which can be expensive.

It's essential to monitor the performance and memory usage of your Spark application (using Spark's web UI or other monitoring tools) to ensure that caching is providing the intended benefits. If not used judiciously, caching can lead to memory issues or even degrade performance.

### 1.2 Usage

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import mean, when, col

# Initialize Spark session
spark = SparkSession.builder \
    .appName("Spark Performance Tuning") \
    .master("local[4]") \
    .getOrCreate()

spark

Lets create a big DataFrame to appreciate caching benefits

In [None]:
df = spark.read.json("datasets/students.json")
big_df = df

for _ in range(99):
    big_df = big_df.union(df)
    
print(f"The new DataFrame has {big_df.count()} rows")

Let's cache an aggregation transformation on this big DataFrame

In [None]:
cached_df = big_df.groupBy("age") \
                    .agg(mean("grade").alias("mean_grade")) \
                    .orderBy("age")
cached_df.cache() # Cache the DataFrame 
# Everytime we access the DataFrame from now on it won't be calculated again
cached_df.show()

### 1.3 Example

**Compare the grade of each student to the mean of its age group**

**Cached DataFrame**

In [None]:
%%time
# The expensive_computation_df is not recomputed here as it has been cached
big_df.join(cached_df, on="age", how="left") \
    .withColumn("performance", 
        when(col('grade') > col('mean_grade'), 'Over mean') \
        .otherwise(
            when(col('grade') == col('mean_grade'), 'On mean') \
            .otherwise('Under mean')
        )
    ) \
    .select("age", "name", "surname", "performance") \
    .show(1)

To remove a DataFrame from the cache you can call the `unpersist()` function, if you want to remove all the cached DataFrames at once you can call `spark.catalog.clearCache()`

In [None]:
# Remove the computed DataFrame from the cache
cached_df.unpersist()
# This removes all the cached DataFrames for the Spark Session
spark.catalog.clearCache()

**Unpersisted DataFrame**

In [None]:
%%time
big_df.join(cached_df, on="age", how="left") \
    .withColumn("performance", 
        when(col('grade') > col('mean_grade'), 'Over mean') \
        .otherwise(
            when(col('grade') == col('mean_grade'), 'On mean') \
            .otherwise('Under mean')
        )
    ) \
    .select("age", "name", "surname", "performance") \
    .show(1)

### 1.4 Storage levels

There are the following storage levels available sorted by fastest but smallest to slowest but largest:

- MEMORY_ONLY
- MEMORY_AND_DISK
- MEMORY_ONLY_SER (serialized)
- MEMORY_AND_DISK_SER (serialized)
- DISK_ONLY

The DataFrame operation `.cache()` does not recieve an storage level as under the hood it performs a `.persist()` operation with the storage level `MEMORY_AND_DISK` (Since version 1.3.0) which is usually the most common as it uses memory until is full and then switches to disk storage.

```python
from pyspark import StorageLevel
df.persist(StorageLevel.MEMORY_AND_DISK)
```

`NOTE`: Caching is useful with heavy computations which result is not very big (around 50% - 60% of the total executor memory), because if the result is too big then it will start storing it in disk which is noticeably slower to access.

## 2. Broadcast Variables and Accumulators

### 2.1 Broadcast Variables

When working with Spark, you might need to send a read-only variable to all the executors. Broadcast variables allow the programmer to keep a read-only variable stored on each executor rather than shipping a copy of it with tasks. This process helps to reduce the amount of data that needs to be transferred over the network, which can improve performance.

<img src="images/broadcast_variables.webp" title="Broadcast Variables" width="700px"/>

Here are some scenarios where using broadcast variables in Spark is beneficial:

1. **Small DataFrame Joins**: When joining a large DataFrame with a small DataFrame, you can broadcast the smaller DataFrame to all worker nodes. This helps avoid the shuffling of the larger DataFrame and can significantly improve join performance.

2. **Frequently Used Lookup Tables**: If tasks often refer to a small lookup table (e.g., mapping of codes to descriptions), it's efficient to broadcast this table so that it's available locally on each worker, rather than fetching it multiple times.

3. **Machine Learning Models**: When scoring data using a machine learning model, if the model is small enough, you can broadcast it to all worker nodes. This way, each node can score data locally without needing to fetch the model repeatedly.

4. **Configuration or Parameters**: If tasks need to reference certain configuration settings or parameters, broadcasting these can prevent the need to send them with every task.

5. **Accumulative Data Structures**: For some algorithms, you might need to refer to a data structure that gets built incrementally (e.g., a prefix tree or histogram). If this structure is small, broadcasting it can improve efficiency.

6. **Avoiding Repetitive Reads**: If tasks on each node need to read the same part of a dataset or file, broadcasting the relevant data can help save on I/O operations.

7. **Static Data Across Tasks**: In some algorithms or computations, there might be static data or state that doesn't change and is used across tasks. Broadcasting this data ensures it's available locally on each worker.

While broadcasting can be highly efficient in these scenarios, there are a few things to keep in mind:

- **Size Limitations**: Broadcasting very large variables can be counterproductive. It might use a significant amount of memory on each worker node and also take time to send across the network initially. As a rule of thumb, only broadcast data that is comfortably small relative to the available memory on worker nodes.

- **Read-only**: Broadcast variables are meant to be read-only. They should not be modified by tasks.

- **Broadcast Cost**: There is an initial cost to broadcast a variable as it needs to be sent to all nodes. It's essential to ensure the benefits of broadcasting (usually in reduced network data transfer in subsequent operations) outweigh this initial cost.

#### **Example**

Following the previous cache example, let's broadcast the heavy computation to the executors as it is a small DataFrame.

In [None]:
%%time
from pyspark.sql.functions import broadcast

big_df.join(broadcast(cached_df), on="age", how="left") \
    .withColumn("performance", 
        when(col('grade') > col('mean_grade'), 'Over mean') \
        .otherwise(
            when(col('grade') == col('mean_grade'), 'On mean') \
            .otherwise('Under mean')
        )
    ) \
    .select("age", "name", "surname", "performance") \
    .show(1)

### 2.2 Accumulators

Accumulators are variables that can only be "added" to. They can be used to implement counters and sums efficiently in parallel. Spark natively supports accumulations of numeric types, and programmers can add support for new types.

Here are some scenarios where using accumulators in Spark is beneficial:

1. **Global Counters**: Accumulators can be used to maintain global counters across tasks. For example, while processing a large dataset, you might want to keep track of records that meet a certain condition or the total number of errors encountered.

2. **Summations**: If you're performing operations that involve summation (e.g., calculating the total sum of a particular field across all records), accumulators can be helpful.

3. **Monitoring and Logging**: Accumulators can be used for monitoring purposes. For instance, you might want to track the number of times a particular code path is executed or track the number of missing values for certain fields.

4. **Validation and Quality Checks**: While processing data, you might want to perform certain validation checks. Accumulators can be used to count the number of records that fail these validations.

5. **Histograms**: They can be employed to build histograms in parallel tasks, where each task updates the histogram based on the portion of data it processes.

6. **Set Accumulation**: Though typically accumulators are used for numerical operations like count and sum, they can also be used to accumulate sets. For example, if you want to build a set of unique user agents from web logs.

7. **Advanced Algorithms**: In more advanced algorithms, especially those that require global information or intermediate results from different partitions, accumulators can be leveraged to gather this information efficiently.

While accumulators are powerful, there are essential caveats and best practices:

- **Idempotency**: Accumulators do not guarantee idempotency in case of task failures. If a task is retried by Spark, the accumulator might be updated multiple times for that task. Therefore, they should be used where occasional duplicates are not a concern, or with operations that are both associative and commutative.

- **Read-only on Worker Nodes**: Accumulator variables should be updated only on worker nodes (inside Spark transformations). They should not be read on worker nodes. The value of accumulators should only be read on the driver program after all tasks have completed.

- **Custom Accumulators**: While Spark provides built-in accumulators for simple types like integer and double, one can also develop custom accumulators for more complex types. However, it's important to ensure that the `add` and `merge` operations are correctly implemented for custom types to ensure accurate and efficient aggregation.

In conclusion, accumulators provide an efficient way to gather global information across tasks in Spark.

#### **Example**

Let's create an accumulator to count how many times any student named "Santiago" is processed in a UDF.

In [None]:
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType

# Initialize accumulator
santiago_name_accumulator = spark.sparkContext.accumulator(0)

# Define UDF function
def add_suffix(name):
    global santiago_name_accumulator
    # Increment if name is Santiago
    if name == 'Santiago':
        santiago_name_accumulator += 1
    return name + "_UDF"

# Register UDF
suffix_udf = udf(add_suffix, StringType())

# Apply UDF to DataFrame
df.withColumn("name_with_suffix", suffix_udf(df["name"])) \
    .select("name", "name_with_suffix") \
    .collect() # Use collect to force the evaluation as Spark has lazy evaluation

print(f"There has been {santiago_name_accumulator.value} students named Santiago processed.")

## 3. Partitioning

Partitioning in Spark is a way to distribute the data across the distributed storage or computing nodes. Proper partitioning is essential for distributing computation and for optimizing data shuffling and I/O operations. Here's a detailed explanation of when and how to partition data in Spark, along with the types of partitioning strategies:

### 3.1 When is it useful to partition data and why?

1. **Optimizing Data Locality**: When data and the computation is close to each other, the time to fetch the data is reduced, leading to faster task execution.

2. **Optimizing Shuffles**: In operations like joins or groupBy, if the data is properly partitioned, the need to shuffle data across nodes can be minimized, leading to performance improvements.

3. **Load Balancing**: Proper partitioning ensures that data is evenly distributed across nodes, preventing situations where some nodes are idle while others are overloaded.

4. **Optimizing Joins**: When two DataFrames/RDDs are being joined on a particular column and they are partitioned using the same key, the join operation can be optimized since data is already co-located.

5. **Optimizing Data Storage**: When saving data back to distributed storage systems like HDFS or cloud storage, partitioning can optimize storage and subsequent read operations. For instance, data can be partitioned by date, so each day's data is in a separate directory.

### 3.2 How to Partition Data in Spark?

1. **Repartitioning**: You can use the `repartition()` method on a DataFrame or RDD to increase or decrease the number of partitions. You can optionally specify a column on which to partition.
```python
df.repartition(100)  # Repartition DataFrame into 100 partitions
df.repartition(50, "date")  # Repartition based on the 'date' column
```

2. **Coalesce**: To reduce the number of partitions, `coalesce()` can be more efficient than `repartition()` because it avoids a full shuffle.
```python
df.coalesce(10)  # Reduce the number of partitions to 10
```

### 3.3 Types of Partitioning and When to Use Them

1. **Hash Partitioning**: Spark distributes data based on the value's hash code. It's useful when you want to ensure a balanced distribution of data but don't necessarily care about which specific keys go to which partition.
- **Use Case**: When performing a join operation on a key, using hash partitioning on that key for both DataFrames can optimize the join.

2. **Range Partitioning**: The data is partitioned based on a range of values. This ensures that a continuous range of data values resides in a single partition.
- **Use Case**: When you have ordered data and operations (like sorting), range partitioning can be beneficial.

3. **Custom Partitioning**: Spark allows you to define a custom partitioning strategy by extending `Partitioner` in RDD operations. This way, you can define any specific logic to determine which keys go to which partition.
- **Use Case**: When the built-in hash or range partitioning doesn't suit your needs, you might require custom logic.

4. **Bucketing**: Specific to DataFrames and used primarily when saving data to storage, bucketing is a form of partitioning where data is divided into a fixed number of "buckets" based on the hash of a column's values.
- **Use Case**: Useful when you have a large dataset that you'll be querying frequently with filters on a specific column. For instance, saving data in buckets based on user ID for a large user dataset.

### 4.4 Example
In the `datasets/` folder we have a [Kaggle dataset](https://www.kaggle.com/datasets/mvieira101/global-cost-of-living?select=cost-of-living_v2.csv) named `cost-of-living.csv` which contains the following information:

- `city`: Name of the city
- `country`: Name of the country
- `x1`: Meal, Inexpensive Restaurant (USD)
- `x2`: Meal for 2 People, Mid-range Restaurant, Three-course (USD)
- `x3`: McMeal at McDonalds (or Equivalent Combo Meal) (USD)
- `x4`: Domestic Beer (0.5 liter draught, in restaurants) (USD)
- `x5`: Imported Beer (0.33 liter bottle, in restaurants) (USD)
- `x6`: Cappuccino (regular, in restaurants) (USD)
- `x7`: Coke/Pepsi (0.33 liter bottle, in restaurants) (USD)
- `x8`: Water (0.33 liter bottle, in restaurants) (USD)
- `x9`: Milk (regular), (1 liter) (USD)
- `x10`: Loaf of Fresh White Bread (500g) (USD)
- `x11`: Rice (white), (1kg) (USD)
- `x12`: Eggs (regular) (12) (USD)
- `x13`: Local Cheese (1kg) (USD)
- `x14`: Chicken Fillets (1kg) (USD)
- `x15`: Beef Round (1kg) (or Equivalent Back Leg Red Meat) (USD)
- `x16`: Apples (1kg) (USD)
- `x17`: Banana (1kg) (USD)
- `x18`: Oranges (1kg) (USD)
- `x19`: Tomato (1kg) (USD)
- `x20`: Potato (1kg) (USD)
- `x21`: Onion (1kg) (USD)
- `x22`: Lettuce (1 head) (USD)
- `x23`: Water (1.5 liter bottle, at the market) (USD)
- `x24`: Bottle of Wine (Mid-Range, at the market) (USD)
- `x25`: Domestic Beer (0.5 liter bottle, at the market) (USD)
- `x26`: Imported Beer (0.33 liter bottle, at the market) (USD)
- `x27`: Cigarettes 20 Pack (Marlboro) (USD)
- `x28`: One-way Ticket (Local Transport) (USD)
- `x29`: Monthly Pass (Regular Price) (USD)
- `x30`: Taxi Start (Normal Tariff) (USD)
- `x31`: Taxi 1km (Normal Tariff) (USD)
- `x32`: Taxi 1hour Waiting (Normal Tariff) (USD)
- `x33`: Gasoline (1 liter) (USD)
- `x34`: Volkswagen Golf 1.4 90 KW Trendline (Or Equivalent New Car) (USD)
- `x35`: Toyota Corolla Sedan 1.6l 97kW Comfort (Or Equivalent New Car) (USD)
- `x36`: Basic (Electricity, Heating, Cooling, Water, Garbage) for 85m2 Apartment (USD)
- `x37`: 1 min. of Prepaid Mobile Tariff Local (No Discounts or Plans) (USD)
- `x38`: Internet (60 Mbps or More, Unlimited Data, Cable/ADSL) (USD)
- `x39`: Fitness Club, Monthly Fee for 1 Adult (USD)
- `x40`: Tennis Court Rent (1 Hour on Weekend) (USD)
- `x41`: Cinema, International Release, 1 Seat (USD)
- `x42`: Preschool (or Kindergarten), Full Day, Private, Monthly for 1 Child (USD)
- `x43`: International Primary School, Yearly for 1 Child (USD)
- `x44`: 1 Pair of Jeans (Levis 501 Or Similar) (USD)
- `x45`: 1 Summer Dress in a Chain Store (Zara, H&M, …) (USD)
- `x46`: 1 Pair of Nike Running Shoes (Mid-Range) (USD)
- `x47`: 1 Pair of Men Leather Business Shoes (USD)
- `x48`: Apartment (1 bedroom) in City Centre (USD)
- `x49`: Apartment (1 bedroom) Outside of Centre (USD)
- `x50`: Apartment (3 bedrooms) in City Centre (USD)
- `x51`: Apartment (3 bedrooms) Outside of Centre (USD)
- `x52`: Price per Square Meter to Buy Apartment in City Centre (USD)
- `x53`: Price per Square Meter to Buy Apartment Outside of Centre (USD)
- `x54`: Average Monthly Net Salary (After Tax) (USD)
- `x55`: Mortgage Interest Rate in Percentages (%), Yearly, for 20 Years Fixed-Rate
- `data_quality`: 0 if Numbeo considers that more contributors are needed to increase data quality, else 1

We want to know the countries with the highest average cost for a Capuccino (`x6`), you know, we are Spark developers...

In [None]:
df = spark.read.option("header",True).csv("datasets/cost-of-living.csv").select("city", "country", "x6")
df.printSchema()

As we can see by printing the schema the column `x6` is a `string`, so we need to cast it into a `float`

In [None]:
from pyspark.sql.functions import col

df = df.withColumn("capuccino_price", col("x6").cast("float")).drop("x6")
df.printSchema()

Now, the column `capuccino_price` is a `float` but is nullable which means some values of the price can be null (NaN as it is a float), in this case we will check if there are any null values and remove them as we dont want to compute the mean with NaN values.

In [None]:
from pyspark.sql.functions import isnan

print("#### Unfiltered DataFrame ####")
df.filter(isnan("capuccino_price")).show(5)
df = df.filter(~isnan("capuccino_price"))
print("#### Filtered DataFrame ####")
df.filter(isnan("capuccino_price")).show(5)

Now we can start computing the countries with the highest average cost for a Capuccino. But in order to appreciate a time difference we will create a bigger DataFrame like before by unioning it with itself.

In [None]:
big_df = df

for _ in range(99):
    big_df = big_df.union(df)

print(f"The new DataFrame has {big_df.count()} rows")

**Without Proper Partitioning:**

If we don't consider partitioning, the default partitioning strategy might distribute the data across nodes without any specific ordering:

In [None]:
%%time
from pyspark.sql.functions import mean, format_number, desc

big_df.groupBy("country") \
    .agg(
        format_number(
            mean("capuccino_price"), 2
        ).alias("avg_capuccino_price")
    ) \
    .orderBy(desc("avg_capuccino_price")) \
    .show(10)

In this case, since data isn't partitioned by `country`, there's a high chance that data shuffling will occur when grouping by `country`. This can result in higher network overhead and slower performance.

`Side Note`: Wow! Didn't see that one coming, but seems legit according to the page [Coffeestics](https://coffeestics.com/countries/turkmenistan) 🤷‍♂️.

**With Proper Partitioning:**

By repartitioning our data by the `country` column, we can ensure that all data for a given `country` will reside on the same partition (and thus the same node):

In [None]:
partitioned_df = big_df.repartition("country")
partitioned_df.write.mode("overwrite").parquet("datasets/partitioned_cost_of_living")

In [None]:
df = spark.read.parquet("datasets/partitioned_cost_of_living")
df.count()

In [None]:
%%time
df.groupBy("country") \
    .agg(
        format_number(
            mean("capuccino_price"), 2
        ).alias("avg_capuccino_price")
    ) \
    .orderBy(desc("avg_capuccino_price")) \
    .show(10)

Now, when we perform the `groupBy` operation, PySpark will not need to shuffle data over the network, because all the data for a particular country is already co-located on the same node. This reduces network overhead and speeds up the operation.

**Conclusion:**

This example illustrates the importance of good partitioning in PySpark. By being aware of how our data is distributed and how it will be accessed, we can leverage partitioning to optimize performance. Proper partitioning reduces data shuffling and ensures that related data is co-located, leading to more efficient computations.