# Apache Spark with Python (PySpark)
## Short Tutorial

This notebook is designed for a **short session** in the *Big Data & Business Intelligence* course.

## Learning goals

By the end of this notebook, you should be able to:

1. Explain what Apache Spark is and when to use it.
2. Start a local PySpark session (`SparkSession`).
3. Load data into Spark DataFrames.
4. Perform basic transformations and aggregations.
5. Use Spark SQL to query data.
6. Build a simple ETL pipeline that reads CSV, transforms, joins, and writes Parquet.


## 1. What is Apache Spark?

Apache Spark is a **distributed data processing framework**. It is designed for:

- Handling **large datasets** that do not fit into memory on one machine.
- Running computations **in parallel** across a cluster of machines.
- Providing a **high-level API** (DataFrames, SQL) that feels similar to Pandas or SQL.

### Understanding the Problem: Why Do We Need Spark?

Imagine you work for a company that has **terabytes** of customer data like transaction logs, clickstream data, sensor readings, etc. You need to analyze this data to generate reports or train machine learning models.

**The Challenge:**
- Your laptop has maybe 8-16 GB of RAM.
- Traditional tools like **Pandas** load all data into memory on a single machine.
- If your dataset is 500 GB, Pandas simply **cannot handle it**—your computer will crash or freeze.

**The Solution: Distributed Computing**

This is where **Apache Spark** comes in. Instead of trying to process all the data on one machine, Spark:

1. **Splits** the data into smaller chunks.
2. **Distributes** these chunks across multiple machines (called a "cluster").
3. **Processes** each chunk in parallel on different machines.
4. **Combines** the results back together.

This approach allows you to process datasets that are **much larger than the memory of any single machine**.

### Why not just use Pandas?

| **Pandas** | **Spark** |
|------------|-----------|
| Keeps all data in **memory on a single machine** | **Distributes** data across many machines |
| Fast for small-to-medium datasets (< 10 GB) | Designed for **large datasets** (GBs to PBs) |
| Single-threaded or limited parallelism | Runs computations **in parallel** across all cores/machines |
| Runs only on your laptop | Can run locally **or** on a cloud cluster with hundreds of machines |

### When Should You Use Spark?

- **Large datasets** that don't fit in memory (> 10-100 GB)
- **Complex data pipelines** (ETL: Extract, Transform, Load)
- **Big data analytics** and reporting
- **Machine learning** on large datasets
- When you need to **scale** from your laptop to a cloud cluster

### When Should You Use Pandas Instead?

- Dataset fits comfortably in memory (< 5 GB)
- Quick exploratory analysis
- Simple data manipulations

### Key Concepts in Spark

**1. Cluster Computing**
- A **cluster** is a group of computers (called "nodes") working together.
- Spark can run on a single machine (local mode) or on a cluster (distributed mode).
- The same Spark code runs in **both modes**—no changes needed!

**2. Lazy Evaluation**
- Spark doesn't execute operations immediately.
- It builds a **plan** of what to do, then executes everything when you ask for results.
- This allows Spark to **optimize** the entire workflow.

**3. DataFrames and SQL**
- In modern Spark, we work with **DataFrames** (similar to Pandas DataFrames or SQL tables).
- You can use **SQL queries** or **DataFrame API** methods—whichever you prefer.
- Under the hood, Spark converts everything to optimized execution plans.

### Real-World Example

**Scenario:** An e-commerce company analyzes 2 TB of daily clickstream data.

- **With Pandas:** Impossible—data doesn't fit in memory.
- **With Spark:** 
  - Data is split across 100 machines.
  - Each machine processes 20 GB in parallel.
  - Results are combined to generate insights in minutes.
  - The same code can run on your laptop with a sample dataset for testing!

---

In this tutorial, we will run Spark in **local mode** on your laptop, but remember: the same code can scale to process massive datasets in the cloud.

## 2. Environment Setup — Starting a SparkSession

