### *Cache*

*Caching a DataFrame in PySpark is a performance optimization technique that stores the DataFrame's contents in memory or disk for faster access during subsequent operations.*

##### *Performance Improvement:* 
*Caching a DataFrame reduces the need to recompute the DataFrame's transformations, especially when the DataFrame is used multiple times in subsequent operations. This can significantly improve performance, especially for iterative algorithms or complex computations.*

##### *Caching Syntax:* 
*To cache a DataFrame, you can use the cache() method. For example:* ***df.cache()***

In [0]:
spark.catalog.clearCache()

In [0]:
dbutils.fs.mkdirs('/FileStore/mithelesh/inbound')

True

#Without Cache

In [0]:

from pyspark.sql.functions import col, date_format, round, sum
from time import time
# file_location = "/FileStore/tables/sales126mb-1.csv"

# Step 1: Load Data
try:
    df = spark.read.csv('/FileStore/mithelesh/inbound/sales126mb.csv', header=True, inferSchema=True)
    
    # Step 2: Creating dataframe
    df1 = df.withColumn("order_date", date_format(col("event_time"), "yyyy-MM-dd")).select("user_id", "product_id", "brand", "price", "order_date")  
    #df2 = df1
    # Step 3: Measure execution time without caching
    start_time = time()
    df_without_cache = df1.groupBy("user_id").agg(round(sum("price")).alias("total_amount"))
    df_without_cache.show(5)
    #df_without_cache.count()  # Trigger an action to force computation
    time_without_cache = time() - start_time
    print(f"Execution time without cache: {time_without_cache} seconds")
    
except Exception as e:
    print("An error occurred:", e)

+---------+------------+
|  user_id|total_amount|
+---------+------------+
|555447570|      1137.0|
|524493281|      1432.0|
|543091275|      9047.0|
|509481306|       180.0|
|513992906|       165.0|
+---------+------------+
only showing top 5 rows
Execution time without cache: 14.392612218856812 seconds


### Read or scan data from disk

![](files/images/image_before_cache.png)

### No data cached in memory

![](files/images/image_before_cache_storage_stats.png)

%md
#With Cache

In [0]:
from pyspark.sql.functions import col, date_format, round, sum
from time import time

# Step 1: Load Data
try:
    df = spark.read.csv("/FileStore/mithelesh/inbound/sales126mb.csv", header=True, inferSchema=True)
    
    # Step 2: Process Data (without caching)
    df1 = df.withColumn("order_date", date_format(col("event_time"), "yyyy-MM-dd")).select("user_id", "product_id", "brand", "price", "order_date")  
         
    # Step 4: Process Data (with caching)
    df1.cache()  # Cache the DataFrame in memory
    df_with_cache = df1.groupBy("user_id").agg(round(sum("price")).alias("total_amount"))
    # Step 5: Measure execution time with caching
    start_time = time()
    df_without_cache.show(5)
    #df_with_cache.count()  # Trigger an action to force computation
    time_with_cache = time() - start_time
    print(f"Execution time with cache: {time_with_cache} seconds")
    
except Exception as e:
    print("An error occurred:", e)


+---------+------------+
|  user_id|total_amount|
+---------+------------+
|555447570|      1137.0|
|524493281|      1432.0|
|543091275|      9047.0|
|509481306|       180.0|
|513992906|       165.0|
+---------+------------+
only showing top 5 rows
Execution time with cache: 3.6317877769470215 seconds


%md
### Read or scan data from memory instead of disk after cache

![](files/images/image_after_cache.png)

### Data cached in memory

![](files/images/image_after_cache_storage_stats.png)

## Spark Storage Level: Disk Memory Deserialized 1x Replicated

**Description:**
- **Disk Memory**: Data is stored in memory (RAM) if available, and spilled to disk if there is insufficient memory. This ensures data retention even if memory is full, though accessing disk-stored data is slower.
- **Deserialized**: Data is stored in its original object format in memory, which allows for faster access as it doesn't require deserialization. This typically uses more memory than serialized data.
- **1x Replicated**: Data is stored without replication, meaning each partition is stored on a single node only. This saves memory and disk space but does not provide redundancy in case of node failure.

**Use Case:**
This storage level is useful when:
- You need to balance between memory usage and persistence.
- Fast access to data is required, and there is sufficient memory available.
- Redundancy is not a primary concern, and you're comfortable with recomputing data if a node fails.


### *Here's a summary of few more DataFrame caching points*


##### *Unpersisting:* 
*When the cached DataFrame is no longer needed or when memory resources are required, you can unpersist it using the unpersist() method:*

###### *df.unpersist()*

In [0]:
# Clean up
df1.unpersist()  # Unpersist the cached DataFrame from memory

DataFrame[user_id: int, product_id: int, brand: string, price: double, order_date: string]

#####*Cache Management:* 
*It's essential to manage DataFrame caching efficiently to avoid memory issues. Caching too many DataFrames or caching large DataFrames can lead to memory pressure and potential out-of-memory errors. It's recommended to cache only the necessary DataFrames and to unpersist them when they are no longer needed.*


##### *spark.catalog.clearCache()*

*The spark.catalog.clearCache() function in PySpark clears all cached tables and the associated in-memory DataFrame caches. This function is useful for releasing memory resources occupied by cached DataFrames, especially when memory usage needs to be optimized or when cached data becomes outdated.*

In [0]:
spark.catalog.clearCache()

### When to Use Cache and Persist in Spark

##### 1. Repeated Access to Data
*Use `cache()` or `persist()` when a DataFrame or RDD is accessed multiple times within the same Spark job. This avoids recalculating the data, saving time and resources.*

##### 2. Optimization for Iterative Algorithms
For iterative algorithms, like machine learning models (e.g., K-means, PageRank), caching or persisting data prevents redundant computations across iterations.

##### 3. Performance Improvement for Expensive Operations
When transformations or computations are expensive (e.g., wide transformations like `groupBy`), caching or persisting results can significantly boost performance.

##### 4. Interactive and Exploratory Data Analysis
During interactive data analysis, caching data can make the experience faster and more responsive, reducing the latency of repeated queries.

##### 5. Joining Large Datasets
When joining large datasets, caching the smaller dataset (if it fits into memory) can reduce the overhead of shuffling and broadcasting.


### When Not to Use Cache and Persist in Spark

##### 1. Memory Constraints
If your cluster has limited memory, overusing `cache()` and `persist()` can lead to memory pressure, causing out-of-memory errors or forcing Spark to spill data to disk, which is slower.

##### 2. Data Used Only Once
If the DataFrame or RDD is accessed only once, caching or persisting it is unnecessary and wastes resources.

##### 3. Small or Lightweight Transformations
For small datasets or lightweight transformations, the overhead of caching may outweigh the benefits, as Spark is optimized for efficient in-memory operations.

##### 4. Frequent Data Changes
If data is frequently updated or transformed, caching might require frequent invalidation and recomputation, negating performance gains and adding complexity.

##### 5. Temporary Data
Avoid caching or persisting temporary or intermediate datasets that are only used in subsequent stages. Let Spark manage these automatically within the execution plan.
 