# Big Data Processing: Pandas and PySpark

**What we'll cover:**
1. Small data → Pandas wins
2. Large data → PySpark required
3. Side-by-side: Same operations in both tools
4. Key concepts: Lazy evaluation, actions vs transformations, partitions
5. Joins: Combining datasets
6. Common beginner mistakes
7. Writing results
8. Your turn: Practice exercises

---

## Setup

In [0]:
import pandas as pd
import time
from pyspark.sql.functions import col, count, avg, sum as spark_sum, desc, when

# Verify Spark is ready (spark is pre-created in Databricks)
print(f"✅ Spark version: {spark.version}")
print(f"✅ Ready to go!")

✅ Spark version: 4.0.0
✅ Ready to go!


---
## Part 1: Small Data (100 MB) - Pandas is Faster

**Dataset:** Flight delays  
**Size:** ~100 MB, 1.4M rows  
**Task:** Find top 10 airports with most delays

**Hypothesis:** PySpark overhead makes it slower for small data.

### Pandas Approach

In [0]:
# PANDAS: Read and process
path = "/dbfs/databricks-datasets/flights/departuredelays.csv"

start = time.time()

df_pandas = pd.read_csv(path)
result = df_pandas[df_pandas['delay'] > 0] \
    .groupby('origin')['delay'] \
    .count() \
    .sort_values(ascending=False) \
    .head(10)

pandas_time = time.time() - start

print(f"⏱️  Pandas: {pandas_time:.2f} seconds")
print(f"\n📊 Top 10 airports:")
print(result)

⏱️  Pandas: 7.79 seconds

📊 Top 10 airports:
origin
ATL    41828
ORD    33812
DEN    30760
DFW    28706
LAX    22684
IAH    21009
PHX    17555
LAS    16938
SFO    16552
MCO    14189
Name: delay, dtype: int64


### PySpark Approach

In [0]:
# PYSPARK: Same task
path = "/databricks-datasets/flights/departuredelays.csv"

start = time.time()

df_spark = spark.read.csv(path, header=True, inferSchema=True)
result = df_spark.filter(col('delay') > 0) \
    .groupBy('origin') \
    .count() \
    .orderBy(desc('count')) \
    .limit(10)

result.show()

pyspark_time = time.time() - start

print(f"\n⏱️  PySpark: {pyspark_time:.2f} seconds")

+------+-----+
|origin|count|
+------+-----+
|   ATL|41828|
|   ORD|33812|
|   DEN|30760|
|   DFW|28706|
|   LAX|22684|
|   IAH|21009|
|   PHX|17555|
|   LAS|16938|
|   SFO|16552|
|   MCO|14189|
+------+-----+


⏱️  PySpark: 2.45 seconds


In [0]:
# COMPARISON
print("="*50)
print("Small Data (100 MB)")
print("="*50)
print(f"Pandas:  {pandas_time:.2f}s  ← WINNER")
print(f"PySpark: {pyspark_time:.2f}s")
print(f"\nPySpark is SLOWER")
print("\n💡 Why? PySpark has 2-3 second overhead:")
print("   - Task scheduling")
print("   - Data serialization")
print("   - Cluster coordination")
print("\n   For small data, overhead > processing time!")

Small Data (100 MB)
Pandas:  7.79s  ← WINNER
PySpark: 2.45s

PySpark is 0.3x SLOWER

💡 Why? PySpark has 2-3 second overhead:
   - Task scheduling
   - Data serialization
   - Cluster coordination

   For small data, overhead > processing time!


---
## Part 2: Large Data (1 GB+) - PySpark Required

**Dataset:** 10x duplicated flights  
**Size:** ~1 GB, 14M rows  
**Task:** Same analysis on bigger data

**What happens:** Pandas will be slow/crash. PySpark handles it easily.

In [0]:
# Create large dataset by duplicating
print("📦 Creating large dataset (10x duplication)...")

df_base = spark.read.csv("/databricks-datasets/flights/departuredelays.csv", 
                          header=True, inferSchema=True)