> If you are running this locally and do not have PySpark installed yet, either run:

```bash
pip install pyspark
```

or follow the environment setup that uses the requirements.txt file to install pyspark.
In this notebook, we create a **SparkSession**, which is the main entry point for using Spark with DataFrames and SQL.


In [2]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("BD_BI_Spark_Tutorial") \
    .master("local[*]") \
    .getOrCreate()

# Suppress verbose logging output
spark.sparkContext.setLogLevel("ERROR")

spark, spark.version

Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
25/11/24 23:53:59 WARN Utils: Your hostname, Aakashs-MacBook-Air.local, resolves to a loopback address: 127.0.0.1; using 192.168.0.109 instead (on interface en0)
25/11/24 23:53:59 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/11/24 23:54:01 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


(<pyspark.sql.session.SparkSession at 0x11492a270>, '4.0.1')

### Understanding the SparkSession Configuration

Let's break down what each part of the SparkSession setup means:

- **`SparkSession.builder`**: Starts the configuration process for creating a Spark session.
- **`.appName("BD_BI_Spark_Tutorial")`**: Sets a name for your application. This name appears in Spark's monitoring UI and logs, making it easy to identify your job.
- **`.master("local[*]")`**: Tells Spark where to run:
  - `local` means run on your laptop (not a cluster)
  - `[*]` means use all available CPU cores for parallel processing
  - You could also use `local[2]` to use only 2 cores, or `local[4]` for 4 cores
- **`.getOrCreate()`**: Creates a new SparkSession, or reuses an existing one if it's already running (useful when re-running cells).
- **`.sparkContext.setLogLevel("ERROR")`**: Reduces verbose output by only showing error messages.

**Important:** Once created, you use the `spark` object to read data, create DataFrames, and run SQL queries throughout your notebook.

## 3. Creating a Sample Dataset

For this tutorial, we will create a small **housing dataset** directly in the notebook, instead of reading from disk.
In a real setting, you would typically read from CSV, Parquet, or a database.


In [3]:
from pyspark.sql import Row

data = [
    Row(house_id=1, neighborhood_id=10, price=300000, sqft=80,  bedrooms=2, year_built=1990, price_per_bedroom=100000),
    Row(house_id=2, neighborhood_id=10, price=450000, sqft=100, bedrooms=3, year_built=2005, price_per_bedroom=200000),
    Row(house_id=3, neighborhood_id=11, price=500000, sqft=120, bedrooms=4, year_built=2010, price_per_bedroom=190000),
    Row(house_id=4, neighborhood_id=11, price=200000, sqft=60,  bedrooms=2, year_built=1980, price_per_bedroom=180000),
    Row(house_id=5, neighborhood_id=12, price=800000, sqft=150, bedrooms=5, year_built=2018 , price_per_bedroom=300000),
]

houses_df = spark.createDataFrame(data)

In [4]:
# Uncomment the following lines to see the schema and data
houses_df.printSchema()
houses_df.show()

root
 |-- house_id: long (nullable = true)
 |-- neighborhood_id: long (nullable = true)
 |-- price: long (nullable = true)
 |-- sqft: long (nullable = true)
 |-- bedrooms: long (nullable = true)
 |-- year_built: long (nullable = true)
 |-- price_per_bedroom: long (nullable = true)

+--------+---------------+------+----+--------+----------+-----------------+
|house_id|neighborhood_id| price|sqft|bedrooms|year_built|price_per_bedroom|
+--------+---------------+------+----+--------+----------+-----------------+
|       1|             10|300000|  80|       2|      1990|           100000|
|       2|             10|450000| 100|       3|      2005|           200000|
|       3|             11|500000| 120|       4|      2010|           190000|
|       4|             11|200000|  60|       2|      1980|           180000|
|       5|             12|800000| 150|       5|      2018|           300000|
+--------+---------------+------+----+--------+----------+-----------------+



                                                                                

