# 🧠 PySpark `cache()` and `persist()` – Notes

---

## ✅ Purpose of `.cache()` and `.persist()`

### ⚙️ 1. Avoid Recomputing Expensive Operations

- PySpark transformations are **lazy** – nothing is executed until an **action** is called.
- If you reuse the same DataFrame or RDD multiple times, Spark **recomputes the full DAG** every time.
- This can be slow and inefficient for large or complex transformations.

#### 🔁 Example (Without Caching):

```python
df_filtered = df.filter("age > 30")

# First action
df_filtered.groupBy("city").count().show()

# Second action – recomputes the filter again!
df_filtered.agg({"salary": "avg"}).show()
```
---

## 🔁 `cache()` in PySpark

- Shortcut for: `.persist(StorageLevel.MEMORY_AND_DISK)`
- Stores DataFrame or RDD in **memory**, and if memory is full, **spills to disk**.
- Best for datasets used multiple times in actions or transformations.

```python
df = spark.read.csv("data.csv", header=True, inferSchema=True)
df.cache()  # Default = MEMORY_AND_DISK
df.count()  # Triggers the cache
```

---

## 🧊 `persist()` in PySpark

- Gives you **full control** over the storage level.
- You can persist data in memory, on disk, or in serialized form.

```python
from pyspark import StorageLevel

df.persist(StorageLevel.MEMORY_ONLY)
df.count()  # Triggers caching
```

---

## ⚙️ Storage Levels for `.persist()`

| Storage Level              | Description                                      |
|---------------------------|--------------------------------------------------|
| `MEMORY_ONLY`             | Fastest; data must fit in memory                 |
| `MEMORY_AND_DISK`         | Tries memory; spills to disk if needed           |
| `DISK_ONLY`               | Stores only on disk (slowest)                    |
| `MEMORY_ONLY_SER`         | Stores serialized objects in memory              |
| `MEMORY_AND_DISK_SER`     | Serialized in memory; spills to disk if needed   |
| `OFF_HEAP`                | Uses off-heap memory (advanced use)              |

---

## 🧹 `unpersist()` – Clear Cache

- Removes data from memory/disk:

```python
df.unpersist()
```

---

## 🧪 Example: Using `persist()` and `unpersist()`

```python
df = spark.read.csv("employees.csv", header=True, inferSchema=True)

# Filter and persist intermediate result
filtered_df = df.filter("salary > 50000").persist(StorageLevel.MEMORY_AND_DISK)

# Reuse multiple times
filtered_df.groupBy("department").count().show()
filtered_df.agg({"salary": "avg"}).show()

# Free up memory
filtered_df.unpersist()
```

---

## 🚀 When Should You Use It?

- Use `.cache()`:
  - When default memory+disk is sufficient
- Use `.persist(level)`:
  - When you need custom control (e.g., memory only, disk only)
- Always trigger with an **action** like `.count()`, `.show()`, `.collect()`, etc.

---


In [0]:
from pyspark.sql import SparkSession
from pyspark import StorageLevel
from pyspark.sql.types import StructType, StructField, StringType, IntegerType

# 1. Start Spark session
spark = SparkSession.builder.appName("CachePersistExample").getOrCreate()

# 2. Create sample employee data
data = [
    ("e01", "Alice", "HR", 55000),
    ("e02", "Bob", "IT", 48000),
    ("e03", "Carol", "Finance", 62000),
    ("e04", "David", "IT", 72000),
    ("e05", "Eva", "HR", 51000),
    ("e06", "Frank", "Finance", 45000),
    ("e07", "Grace", "IT", 53000)
]

schema = StructType([
    StructField("id", StringType(), True),
    StructField("name", StringType(), True),
    StructField("department", StringType(), True),
    StructField("salary", IntegerType(), True)
])

# 3. Create original DataFrame
df = spark.createDataFrame(data, schema)

# 4. Filter high-salary employees
high_salary_df = df.filter("salary > 50000")

# Rename columns in df to avoid ambiguity during join
df = df.withColumnRenamed('name', 'namedf') \
       .withColumnRenamed('department', 'departmentdf') \
       .withColumnRenamed('salary', 'salarydf')

# 5. Cache the filtered DataFrame
high_salary_df.cache()

# 6. Trigger cache with an action
print("🔄 Counting high-salary employees:")
print("Total:", high_salary_df.count())

# 7. Reuse cached DataFrame
print("\n📊 Department-wise count of high earners:")
high_salary_df.groupBy("department").count().show()

print("💰 Average salary of high earners:")
high_salary_df.agg({"salary": "avg"}).show()

# 8. Unpersist cached DataFrame
high_salary_df.unpersist()

# 9. Join original (renamed) df with high_salary_df on "id"
joined_df = df.join(high_salary_df, on="id", how="inner")

# 10. Persist the joined DataFrame with available storage level
joined_df.persist(StorageLevel.MEMORY_AND_DISK_DESER)

# 11. Trigger persist with an action and show joined data
print("\n🔁 Joined high salary employees (with disambiguated columns):")
joined_df.select(
    "id",
    "namedf",         # from df
    "departmentdf",   # from df
    "salarydf",       # from df
    "name",           # from high_salary_df
    "department",     # from high_salary_df
    "salary"          # from high_salary_df
).show()

# 12. Unpersist the joined DataFrame
joined_df.unpersist()

# 13. Stop Spark session
spark.stop()


In [0]:
print(dir(StorageLevel))

['DISK_ONLY', 'DISK_ONLY_2', 'DISK_ONLY_3', 'MEMORY_AND_DISK', 'MEMORY_AND_DISK_2', 'MEMORY_AND_DISK_DESER', 'MEMORY_ONLY', 'MEMORY_ONLY_2', 'OFF_HEAP', '__annotations__', '__class__', '__delattr__', '__dict__', '__dir__', '__doc__', '__eq__', '__format__', '__ge__', '__getattribute__', '__gt__', '__hash__', '__init__', '__init_subclass__', '__le__', '__lt__', '__module__', '__ne__', '__new__', '__reduce__', '__reduce_ex__', '__repr__', '__setattr__', '__sizeof__', '__str__', '__subclasshook__', '__weakref__']
