# Mod8L5 PySpark vs. pandas (Local Parquet) — Do in Google Colab
**Goal:** See the syntax and workflow differences between **pandas** and **PySpark** using a **local Parquet** file (no cloud).  
**Format:** INSTRUCTOR → YOU DO → REFLECTION

## INSTRUCTOR — Follow Along Together (20 mins.)

### Why Parquet (local-first)
- **Columnar** → fast column scans & smaller I/O.
- **Compressed & splittable** → efficient storage & parallel reads.
- **Self-describing schema** → types are embedded in the file.
- **Predicate pushdown** (Spark) → skip non-matching row groups.

**Docs:**  
- Spark SQL/DataFrames: https://spark.apache.org/docs/latest/sql-programming-guide.html  
- Spark Reader/Writer Parquet: https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrameReader.parquet.html  
- pandas `read_parquet`/`to_parquet`: https://pandas.pydata.org/docs/reference/api/pandas.read_parquet.html

### Step 0 — (Optional) Install libraries (only if needed)


In [None]:
# Do a Pip Install in the Terminal for these pacakages

#pip install pyspark==3.5.1
#pip install pyarrow==15.0.2


### Step 1 — Create a tiny local Parquet (once) and keep it on disk
We’ll generate a small sample dataset, save it to **`./customers.parquet/`** (a folder of Parquet part files).

In [None]:
# RUN THIS CELL WITHOUT CHANGES
import pandas as pd
import numpy as np

rng = np.random.default_rng(42)
n = 20
pdf = pd.DataFrame({
    "customer_id": np.arange(1, n+1),
    "age": rng.integers(18, 70, size=n),
    "country": rng.choice(["US", "UK", "FR", "DE"], size=n),
    "spend_usd": rng.normal(100, 30, size=n).round(2)
})

# Save locally as Parquet (folder)
None
print("Wrote local parquet folder: ./customers.parquet/")


In [None]:
pdf2 = pd.read_parquet("customers.parquet")
display(pdf2.head())
display(pdf2.dtypes)

pdf_country = (
    pdf2.groupby("country", as_index=False)
        .agg(n_rows=("customer_id","count"),
             avg_spend=("spend_usd","mean"))
        .sort_values("avg_spend", ascending=False)
)
display(pdf_country)


### Step 3 — Start SparkSession (local) and read the same local Parquet

**Instructor:** You can hover over methods in Google Colab to see the documentation.  Note to students what is similar between this syntax and pandas syntax


In [None]:
# RUN THIS CELL WITHOUT CHANGES
from pyspark.sql import SparkSession
from pyspark.sql import functions as F

spark = None
df = None
df.printSchema()
df.show(5, truncate=False)




### Step 4 — Spark: do the same (select, filter, groupBy)


In [None]:
# Select a few columns
df_sel = None

# Filter
df_us = None
print("US rows:", df_us.count())
df_us.show(truncate=False)

# Group & aggregate (Spark)
df_country = None
df_country.show(truncate=False)


### Step 5 — Write a filtered subset to Parquet & round-trip


In [None]:
out_path = "customers_high_spend.parquet"
(
    df.filter(F.col("spend_usd") > 100)
      .write.mode("overwrite")
      .parquet(out_path)
)
rt = spark.read.parquet(out_path)
rt.show(truncate=False)


## YOU DO (30 mins)

Use the docs to complete the tasks in Spark **and** (where it helps you think) pandas.

**Helpful Docs:**  
- Spark `functions`: https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/functions.html  
- Spark `DataFrame` API: https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/index.html  
- pandas groupby: https://pandas.pydata.org/docs/reference/api/pandas.DataFrame.groupby.html

### Task A — Schema, sample, and simple transforms (Spark)
1. Print the schema and count rows.  
2. Add two columns:
   - `spend_eur = spend_usd * 0.92`
   - `is_senior = age >= 40` (boolean)
3. Show the top 8 rows sorted by `spend_usd` descending.

### Task B — Aggregations (Spark)
4. By `country`, compute:
   - `n_rows = count(*)`
   - `avg_age = avg(age)`
   - `p90_spend = percentile_approx(spend_usd, 0.90)` (use `F.expr(...)` or SQL expr)
   Sort by `p90_spend` descending, show all rows.

### Task C — (Compare) Do the same aggregation in pandas
5. Load the same `customers.parquet` into pandas and compute:
   - `n_rows`, `avg_age`, and the 90th percentile of `spend_usd` (use `quantile(0.90)`).
   - Sort to match Spark output.

> Tip: Notice Spark’s **lazy** execution vs pandas’ **eager** execution.


### Answers (Instructor Only)

In [None]:
#Answer

# YOU DO — write your Spark code below

from pyspark.sql import functions as F

# A1) schema & count
# df.printSchema()
# print("Rows:", df.count())

# A2) new columns
# df2 = (df
#        .withColumn("spend_eur", F.col("spend_usd") * F.lit(0.92))
#        .withColumn("is_senior", F.col("age") >= F.lit(40)))
# df2.show(5, truncate=False)

# A3) sort
# df2.orderBy(F.col("spend_usd").desc()).show(8, truncate=False)

# B4) aggregations with percentile_approx
# country_stats = (df
#     .groupBy("country")
#     .agg(F.count("*").alias("n_rows"),
#          F.avg("age").alias("avg_age"),
#          F.expr("percentile_approx(spend_usd, 0.90)").alias("p90_spend"))
#     .orderBy(F.col("p90_spend").desc())
# )
# country_stats.show(truncate=False)
# YOU DO — pandas comparison for Task C

import pandas as pd

# Load the same local parquet into pandas
# pdf3 = pd.read_parquet("customers.parquet")

# agg with pandas
# pdf_stats = (pdf3
#              .groupby("country", as_index=False)
#              .agg(n_rows=("customer_id","count"),
#                   avg_age=("age","mean"),
#                   p90_spend=("spend_usd", lambda s: s.quantile(0.90)))
#              .sort_values("p90_spend", ascending=False)
#             )
# display(pdf_stats)



In [None]:
# RUN THIS CELL WITHOUT CHANGES -- IMPORTANT
spark.stop()
print("Spark session stopped.")

## REFLECTION — Short Answers


1) **Why Parquet locally?**
- If you used CSV yesterday and Parquet today for the same data, what changes in performance and why?

2) **Scaling Up**
- If this dataset were 100 GB on your laptop, which tool would likely still run and why?  
- What would you change first to handle that scale?