### DataFrame vs Pandas

- A **Spark DataFrame** looks similar to a Pandas DataFrame when you `.show()` it.
- But it is **not** actually held entirely in memory like in Pandas.
- Spark only runs computations **lazily**: it plans the query first and only executes when needed (e.g., on `.show()`, `.count()`, `.collect()`).

### Working with Large Datasets

When working with very large datasets (millions or billions of rows), you should **never** try to see all the data at once. Here are the safe ways to inspect your data:

**Preview the first few rows:**
```python
houses_df.show(5)          # Show first 5 rows (default is 20)
houses_df.show(10, False)  # Show 10 rows without truncating wide columns
```

**Similar to Pandas `.head()`:**
```python
houses_df.limit(5).show()  # Equivalent to df.head(5) in Pandas
```

**What's the difference between `.show(5)` and `.limit(5).show()`?**

- **`.show(5)`**: Only displays 5 rows but the DataFrame itself (`houses_df`) remains unchanged with all rows.
- **`.limit(5).show()`**: Creates a **new DataFrame** with only 5 rows, then displays it. This new DataFrame can be saved and reused.

```python
# Example:
top5 = houses_df.limit(5)  # Creates a new DataFrame with 5 rows
top5.count()               # Returns 5
houses_df.count()          # Still returns the original row count

# .show(5) is just for display - it doesn't create a new DataFrame
```

Use `.show(n)` when you just want to peek at the data. Use `.limit(n)` when you need a smaller DataFrame for testing or further processing.

**WARNING:** Never use `.collect()` on large datasets! This pulls ALL data into memory on your laptop and will crash it.
```python
# DANGEROUS with big data:
# all_data = houses_df.collect()  # DON'T DO THIS with TB of data!
```

**Safe alternatives:**
- Use `.show(n)` to preview a few rows
- Use `.count()` to get the total number of rows
- Use `.describe().show()` to get summary statistics
- Use aggregations (`.groupBy()`, `.agg()`) to summarize data

### Exercise: Inspect the data

1. Use `houses_df.count()` to count the rows.
2. Use `houses_df.describe().show()` to see basic statistics.
3. Try `houses_df.select("price", "sqft").show()`.


In [5]:
houses_df.count()
houses_df.describe().show()
houses_df.select("price", "sqft", "bedrooms").show()

+-------+------------------+------------------+----------------+-----------------+------------------+------------------+-----------------+
|summary|          house_id|   neighborhood_id|           price|             sqft|          bedrooms|        year_built|price_per_bedroom|
+-------+------------------+------------------+----------------+-----------------+------------------+------------------+-----------------+
|  count|                 5|                 5|               5|                5|                 5|                 5|                5|
|   mean|               3.0|              10.8|        450000.0|            102.0|               3.2|            2000.6|         194000.0|
| stddev|1.5811388300841898|0.8366600265340753|229128.784747792|34.92849839314596|1.3038404810405297|15.388307249337096|71274.11872482185|
|    min|                 1|                10|          200000|               60|                 2|              1980|           100000|
|    max|                 5

## 4. Basic DataFrame Operations

In this section, we will:

- Select and filter columns.
- Create new columns with `withColumn`.
- Understand lazy evaluation briefly.


In [6]:
from pyspark.sql.functions import col

# Add a new column: price per square meter
houses_df = houses_df.withColumn("price_per_sqft", col("price") / col("sqft"))
houses_df.select("house_id", "price", "sqft", "price_per_sqft").show()

+--------+------+----+------------------+
|house_id| price|sqft|    price_per_sqft|
+--------+------+----+------------------+
|       1|300000|  80|            3750.0|
|       2|450000| 100|            4500.0|
|       3|500000| 120| 4166.666666666667|
|       4|200000|  60|3333.3333333333335|
|       5|800000| 150| 5333.333333333333|
+--------+------+----+------------------+



