**Cache:**

In PySpark, caching is a mechanism that allows you to store intermediate DataFrames or RDDs in memory to speed up subsequent actions on them. This is especially useful when you're performing multiple operations on the same dataset and want to avoid recomputing expensive transformations.

**Key Concepts of Caching in PySpark:**

**1.What is Caching?**

-   Caching is the process of storing a DataFrame or RDD in memory after the first computation so that it can be reused in subsequent operations without needing to be recomputed.

-   This can drastically improve performance for iterative algorithms, like machine learning models or graph processing tasks, where the same data is processed multiple times.

**2.Why Use Cache?**

-   When performing multiple transformations on a large dataset, every transformation may require reading data from disk or recomputing the results.
-   Caching stores the results of a DataFrame or RDD in memory (RAM), so subsequent transformations or actions on the cached data are faster.

**3.When to Cache?**

-   **Repeated access to the same dataset:** If you plan to use the same dataset multiple times, caching is beneficial.

-   **Expensive operations:** If a particular transformation is costly (e.g., shuffling, sorting, or aggregation), caching intermediate results can speed up future operations.

**4.How to Cache in PySpark?**

-   You can use the cache() method to cache a DataFrame or RDD. This stores the data in memory by default.

Example:

```
df = spark.read.csv("large_file.csv")
df.cache()
```

Alternatively, if you want to control the storage level (e.g., store the data on disk if there is not enough memory), you can use the `persist()` method with a specific storage level.

```
from pyspark import StorageLevel
df.persist(StorageLevel.MEMORY_AND_DISK)
```

**5.Storage Levels in PySpark:** PySpark provides several options for caching based on where and how the data should be stored. The default storage level is MEMORY_AND_DISK. Here are the most common storage levels:

-   MEMORY_ONLY: Stores data only in memory (RAM). If there is insufficient memory, the computation will fail.

-   MEMORY_AND_DISK: Stores data in memory, but if there is not enough memory, it spills the data to disk. This is the default level.

-   DISK_ONLY: Stores data only on disk.

-   MEMORY_ONLY_SER: Stores data in memory in a serialized format. This may reduce memory usage but can be slower to access.

-   MEMORY_AND_DISK_SER: Stores data in a serialized format in memory and on disk if needed.

Example:

```
df.persist(StorageLevel.MEMORY_ONLY)
```

**6.Unpersisting:**

After you no longer need the cached data, you should explicitly release it using the `unpersist()` method. This helps free up memory and prevents memory pressure from building up.

```
df.unpersist()
```

**7.Cache and Action/Transformation Behavior:**

-   Caching only stores the data up to the point of the cache call. The transformations that follow the cache will be executed based on the cached data.
-   **Note:** Caching is not a persistent mechanism across different Spark sessions; it will be cleared once the Spark session ends.

**8.Lazy Evaluation:**

-   Spark uses lazy evaluation, meaning that cached data will not be loaded into memory until an action (like collect(), show(), count(), etc.) is called. Simply calling cache() does not trigger the computation, it only marks the DataFrame for caching.

**Caching vs Persisting:**

-   cache() is a shorthand for persist(StorageLevel.MEMORY_AND_DISK). If you don't need fine-grained control over how and where the data is stored, cache() is sufficient.
-   persist() gives more flexibility by allowing you to choose different storage levels depending on your memory/disk constraints.

**Key Considerations:**

-   **Memory Constraints:** If the dataset is too large to fit into memory, caching can lead to out-of-memory errors. In such cases, you might want to use the MEMORY_AND_DISK storage level.

-   **Multiple Caches:** Multiple DataFrames can be cached in the same session, but it's important to unpersist() when you're done to avoid unnecessary memory consumption.

-   **Not Always Beneficial:** Caching is not always beneficial. For small datasets or when you're performing only a few operations, caching can add unnecessary overhead.

**When Caching Might Not Help:**

-   If you only perform one action on the dataset and don't reuse it, caching might not provide any performance benefit.
-   If your dataset is small enough to fit entirely in memory without caching, it may not be necessary to cache it.

**Performance Tuning and Best Practices:**

-   **Cache only what’s necessary:** Cache only large DataFrames or RDDs that are accessed multiple times.

-   **Cache intermediate results:** In iterative algorithms (e.g., machine learning), caching intermediate results can significantly speed up the process.

-   **Check your cluster resources:** Ensure your cluster has sufficient memory to hold the cached data, otherwise, use a more disk-friendly storage level like MEMORY_AND_DISK.


In [1]:
import findspark
findspark.init()

from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("PySparkExample") \
    .getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/12/23 17:48:51 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [2]:
orders_base = spark.sparkContext.textFile("/Users/sugumarsrinivasan/Documents/data/sample_orders_1GB.csv")

In [3]:
header = orders_base.first()

                                                                                

In [4]:
orders_without_header = orders_base.filter(lambda line: line!= header)

In [5]:
orders_filtered = orders_without_header.filter(lambda x: x.split(",")[3] != "PENDING_PAYMENT")

In [6]:
orders_mapped = orders_filtered.map(lambda x: (x.split(",")[2],1))

In [7]:
orders_reduced = orders_mapped.reduceByKey(lambda x,y: x+y)

In [8]:
orders_filtered = orders_reduced.filter(lambda x: int(x[0]) < 501)

In [9]:
orders_filtered.cache()

PythonRDD[7] at RDD at PythonRDD.scala:53

In [12]:
orders_filtered.collect()

[('191', 463),
 ('365', 470),
 ('193', 464),
 ('24', 438),
 ('6', 436),
 ('245', 419),
 ('29', 459),
 ('477', 433),
 ('419', 470),
 ('57', 432),
 ('410', 453),
 ('318', 442),
 ('324', 402),
 ('416', 453),
 ('428', 422),
 ('72', 458),
 ('118', 469),
 ('92', 448),
 ('32', 414),
 ('54', 457),
 ('139', 449),
 ('277', 429),
 ('239', 462),
 ('394', 448),
 ('396', 430),
 ('414', 469),
 ('307', 413),
 ('443', 465),
 ('371', 443),
 ('198', 435),
 ('400', 474),
 ('316', 455),
 ('51', 449),
 ('300', 476),
 ('490', 461),
 ('114', 469),
 ('280', 456),
 ('244', 459),
 ('48', 446),
 ('376', 421),
 ('187', 477),
 ('124', 490),
 ('470', 473),
 ('71', 446),
 ('142', 475),
 ('431', 434),
 ('242', 455),
 ('115', 425),
 ('143', 459),
 ('112', 455),
 ('306', 414),
 ('487', 458),
 ('240', 415),
 ('154', 437),
 ('185', 437),
 ('284', 429),
 ('93', 427),
 ('270', 424),
 ('364', 445),
 ('12', 437),
 ('7', 415),
 ('268', 430),
 ('486', 454),
 ('298', 460),
 ('390', 471),
 ('149', 451),
 ('389', 499),
 ('50', 459

**Spark UI:**

![Local Image](./screenshots/spark-chache-job.png)
![Local Image](./screenshots/spark-cache-stage.png)
![Local Image](./screenshots/spark-cache-stage-skip.png)