df_large = df_base
for i in range(9):
    df_large = df_large.union(df_base)

print("✅ Created ~1 GB dataset with 14M rows")

📦 Creating large dataset (10x duplication)...
✅ Created ~1 GB dataset with 14M rows


In [0]:
# PYSPARK: Process large data
start = time.time()

result_large = df_large.filter(col('delay') > 0) \
    .groupBy('origin') \
    .count() \
    .orderBy(desc('count')) \
    .limit(10)

result_large.show()

large_time = time.time() - start

print(f"\n⏱️  PySpark on 1 GB: {large_time:.2f} seconds")
print(f"\n💡 Pandas would:")
print(f"   - Take 3-5x longer ({large_time*4:.1f}s estimate)")
print(f"   - Or crash with MemoryError")
print(f"\n   PySpark handles it easily! ✅")

+------+------+
|origin| count|
+------+------+
|   ATL|418280|
|   ORD|338120|
|   DEN|307600|
|   DFW|287060|
|   LAX|226840|
|   IAH|210090|
|   PHX|175550|
|   LAS|169380|
|   SFO|165520|
|   MCO|141890|
+------+------+


⏱️  PySpark on 1 GB: 2.85 seconds

💡 Pandas would:
   - Take 3-5x longer (11.4s estimate)
   - Or crash with MemoryError

   PySpark handles it easily! ✅


---
## Part 3: Side-by-Side Comparison

Let's see common operations in both tools.

### Operation 1: Select Columns

In [0]:
# PANDAS
df_pandas_select = df_pandas[['date', 'origin', 'destination', 'delay']]
print("Pandas:")
print(df_pandas_select.head())

print("\n" + "="*50 + "\n")

# PYSPARK  
df_spark_select = df_spark.select('date', 'origin', 'destination', 'delay')
print("PySpark:")
df_spark_select.show(5)

Pandas:
      date origin destination  delay
0  1011245    ABE         ATL      6
1  1020600    ABE         DTW     -8
2  1021245    ABE         ATL     -2
3  1020605    ABE         ATL     -4
4  1031245    ABE         ATL     -4


PySpark:
+-------+------+-----------+-----+
|   date|origin|destination|delay|
+-------+------+-----------+-----+
|1011245|   ABE|        ATL|    6|
|1020600|   ABE|        DTW|   -8|
|1021245|   ABE|        ATL|   -2|
|1020605|   ABE|        ATL|   -4|
|1031245|   ABE|        ATL|   -4|
+-------+------+-----------+-----+
only showing top 5 rows


### Operation 2: Filter Rows

In [0]:
# PANDAS
df_pandas_filter = df_pandas[
    (df_pandas['delay'] > 60) & 
    (df_pandas['distance'] > 1000)
]
print(f"Pandas: {len(df_pandas_filter):,} flights with >1hr delay and >1000 miles")

print("\n" + "="*50 + "\n")

# PYSPARK
df_spark_filter = df_spark.filter(
    (col('delay') > 60) & 
    (col('distance') > 1000)
)
print(f"PySpark: {df_spark_filter.count():,} flights with >1hr delay and >1000 miles")

Pandas: 17,218 flights with >1hr delay and >1000 miles


PySpark: 17,218 flights with >1hr delay and >1000 miles


### Operation 3: Add New Column

In [0]:
# PANDAS: Modify dataframe directly
df_pandas_copy = df_pandas.copy()
df_pandas_copy['delay_hours'] = df_pandas_copy['delay'] / 60
print("Pandas:")
print(df_pandas_copy[['delay', 'delay_hours']].head())

print("\n" + "="*50 + "\n")

# PYSPARK: Returns new dataframe (immutable)
df_spark_with_hours = df_spark.withColumn('delay_hours', col('delay') / 60)
print("PySpark:")
df_spark_with_hours.select('delay', 'delay_hours').show(5)

Pandas:
   delay  delay_hours
0      6     0.100000
1     -8    -0.133333
2     -2    -0.033333
3     -4    -0.066667
4     -4    -0.066667