We can also filter rows using `.filter()` or `.where()`.


In [7]:
houses_df.filter("sqft >69").show()

+--------+---------------+------+----+--------+----------+-----------------+-----------------+
|house_id|neighborhood_id| price|sqft|bedrooms|year_built|price_per_bedroom|   price_per_sqft|
+--------+---------------+------+----+--------+----------+-----------------+-----------------+
|       1|             10|300000|  80|       2|      1990|           100000|           3750.0|
|       2|             10|450000| 100|       3|      2005|           200000|           4500.0|
|       3|             11|500000| 120|       4|      2010|           190000|4166.666666666667|
|       5|             12|800000| 150|       5|      2018|           300000|5333.333333333333|
+--------+---------------+------+----+--------+----------+-----------------+-----------------+



In [8]:
houses_df.where("price > 400000").show()

+--------+---------------+------+----+--------+----------+-----------------+-----------------+
|house_id|neighborhood_id| price|sqft|bedrooms|year_built|price_per_bedroom|   price_per_sqft|
+--------+---------------+------+----+--------+----------+-----------------+-----------------+
|       2|             10|450000| 100|       3|      2005|           200000|           4500.0|
|       3|             11|500000| 120|       4|      2010|           190000|4166.666666666667|
|       5|             12|800000| 150|       5|      2018|           300000|5333.333333333333|
+--------+---------------+------+----+--------+----------+-----------------+-----------------+



In [9]:
# Filter for houses with more than 3 bedrooms
large_houses = houses_df.filter(col("bedrooms") > 3)

large_houses.select("house_id", "bedrooms", "price", "sqft", "price_per_sqft").show()

+--------+--------+------+----+-----------------+
|house_id|bedrooms| price|sqft|   price_per_sqft|
+--------+--------+------+----+-----------------+
|       3|       4|500000| 120|4166.666666666667|
|       5|       5|800000| 150|5333.333333333333|
+--------+--------+------+----+-----------------+



### Exercise: Create a custom metric

1. Create a new column `price_per_bedroom` = `price / bedrooms`.
2. Filter for houses where `price_per_bedroom` is less than **200,000**.
3. Show the `house_id`, `bedrooms`, and `price_per_bedroom` columns.


In [10]:
houses_df.filter("price_per_bedroom < 200000").show()
houses_df.select("house_id", "bedrooms", "price_per_sqft").show()

+--------+---------------+------+----+--------+----------+-----------------+------------------+
|house_id|neighborhood_id| price|sqft|bedrooms|year_built|price_per_bedroom|    price_per_sqft|
+--------+---------------+------+----+--------+----------+-----------------+------------------+
|       1|             10|300000|  80|       2|      1990|           100000|            3750.0|
|       3|             11|500000| 120|       4|      2010|           190000| 4166.666666666667|
|       4|             11|200000|  60|       2|      1980|           180000|3333.3333333333335|
+--------+---------------+------+----+--------+----------+-----------------+------------------+

+--------+--------+------------------+
|house_id|bedrooms|    price_per_sqft|
+--------+--------+------------------+
|       1|       2|            3750.0|
|       2|       3|            4500.0|
|       3|       4| 4166.666666666667|
|       4|       2|3333.3333333333335|
|       5|       5| 5333.333333333333|
+--------+-----

## 5. Aggregations with `groupBy`

Spark makes it easy to compute statistics over groups, similar to `GROUP BY` in SQL or `.groupby()` in Pandas.


In [11]:
from pyspark.sql.functions import avg, min, max

# Average price per number of bedrooms
agg_df = houses_df.groupBy("bedrooms").agg(
    avg("price").alias("avg_price"),
    min("price").alias("min_price"),
    max("price").alias("max_price")
).orderBy("bedrooms")

agg_df.show()

