<a href="https://colab.research.google.com/github/Denuka1993/DataScienceExercise/blob/main/learning_lab_notebook.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Learning Lab: Getting Started with Big Data Tools

**Course:** GIK2Q3 Applied Big Data and Cloud Computing  
**Duration:** ~3 hours  
**Week:** 4

---

## üìò This is a Learning Lab ‚Äî Not Graded!

Use this session to explore the tools and get comfortable with the environment. There's nothing to submit ‚Äî just run the cells, experiment, and ask questions.

**Lab 1** (your first graded assignment) will be published next week and will build on what you learn here.

---

**Objectives:**
- Verify your environment is working
- Run your first PySpark code
- Explore the differences between Pandas and PySpark
- Understand why Spark matters for big data
- Explore the Spark UI to see what's happening behind the scenes
- Get comfortable with Jupyter notebooks for big data work

---

## ‚ö†Ô∏è Important: Google Colab Fallback

If you have trouble running PySpark locally (especially on Windows), you can use **Google Colab** as a backup:

1. Upload this notebook to [Google Colab](https://colab.research.google.com/)
2. Colab has Java pre-installed, so PySpark will work out of the box
3. **Note:** Spark UI links (localhost:4040) won't work in Colab ‚Äî that's expected. Skip those exploration steps if using Colab.

The cells below will auto-detect your environment and install PySpark if needed.

---

## üêç Note: BrokenPipeError Messages

You may see `BrokenPipeError: [Errno 32] Broken pipe` messages after some cells complete. **These are harmless** ‚Äî your code worked fine! This is a known PySpark quirk where worker subprocesses try to flush output after the parent has already closed the connection.

**How to tell if your code worked:** If you see the expected output *before* the error message, everything ran correctly. Just ignore the traceback.

## Section 1: Environment Check

Let's make sure everything is installed and working properly.

In [None]:
# Check Python version
import sys
print(f"Python version: {sys.version}")

In [None]:
# Install PySpark if needed (mainly for Google Colab)
try:
    import pyspark
    print(f"‚úì PySpark already installed: version {pyspark.__version__}")
except ImportError:
    print("Installing PySpark...")
    %pip install pyspark -q
    import pyspark
    print(f"‚úì PySpark installed: version {pyspark.__version__}")

In [None]:
# Import other libraries we'll use
import pandas as pd
import numpy as np
import time

print(f"‚úì Pandas version: {pd.__version__}")
print(f"‚úì NumPy version: {np.__version__}")
print("\nüéâ All libraries loaded successfully!")

In [None]:
# Environment configuration for local installations
import os

# Set JAVA_HOME for local installations (macOS/Linux)
# This is only needed for local development - Colab has Java pre-installed
java_paths = [
    "/opt/homebrew/opt/openjdk@17",  # macOS Homebrew (Apple Silicon)
    "/usr/local/opt/openjdk@17",      # macOS Homebrew (Intel)
    "/usr/lib/jvm/java-17-openjdk-amd64",  # Ubuntu/Debian
]

java_home = os.environ.get("JAVA_HOME")

if not java_home:
    for path in java_paths:
        if os.path.exists(path):
            os.environ["JAVA_HOME"] = path
            java_home = path
            break

if java_home:
    print(f"‚úì JAVA_HOME set to: {java_home}")
else:
    print("‚ö†Ô∏è Java not found. PySpark requires Java 17.")
    print("  On macOS: brew install openjdk@17")
    print("  On Ubuntu: sudo apt install openjdk-17-jdk")

---

## Section 2: Understanding MapReduce

Before we dive into Spark, let's understand the paradigm that started it all: **MapReduce**.

MapReduce was introduced by Google in 2004 and became the foundation for processing massive datasets across clusters of computers. The key insight is simple:

1. **Map**: Transform each piece of data independently (can run in parallel)
2. **Reduce**: Combine the results together

This pattern is powerful because the Map step can run on thousands of machines simultaneously!

### The Classic Example: Word Count

Let's count words in a text ‚Äî first with pure Python to understand the concept, then we'll see how Spark makes it easier.

In [None]:
# Sample text data - imagine this is a huge file distributed across many machines
text_data = """
Big data is transforming how we understand the world
Data science and machine learning rely on big data
The world of data is growing every day
Machine learning models need data to learn
Big models require big data and big compute
"""

# Split into lines (simulating data distributed across machines)
lines = text_data.strip().split('\n')
print(f"We have {len(lines)} 'documents' to process:")
for i, line in enumerate(lines):
    print(f"  Doc {i}: {line[:50]}...")

### Step 1: The MAP Phase

The **map** function processes each document independently and emits (key, value) pairs.

For word count: `"hello world"` ‚Üí `[("hello", 1), ("world", 1)]`

In [None]:
def map_function(document):
    """
    MAP: Takes a document, emits (word, 1) for each word.
    This function can run independently on each machine!
    """
    words = document.lower().split()
    # Emit (key, value) pairs
    return [(word, 1) for word in words]

# Apply map to each document
mapped_results = []
for doc in lines:
    result = map_function(doc)
    mapped_results.extend(result)
    print(f"Map output: {result[:3]}...")  # Show first 3 pairs

print(f"\nüì§ Total (word, 1) pairs emitted: {len(mapped_results)}")

### Step 2: The SHUFFLE Phase

Before reducing, we need to group all values by key. This is the **shuffle** ‚Äî moving data between machines so all pairs with the same key end up together.

`[("big", 1), ("data", 1), ("big", 1)]` ‚Üí `{"big": [1, 1], "data": [1]}`

In [None]:
from collections import defaultdict

def shuffle(mapped_pairs):
    """
    SHUFFLE: Group all values by key.
    In a real cluster, this moves data across the network!
    """
    grouped = defaultdict(list)
    for key, value in mapped_pairs:
        grouped[key].append(value)
    return grouped

shuffled = shuffle(mapped_results)
print("After shuffle - values grouped by key:")
for word in list(shuffled.keys())[:5]:  # Show first 5 words
    print(f"  '{word}': {shuffled[word]}")

### Step 3: The REDUCE Phase

The **reduce** function takes each key and its list of values, and combines them into a final result.

`("big", [1, 1, 1])` ‚Üí `("big", 3)`

In [None]:
def reduce_function(key, values):
    """
    REDUCE: Combine all values for a key.
    This can also run in parallel ‚Äî one reducer per key (or group of keys)!
    """
    return (key, sum(values))

# Apply reduce to each key
final_counts = {}
for word, counts in shuffled.items():
    key, total = reduce_function(word, counts)
    final_counts[key] = total

# Sort by count and display
sorted_counts = sorted(final_counts.items(), key=lambda x: x[1], reverse=True)
print("üéØ Final word counts (MapReduce result):")
print("-" * 30)
for word, count in sorted_counts[:10]:
    bar = '‚ñà' * count
    print(f"  {word:12} : {count:2} {bar}")

### Visualizing the MapReduce Flow

```
    Document 1         Document 2         Document 3
        ‚îÇ                  ‚îÇ                  ‚îÇ
        ‚ñº                  ‚ñº                  ‚ñº
    ‚îå‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îê          ‚îå‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îê          ‚îå‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îê
    ‚îÇ  MAP  ‚îÇ          ‚îÇ  MAP  ‚îÇ          ‚îÇ  MAP  ‚îÇ   ‚Üê Parallel!
    ‚îî‚îÄ‚îÄ‚îÄ‚î¨‚îÄ‚îÄ‚îÄ‚îò          ‚îî‚îÄ‚îÄ‚îÄ‚î¨‚îÄ‚îÄ‚îÄ‚îò          ‚îî‚îÄ‚îÄ‚îÄ‚î¨‚îÄ‚îÄ‚îÄ‚îò
        ‚îÇ                  ‚îÇ                  ‚îÇ
        ‚ñº                  ‚ñº                  ‚ñº
    (big,1)            (data,1)           (big,1)
    (data,1)           (is,1)             (models,1)
       ...                ...                ...
        ‚îÇ                  ‚îÇ                  ‚îÇ
        ‚îî‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îº‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îò
                           ‚îÇ
                    ‚îå‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚ñº‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îê
                    ‚îÇ   SHUFFLE   ‚îÇ   ‚Üê Network transfer
                    ‚îî‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚î¨‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îò
                           ‚îÇ
            ‚îå‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îº‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îê
            ‚ñº              ‚ñº              ‚ñº
    big: [1,1,1]    data: [1,1,1,1]  models: [1,1]
            ‚îÇ              ‚îÇ              ‚îÇ
            ‚ñº              ‚ñº              ‚ñº
        ‚îå‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îê      ‚îå‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îê      ‚îå‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îê
        ‚îÇREDUCE ‚îÇ      ‚îÇREDUCE ‚îÇ      ‚îÇREDUCE ‚îÇ   ‚Üê Parallel!
        ‚îî‚îÄ‚îÄ‚îÄ‚î¨‚îÄ‚îÄ‚îÄ‚îò      ‚îî‚îÄ‚îÄ‚îÄ‚î¨‚îÄ‚îÄ‚îÄ‚îò      ‚îî‚îÄ‚îÄ‚îÄ‚î¨‚îÄ‚îÄ‚îÄ‚îò
            ‚îÇ              ‚îÇ              ‚îÇ
            ‚ñº              ‚ñº              ‚ñº
        (big, 3)       (data, 4)     (models, 2)
```

### Why MapReduce Changed Everything

**Before MapReduce (2004):**
- Processing large datasets required specialized supercomputers
- Scaling was expensive and complex

**After MapReduce:**
- Use thousands of cheap commodity machines
- Fault tolerance built-in (if a machine fails, just re-run that map task)
- Google used this to build their search index!

**But MapReduce has limitations:**
- Writes intermediate data to disk between each step (slow!)
- Not great for iterative algorithms (machine learning, graph processing)
- Verbose to program

**Enter Spark (2014):**
- Keeps data in memory between operations
- 10-100x faster than MapReduce for many workloads
- Much easier API (as we'll see next!)

Let's now see how Spark makes this much simpler...

In [None]:
# Challenge 1: Count only words with more than 3 characters
# Your solution here:

def map_function_filtered(document):
    """MAP: Emit (word, 1) only for words with more than 3 characters."""
    words = document.lower().split()
    # TODO: Add a filter condition
    return [(word, 1) for word in words]  # Modify this line

# Test your solution
mapped_filtered = []
for doc in lines:
    mapped_filtered.extend(map_function_filtered(doc))

# Shuffle and reduce (same as before)
shuffled_filtered = shuffle(mapped_filtered)
filtered_counts = {k: sum(v) for k, v in shuffled_filtered.items()}
print("Words with >3 characters:")
for word, count in sorted(filtered_counts.items(), key=lambda x: -x[1])[:8]:
    print(f"  {word}: {count}")

In [None]:
# Challenge 3: Find the longest word using MapReduce pattern
# Your solution here:

# Hint: What should map emit? What should reduce do?
# Think about it: map could emit (1, word) where 1 is a dummy key
# reduce could compare two words and keep the longer one

def map_longest(document):
    """MAP: Emit the longest word from this document."""
    words = document.split()
    if not words:
        return []
    # TODO: Return the longest word from this document
    return [("longest", max(words, key=len))]  # What key should we use?

def reduce_longest(word1, word2):
    """REDUCE: Compare two words, keep the longer one."""
    # TODO: Return the longer word
    pass

# Test your solution - discuss with your group:
# 1. Is this truly parallelizable?
# 2. What happens if two words have the same length?
# 3. How would you handle ties (return all longest words)?

In [None]:
# Challenge 2: Count words starting with capital letters
# Your solution here:

# Use the original text (not lowercased)
original_text = """
Big data is transforming how we understand the world
Data science and machine learning rely on big data
The world of data is growing every day
Machine learning models need data to learn
Big models require big data and big compute
"""

def map_capitals(document):
    """MAP: Emit (word, 1) only for words starting with a capital letter."""
    words = document.split()
    # TODO: Check if word starts with capital
    return [(word, 1) for word in words]  # Modify this line

# Test your solution
original_lines = original_text.strip().split('\n')
mapped_capitals = []
for doc in original_lines:
    mapped_capitals.extend(map_capitals(doc))

shuffled_capitals = shuffle(mapped_capitals)
capital_counts = {k: sum(v) for k, v in shuffled_capitals.items()}
print("Capital words:")
for word, count in sorted(capital_counts.items(), key=lambda x: -x[1])[:8]:
    print(f"  {word}: {count}")

---

## üèãÔ∏è Exercise A: MapReduce Challenge

Now that you understand the MapReduce pattern, it's time to apply it! Work in your group to solve these challenges.

### Challenge 1: Filter Before Counting
Modify the word count to **only count words with more than 3 characters**.

*Hint: Add a filter in the map function or between map and reduce.*

### Challenge 2: Count Capital Words
Count only words that **start with a capital letter** in the original (non-lowercased) text.

*Hint: You'll need to modify the map function to check `word[0].isupper()`*

### Challenge 3: Find the Longest Word
Find the single longest word in the text.

*Discussion: Can you parallelize this? What would the map and reduce functions look like?*

---

## Section 3: Your First SparkSession

Now that you understand MapReduce, let's see how **Spark** makes distributed computing much easier.

The **SparkSession** is your entry point to all Spark functionality. Think of it as opening the door to distributed computing.

In a real cluster, SparkSession connects to multiple worker machines. Here, we'll run locally ‚Äî but the API is exactly the same!

> ‚ö†Ô∏è **Google Colab Users ‚Äî Important Reminder!**  
> The Spark UI link (`localhost:4040`) that appears after running the next cell **will not work in Colab**. This is expected ‚Äî Colab runs on a remote server, so you can't access localhost URLs. When the notebook asks you to explore the Spark UI, just skip those steps. We'll provide alternative cells using `.explain()` for Colab users later in the notebook (see Exercise C).

### Key Configuration Options:
- `appName`: A name for your application (shows up in the Spark UI)
- `master`: Where Spark runs ‚Äî `local[*]` means use all available CPU cores locally

In [None]:
from pyspark.sql import SparkSession

# Create a SparkSession
spark = SparkSession.builder \
    .appName("LearningLab-GettingStarted") \
    .master("local[*]") \
    .config("spark.driver.memory", "2g") \
    .getOrCreate()

print(f"‚úì Spark version: {spark.version}")
print(f"‚úì Using {spark.sparkContext.defaultParallelism} CPU cores")
print(f"\nüåê Spark UI available at: {spark.sparkContext.uiWebUrl}")
print("\n   ‚Üë Click this link to see what Spark is doing behind the scenes!")
print("\nüî• Spark is ready to go!")
print("\n   (Colab users: The Spark UI link won't work ‚Äî that's expected. Skip UI steps.)")

### üëÜ Take a Moment: Open the Spark UI

Click the link above to open the Spark UI in your browser. Keep it open ‚Äî we'll explore it as we run code!

The Spark UI shows:
- **Jobs**: Operations that have run
- **Stages**: How jobs are broken into stages
- **Storage**: Cached data
- **Executors**: Worker processes (just 1 locally)

### Word Count in Spark: So Much Simpler!

Remember our 30+ lines of MapReduce code? Here's the same thing in Spark:

In [None]:
# Word count in Spark - the same operation in just a few lines!
from pyspark.sql import functions as F

# Use the same text data
text_rdd = spark.sparkContext.parallelize(lines)

# MapReduce in Spark: flatMap (map) ‚Üí map ‚Üí reduceByKey (shuffle + reduce)
word_counts_spark = (
    text_rdd
    .flatMap(lambda line: line.lower().split())  # MAP: split into words
    .map(lambda word: (word, 1))                  # MAP: create (word, 1) pairs
    .reduceByKey(lambda a, b: a + b)              # SHUFFLE + REDUCE: sum counts
)

# Collect and display results
print("üöÄ Word count in Spark (same result, much less code!):")
print("-" * 30)
for word, count in sorted(word_counts_spark.collect(), key=lambda x: -x[1])[:10]:
    bar = '‚ñà' * count
    print(f"  {word:12} : {count:2} {bar}")

### üîç Your First Job in the Spark UI!

You just ran your first Spark job! Open the Spark UI (link from Section 3) and explore what happened:

1. **Go to the Jobs tab** ‚Äî This shows all completed jobs. You should see at least one.
2. **Click on a specific job** (e.g., "Job 0") to see its details:
   - **Note:** You're still in the Jobs tab, but now viewing that job's completed stages ‚Äî the tab name doesn't change, which can be confusing!
   - **Duration**: How long did it take?
   - **Stages**: The job was broken into stages (likely 2 ‚Äî one for the map, one for the reduce)
3. **Click on a stage** to drill down further:
   - **Tasks**: Individual units of work that ran in parallel
   - **Shuffle Read/Write**: Data movement between stages (this is the "shuffle" from MapReduce!)

### ü§î Wait ‚Äî Why Are There 10 Tasks for 5 Lines of Text?

You might see ~10 completed tasks even though we only have 5 lines of text. What's going on?

When you call `parallelize(lines)`, Spark splits the data into **partitions** ‚Äî and by default, it creates one partition per CPU core (check the "Using X CPU cores" message from earlier). So if you have 10 cores, you get 10 partitions, hence 10 tasks.

With only 5 lines, most partitions are **empty**! The tasks still run, but they have nothing to process. This is the overhead we mentioned ‚Äî for tiny data, Spark does unnecessary work coordinating empty partitions.

**Takeaway:** Spark is designed for data much larger than your CPU count. With millions of rows, each partition would have meaningful work to do.

### üìä Key Visualizations to Explore

**Event Timeline** (in the stage details view):
- Shows tasks as horizontal bars on a timeline
- Overlapping bars = parallel execution
- Look for gaps ‚Äî they indicate waiting (for shuffle, scheduling, etc.)

**DAG Visualization** (click "DAG Visualization" in the job or stage view):
- DAG = Directed Acyclic Graph ‚Äî the execution plan
- Shows how stages connect and depend on each other
- Arrows between stages represent shuffles (data movement)
- This is the visual version of what `.explain()` prints as text!

### ‚ö° Did We Actually Run in Parallel?

**Yes!** Look at the **Event Timeline** ‚Äî you'll see multiple tasks running at the same time on different CPU cores.

But here's the honest truth: **with only 5 lines of text, parallelism didn't help much.** The overhead of coordinating parallel work is larger than the work itself! You might see tasks completing almost instantly, possibly even sequentially.

This is similar to the Pandas vs Spark comparison we'll do later ‚Äî Spark's power shows up with **large data**, not small examples. We're using tiny data here to learn the concepts, but remember: in the real world, you'd have millions of documents, and those tasks would run on hundreds of machines simultaneously.

**What you're seeing:**
- Each **task** processed one partition of data
- The **shuffle** happened between the map and reduce stages (grouping data by word)
- All tasks in a stage *can* run **in parallel** ‚Äî and with bigger data, they will!

> üí° These visualizations become essential when debugging slow jobs on real clusters!

*(Colab users: Skip this ‚Äî use `.explain()` later in Exercise C to see query plans instead.)*

### üß† A Deeper Question: Can Everything Be Parallelized?

We've seen that Spark can split work across many cores (or machines). But this raises a question: **If parallelism is so powerful, why not always use Spark and forget about RAM limits?**

The answer reveals fundamental truths about distributed computing:

**1. Not all problems are "embarrassingly parallel"**

Word count is ideal for parallelism ‚Äî each document can be processed independently. But consider:
- **Sorting**: You can't know the final position of an element without seeing all elements
- **Iterative algorithms**: Each step depends on the previous (e.g., training neural networks)
- **Sequential dependencies**: "Read line 5, but only if line 4 says X"

These require **coordination**, which means communication, which means waiting.

**2. The Shuffle is Expensive**

Remember the shuffle phase? In a real cluster, that means sending data **across the network** to other machines. Networks are slow compared to RAM ‚Äî often 100-1000x slower. Every shuffle is a potential bottleneck.

**3. Amdahl's Law: The Sequential Part Limits You**

If 10% of your task must run sequentially, you can never get more than 10x speedup ‚Äî even with infinite parallel resources. That sequential portion becomes the bottleneck.

**4. Coordination Overhead**

With 1000 machines, someone has to:
- Track which machine has which data
- Detect failures and reassign work
- Collect and combine results

This overhead can exceed the actual work for small tasks!

**The Practical Wisdom:**

| Data Size | Best Tool | Why |
|-----------|-----------|-----|
| < 1 GB | Pandas | No overhead, all in RAM |
| 1-100 GB | Spark (single machine) | Spill to disk when needed |
| 100+ GB | Spark (cluster) | Distribute across machines |
| Real-time | Streaming (Kafka, Flink) | Can't wait to batch process |

**Bottom line:** Parallelism is a tool, not magic. Use the right tool for the job ‚Äî and often, that's still good old single-machine code!

**What just happened?**

| Our MapReduce | Spark Equivalent |
|---------------|------------------|
| `map_function()` | `.flatMap()` + `.map()` |
| `shuffle()` | Automatic! |
| `reduce_function()` | `.reduceByKey()` |
| ~30 lines of code | ~5 lines of code |

And more importantly: **Spark keeps data in memory** instead of writing to disk, making it much faster for complex pipelines!

Now let's explore Spark's DataFrame API, which is even easier to use...

---

## Section 4: Quick Pandas Refresher

Before diving deeper into Spark DataFrames, let's create some sample data with Pandas ‚Äî a tool you're probably already familiar with.

In [None]:
# Create a simple sample dataset
np.random.seed(42)

data = {
    'name': ['Alice', 'Bob', 'Charlie', 'Diana', 'Eve'],
    'age': [25, 30, 35, 28, 22],
    'city': ['Stockholm', 'Gothenburg', 'Malm√∂', 'Uppsala', 'Lund'],
    'salary': [45000, 52000, 48000, 55000, 42000]
}

df_pandas = pd.DataFrame(data)
print("Our sample dataset:")
df_pandas

In [None]:
# Basic Pandas operations
print("Shape:", df_pandas.shape)
print("\nDescribe:")
df_pandas.describe()

In [None]:
# Filtering in Pandas
high_earners = df_pandas[df_pandas['salary'] > 45000]
print("People earning more than 45,000:")
high_earners

---

## Section 5: Your First Spark DataFrame

Now let's do the same thing with Spark! We'll convert our Pandas DataFrame to a Spark DataFrame and see how similar (and different) the operations are.

In [None]:
# Convert Pandas DataFrame to Spark DataFrame
df_spark = spark.createDataFrame(df_pandas)

# View the schema (data types)
print("Spark DataFrame Schema:")
df_spark.printSchema()

In [None]:
# Show the data (like .head() in Pandas, but for Spark)
df_spark.show()

In [None]:
# Select specific columns
df_spark.select('name', 'salary').show()

In [None]:
# Filter rows (same as Pandas, slightly different syntax)
high_earners_spark = df_spark.filter(df_spark['salary'] > 45000)
print("People earning more than 45,000 (Spark):")
high_earners_spark.show()

### Pandas vs Spark: Quick Comparison

| Operation | Pandas | PySpark |
|-----------|--------|---------|
| View data | `df.head()` | `df.show()` |
| Select columns | `df[['col1', 'col2']]` | `df.select('col1', 'col2')` |
| Filter rows | `df[df['col'] > 5]` | `df.filter(df['col'] > 5)` |
| Data types | `df.dtypes` | `df.printSchema()` |
| Row count | `len(df)` | `df.count()` |
| Add column | `df['new'] = ...` | `df.withColumn('new', ...)` |

The syntax is intentionally similar ‚Äî making it easier to transition from Pandas to Spark!

---

## Section 6: Why Spark? A Scaling Demo

With 5 rows, Pandas is actually faster than Spark (overhead matters!). Let's see what happens when we scale up to 100,000 rows.

### The Key Insight
Spark's power isn't for small data ‚Äî it's for data that doesn't fit in memory, or when you need to distribute work across a cluster.

In [None]:
# Generate a larger dataset
np.random.seed(42)
n_rows = 100_000

cities = ['Stockholm', 'Gothenburg', 'Malm√∂', 'Uppsala', 'Lund',
          'V√§ster√•s', '√ñrebro', 'Link√∂ping', 'Helsingborg', 'J√∂nk√∂ping']

large_data = {
    'id': range(n_rows),
    'age': np.random.randint(18, 70, n_rows),
    'city': np.random.choice(cities, n_rows),
    'salary': np.random.randint(30000, 100000, n_rows),
    'years_experience': np.random.randint(0, 40, n_rows)
}

df_large_pandas = pd.DataFrame(large_data)
print(f"Created dataset with {len(df_large_pandas):,} rows")
print(f"Memory usage: {df_large_pandas.memory_usage(deep=True).sum() / 1024**2:.2f} MB")
df_large_pandas.head()

In [None]:
# Convert to Spark DataFrame
df_large_spark = spark.createDataFrame(df_large_pandas)

# Check how Spark partitions the data
print(f"Number of partitions: {df_large_spark.rdd.getNumPartitions()}")
print("\nFirst 5 rows:")
df_large_spark.show(5)

In [None]:
# Compare: Pandas aggregation
start = time.time()
result_pandas = df_large_pandas.groupby('city')['salary'].agg(['mean', 'max', 'min', 'count'])
pandas_time = time.time() - start

print(f"Pandas aggregation took: {pandas_time*1000:.2f} ms")
result_pandas

In [None]:
# Compare: Spark aggregation
from pyspark.sql import functions as F

start = time.time()
result_spark = df_large_spark.groupBy('city').agg(
    F.avg('salary').alias('mean'),
    F.max('salary').alias('max'),
    F.min('salary').alias('min'),
    F.count('salary').alias('count')
).collect()  # .collect() triggers actual computation
spark_time = time.time() - start

print(f"Spark aggregation took: {spark_time*1000:.2f} ms")
print("\n(Spark is slower here because of overhead ‚Äî but scales to billions of rows!)")

In [None]:
# Side-by-side comparison: 100,000 rows
print("=" * 50)
print(f"üìä SPEED COMPARISON: 100,000 rows")
print("=" * 50)
print(f"  Pandas:  {pandas_time*1000:>8.2f} ms")
print(f"  Spark:   {spark_time*1000:>8.2f} ms")
print(f"  Winner:  {'Pandas' if pandas_time < spark_time else 'Spark'} ({'%.1fx' % (spark_time/pandas_time) if pandas_time < spark_time else '%.1fx' % (pandas_time/spark_time)} faster)")
print("=" * 50)
print("\nüí° Lesson: At this scale, Pandas wins! Spark has startup")
print("   overhead that only pays off with much larger data.")
print("\n   Spark shines when:")
print("   ‚Ä¢ Data doesn't fit in memory (100+ GB)")
print("   ‚Ä¢ You have a cluster with many nodes")
print("   ‚Ä¢ You need fault tolerance for long-running jobs")

### ‚è≥ Optional: Big Data Showdown (10 million rows)

The cell below creates a **10 million row** dataset ‚Äî 100x larger than before. This will take **1-2 minutes** to run, but you'll finally see Spark's overhead become worthwhile.

> **Skip this if short on time** ‚Äî the concept is what matters!

In [None]:
# ‚ö†Ô∏è This cell takes 1-2 minutes to run!
# 10 million rows ‚Äî now we're talking big data

n_rows_big = 10_000_000  # 10 million rows

print(f"Generating {n_rows_big:,} rows... (this takes a moment)")
start_gen = time.time()

big_data = {
    'id': range(n_rows_big),
    'age': np.random.randint(18, 70, n_rows_big),
    'city': np.random.choice(cities, n_rows_big),
    'salary': np.random.randint(30000, 100000, n_rows_big),
    'years_experience': np.random.randint(0, 40, n_rows_big)
}

df_big_pandas = pd.DataFrame(big_data)
print(f"‚úì Created Pandas DataFrame in {time.time() - start_gen:.1f}s")
print(f"  Memory usage: {df_big_pandas.memory_usage(deep=True).sum() / 1024**2:.0f} MB")

# Convert to Spark
start_convert = time.time()
df_big_spark = spark.createDataFrame(df_big_pandas)
df_big_spark.cache()  # Cache in memory for fair comparison
df_big_spark.count()  # Force materialization
convert_time = time.time() - start_convert
print(f"‚úì Created Spark DataFrame in {convert_time:.1f}s")

# Pandas timing
start = time.time()
result_pandas_big = df_big_pandas.groupby('city')['salary'].agg(['mean', 'max', 'min', 'count'])
pandas_time_big = time.time() - start

# Spark timing
start = time.time()
result_spark_big = df_big_spark.groupBy('city').agg(
    F.avg('salary').alias('mean'),
    F.max('salary').alias('max'),
    F.min('salary').alias('min'),
    F.count('salary').alias('count')
).collect()
spark_time_big = time.time() - start

# Results
print("\n" + "=" * 50)
print(f"üìä SPEED COMPARISON: 10,000,000 rows")
print("=" * 50)
print(f"  Pandas:  {pandas_time_big*1000:>8.0f} ms")
print(f"  Spark:   {spark_time_big*1000:>8.0f} ms")
winner = 'Pandas' if pandas_time_big < spark_time_big else 'Spark'
print(f"  Winner:  {winner}")
print("=" * 50)

if pandas_time_big <= spark_time_big:
    print("\nü§î Wait ‚Äî Pandas still wins? Yes! Here's the lesson:")
    print("   ‚Ä¢ On a SINGLE machine, Pandas is hard to beat")
    print("   ‚Ä¢ Spark's overhead (coordination, serialization) hurts locally")
    print("   ‚Ä¢ Spark shines when data is ALREADY distributed across a cluster")
    print("   ‚Ä¢ Or when data is too big for RAM (try 100GB ‚Äî Pandas crashes!)")
else:
    print("\nüî• Spark is catching up! With more data or a real cluster,")
    print("   Spark would pull ahead significantly.")

print(f"\n‚ö†Ô∏è Note: Converting Pandas‚ÜíSpark took {convert_time:.0f}s ‚Äî in real pipelines,")
print("   you'd read directly into Spark from disk/cloud, not via Pandas.")

# Clean up to free memory
df_big_spark.unpersist()
del df_big_pandas, big_data

### üî• The Real Spark Advantage: Data Larger Than RAM

The demos above converted Pandas ‚Üí Spark, which isn't realistic. In the real world, you'd **read data directly into Spark** from disk, cloud storage, or databases.

The cell below generates **100 million rows directly in Spark** ‚Äî no Pandas involved. This would require ~9 GB of RAM in Pandas, but Spark handles it by:
- Generating data lazily (doesn't load everything into RAM at once)
- Spilling to disk when memory is tight
- Processing in partitions, not all at once

> üí° This runs in **1-30 seconds** depending on your hardware ‚Äî much faster than you'd expect for 100M rows! That's the power of lazy evaluation.

In [None]:
# Generate 100 million rows DIRECTLY in Spark ‚Äî no Pandas, no RAM limit!
from pyspark.sql.functions import rand, floor, array, lit, element_at, when

n_rows_massive = 100_000_000  # 100 million rows

print(f"üöÄ Generating {n_rows_massive:,} rows directly in Spark...")
print("   (This would require ~9 GB in Pandas ‚Äî Spark handles it lazily)\n")

start = time.time()

# Create cities array for random selection
cities_array = array([lit(c) for c in cities])

# Generate data directly in Spark ‚Äî never touches Pandas!
df_massive = (
    spark.range(n_rows_massive)
    .withColumn("age", (floor(rand() * 52) + 18).cast("int"))
    .withColumn("city", element_at(cities_array, (floor(rand() * 10) + 1).cast("int")))
    .withColumn("years_experience", (floor(rand() * 40)).cast("int"))
)

# Add salary with meaningful differences by city (reflects Swedish salary patterns)
df_massive = df_massive.withColumn(
    "salary",
    when(df_massive["city"] == "Stockholm", (floor(rand() * 40000) + 55000).cast("int"))  # 55-95k
    .when(df_massive["city"] == "Gothenburg", (floor(rand() * 35000) + 50000).cast("int"))  # 50-85k
    .when(df_massive["city"] == "Malm√∂", (floor(rand() * 30000) + 48000).cast("int"))  # 48-78k
    .when(df_massive["city"] == "Uppsala", (floor(rand() * 30000) + 47000).cast("int"))  # 47-77k
    .when(df_massive["city"] == "Link√∂ping", (floor(rand() * 28000) + 45000).cast("int"))  # 45-73k
    .when(df_massive["city"] == "V√§ster√•s", (floor(rand() * 25000) + 43000).cast("int"))  # 43-68k
    .when(df_massive["city"] == "√ñrebro", (floor(rand() * 25000) + 42000).cast("int"))  # 42-67k
    .when(df_massive["city"] == "Helsingborg", (floor(rand() * 25000) + 41000).cast("int"))  # 41-66k
    .when(df_massive["city"] == "J√∂nk√∂ping", (floor(rand() * 23000) + 40000).cast("int"))  # 40-63k
    .otherwise((floor(rand() * 22000) + 38000).cast("int"))  # Lund: 38-60k
)

# Force execution with an aggregation
result = df_massive.groupBy("city").agg(
    F.avg("salary").alias("avg_salary"),
    F.count("*").alias("count")
).collect()

total_time = time.time() - start

print(f"‚úì Processed {n_rows_massive:,} rows in {total_time:.1f}s")
print(f"‚úì Partitions used: {df_massive.rdd.getNumPartitions()}")
print(f"\nResults (note the salary differences by city!):")
for row in sorted(result, key=lambda x: -x["avg_salary"]):
    print(f"   {row['city']:12} avg salary: {row['avg_salary']:,.0f} SEK ({row['count']:,} rows)")

print(f"\nüí° Key insight: Spark never loaded 100M rows into RAM at once!")
print(f"   It processed them in partitions, streaming through memory.")
print(f"   This same code works for 1B or 10B rows on a cluster.")

### üîç Check the Spark UI Now!

Go back to the Spark UI (link from Section 2) and click on **Jobs**. You should see:

1. Jobs that were created when we ran the aggregation
2. Click on a job to see its **stages**
3. Click on a stage to see the **tasks** that ran on each partition

This visualization becomes essential when optimizing real big data jobs!

---

## Section 7: Understanding Partitions

**Partitions** are how Spark divides data for parallel processing. Understanding partitions is crucial for performance.

Think of it like dividing a pizza ‚Äî more slices means more people can eat at once!

In [None]:
# See how data is distributed across partitions
partition_counts = df_large_spark.rdd.mapPartitions(
    lambda x: [sum(1 for _ in x)]
).collect()

print(f"Data distribution across {len(partition_counts)} partitions:")
for i, count in enumerate(partition_counts):
    bar = '‚ñà' * (count // 500)  # Visual representation
    print(f"  Partition {i}: {count:,} rows {bar}")

In [None]:
# Repartition to change the number of partitions
df_repartitioned = df_large_spark.repartition(4)
print(f"Original partitions: {df_large_spark.rdd.getNumPartitions()}")
print(f"After repartition(4): {df_repartitioned.rdd.getNumPartitions()}")

# See new distribution
new_counts = df_repartitioned.rdd.mapPartitions(
    lambda x: [sum(1 for _ in x)]
).collect()

print("\nNew distribution:")
for i, count in enumerate(new_counts):
    bar = '‚ñà' * (count // 1000)
    print(f"  Partition {i}: {count:,} rows {bar}")

### Why Partitions Matter

- **Too few partitions**: Not enough parallelism, some cores sit idle
- **Too many partitions**: Overhead of managing many small tasks
- **Uneven partitions**: Some tasks finish early, others take forever ("data skew")

Rule of thumb: 2-4 partitions per CPU core for local mode.

---

## üèãÔ∏è Exercise B: Partition Experiment

Partitions have a real impact on performance. In this exercise, you'll measure it!

### Your Task

1. Create a DataFrame with **1 million rows** (modify the code below)
2. Time an aggregation with the **default number of partitions**
3. Repartition to **2 partitions** and time the same aggregation
4. Repartition to **100 partitions** and time again
5. Discuss with your group: What's the optimal number? Why?

### Guiding Questions
- What happens when you have too few partitions?
- What happens when you have too many?
- How does this relate to the number of CPU cores on your machine?

In [None]:
# Exercise B: Partition Experiment
# Step 1: Create a larger dataset (1 million rows)

n_rows_experiment = 1_000_000  # 1 million rows

experiment_data = {
    'id': range(n_rows_experiment),
    'category': np.random.choice(['A', 'B', 'C', 'D', 'E'], n_rows_experiment),
    'value': np.random.randint(1, 1000, n_rows_experiment),
}

df_experiment_pandas = pd.DataFrame(experiment_data)
df_experiment = spark.createDataFrame(df_experiment_pandas)

print(f"Created dataset with {n_rows_experiment:,} rows")
print(f"Default partitions: {df_experiment.rdd.getNumPartitions()}")

In [None]:
# Step 2: Define the aggregation we'll test
def time_aggregation(df, label):
    """Time a groupBy aggregation and return the duration."""
    start = time.time()
    result = df.groupBy('category').agg(
        F.avg('value').alias('avg'),
        F.sum('value').alias('sum'),
        F.count('value').alias('count')
    ).collect()
    duration = time.time() - start
    print(f"{label}: {duration*1000:.0f} ms ({df.rdd.getNumPartitions()} partitions)")
    return duration

# Test with default partitions
time_default = time_aggregation(df_experiment, "Default")

In [None]:
# Step 3: Test with 2 partitions (very few)
df_2_partitions = df_experiment.coalesce(2)
time_2 = time_aggregation(df_2_partitions, "2 partitions")

In [None]:
# Step 4: Test with 100 partitions (many)
df_100_partitions = df_experiment.repartition(100)
time_100 = time_aggregation(df_100_partitions, "100 partitions")

In [None]:
# Step 5: Compare results and discuss
print("\nüìä Performance Summary:")
print("-" * 40)
print(f"Default ({df_experiment.rdd.getNumPartitions()} partitions): {time_default*1000:.0f} ms")
print(f"2 partitions:   {time_2*1000:.0f} ms")
print(f"100 partitions: {time_100*1000:.0f} ms")
print("-" * 40)
print(f"\nü§î Discussion questions for your group:")
print("1. Which configuration was fastest? Why?")
print("2. How many CPU cores does your machine have? (Check above)")
print("3. What would happen with 1000 partitions?")
print("4. When might you want MORE partitions than cores?")

---

## Section 8: Lazy Evaluation Preview

One of Spark's superpowers is **lazy evaluation**. Transformations aren't executed until you need the result. This allows Spark to optimize the entire pipeline!

We'll explore this more deeply in Lab 1, but here's a preview:

In [None]:
# This is INSTANT - nothing is computed yet!
start = time.time()

# Chain multiple transformations
result = df_large_spark \
    .filter(df_large_spark['age'] > 30) \
    .filter(df_large_spark['salary'] > 50000) \
    .select('city', 'salary') \
    .groupBy('city') \
    .agg(F.avg('salary').alias('avg_salary'))

planning_time = time.time() - start
print(f"‚ö° Planning took: {planning_time*1000:.2f} ms")
print("\nNothing has been computed yet! Spark just built a plan.")
print(f"Type of result: {type(result)}")

In [None]:
# See the query plan
print("Query plan (what Spark will do):")
result.explain()

In [None]:
# NOW it executes - when we ask for results
start = time.time()
actual_result = result.collect()  # This triggers computation!
execution_time = time.time() - start

print(f"‚è±Ô∏è Execution took: {execution_time*1000:.2f} ms")
print("\nResults:")
result.show()

### Actions vs Transformations

| Transformations (Lazy) | Actions (Trigger Execution) |
|----------------------|---------------------------|
| `filter()` | `show()` |
| `select()` | `count()` |
| `groupBy()` | `collect()` |
| `join()` | `write()` |
| `withColumn()` | `take(n)` |

Transformations build a plan. Actions execute it.

---

## Section 9: Exercises & Exploration

The final section contains two exercises and a free exploration sandbox. Work through these with your group!

---

## üèãÔ∏è Exercise C: Spark UI Scavenger Hunt

The Spark UI is your best friend for understanding and debugging Spark jobs. Let's explore it!

> ‚ö†Ô∏è **Colab Users:** Skip this exercise ‚Äî the Spark UI isn't accessible in Colab. Instead, use `explain()` to examine query plans (see alternative below).

### Your Mission

Run the aggregation cell below, then open the Spark UI and find the answers to these questions:

1. **Jobs Tab:** How many jobs were created?
2. **Stages:** Click on a job ‚Äî how many stages does it have?
3. **Tasks:** Click on a stage ‚Äî how many tasks ran in parallel?
4. **Timeline:** Look at the Event Timeline ‚Äî did tasks run concurrently?
5. **Shuffle:** Find the shuffle read/write metrics ‚Äî how much data was shuffled?

### For Colab Users (Alternative)
Use `.explain(True)` to see the physical plan instead.

In [None]:
# Run this aggregation, then explore the Spark UI
scavenger_result = df_large_spark.groupBy('city').agg(
    F.avg('salary').alias('avg_salary'),
    F.max('years_experience').alias('max_experience'),
    F.count('*').alias('employee_count')
).orderBy(F.desc('avg_salary'))

# Trigger execution
scavenger_result.show()

# Print the Spark UI URL again for convenience
print(f"\nüåê Spark UI: {spark.sparkContext.uiWebUrl}")
print("   Go to the Jobs tab and find the job that just ran!")

In [None]:
# Alternative for Colab users: Examine the query plan
print("Query Plan (what Spark will execute):")
print("=" * 50)
scavenger_result.explain(True)

### üìñ How to Read the Query Plan

The output above shows **4 different views** of the same query, from abstract to concrete:

**1. Parsed Logical Plan** ‚Äî What you wrote, parsed into a tree structure
- Just translates your code, no optimizations yet

**2. Analyzed Logical Plan** ‚Äî Same as above, but with resolved column types
- Now Spark knows `avg_salary` is a `double`, `employee_count` is a `bigint`, etc.

**3. Optimized Logical Plan** ‚Äî Spark's optimizer has improved your query!
- Notice the `Project` step ‚Äî Spark realized it only needs 3 columns (`city`, `salary`, `years_experience`), so it drops `id` and `age` early to save memory

**4. Physical Plan** ‚Äî The actual execution strategy (this is what runs!)

### üîç Reading the Physical Plan (Bottom to Top!)

Read from the **bottom up** ‚Äî that's the order of execution:

```
Scan ExistingRDD           ‚Üê 1. Read the data
   ‚Üì
Project [city, salary, years_experience]  ‚Üê 2. Keep only needed columns
   ‚Üì
HashAggregate (partial)    ‚Üê 3. Partial aggregation BEFORE shuffle (optimization!)
   ‚Üì
Exchange hashpartitioning  ‚Üê 4. SHUFFLE! Send data to reducers by city
   ‚Üì
HashAggregate (final)      ‚Üê 5. Final aggregation after shuffle
   ‚Üì
Exchange rangepartitioning ‚Üê 6. Another shuffle for sorting
   ‚Üì
Sort                       ‚Üê 7. Sort by avg_salary descending
```

### üéØ Key Insights

- **Two shuffles!** (`Exchange` = shuffle). One for grouping by city, one for sorting.
- **Partial aggregation**: Spark computes partial averages/counts *before* shuffling (see `partial_avg`, `partial_count`). This reduces network traffic ‚Äî smart!
- **AdaptiveSparkPlan**: Spark can adjust the plan at runtime based on actual data sizes.
- **200 partitions**: The default shuffle partition count (you can tune this with `spark.sql.shuffle.partitions`).

### Scavenger Hunt Answers (Fill in with your group)

| Question | Your Answer |
|----------|-------------|
| How many jobs? | ___ |
| Stages per job? | ___ |
| Tasks per stage? | ___ |
| Concurrent execution? | Yes / No |
| Shuffle data size? | ___ MB |

**Discussion:** Why are there multiple stages? (Hint: What operation requires data movement?)

---

## üèãÔ∏è Exercise D: Distributed Thinking

This is a **group discussion exercise** ‚Äî no code required! Work through these scenarios with your group.

### Scenario 1: Web Log Analysis

You work at a streaming company and need to analyze **500 GB of web server logs** to find:
- The most popular pages
- Peak usage hours
- Users with unusual behavior

**Discussion Questions:**
1. Would this fit in Pandas on your laptop? (Hint: How much RAM do you have?)
2. If you have a cluster with 10 nodes, how many partitions would you choose?
3. What if one log file is 400 GB and the others are 10 GB each? What problem might occur?

### Scenario 2: Data Pipeline Failure

Your nightly Spark job that processes customer orders has been running for 8 hours (usually takes 1 hour). You open the Spark UI and see:
- 199 out of 200 tasks completed
- 1 task has been running for 7.5 hours

**Discussion Questions:**
1. What is this symptom called?
2. What might cause one task to take so long?
3. How would you investigate and fix this?

### Scenario 3: Real-time vs Batch

Your team debates whether to process data in real-time or in nightly batches.

**Discussion Questions:**
1. What are the trade-offs between batch and streaming?
2. Give an example where real-time processing is essential
3. Give an example where batch processing is sufficient

### üìù Group Notes: Record Your Answers

Use this space to record your group's answers and insights:

**Scenario 1 (Web Logs):**
- Would it fit in Pandas?
- Suggested partition count?
- The 400 GB file problem is called...

**Scenario 2 (Pipeline Failure):**
- The symptom is called...
- Possible causes...
- Investigation steps...

**Scenario 3 (Real-time vs Batch):**
- Trade-offs...
- Real-time example...
- Batch example...

### üß™ Exploration Sandbox

Use these cells to experiment with Spark! Try different operations, break things, and learn by doing.

In [None]:
# Try: Summary statistics
df_large_spark.describe().show()

In [None]:
# Try: Top 10 highest salaries
df_large_spark.orderBy(df_large_spark['salary'].desc()).show(10)

In [None]:
# Try: Add a calculated column
df_with_tax = df_large_spark.withColumn('estimated_tax', df_large_spark['salary'] * 0.3)
df_with_tax.show(5)

In [None]:
# Your experiments here...


In [None]:
# More space for exploration...


---

## Wrap-up

### What You Learned Today

1. **Environment setup** ‚Äî How to verify and configure PySpark
2. **MapReduce** ‚Äî The foundational paradigm (Map ‚Üí Shuffle ‚Üí Reduce)
3. **SparkSession** ‚Äî The entry point to Spark functionality
4. **MapReduce vs Spark** ‚Äî Same word count in 5 lines instead of 30!
5. **Pandas ‚Üí Spark** ‚Äî Converting DataFrames and the similar syntax
6. **Scaling** ‚Äî Why Spark matters for large datasets
7. **Partitions** ‚Äî How Spark distributes data for parallel processing
8. **Lazy evaluation** ‚Äî Transformations vs Actions
9. **Spark UI** ‚Äî Monitoring what Spark is doing
10. **Distributed thinking** ‚Äî Reasoning about data at scale

### Key Concepts to Remember

- **MapReduce** is the foundational pattern: Map (transform) ‚Üí Shuffle (group) ‚Üí Reduce (combine)
- Spark is designed for **distributed** computing ‚Äî it shines with big data
- Spark keeps data **in memory** instead of writing to disk like Hadoop MapReduce
- **Lazy evaluation** lets Spark optimize your entire pipeline
- **Partitions** determine parallelism ‚Äî balance is key
- **Data skew** (uneven partitions) is a common performance killer
- The **Spark UI** is your best friend for debugging and optimization

### What's Next?

- **Next week (Week 5):** Lab 1 will be published ‚Äî your first graded assignment!
- Lab 1 dives deeper into transformations, actions, and the execution model
- You'll work with larger datasets and submit your completed notebook

### Great work today! üéâ

You've taken your first steps into distributed computing. The concepts you learned today ‚Äî MapReduce thinking, partitioning, lazy evaluation ‚Äî are foundational to everything else in this course.

In [None]:
# Clean up - stop the Spark session when you're done
spark.stop()
print("‚úì Spark session stopped. Learning lab complete!")