PySpark:
+-----+--------------------+
|delay|         delay_hours|
+-----+--------------------+
|    6|                 0.1|
|   -8|-0.13333333333333333|
|   -2|-0.03333333333333333|
|   -4|-0.06666666666666667|
|   -4|-0.06666666666666667|
+-----+--------------------+
only showing top 5 rows


### Operation 4: Group By & Aggregate

In [0]:
# PANDAS
pandas_agg = df_pandas.groupby('origin').agg({
    'delay': ['mean', 'count']
}).round(2).head()
print("Pandas:")
print(pandas_agg)

print("\n" + "="*50 + "\n")

# PYSPARK
spark_agg = df_spark.groupBy('origin').agg(
    avg('delay').alias('avg_delay'),
    count('*').alias('count')
)
print("PySpark:")
spark_agg.show(5)

Pandas:
        delay      
         mean count
origin             
ABE     11.41   448
ABI      7.26   706
ABQ     11.23  5739
ABY      6.17   252
ACT      0.90   437


PySpark:
+------+-----------------+-----+
|origin|        avg_delay|count|
+------+-----------------+-----+
|   GNV|  8.4328165374677|  774|
|   CLT|8.681008379691571|28402|
|   DCA|8.012508036705828|17109|
|   FAT|8.853170189098998| 2697|
|   COD|2.374301675977654|  179|
+------+-----------------+-----+
only showing top 5 rows


### Operation 5: Sort

In [0]:
# PANDAS
pandas_sorted = df_pandas.sort_values('delay', ascending=False).head(5)
print("Pandas - Top 5 delays:")
print(pandas_sorted[['date', 'origin', 'destination', 'delay']])

print("\n" + "="*50 + "\n")

# PYSPARK
spark_sorted = df_spark.orderBy(desc('delay')).limit(5)
print("PySpark - Top 5 delays:")
spark_sorted.select('date', 'origin', 'destination', 'delay').show()

Pandas - Top 5 delays:
            date origin destination  delay
1378822  3090615    TPA         DFW   1642
844767   2190925    SFO         ORD   1638
620315   2021245    FLL         DFW   1636
1309315  3020700    RSW         ORD   1592
44799    1180805    BNA         DFW   1560


PySpark - Top 5 delays:
+-------+------+-----------+-----+
|   date|origin|destination|delay|
+-------+------+-----------+-----+
|3090615|   TPA|        DFW| 1642|
|2190925|   SFO|        ORD| 1638|
|2021245|   FLL|        DFW| 1636|
|3020700|   RSW|        ORD| 1592|
|1180805|   BNA|        DFW| 1560|
+-------+------+-----------+-----+



### Summary: Syntax Comparison

| Operation | Pandas | PySpark |
|-----------|--------|----------|
| **Select** | `df[['col1', 'col2']]` | `df.select('col1', 'col2')` |
| **Filter** | `df[df['col'] > 10]` | `df.filter(col('col') > 10)` |
| **Add column** | `df['new'] = df['old'] * 2` | `df.withColumn('new', col('old') * 2)` |
| **Group by** | `df.groupby('col').sum()` | `df.groupBy('col').sum()` |
| **Sort** | `df.sort_values('col')` | `df.orderBy('col')` |

💡 **Key difference:** PySpark uses `col()` function for columns

---
## Part 4: Key PySpark Concepts

Understanding these concepts is critical for working effectively with PySpark.

### Concept 1: Lazy Evaluation

**Pandas:** Executes immediately  
**PySpark:** Builds a plan, executes when needed

In [0]:
print("⚡ PySpark Lazy Evaluation Demo\n")

# These operations are INSTANT (just building a plan)
start = time.time()

df_lazy = df_large.filter(col('delay') > 0)
df_lazy = df_lazy.select('origin', 'delay')
df_lazy = df_lazy.filter(col('delay') > 30)

build_time = time.time() - start
print(f"Building plan: {build_time:.4f}s (basically instant!)")