+--------+---------+---------+---------+
|bedrooms|avg_price|min_price|max_price|
+--------+---------+---------+---------+
|       2| 250000.0|   200000|   300000|
|       3| 450000.0|   450000|   450000|
|       4| 500000.0|   500000|   500000|
|       5| 800000.0|   800000|   800000|
+--------+---------+---------+---------+



### Exercise: Aggregation practice

1. Compute the **average** `price_per_sqft` per number of bedrooms.
2. Sort the result by `price_per_sqft` in **descending** order.


In [12]:
from pyspark.sql.functions import avg, min, max, desc

# Average price per number of bedrooms, sorted in descending order
agg_df = houses_df.groupBy("bedrooms").agg(
    avg("price_per_sqft").alias("avg_price"),
    min("price_per_sqft").alias("min_price"),
    max("price_per_sqft").alias("max_price")
).orderBy(desc("bedrooms"))

agg_df.show()


+--------+-----------------+------------------+-----------------+
|bedrooms|        avg_price|         min_price|        max_price|
+--------+-----------------+------------------+-----------------+
|       5|5333.333333333333| 5333.333333333333|5333.333333333333|
|       4|4166.666666666667| 4166.666666666667|4166.666666666667|
|       3|           4500.0|            4500.0|           4500.0|
|       2|3541.666666666667|3333.3333333333335|           3750.0|
+--------+-----------------+------------------+-----------------+



## 6. Spark SQL

You can use **SQL queries** on Spark DataFrames. First, you need to register a DataFrame as a **temporary view**.


In [13]:
# Register the DataFrame as a temporary SQL view
houses_df.createOrReplaceTempView("houses")

# Use SQL to compute average price per bedroom
sql_result = spark.sql("""
SELECT bedrooms,
       AVG(price) AS avg_price,
       AVG(price_per_sqft) AS avg_price_per_sqft
FROM houses
GROUP BY bedrooms
ORDER BY avg_price DESC
""")

sql_result.show()

+--------+---------+------------------+
|bedrooms|avg_price|avg_price_per_sqft|
+--------+---------+------------------+
|       5| 800000.0| 5333.333333333333|
|       4| 500000.0| 4166.666666666667|
|       3| 450000.0|            4500.0|
|       2| 250000.0| 3541.666666666667|
+--------+---------+------------------+



Spark SQL is useful because:

- Many people already know **SQL**, so they can work with data quickly.
- The same query can run on **very large** datasets in a cluster.
- Spark can automatically optimize SQL queries internally.

### Exercise: Your own SQL query

Write a SQL query that:

1. Selects `house_id`, `price`, `bedrooms`, `price_per_sqft`.
2. Only keeps rows where `bedrooms >= 3`.
3. Orders the results by `price_per_sqft` **ascending**.


In [14]:
sql_result = spark.sql("""
SELECT bedrooms,
       AVG(price) AS avg_price,
       AVG(price_per_sqft) AS avg_price_per_sqft
FROM houses
WHERE bedrooms >= 3
GROUP BY bedrooms
ORDER BY avg_price DESC
""")

sql_result.show()


+--------+---------+------------------+
|bedrooms|avg_price|avg_price_per_sqft|
+--------+---------+------------------+
|       5| 800000.0| 5333.333333333333|
|       4| 500000.0| 4166.666666666667|
|       3| 450000.0|            4500.0|
+--------+---------+------------------+



## 7. Joins: Combining Multiple Tables

Let's imagine we have a second table that contains information about neighborhoods.


In [15]:
# Create a small neighborhoods DataFrame
neighborhood_data = [
    Row(neighborhood_id=10, name="Central", city="Metropolis"),
    Row(neighborhood_id=11, name="Northside", city="Metropolis"),
    Row(neighborhood_id=12, name="Lakeside", city="Springfield"),
]\


neigh_df = spark.createDataFrame(neighborhood_data)

neigh_df.printSchema()
neigh_df.show()

root
 |-- neighborhood_id: long (nullable = true)
 |-- name: string (nullable = true)
 |-- city: string (nullable = true)

+---------------+---------+-----------+
|neighborhood_id|     name|       city|
+---------------+---------+-----------+
|             10|  Central| Metropolis|
|             11|Northside| Metropolis|
|             12| Lakeside|Springfield|
+---------------+---------+-----------+



In [16]:
# Join houses with neighborhoods on neighborhood_id
joined_df = houses_df.join(neigh_df, on="neighborhood_id", how="left")

joined_df.show()

+---------------+--------+------+----+--------+----------+-----------------+------------------+---------+-----------+
|neighborhood_id|house_id| price|sqft|bedrooms|year_built|price_per_bedroom|    price_per_sqft|     name|       city|
+---------------+--------+------+----+--------+----------+-----------------+------------------+---------+-----------+
|             10|       1|300000|  80|       2|      1990|           100000|            3750.0|  Central| Metropolis|
|             10|       2|450000| 100|       3|      2005|           200000|            4500.0|  Central| Metropolis|
|             11|       3|500000| 120|       4|      2010|           190000| 4166.666666666667|Northside| Metropolis|
|             11|       4|200000|  60|       2|      1980|           180000|3333.3333333333335|Northside| Metropolis|
|             12|       5|800000| 150|       5|      2018|           300000| 5333.333333333333| Lakeside|Springfield|
+---------------+--------+------+----+--------+---------

### Exercise: Neighborhood statistics

Using `joined_df`:

1. Compute the **average house price** per neighborhood `name`.
2. Compute the average `price_per_sqft` per **city**.


In [17]:
joined_df = houses_df.join(neigh_df, on="neighborhood_id", how="left")
joined_df.show()

+---------------+--------+------+----+--------+----------+-----------------+------------------+---------+-----------+
|neighborhood_id|house_id| price|sqft|bedrooms|year_built|price_per_bedroom|    price_per_sqft|     name|       city|
+---------------+--------+------+----+--------+----------+-----------------+------------------+---------+-----------+
|             10|       1|300000|  80|       2|      1990|           100000|            3750.0|  Central| Metropolis|
|             10|       2|450000| 100|       3|      2005|           200000|            4500.0|  Central| Metropolis|
|             11|       3|500000| 120|       4|      2010|           190000| 4166.666666666667|Northside| Metropolis|
|             11|       4|200000|  60|       2|      1980|           180000|3333.3333333333335|Northside| Metropolis|
|             12|       5|800000| 150|       5|      2018|           300000| 5333.333333333333| Lakeside|Springfield|
+---------------+--------+------+----+--------+---------

## 8. A Mini ETL Pipeline in Spark

We now put the pieces together into a simple **ETL (Extract–Transform–Load)** function:

- **Extract:** Read data (in this tutorial we reuse the DataFrames created earlier).
- **Transform:** Clean data, add derived columns, join tables.
- **Load:** Write the result to disk (e.g., as Parquet).

**Note:** For simplicity, this tutorial uses the in-memory sample data (`houses_df` and `neigh_df`) we created earlier. In a real-world scenario, you would read data from files using `spark.read.csv()` or `spark.read.parquet()`, transform it, and write the results back to disk.

In [18]:
from pyspark.sql.functions import col


def transform_housing_data(spark):
    """Example ETL function for housing data.

    In a real project, you would:
    - Read houses from CSV or a database.
    - Read neighborhoods from another source.
    - Clean, transform, and join them.
    - Write out the final DataFrame to Parquet for downstream BI tools.
    """

    # Here we reuse houses_df and neigh_df from the notebook scope.
    # In a standalone script, you would read them inside this function.
    global houses_df, neigh_df

    df = houses_df.dropna(subset=["price", "sqft"])  # simple cleaning

    df = df.withColumn("price_per_sqft", col("price") / col("sqft"))
    df = df.withColumn("price_per_bedroom", col("price") / col("bedrooms"))

    result = df.join(neigh_df, on="neighborhood_id", how="left")

    # In a real setting, uncomment to write to disk:
    # result.write.mode("overwrite").parquet("output/cleaned_housing")

    return result