# THIS triggers execution
start = time.time()
count = df_lazy.count()  # ← ACTION
exec_time = time.time() - start

print(f"Executing: {exec_time:.2f}s (now it actually runs)")
print(f"\nResult: {count:,} flights")
print(f"\n💡 All work happened in .count(), not before!")

⚡ PySpark Lazy Evaluation Demo

Building plan: 0.0003s (basically instant!)
Executing: 2.79s (now it actually runs)

Result: 1,897,660 flights

💡 All work happened in .count(), not before!


### Concept 2: Actions vs Transformations (CRITICAL!)

This is the most important concept in PySpark!

**Transformations** (lazy - just build the plan):
- `.filter()`, `.select()`, `.groupBy()`, `.join()`, `.withColumn()`
- Don't execute anything
- Return a new DataFrame

**Actions** (trigger execution):
- `.count()`, `.show()`, `.collect()`, `.write()`
- Actually run the computation
- Return results to driver

In [0]:
print("🔍 Actions vs Transformations Demo\n")

# TRANSFORMATIONS - instant
print("1. Building query with transformations...")
start = time.time()

query = df_large \
    .filter(col('delay') > 60) \
    .filter(col('distance') > 1000) \
    .groupBy('origin') \
    .count() \
    .filter(col('count') > 100)

transform_time = time.time() - start
print(f"   Time: {transform_time:.4f}s (instant!)\n")

# ACTION - triggers everything
print("2. Triggering with .count() action...")
start = time.time()
result = query.count()
action_time = time.time() - start

print(f"   Time: {action_time:.2f}s (now it runs!)")
print(f"   Result: {result} airports\n")

print("💡 Key takeaway:")
print("   - Chain transformations freely (they're free!)")
print("   - Minimize actions (they're expensive!)")
print("   - Don't call .count() unnecessarily!")

🔍 Actions vs Transformations Demo

1. Building query with transformations...
   Time: 0.0004s (instant!)

2. Triggering with .count() action...
   Time: 2.91s (now it runs!)
   Result: 78 airports