etl_df = transform_housing_data(spark)
etl_df.show()

+---------------+--------+------+----+--------+----------+-----------------+------------------+---------+-----------+
|neighborhood_id|house_id| price|sqft|bedrooms|year_built|price_per_bedroom|    price_per_sqft|     name|       city|
+---------------+--------+------+----+--------+----------+-----------------+------------------+---------+-----------+
|             10|       1|300000|  80|       2|      1990|         150000.0|            3750.0|  Central| Metropolis|
|             10|       2|450000| 100|       3|      2005|         150000.0|            4500.0|  Central| Metropolis|
|             11|       3|500000| 120|       4|      2010|         125000.0| 4166.666666666667|Northside| Metropolis|
|             11|       4|200000|  60|       2|      1980|         100000.0|3333.3333333333335|Northside| Metropolis|
|             12|       5|800000| 150|       5|      2018|         160000.0| 5333.333333333333| Lakeside|Springfield|
+---------------+--------+------+----+--------+---------

### Exercise: Extend the ETL

1. Add a new column `is_new` that is `True` if `year_built >= 2010`, else `False`.
2. Aggregate `etl_df` to compute the average price for `is_new = True` vs `False`.

In [20]:
from pyspark.sql import functions as F

# Add correct column
extended_df = joined_df.withColumn(
    "is_new",
    F.col("year_built") > 2010
)

# Preview
extended_df.select("house_id", "year_built", "is_new").show(10)

# Average price for new vs old
avg_price_df = (
    extended_df.groupBy("is_new")
               .agg(F.avg("price").alias("avg_price"))
               .orderBy("is_new", ascending=False)
)

avg_price_df.show()


+--------+----------+------+
|house_id|year_built|is_new|
+--------+----------+------+
|       1|      1990| false|
|       2|      2005| false|
|       3|      2010| false|
|       4|      1980| false|
|       5|      2018|  true|
+--------+----------+------+

+------+---------+
|is_new|avg_price|
+------+---------+
|  true| 800000.0|
| false| 362500.0|
+------+---------+



## 9. Wrap-Up

In this notebook, you:

- Started a **SparkSession** in local mode.
- Created Spark **DataFrames** and inspected their schema and contents.
- Performed **basic transformations** and **aggregations** using the DataFrame API.
- Used **Spark SQL** queries on DataFrames.
- Performed **joins** to combine multiple tables.
- Built a small **ETL function** that transforms and combines data.

### Where to go from here

- Try reading real CSV or Parquet files with `spark.read.csv(...)` or `spark.read.parquet(...)`.
- Experiment with **larger datasets** to see the benefit of Spark over Pandas.
- Look into **Spark MLlib** for scalable machine learning.

When you close your notebook, you can stop Spark with:


In [None]:
spark.stop()

---

## BONUS: Reading and Writing Files

This section demonstrates how to read from CSV and write to Parquet files. **These cells are optional and can be deleted if you don't need file I/O examples.**

### Writing to Parquet

Parquet is a columnar storage format that's efficient for big data. Here's how to save your results:

In [None]:
# Example: Writing to Parquet format
# etl_df.write.mode("overwrite").parquet("../data/tmp/cleaned_housing_parquet")

# To read it back:
# df_from_parquet = spark.read.parquet("../data/tmp/cleaned_housing_parquet")
# df_from_parquet.show(5)

### Reading from CSV

Here's how you would read data from a CSV file in a real project:

In [None]:
# Example: Reading a CSV file
# df_from_csv = spark.read.csv(
#     "../data/your_file.csv",
#     header=True,        # First row contains column names
#     inferSchema=True    # Automatically detect column types
# )
# df_from_csv.show(5)