💡 Key takeaway:
   - Chain transformations freely (they're free!)
   - Minimize actions (they're expensive!)
   - Don't call .count() unnecessarily!


### Quick Reference: Actions vs Transformations

| Type | Operations | What They Do |
|------|-----------|-------------|
| **Transformations** | `.filter()`, `.select()`, `.groupBy()`, `.join()`, `.withColumn()`, `.orderBy()` | Build execution plan (lazy) |
| **Actions** | `.count()`, `.show()`, `.collect()`, `.take()`, `.first()`, `.write()` | Trigger execution |

**Rule of thumb:** If it returns a DataFrame, it's a transformation. If it returns a value or writes data, it's an action.

### Concept 3: Partitions - How Spark Parallelizes

Data is split into **partitions** - think of them as chunks that can be processed in parallel.

- More partitions = more parallelism = faster (usually)
- But too many = overhead from coordination
- Rule of thumb: 2-4 partitions per CPU core

In [0]:
print("📦 Partitions Demo\n")
print("💡 Partitions split your data into chunks for parallel processing")
print("   - Databricks serverless automatically manages partitions")
print("   - You can still manually repartition if needed\n")

# Repartition to different sizes
print("Testing different partition strategies...\n")

# Fewer partitions (4)
df_few = df_large.repartition(4)
start = time.time()
df_few.filter(col('delay') > 0).count()
few_time = time.time() - start
print(f"✓ Fewer partitions (4): {few_time:.2f}s")

# More partitions (32)
df_many = df_large.repartition(32)
start = time.time()
df_many.filter(col('delay') > 0).count()
many_time = time.time() - start
print(f"✓ More partitions (32): {many_time:.2f}s")

# Default (let Spark decide)
start = time.time()
df_large.filter(col('delay') > 0).count()
default_time = time.time() - start
print(f"✓ Default (auto): {default_time:.2f}s")

print(f"\n💡 Key takeaways:")
print(f"   - More partitions ≠ always faster")
print(f"   - Too few = not enough parallelism")
print(f"   - Too many = coordination overhead")
print(f"   - Usually best to let Spark decide!")
print(f"\n📝 Note: On standard clusters, you can check partition count with:")
print(f"   df.rdd.getNumPartitions() (not available on serverless)")

📦 Partitions Demo

💡 Partitions split your data into chunks for parallel processing
   - Databricks serverless automatically manages partitions
   - You can still manually repartition if needed

Testing different partition strategies...

✓ Fewer partitions (4): 3.50s
✓ More partitions (32): 3.39s
✓ Default (auto): 2.52s

💡 Key takeaways:
   - More partitions ≠ always faster
   - Too few = not enough parallelism
   - Too many = coordination overhead
   - Usually best to let Spark decide!

📝 Note: On standard clusters, you can check partition count with:
   df.rdd.getNumPartitions() (not available on serverless)


### Concept 4: Filter Early = Faster

Always filter data BEFORE expensive operations like groupBy or join.

In [0]:
print("🎯 Optimization: Filter Early\n")

# ❌ BAD: Group all 14M rows first
start = time.time()
bad = df_large.groupBy('origin').count().filter(col('count') > 1000).count()
bad_time = time.time() - start
print(f"❌ Bad:  {bad_time:.2f}s (grouped all data)")

# ✅ GOOD: Filter to 7M rows first
start = time.time()
good = df_large.filter(col('delay') > 0).groupBy('origin').count().filter(col('count') > 1000).count()
good_time = time.time() - start
print(f"✅ Good: {good_time:.2f}s (filtered first)")

print(f"\n⚡ {bad_time/good_time:.1f}x speedup by filtering early!")

🎯 Optimization: Filter Early

❌ Bad:  3.36s (grouped all data)
✅ Good: 3.10s (filtered first)

⚡ 1.1x speedup by filtering early!


---
## Part 5: Joins - Combining DataFrames

Joins are one of the most common operations in real-world data work.

### Create Sample Airport Information Dataset

In [0]:
# Create a small airport info dataset
airport_data = [
    ('SFO', 'San Francisco', 'California'),
    ('LAX', 'Los Angeles', 'California'),
    ('ORD', 'Chicago', 'Illinois'),
    ('ATL', 'Atlanta', 'Georgia'),
    ('DFW', 'Dallas', 'Texas'),
    ('JFK', 'New York', 'New York'),
    ('SEA', 'Seattle', 'Washington'),
    ('DEN', 'Denver', 'Colorado')
]

airport_df = spark.createDataFrame(airport_data, ['code', 'city', 'state'])
print("Airport Information:")
airport_df.show()

Airport Information:
+----+-------------+----------+
|code|         city|     state|
+----+-------------+----------+
| SFO|San Francisco|California|
| LAX|  Los Angeles|California|
| ORD|      Chicago|  Illinois|
| ATL|      Atlanta|   Georgia|
| DFW|       Dallas|     Texas|
| JFK|     New York|  New York|
| SEA|      Seattle|Washington|
| DEN|       Denver|  Colorado|
+----+-------------+----------+



### Inner Join - Get Delays with Airport Info

In [0]:
print("🔗 INNER JOIN Example\n")

# Get top delayed flights with city names
delayed_flights = df_spark.filter(col('delay') > 60).select('origin', 'delay', 'destination')

# Join to get origin city name
result = delayed_flights.join(
    airport_df,
    delayed_flights.origin == airport_df.code,
    'inner'  # only keep matches
).select(
    col('origin'),
    col('city').alias('origin_city'),
    col('state'),
    col('destination'),
    col('delay')
).orderBy(desc('delay'))

print("Top delayed flights with city names:")
result.show(10)

print("\n💡 Inner join only keeps rows that match in BOTH DataFrames")

🔗 INNER JOIN Example

Top delayed flights with city names:
+------+-------------+----------+-----------+-----+
|origin|  origin_city|     state|destination|delay|
+------+-------------+----------+-----------+-----+
|   SFO|San Francisco|California|        ORD| 1638|
|   DEN|       Denver|  Colorado|        DFW| 1174|
|   ORD|      Chicago|  Illinois|        MIA| 1149|
|   JFK|     New York|  New York|        MCO| 1014|
|   JFK|     New York|  New York|        DEN| 1003|
|   DEN|       Denver|  Colorado|        MSP|  982|
|   LAX|  Los Angeles|California|        ATL|  926|
|   ATL|      Atlanta|   Georgia|        PHL|  925|
|   JFK|     New York|  New York|        HNL|  922|
|   JFK|     New York|  New York|        SEA|  920|
+------+-------------+----------+-----------+-----+
only showing top 10 rows

💡 Inner join only keeps rows that match in BOTH DataFrames


### Left Join - Keep All Flights, Add Info When Available

In [0]:
print("🔗 LEFT JOIN Example\n")

# Left join keeps all flights, adds city info when available
result_left = delayed_flights.join(
    airport_df,
    delayed_flights.origin == airport_df.code,
    'left'  # keep all from left (delayed_flights)
).select(
    col('origin'),
    col('city').alias('origin_city'),
    col('destination'),
    col('delay')
)

print("All flights (with city name when available):")
result_left.show(10)

# Count nulls (airports not in our lookup table)
null_count = result_left.filter(col('origin_city').isNull()).count()
print(f"\n💡 Left join kept all flights: {null_count:,} have no city info (NULL)")

🔗 LEFT JOIN Example

All flights (with city name when available):
+------+-----------+-----------+-----+
|origin|origin_city|destination|delay|
+------+-----------+-----------+-----+
|   ABE|       NULL|        ATL|   88|
|   ABE|       NULL|        ATL|   69|
|   ABE|       NULL|        DTW|  151|
|   ABE|       NULL|        ORD|   83|
|   ABE|       NULL|        ATL|  127|
|   ABE|       NULL|        ORD|   68|
|   ABE|       NULL|        DTW|   89|
|   ABE|       NULL|        DTW|   80|
|   ABE|       NULL|        ATL|  333|
|   ABE|       NULL|        ORD|  219|
+------+-----------+-----------+-----+
only showing top 10 rows

💡 Left join kept all flights: 63,918 have no city info (NULL)


### Join Types Quick Reference

| Join Type | Keeps | Use When |
|-----------|-------|----------|
| **inner** | Only matches | Need complete info for both sides |
| **left** | All from left + matches from right | Keep all primary data, enrich with lookup |
| **right** | All from right + matches from left | Rarely used (just flip to left) |
| **outer** | Everything from both | Need to see all data, matched or not |

---
## Part 6: Common Beginner Mistakes (AVOID THESE!)

These mistakes will crash your code or make it super slow.

### Mistake 1: Using .collect() on Large Data

**THE CARDINAL SIN OF PYSPARK**

In [0]:
print("MISTAKE #1: .collect() on large data\n")

# ❌ NEVER DO THIS on large data
print("❌ BAD: df_large.collect()")
print("   → Brings ALL data to driver memory")
print("   → WILL CRASH with OutOfMemory error")
print("   → Use .show() or .limit() instead\n")

# ✅ GOOD alternatives
print("✅ GOOD: Peek at data")
df_large.show(20)  # Safe - only shows 20 rows

print("\n✅ GOOD: Sample for local analysis")
sample = df_large.limit(1000).toPandas()  # Safe - limited to 1000 rows
print(f"Sample size: {len(sample)} rows")

print("\n💡 RULE: Never .collect() unless you're sure data fits in memory!")

MISTAKE #1: .collect() on large data

❌ BAD: df_large.collect()
   → Brings ALL data to driver memory
   → WILL CRASH with OutOfMemory error
   → Use .show() or .limit() instead

✅ GOOD: Peek at data
+-------+-----+--------+------+-----------+
|   date|delay|distance|origin|destination|
+-------+-----+--------+------+-----------+
|1011245|    6|     602|   ABE|        ATL|
|1020600|   -8|     369|   ABE|        DTW|
|1021245|   -2|     602|   ABE|        ATL|
|1020605|   -4|     602|   ABE|        ATL|
|1031245|   -4|     602|   ABE|        ATL|
|1030605|    0|     602|   ABE|        ATL|
|1041243|   10|     602|   ABE|        ATL|
|1040605|   28|     602|   ABE|        ATL|
|1051245|   88|     602|   ABE|        ATL|
|1050605|    9|     602|   ABE|        ATL|
|1061215|   -6|     602|   ABE|        ATL|
|1061725|   69|     602|   ABE|        ATL|
|1061230|    0|     369|   ABE|        DTW|
|1060625|   -3|     602|   ABE|        ATL|
|1070600|    0|     369|   ABE|        DTW|
|1071725

### Mistake 2: Excessive .count() Calls

In [0]:
print("⚠️ MISTAKE #2: Too many .count() calls\n")

# ❌ BAD: Multiple counts
print("❌ BAD: Counting at every step")
start = time.time()

step1 = df_large.filter(col('delay') > 0)
print(f"   After filter: {step1.count():,}")  # Expensive!

step2 = step1.filter(col('distance') > 500)
print(f"   After filter 2: {step2.count():,}")  # Expensive!

bad_time = time.time() - start
print(f"   Time: {bad_time:.2f}s\n")

# ✅ GOOD: Count only at the end
print("✅ GOOD: Count once at the end")
start = time.time()

result = df_large \
    .filter(col('delay') > 0) \
    .filter(col('distance') > 500)

final_count = result.count()
good_time = time.time() - start

print(f"   Final count: {final_count:,}")
print(f"   Time: {good_time:.2f}s")
print(f"\n⚡ {bad_time/good_time:.1f}x faster!\n")

print("💡 RULE: .count() is expensive. Use it sparingly!")

⚠️ MISTAKE #2: Too many .count() calls

❌ BAD: Counting at every step
   After filter: 5,917,270
   After filter 2: 3,455,930
   Time: 5.15s

✅ GOOD: Count once at the end
   Final count: 3,455,930
   Time: 2.66s

⚡ 1.9x faster!

💡 RULE: .count() is expensive. Use it sparingly!


### Mistake 3: Not Viewing Execution Plans

In [0]:
print("🔍 Use .explain() to see what Spark is doing\n")

query = df_large \
    .filter(col('delay') > 60) \
    .groupBy('origin') \
    .count() \
    .orderBy(desc('count'))

# See the execution plan
print("Execution plan:")
query.explain()

print("\n💡 .explain() helps you understand:")
print("   - What operations Spark will do")
print("   - In what order")
print("   - Where optimizations happen")
print("   - Useful for debugging slow queries!")

🔍 Use .explain() to see what Spark is doing

Execution plan:
== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- == Initial Plan ==
   ColumnarToRow
   +- PhotonResultStage
      +- PhotonSort [count#14938L DESC NULLS LAST]
         +- PhotonShuffleExchangeSource
            +- PhotonShuffleMapStage ENSURE_REQUIREMENTS, [id=#23338]
               +- PhotonShuffleExchangeSink rangepartitioning(count#14938L DESC NULLS LAST, 1024)
                  +- PhotonGroupingAgg(keys=[origin#13380], functions=[finalmerge_count(merge count#14986L) AS count(1)#14984L])
                     +- PhotonShuffleExchangeSource
                        +- PhotonShuffleMapStage ENSURE_REQUIREMENTS, [id=#23332]
                           +- PhotonShuffleExchangeSink hashpartitioning(origin#13380, 1024)
                              +- PhotonGroupingAgg(keys=[origin#13380], functions=[partial_count(1) AS count#14986L])
                                 +- PhotonUnion Generic
                               

### Mistake 4: Forgetting to Filter Before Joins

In [0]:
print("🎯 MISTAKE #4: Not filtering before joins\n")

# ❌ BAD: Join all data
print("❌ BAD: Join everything first")
start = time.time()
bad_result = df_large \
    .join(airport_df, df_large.origin == airport_df.code, 'inner') \
    .filter(col('delay') > 100) \
    .count()
bad_time = time.time() - start
print(f"   Time: {bad_time:.2f}s\n")

# ✅ GOOD: Filter first
print("✅ GOOD: Filter before join")
start = time.time()
good_result = df_large \
    .filter(col('delay') > 100) \
    .join(airport_df, df_large.origin == airport_df.code, 'inner') \
    .count()
good_time = time.time() - start
print(f"   Time: {good_time:.2f}s")
print(f"\n⚡ {bad_time/good_time:.1f}x faster!\n")

print("💡 RULE: Always filter before expensive operations like joins!")

🎯 MISTAKE #4: Not filtering before joins

❌ BAD: Join everything first
   Time: 2.86s

✅ GOOD: Filter before join
   Time: 2.98s

⚡ 1.0x faster!

💡 RULE: Always filter before expensive operations like joins!


### Common Mistakes Summary

| ❌ Don't Do This | ✅ Do This Instead |
|-----------------|-------------------|
| `.collect()` on large data | `.show()` or `.limit().toPandas()` |
| `.count()` after every step | `.count()` only when needed |
| Never check execution plan | Use `.explain()` to understand |
| Join then filter | Filter then join |
| Large `.show(10000)` | `.show(20)` is usually enough |
| Ignore errors | Read error messages (they're helpful!) |

**Remember: PySpark is lazy. Take advantage of it!**

---
## Part 7: Writing Results

You've processed your data - now save it!

### Write to Parquet (Recommended for big data)

In [0]:
print("💾 Writing Results\n")

# Create a result to save
result_to_save = df_large \
    .filter(col('delay') > 60) \
    .select('date', 'origin', 'destination', 'delay', 'distance')
    
print(f"Preview of what would be saved: {result_to_save.count():,} rows")
result_to_save.show(5)

💾 Writing Results

Preview of what would be saved: 939,850 rows
+-------+------+-----------+-----+--------+
|   date|origin|destination|delay|distance|
+-------+------+-----------+-----+--------+
|1051245|   ABE|        ATL|   88|     602|
|1061725|   ABE|        ATL|   69|     602|
|1090600|   ABE|        DTW|  151|     369|
|1091219|   ABE|        ORD|   83|     569|
|1111215|   ABE|        ATL|  127|     602|
+-------+------+-----------+-----+--------+
only showing top 5 rows


### Write to CSV (For small results or Excel)

In [0]:
small_result = (
    df_large
    .filter(
        col('delay') > 60
    )
    .groupBy('origin')
    .agg(
        avg('delay').alias('avg_delay'),
        count('*').alias('flight_count')
    )
    .orderBy(
        desc('avg_delay')
    )
    .limit(100)
)

display(small_result)

origin,avg_delay,flight_count
GUM,302.0,90
ALO,222.23076923076923,130
YUM,191.77777777777777,180
BGR,191.25,160
CDV,176.85714285714286,70
MOT,176.76,250
EGE,175.71428571428572,980
ADQ,174.4,50
ACT,172.5,100
HNL,170.21596244131456,2130


### Write Modes

| Mode | What It Does | Use When |
|------|-------------|----------|
| `overwrite` | Replace existing data | Recreating results |
| `append` | Add to existing data | Incremental updates |
| `ignore` | Skip if exists | Don't want to overwrite |
| `error` | Fail if exists | Safety - don't overwrite accidentally |

**Most common:** `overwrite` for one-off analyses

### File Formats Quick Reference

| Format | Best For | Pros | Cons |
|--------|----------|------|------|
| **Parquet** | Big data storage | Fast, compressed, columnar | Not human-readable |
| **CSV** | Small results, Excel | Readable, universal | Slow, no compression |
| **JSON** | Nested data, APIs | Flexible schema | Larger files |
| **Delta** | Production data lakes | ACID, time travel, fast | Needs Delta Lake setup |

**Default choice: Use Parquet for everything unless you have a reason not to.**