# PySpark Fundamentals - Jupyter Notebook

This notebook explains PySpark fundamentals for beginners.

## 0. Getting Started

### 0.1 Acknowledgments

**Original Source:** Professor Dr. Ali Safari  
**Modified by:** Benjamin Gao (Enhanced structure and explanations for better learning experience)  
**Email:** g103200@gmail.com 

---

**☕ Support This Work:**

If you find this notebook helpful, consider buying me a coffee! Your support helps create more free educational content. 🙏

**💳 Fiat Currency:**
- **Alipay/支付宝:** `g103200@gmail.com`
- **Wise:** `g103200@gmail.com`
- **PayPal:** [paypal.me/gbenjamin3](https://paypal.me/gbenjamin3)

**₿ Cryptocurrency:**

<details>
<summary>Click to view crypto addresses</summary>

- **ETH (ERC20):** `0x05bd3070993c1ef72b1ca3a06999cbcc3f61ad8b`
- **USDT (ERC20):** `0x0a4649d6cbabf9bcf0419ac829f22a273136af51`
- **SOL (Solana):** `3bsEtgBPeNwMrHLzQBrxiQ7wX1nr3dSRrzVAHoa1nudQ`
- **BTC (Bitcoin):** `bc1ql0pafavp4l0l7j9m6dhgqajces3a80zqdj2kp8nua3aw4hqsm6vsnucv2m`

</details>

*Every contribution, no matter how small, is greatly appreciated!* ✨

---

### 0.2 How to Use This Notebook

#### 📖 Folding Feature
- Click **▼** to collapse sections
- Click **▶** to expand content

#### 🎯 Learning Path
1. Getting Started → Setup environment
2. Introduction → PySpark basics
3. Creating DataFrames → Three creation methods
4. Basic Operations → Common operations
5. Class Activity → Hands-on practice

---

### 0.3 System Information

**Local Environment (This Notebook):**

| Component | Version |
|-----------|---------|
| System | M4Pro MacBook Pro macOS 26 |
| Python | 3.14.0 |
| PySpark | 4.0.1 |
| Java | OpenJDK 17 |
| Environment | `~/.venvs/pyspark-latest` |

**💡 Alternative: Google Colab**

If you find local environment setup too complicated, consider using **Google Colab**:
- ✅ **Free** - No cost to use
- ✅ **Easy** - No installation needed, runs in browser
- ✅ **Powerful** - Free GPU/TPU access
- ✅ **Pre-configured** - PySpark ready to use with minimal setup

**To use Colab:** Visit [colab.research.google.com](https://colab.research.google.com)

---

### 0.4 Kernel Selection

**Current Setup: Python 3.14 (works fine!)**

**Steps:**
1. Click "Select Kernel" (top-right corner)
2. Currently using **"Python 3.14 (pyspark-latest)"** ✅
3. Alternative: **"Python 3.12 (pyspark-py312)"** is also available
4. Run the setup cells below to configure JAVA_HOME

In [None]:
# ⚙️ Minimal Setup: JAVA_HOME + Imports + Spark (Run this FIRST!)
import os
import sys
import subprocess
import traceback
from time import sleep

# Soft guard: PySpark 4.0.1 is known-stable on Python 3.12. Python 3.13+ can be flaky but may still work.
if sys.version_info >= (3, 13) and os.environ.get("ALLOW_UNSUPPORTED_PYTHON") != "1":
    print(
        f"⚠️ Running on Python {sys.version.split()[0]}. PySpark 4.0.1 is best on Python 3.12.\n"
        "If the Java gateway fails to start intermittently, switch kernel to 'Python 3.12 (pyspark-py312)'.\n"
        "To silence this warning, set env ALLOW_UNSUPPORTED_PYTHON=1."
    )

# Set JAVA_HOME to the OpenJDK 17 installed via Homebrew
java_home = "/opt/homebrew/opt/openjdk@17/libexec/openjdk.jdk/Contents/Home"
os.environ["JAVA_HOME"] = java_home

# Force IPv4 bindings at the OS/env level (helps on macOS where localhost may resolve to ::1)
os.environ["SPARK_LOCAL_IP"] = "127.0.0.1"
os.environ["SPARK_LOCAL_HOSTNAME"] = "127.0.0.1"
# Ensure PySpark uses the same Python interpreter as this kernel
os.environ["PYSPARK_PYTHON"] = sys.executable

# Allow overriding the local master via env (default local[1])
SPARK_MASTER = os.environ.get("SPARK_MASTER", "local[1]")

# Import PySpark and other libraries
from pyspark.sql import SparkSession
from pyspark.sql.functions import *  # All Spark SQL functions
from pyspark.sql.types import *
import pandas as pd

print(f"✅ JAVA_HOME: {java_home}")
print(f"✅ Python: {sys.executable}")
print(f"✅ Version: {sys.version.split()[0]}")
print("✅ Libraries imported successfully!")
print(f"✅ SPARK_LOCAL_IP: {os.environ.get('SPARK_LOCAL_IP')}")
print(f"✅ SPARK_MASTER: {SPARK_MASTER}")

# Quick Java sanity check (prints to stderr normally)
try:
    java_bin = os.path.join(java_home, "bin", "java")
    proc = subprocess.run([java_bin, "-version"], capture_output=True, text=True)
    print("✅ java -version (stderr):")
    print(proc.stderr.strip())
except Exception as e:
    print("❌ Failed to run java -version:", repr(e))

# Helper: create/get a SparkSession with safe IPv4 binding on macOS and retry on intermittent gateway errors
def get_spark(app_name: str = "PySparkFundamentals", master: str = SPARK_MASTER, retries: int = 5, delay: float = 2.0) -> SparkSession:
    last_err = None
    for attempt in range(1, retries + 1):
        try:
            spark = (
                SparkSession.builder
                    .master(master)
                    .appName(app_name)
                    # Quality of life and IPv4 safety
                    .config("spark.sql.repl.eagerEval.enabled", True)
                    .config("spark.ui.enabled", "false")
                    .config("spark.driver.host", "127.0.0.1")
                    .config("spark.driver.bindAddress", "127.0.0.1")
                    # Prefer IPv4 on JVM side (driver and executor)
                    .config("spark.driver.extraJavaOptions", "-Djava.net.preferIPv4Stack=true -Djava.net.preferIPv6Addresses=false")
                    .config("spark.executor.extraJavaOptions", "-Djava.net.preferIPv4Stack=true -Djava.net.preferIPv6Addresses=false")
                    # Use ephemeral ports to avoid conflicts with firewalls/defenders
                    .config("spark.driver.port", "0")
                    .config("spark.blockManager.port", "0")
                    # Conservative defaults for local notebook runs
                    .config("spark.executor.instances", "1")
                    .config("spark.network.timeout", "120s")
                    .getOrCreate()
            )
            spark.sparkContext.setLogLevel("ERROR")
            return spark
        except Exception as e:
            last_err = e
            # Intermittent: gateway not yet listening; backoff and retry
            if attempt < retries:
                print(f"⏳ Spark start failed (attempt {attempt}/{retries}): {e.__class__.__name__}: {e}")
                print(f"   Retrying in {delay:.1f}s...")
                sleep(delay)
            else:
                print(f"❌ Spark failed after {retries} attempts.")
                raise

# Create (or get) the SparkSession once for the whole notebook
spark = get_spark()
print(f"✅ Spark version: {spark.version}")
print(f"✅ Spark is running on: {spark.sparkContext.master}")

# 🔍 Optional: Quick Diagnostic (set to True to run)
RUN_DIAGNOSTICS = False

if RUN_DIAGNOSTICS:
    print("\n" + "=" * 60)
    print("DIAGNOSTIC REPORT")
    print("=" * 60)
    try:
        test_spark = (
            SparkSession.builder
            .master(SPARK_MASTER)
            .appName("DiagnosticTest")
            .config("spark.ui.enabled", "false")
            .config("spark.driver.host", "127.0.0.1")
            .config("spark.driver.bindAddress", "127.0.0.1")
            .config("spark.driver.extraJavaOptions", "-Djava.net.preferIPv4Stack=true -Djava.net.preferIPv6Addresses=false")
            .config("spark.executor.extraJavaOptions", "-Djava.net.preferIPv4Stack=true -Djava.net.preferIPv6Addresses=false")
            .config("spark.driver.port", "0")
            .config("spark.blockManager.port", "0")
            .getOrCreate()
        )
        print(f"   ✅ Spark {test_spark.version} started!")
        print(f"   ✅ Master: {test_spark.sparkContext.master}")
        test_spark.stop()
        print("   ✅ Stopped successfully")
    except Exception:
        print("   ❌ Failed to start")
        print("\n   Error Details:")
        print("   " + "─" * 56)
        traceback.print_exc(file=sys.stdout)
        print("   " + "─" * 56)
        print("\n   💡 Fix: Ensure network config uses 127.0.0.1 and no firewall is blocking local ports")
    print("\n" + "=" * 60)

⚠️ Running on Python 3.14.0. PySpark 4.0.1 is best on Python 3.12.
If the Java gateway fails to start intermittently, switch kernel to 'Python 3.12 (pyspark-py312)'.
✅ JAVA_HOME: /opt/homebrew/opt/openjdk@17/libexec/openjdk.jdk/Contents/Home
✅ Python: /Users/benjamingao/.venvs/pyspark-latest/bin/python
✅ Version: 3.14.0
✅ Libraries imported successfully!
✅ SPARK_LOCAL_IP: 127.0.0.1
✅ SPARK_MASTER: local[1]
✅ java -version (stderr):
openjdk version "17.0.17" 2025-10-21
OpenJDK Runtime Environment Homebrew (build 17.0.17+0)
OpenJDK 64-Bit Server VM Homebrew (build 17.0.17+0, mixed mode, sharing)
⏳ Spark start failed (attempt 1/3): ConnectionRefusedError: [Errno 61] Connection refused
   Retrying in 1.0s...
⏳ Spark start failed (attempt 2/3): ConnectionRefusedError: [Errno 61] Connection refused
   Retrying in 1.0s...
⏳ Spark start failed (attempt 2/3): ConnectionRefusedError: [Errno 61] Connection refused
   Retrying in 1.0s...
❌ Spark failed after 3 attempts.
❌ Spark failed after 3 atte

ConnectionRefusedError: [Errno 61] Connection refused

### 📋 Quick Start

**Run the cell below FIRST** - it will:
- ✅ Configure JAVA_HOME for Spark
- ✅ Import all required libraries (PySpark, pandas, etc.)
- ✅ Verify your setup (optional diagnostic available)

**If you encounter errors:** Change `RUN_DIAGNOSTICS = False` to `True` for detailed diagnostic report.

### ⚠️ Troubleshooting: Connection Refused Error

**Problem:** `ConnectionRefusedError` when starting Spark

**Root Cause:** macOS resolves `localhost` to IPv6 (`::1`) instead of IPv4 (`127.0.0.1`), or the JVM gateway fails to bind due to port/IP issues, or an unsupported Python version.

**Solution (already applied in this notebook):** Force IPv4 binding
```python
.config("spark.driver.host", "127.0.0.1")
.config("spark.driver.bindAddress", "127.0.0.1")
.config("spark.driver.port", "0")              # use an ephemeral port
.config("spark.blockManager.port", "0")        # use an ephemeral port
```

**Important:** PySpark 4.0.1 is currently best supported on Python 3.12. If you're on Python 3.13+ (e.g., 3.14), the Java gateway may fail to start. Switch the kernel to Python 3.12:
- VS Code (top-right): Select Kernel → choose "Python 3.12 (pyspark-py312)".
- Then re-run the first setup cell.

**If you still see ConnectionRefused:**
- Ensure JAVA works: the setup cell prints `java -version` automatically.
- Confirm firewall or VPN isn’t blocking localhost connections.
- Optionally set `RUN_DIAGNOSTICS = True` in the setup cell to run a minimal start/stop test.

---

## 1. Introduction to PySpark

### 1.1 What is PySpark?

PySpark is the Python API for Apache Spark, a distributed computing engine for big data. It supports:
- Distributed data processing
- Fault tolerance
- In-memory computations
- Integration with many data sources

### 1.2 Creating a SparkSession

#### 💡 Code Explanation

**SparkSession** (case sensitive)

#### Why need session?
Without session, the system does not recognize who you are.

#### 🏦 Analogy: Bank Account

| Code Part | Bank Analogy | Explanation |
|-----------|--------------|-------------|
| `SparkSession` | Opening a bank account | Your identity in the Spark system |
| `.builder` | Walking into the bank | Starting the process |
| `.appName("PySparkFundamentals")` | Naming your account | Give your app a unique name |
| `.config(...)` | Setting up special features | Configure how Spark behaves |
| `.getOrCreate()` | Get existing OR create new | Smart! Reuses if exists, creates if not |

#### 📋 Step-by-Step Breakdown

```python
# Step 1: Start building a SparkSession
SparkSession.builder

# Step 2: Name your application
.appName("PySparkFundamentals")

# Step 3: Add configuration (optional)
.config("spark.sql.repl.eagerEval.enabled", True)
# ↑ This makes DataFrames auto-display in Jupyter

# Step 4: Create or get existing session
.getOrCreate()
```

#### 🎯 Key Points

- ✅ **Case Sensitive**: Must write `SparkSession` (not `sparksession`)
- ✅ **One Session**: `getOrCreate()` ensures only one session exists
- ✅ **Required**: Without it, you can't use any Spark functions
- ✅ **Like Login**: It's your "login credentials" for Spark

**This is called "Method Chaining"**
- Object.Method1().Method2().Method3()

#### 🧪 Quick Practice

> **Task:** Create a SparkSession with the following requirements:
> - Application name: "StudentDataAnalysis"
> - No additional configuration needed
> - Store in variable: spark2

In [None]:
spark2 = spark  # Reuse the existing SparkSession
print(f"Spark2 version: {spark2.version}")
print("Spark2 is spark:", spark2 is spark)

Spark2 version: 4.0.1


### 1.3 Understanding `getOrCreate()`

#### 🎓 Why It's Called `getOrCreate()`

| Scenario | Behavior |
|----------|----------|
| **First Call** | **Create** - Creates a new SparkSession |
| **Subsequent Calls** | **Get** - Returns existing SparkSession (ignores new config) |

#### ⚠️ Important Discovery

Run the verification code above and you'll find:
- Both variables have the app name `"PySparkFundamentals"` (the first one created)
- `spark is spark2` returns `True` (they are the same object)

#### 📝 What If You Really Want Multiple Sessions?

**Method 1: Stop the old one, then create a new one** (within the same program)

```python
# Stop the old session
spark.stop()

# Create a new session
spark2 = SparkSession.builder.appName("NewApp").getOrCreate()
```

**Method 2: Run different programs** (recommended)

Different Python scripts can each have their own SparkSession.

#### 🎯 Spark's Design Philosophy

**One Application = One SparkSession = Multiple DataFrames**

```python
# ✅ Correct approach: One Session, multiple datasets
spark = SparkSession.builder.appName("DataAnalysis").getOrCreate()

# Process multiple types of data simultaneously
students_df = spark.createDataFrame(student_data)
sales_df = spark.createDataFrame(sales_data)
products_df = spark.createDataFrame(product_data)

# Can analyze in parallel
students_df.show()
sales_df.show()
products_df.show()
```

**You don't need multiple Sessions, you need multiple DataFrames!**

## 2. Creating DataFrames

### 2.1 From a List

In [None]:
# 2.1 From a List

# SparkSession 'spark' is already initialized in the setup cell

data = [
    ("Alice", 34, "Engineer"),
    ("Bob", 45, "Data Scientist"),
    ("Catherine", 29, "Developer"),
    ("David", 52, "Manager")
]
columns = ["Name", "Age", "Occupation"]

df = spark.createDataFrame(data, columns)
df.show()

+---------+---+--------------+
|     Name|Age|    Occupation|
+---------+---+--------------+
|    Alice| 34|      Engineer|
|      Bob| 45|Data Scientist|
|Catherine| 29|     Developer|
|    David| 52|       Manager|
+---------+---+--------------+



                                                                                

#### 📝 Code Example

#### 💡 Code Explanation: Creating DataFrame from a List

#### 🏦 Analogy: Opening a Savings Account with Customer Records

Imagine you work at a bank and need to digitize customer records:

| Code Part | Bank Analogy | What's Happening |
|-----------|--------------|------------------|
| `data = [(...), (...), ...]` | **Customer info cards** | Raw data: each tuple is like a customer card with details |
| `columns = ["Name", "Age", ...]` | **Form field labels** | Column headers: defining what each piece of data means |
| `spark.createDataFrame(data, columns)` | **Create digital database** | Convert paper records into a structured database table |
| `df` | **The customer database** | Your organized, searchable database |
| `df.show()` | **Print the database** | Display the records on screen |

#### 📋 Step-by-Step Breakdown

```python
# Step 1: Prepare raw data (like customer info cards)
data = [
    ("Alice", 34, "Engineer"),      # Card 1
    ("Bob", 45, "Data Scientist"),  # Card 2
    ("Catherine", 29, "Developer"), # Card 3
    ("David", 52, "Manager")        # Card 4
]
# ↑ This is a LIST of TUPLES (each tuple = one row/record)

# Step 2: Define column names (like form field labels)
columns = ["Name", "Age", "Occupation"]
# ↑ This is a LIST of STRINGS (column headers)

# Step 3: Create a DataFrame (digitize the records)
df = spark.createDataFrame(data, columns)
# ↑ spark: Your bank account (SparkSession)
#   .createDataFrame(): The "digitization machine"
#   data: What to digitize
#   columns: How to label each field

# Step 4: Display the database
df.show()
# ↑ Show the organized table on screen
```

### 2.2 From a Pandas DataFrame

#### 🐼 What is Pandas?

**Pandas** is a popular Python library for data analysis (like Excel for Python)

- **Full Name**: Python Data Analysis Library
- **Use Case**: Working with small-to-medium datasets (fits in your computer's memory)
- **Key Object**: `DataFrame` - a table with rows and columns (like an Excel spreadsheet)

#### 💡 Why Convert Pandas → PySpark?

You might have data in Pandas but want to:
- Scale up to larger datasets
- Use Spark's distributed processing
- Integrate with existing Spark pipelines

**Good News**: PySpark can easily convert Pandas DataFrames!

In [None]:
pandas_df = pd.DataFrame({
    "Product": ["Laptop", "Mouse", "Keyboard", "Monitor"],
    "Price": [1200, 25, 80, 300],
    "Quantity": [5, 20, 15, 8]
})

products_df = spark.createDataFrame(pandas_df)
products_df.show()

+--------+-----+--------+
| Product|Price|Quantity|
+--------+-----+--------+
|  Laptop| 1200|       5|
|   Mouse|   25|      20|
|Keyboard|   80|      15|
| Monitor|  300|       8|
+--------+-----+--------+



### 2.3 Reading from CSV

#### 📄 What is CSV?

**CSV = Comma-Separated Values**

- A simple text file format for storing tabular data
- Each line = one row
- Commas separate columns
- **Most common** format for big data exchange!

In [13]:
csv_data = """id,name,value
1,Alice,100
2,Bob,200
3,Charlie,150
4,Diana,300"""

with open("sample_data.csv", "w") as f:
    f.write(csv_data)

csv_df = spark.read.csv("sample_data.csv", header=True, inferSchema=True)
csv_df.show()

+---+-------+-----+
| id|   name|value|
+---+-------+-----+
|  1|  Alice|  100|
|  2|    Bob|  200|
|  3|Charlie|  150|
|  4|  Diana|  300|
+---+-------+-----+



#### 📝 Code Example: Creating and Reading CSV

#### 🔍 Detailed Code Walkthrough

Let me break down this code line by line:

##### **Step 1: Create CSV Data (Line 1-5)**

```python
csv_data = """id,name,value
1,Alice,100
2,Bob,200
3,Charlie,150
4,Diana,300"""
```

**What's happening:**
- `csv_data` = a **variable** storing text
- `"""..."""` = **triple quotes** (allows multi-line strings)
- Content = CSV format data (comma-separated values)

**Structure:**
```
Line 1: id,name,value        ← Header row (column names)
Line 2: 1,Alice,100          ← Data row 1
Line 3: 2,Bob,200            ← Data row 2
Line 4: 3,Charlie,150        ← Data row 3
Line 5: 4,Diana,300          ← Data row 4
```

**Analogy:** Like writing customer records on a piece of paper

##### **Step 2: Create a Physical File (Line 7-8)**

```python
with open("sample_data.csv", "w") as f:
    f.write(csv_data)
```

**Breaking it down:**

| Part | Meaning | Analogy |
|------|---------|---------|
| `with open(...)` | Context manager (auto-closes file) | "Use this file, then clean up automatically" |
| `"sample_data.csv"` | Filename to create | Name of the file on your computer |
| `"w"` | Write mode | "Create new file or overwrite existing one" |
| `as f:` | Give it a nickname `f` | Short name for the file object |
| `f.write(csv_data)` | Write the text to file | Copy the text into the file |

**What happens:**
1. Creates (or overwrites) a file named `sample_data.csv`
2. Writes the CSV text into it
3. Automatically closes the file when done

**Analogy:** Taking your paper records and putting them in a filing cabinet

**Result:** You now have a real CSV file on your computer:
```
📁 Your Computer
  └─ sample_data.csv  ← This file now exists!
```

##### **Step 3: Read CSV into PySpark (Line 10)**

```python
csv_df = spark.read.csv("sample_data.csv", header=True, inferSchema=True)
```

**Breaking it down:**

| Part | What It Does |
|------|--------------|
| `csv_df =` | Store the result in variable `csv_df` |
| `spark` | Your SparkSession (created earlier) |
| `.read` | Access the DataFrameReader |
| `.csv(...)` | Read a CSV file |
| `"sample_data.csv"` | The file to read |
| `header=True` | First row is column names |
| `inferSchema=True` | Auto-detect data types |

**Process:**
```
Step 1: spark.read
        ↓
        Access Spark's file reader

Step 2: .csv("sample_data.csv")
        ↓
        Read the CSV file

Step 3: header=True
        ↓
        Use first row as column names
        (id, name, value)

Step 4: inferSchema=True
        ↓
        Figure out data types automatically
        - id: integer
        - name: string
        - value: integer

Step 5: Return DataFrame → csv_df
```

**Analogy:** Scanning paper documents and creating a digital database

##### **Step 4: Display the DataFrame (Line 11)**

```python
csv_df.show()
```

**What it does:** Prints the DataFrame in a table format

**Output:**
```
+---+-------+-----+
| id|   name|value|
+---+-------+-----+
|  1|  Alice|  100|
|  2|    Bob|  200|
|  3|Charlie|  150|
|  4|  Diana|  300|
+---+-------+-----+
```

**Analogy:** Printing a report to see your database

##### 🎯 Complete Flow Diagram

```
┌─────────────────────────────────────┐
│ Step 1: Create Text Data           │
│ csv_data = """id,name,value..."""  │
└─────────────┬───────────────────────┘
              │
              ↓
┌─────────────────────────────────────┐
│ Step 2: Write to File               │
│ with open("sample_data.csv", "w"):  │
│     f.write(csv_data)               │
└─────────────┬───────────────────────┘
              │
              ↓
        📁 sample_data.csv
        (File on disk)
              │
              ↓
┌─────────────────────────────────────┐
│ Step 3: Read File into Spark        │
│ csv_df = spark.read.csv(...)        │
│ - header=True: Use first row        │
│ - inferSchema=True: Detect types    │
└─────────────┬───────────────────────┘
              │
              ↓
┌─────────────────────────────────────┐
│ csv_df (PySpark DataFrame)          │
│ +---+-------+-----+                 │
│ | id|   name|value|                 │
│ +---+-------+-----+                 │
│ |  1|  Alice|  100|                 │
│ +---+-------+-----+                 │
└─────────────┬───────────────────────┘
              │
              ↓
┌─────────────────────────────────────┐
│ Step 4: Display                     │
│ csv_df.show()                       │
└─────────────────────────────────────┘
```

##### 🤔 Why Two Steps (Create File + Read File)?

**Question:** Why not read data directly from `csv_data` string?

**Answer:** This example demonstrates the **real-world workflow**:
1. In reality, CSV files already exist (from other systems)
2. You just need to **read them** into Spark

**Real-world scenario:**
```python
# You DON'T create the file, it already exists!
# Just read it:
df = spark.read.csv("sales_data_2024.csv", header=True, inferSchema=True)
```

This code creates a file just for **demonstration purposes** so you can practice reading CSVs!

##### 💡 Key Takeaways

1. **CSV file** = text file with comma-separated values
2. **`with open()`** = Python's way to create/write files
3. **`spark.read.csv()`** = Spark's way to read CSV into DataFrame
4. **`header=True`** = Treat first row as column names
5. **`inferSchema=True`** = Auto-detect data types
6. **`.show()`** = Display the DataFrame

##### 📊 Common Big Data CSV Scenarios

**Scenario 1: Web Server Logs**
```csv
timestamp,ip_address,url,status_code
2024-10-24 10:30:00,192.168.1.1,/home,200
2024-10-24 10:31:15,192.168.1.2,/login,404
```

**Scenario 2: E-commerce Transactions**
```csv
order_id,customer_id,product,amount,date
12345,C001,Laptop,1200,2024-10-24
12346,C002,Mouse,25,2024-10-24
```

**Scenario 3: IoT Sensor Data**
```csv
sensor_id,temperature,humidity,timestamp
S001,22.5,65,2024-10-24 10:00:00
S002,23.1,62,2024-10-24 10:00:01
```

##### 🎓 CSV Best Practices

✅ **DO**:
- Always use `header=True` if your CSV has headers
- Use `inferSchema=True` for automatic type detection
- Clean data before loading (remove extra spaces)

❌ **DON'T**:
- Assume data is clean (always check for spaces, nulls)
- Load huge CSVs without partitioning (Spark handles this automatically)
- Forget to handle special characters in data

##### ⚡ Advanced CSV Options

```python
# Handle spaces around values
df = spark.read.csv("data.csv", 
    header=True,
    inferSchema=True,
    ignoreLeadingWhiteSpace=True,   # Remove leading spaces
    ignoreTrailingWhiteSpace=True   # Remove trailing spaces
)

# Different delimiter (tab-separated)
df = spark.read.csv("data.tsv", sep="\t", header=True)

# Handle missing values
df = spark.read.csv("data.csv", header=True, nullValue="N/A")

# Custom quote character
df = spark.read.csv("data.csv", header=True, quote="'")
```

##### 🔄 Real-World CSV Loading

**In production, you'll read files from:**

```python
# Local file
df = spark.read.csv("file:///path/to/data.csv", header=True, inferSchema=True)

# HDFS (Hadoop Distributed File System)
df = spark.read.csv("hdfs://namenode:9000/data/sales.csv", header=True)

# AWS S3
df = spark.read.csv("s3://my-bucket/data/logs.csv", header=True)

# Azure Blob Storage
df = spark.read.csv("wasbs://container@account.blob.core.windows.net/data.csv", header=True)

# Google Cloud Storage
df = spark.read.csv("gs://my-bucket/data/users.csv", header=True)
```

##### 🎯 Key Options Explained

**`header=True`**
- Treats the first line as column names
- Without this, first line would be treated as data
- Example:
  ```
  WITH header=True:     WITHOUT header=True:
  +---+-------+-----+   +---+-------+-----+
  | id|   name|value|   |_c0|    _c1|  _c2|
  +---+-------+-----+   +---+-------+-----+
  |  1|  Alice|  100|   | id|   name|value|
  |  2|    Bob|  200|   |  1|  Alice|  100|
  +---+-------+-----+   +---+-------+-----+
  ```

**`inferSchema=True`**
- Automatically detects data types (int, string, double, etc.)
- Without this, everything is treated as string
- Example:
  ```
  WITH inferSchema=True:     WITHOUT inferSchema=True:
  id: integer               id: string
  name: string              name: string
  value: integer            value: string
  ```

##### 📋 Complete Code Breakdown

```python
# Step 1: Create CSV data as a string
csv_data = """id,name,value
1,Alice,100
2,Bob,200
3,Charlie,150
4,Diana,300"""
# ↑ Triple quotes allow multi-line strings
#   First line: column names (header)
#   Other lines: data rows

# Step 2: Write CSV data to a file (creates "sample_data.csv")
with open("sample_data.csv", "w") as f:
    f.write(csv_data)
# ↑ "w" = write mode (creates or overwrites file)
#   This creates a real CSV file on your computer

# Step 3: Read CSV file into PySpark DataFrame
csv_df = spark.read.csv("sample_data.csv", header=True, inferSchema=True)
#         ↑      ↑                         ↑            ↑
#         |      |                         |            Auto-detect data types
#         |      |                         First row is header
#         |      Read CSV file
#         Spark session

# Step 4: Display the DataFrame
csv_df.show()
```

##### 🏦 Analogy: Importing Bank Records from a File Cabinet

Imagine you have customer records stored in a text file (CSV) and want to digitize them:

| Code Part | Bank Analogy | What's Happening |
|-----------|--------------|------------------|
| `csv_data = """..."""` | **Text file content** | The raw CSV data as a multi-line string |
| `with open(...) as f:` | **Create a physical file** | Write the CSV text to a real file on disk |
| `spark.read.csv(...)` | **Import file into database** | Read CSV file into a PySpark DataFrame |
| `header=True` | **First row is column names** | Tells Spark the first line contains headers |
| `inferSchema=True` | **Auto-detect data types** | Spark figures out which columns are numbers, strings, etc. |

#### 💡 Code Explanation Summary

This section provides a complete breakdown of how to read CSV files in PySpark.

## 3. Basic DataFrame Operations

### 3.1 Show, Schema, Columns, Describe

In [14]:
# Basic inspection utilities
# Show a small, clean summary of the DataFrame

df.show()
df.printSchema()
print("Columns:", df.columns)
df.describe().show()

+---------+---+--------------+
|     Name|Age|    Occupation|
+---------+---+--------------+
|    Alice| 34|      Engineer|
|      Bob| 45|Data Scientist|
|Catherine| 29|     Developer|
|    David| 52|       Manager|
+---------+---+--------------+

root
 |-- Name: string (nullable = true)
 |-- Age: long (nullable = true)
 |-- Occupation: string (nullable = true)

Columns: ['Name', 'Age', 'Occupation']
+-------+-----+------------------+--------------+
|summary| Name|               Age|    Occupation|
+-------+-----+------------------+--------------+
|  count|    4|                 4|             4|
|   mean| NULL|              40.0|          NULL|
| stddev| NULL|10.424330514074594|          NULL|
|    min|Alice|                29|Data Scientist|
|    max|David|                52|       Manager|
+-------+-----+------------------+--------------+

+-------+-----+------------------+--------------+
|summary| Name|               Age|    Occupation|
+-------+-----+------------------+----------

### 3.2 Selecting and Filtering

In [15]:
df.select("Name", "Occupation").show()
df.filter(df.Age > 30).show()
df.filter((df.Age > 30) & (df.Occupation == "Engineer")).show()
df.filter("Age > 30 AND Occupation = 'Engineer'").show()

+---------+--------------+
|     Name|    Occupation|
+---------+--------------+
|    Alice|      Engineer|
|      Bob|Data Scientist|
|Catherine|     Developer|
|    David|       Manager|
+---------+--------------+

+-----+---+--------------+
| Name|Age|    Occupation|
+-----+---+--------------+
|Alice| 34|      Engineer|
|  Bob| 45|Data Scientist|
|David| 52|       Manager|
+-----+---+--------------+

+-----+---+----------+
| Name|Age|Occupation|
+-----+---+----------+
|Alice| 34|  Engineer|
+-----+---+----------+

+-----+---+----------+
| Name|Age|Occupation|
+-----+---+----------+
|Alice| 34|  Engineer|
+-----+---+----------+



### 3.3 Adding / Modifying columns

In [16]:
df_with_bonus = df.withColumn("Bonus", df.Age * 10)
df_with_bonus.show()

df_renamed = df.withColumnRenamed("Occupation", "Job")
df_renamed.show()

df_renamed.drop("Job").show()

+---------+---+--------------+-----+
|     Name|Age|    Occupation|Bonus|
+---------+---+--------------+-----+
|    Alice| 34|      Engineer|  340|
|      Bob| 45|Data Scientist|  450|
|Catherine| 29|     Developer|  290|
|    David| 52|       Manager|  520|
+---------+---+--------------+-----+

+---------+---+--------------+
|     Name|Age|           Job|
+---------+---+--------------+
|    Alice| 34|      Engineer|
|      Bob| 45|Data Scientist|
|Catherine| 29|     Developer|
|    David| 52|       Manager|
+---------+---+--------------+

+---------+---+
|     Name|Age|
+---------+---+
|    Alice| 34|
|      Bob| 45|
|Catherine| 29|
|    David| 52|
+---------+---+



## 4. Class Activity

### 📊 Practice Dataset

In [17]:
data = [
    ("Alice", 34, "Engineer", 70000),
    ("Bob", 45, "Data Scientist", 120000),
    ("Catherine", 29, "Developer", 90000),
    ("David", 52, "Manager", 150000),
    ("Eva", 41, "Engineer", 80000),
    ("Frank", 36, "Developer", 95000),
    ("Grace", 28, "Intern", 40000)
]
columns = ["Name", "Age", "Occupation", "Salary"]

df = spark.createDataFrame(data, columns)
df.show()



+---------+---+--------------+------+
|     Name|Age|    Occupation|Salary|
+---------+---+--------------+------+
|    Alice| 34|      Engineer| 70000|
|      Bob| 45|Data Scientist|120000|
|Catherine| 29|     Developer| 90000|
|    David| 52|       Manager|150000|
|      Eva| 41|      Engineer| 80000|
|    Frank| 36|     Developer| 95000|
|    Grace| 28|        Intern| 40000|
+---------+---+--------------+------+



### 📝 Activity Tasks

In [18]:
# - Show the first 5 rows 
# - Print the schema and column names
# - Describe the dataset (count, mean, min, max)
df.show(5)
df.printSchema()
df.columns
df.describe().show()
df.describe()

+---------+---+--------------+------+
|     Name|Age|    Occupation|Salary|
+---------+---+--------------+------+
|    Alice| 34|      Engineer| 70000|
|      Bob| 45|Data Scientist|120000|
|Catherine| 29|     Developer| 90000|
|    David| 52|       Manager|150000|
|      Eva| 41|      Engineer| 80000|
+---------+---+--------------+------+
only showing top 5 rows
root
 |-- Name: string (nullable = true)
 |-- Age: long (nullable = true)
 |-- Occupation: string (nullable = true)
 |-- Salary: long (nullable = true)

+-------+-----+------------------+--------------+-----------------+
|summary| Name|               Age|    Occupation|           Salary|
+-------+-----+------------------+--------------+-----------------+
|  count|    7|                 7|             7|                7|
|   mean| NULL|37.857142857142854|          NULL|92142.85714285714|
| stddev| NULL| 8.706866474772873|          NULL|35338.49917313302|
|    min|Alice|                28|Data Scientist|            40000|
|    

+---------+---+--------------+------+
|     Name|Age|    Occupation|Salary|
+---------+---+--------------+------+
|    Alice| 34|      Engineer| 70000|
|      Bob| 45|Data Scientist|120000|
|Catherine| 29|     Developer| 90000|
|    David| 52|       Manager|150000|
|      Eva| 41|      Engineer| 80000|
+---------+---+--------------+------+
only showing top 5 rows
root
 |-- Name: string (nullable = true)
 |-- Age: long (nullable = true)
 |-- Occupation: string (nullable = true)
 |-- Salary: long (nullable = true)

+-------+-----+------------------+--------------+-----------------+
|summary| Name|               Age|    Occupation|           Salary|
+-------+-----+------------------+--------------+-----------------+
|  count|    7|                 7|             7|                7|
|   mean| NULL|37.857142857142854|          NULL|92142.85714285714|
| stddev| NULL| 8.706866474772873|          NULL|35338.49917313302|
|    min|Alice|                28|Data Scientist|            40000|
|    

summary,Name,Age,Occupation,Salary
count,7,7.0,7,7.0
mean,,37.85714285714285,,92142.85714285714
stddev,,8.706866474772873,,35338.49917313302
min,Alice,28.0,Data Scientist,40000.0
max,Grace,52.0,Manager,150000.0


In [None]:
# Filtering
# - Show only employees older than 35
# - Show names and salaries of Engineers only
# - Filter employees with salary > 90000 and age < 50
df.filter(df.Occupation == "Engineer"  ).select("Name",  "Salary").show()
df.filter((df.Salary > 90000) & (df.Age < 50)).show()

+-----+------+
| Name|Salary|
+-----+------+
|Alice| 70000|
|  Eva| 80000|
+-----+------+

+-----+---+--------------+------+
| Name|Age|    Occupation|Salary|
+-----+---+--------------+------+
|  Bob| 45|Data Scientist|120000|
|Frank| 36|     Developer| 95000|
+-----+---+--------------+------+



In [19]:
# Transformations
# - Add a new column Bonus equal to 10% of Salary
# - Rename the column Occupation to Job
# - Drop the Age column from a copy of the DataFrame
df_with_bonus = df.withColumn("Bonus", df.Salary * 0.1)
df_with_bonus.show()
df_renamed = df.withColumnRenamed("Occupation", "Job")
df_renamed.show()
df.show()
dfnew = df.drop("Age")
dfnew.show()


+---------+---+--------------+------+-------+
|     Name|Age|    Occupation|Salary|  Bonus|
+---------+---+--------------+------+-------+
|    Alice| 34|      Engineer| 70000| 7000.0|
|      Bob| 45|Data Scientist|120000|12000.0|
|Catherine| 29|     Developer| 90000| 9000.0|
|    David| 52|       Manager|150000|15000.0|
|      Eva| 41|      Engineer| 80000| 8000.0|
|    Frank| 36|     Developer| 95000| 9500.0|
|    Grace| 28|        Intern| 40000| 4000.0|
+---------+---+--------------+------+-------+

+---------+---+--------------+------+
|     Name|Age|           Job|Salary|
+---------+---+--------------+------+
|    Alice| 34|      Engineer| 70000|
|      Bob| 45|Data Scientist|120000|
|Catherine| 29|     Developer| 90000|
|    David| 52|       Manager|150000|
|      Eva| 41|      Engineer| 80000|
|    Frank| 36|     Developer| 95000|
|    Grace| 28|        Intern| 40000|
+---------+---+--------------+------+

+---------+---+--------------+------+
|     Name|Age|    Occupation|Salary

In [20]:
# Aggregations
# - Compute average salary per job (groupBy + avg)
# - Count the number of employees in each job

df2 = df.groupBy("Occupation").agg(avg("Salary").alias("Avg_Salary"))
df2.show()
df2 = df.groupBy("Occupation").agg(count("*").alias("Employee_Count"))
df2.show()

+--------------+----------+
|    Occupation|Avg_Salary|
+--------------+----------+
|        Intern|   40000.0|
|     Developer|   92500.0|
|Data Scientist|  120000.0|
|      Engineer|   75000.0|
|       Manager|  150000.0|
+--------------+----------+

+--------------+--------------+
|    Occupation|Employee_Count|
+--------------+--------------+
|        Intern|             1|
|     Developer|             2|
|Data Scientist|             1|
|      Engineer|             2|
|       Manager|             1|
+--------------+--------------+



## 📚 PySpark Operations Summary

### 🔍 Basic Display Operations

#### Show Data
```python
df.show()           # Display all rows (default 20)
df.show(n)          # Display first n rows
```

#### Inspect Structure
```python
df.printSchema()    # Print schema (column names and types)
df.columns          # Show column names as a list
df.describe().show() # Display statistical summary (count, mean, min, max)
```

**Note:** `df.describe()` returns a DataFrame object (not human-readable), use `.show()` to display it properly.

---

### 🔎 Filtering and Selecting

#### Filter Rows + Select Columns
```python
df.filter(condition).select(col1, col2).show()
```

**Filter:** Apply conditions to keep certain rows
- Supports logical operators: `&` (AND), `|` (OR), `~` (NOT)
- Example: `df.filter((df.Age > 30) & (df.Salary > 80000))`

**Select:** Choose which columns to display
- Example: `df.select("Name", "Salary")`

---

### ✏️ Column Transformations

#### Add or Modify Column
```python
df.withColumn("new_col", expression)
```
- If column exists → modifies it
- If column doesn't exist → creates it
- Example: `df.withColumn("Bonus", df.Salary * 0.1)`

#### Rename Column
```python
df.withColumnRenamed("old_name", "new_name")
```
- Example: `df.withColumnRenamed("Occupation", "Job")`

#### Drop Column
```python
df.drop("column_name")
```
- Example: `df.drop("Age")`

**Important:** These operations return a new DataFrame and don't modify the original!

---

### 📊 Aggregation Operations

#### GroupBy + Aggregate
```python
df.groupBy("column").agg(aggregation_function)
```

**Common Aggregation Functions:**

| Function | Purpose | Example |
|----------|---------|---------|
| `avg("col")` | Average | `avg("Salary")` → Average salary |
| `sum("col")` | Sum | `sum("Salary")` → Total salary |
| `count("col")` | Count non-null values | `count("Salary")` → Number of non-null salaries |
| `count("*")` | Count all rows | `count("*")` → Total number of rows |
| `max("col")` | Maximum | `max("Salary")` → Highest salary |
| `min("col")` | Minimum | `min("Salary")` → Lowest salary |

#### Alias - Rename Result Column
```python
avg("Salary").alias("Avg_Salary")
```
- Makes result column names more readable
- Example: Instead of `avg(Salary)`, displays as `Avg_Salary`

**Complete Example:**
```python
df.groupBy("Occupation").agg(
    avg("Salary").alias("Avg_Salary"),
    count("*").alias("Employee_Count")
).show()
```

---

### 🎯 Key Concepts

1. **Immutability:** All DataFrame operations return a new DataFrame
2. **Method Chaining:** Can chain operations like `.filter().select().show()`
3. **Lazy Evaluation:** Transformations aren't executed until an action (like `.show()`) is called
4. **`.show()` Returns None:** Don't assign it to a variable!

## 🎯 Aggregation Practice Exercises

Use the employee dataset above to complete these tasks:

### Exercise 1: Basic Aggregations
1. Find the **maximum salary** across all employees
2. Find the **minimum age** across all employees
3. Calculate the **total salary** (sum) for all employees

### Exercise 2: GroupBy Aggregations
1. Find the **maximum salary** for each Occupation
2. Find the **minimum age** for each Occupation
3. Count how many employees are in each Occupation

### Exercise 3: Multiple Aggregations
1. For each Occupation, show:
   - Average salary
   - Maximum salary
   - Minimum salary
   - Employee count
   
   (All in one query using multiple `.agg()` functions)

### Exercise 4: Advanced Filtering + Aggregation
1. Find the average salary for employees **older than 30**
2. Count how many Engineers have salary **greater than 75000**
3. For employees **under 40**, calculate average salary by Occupation

---

**💡 Hints:**
- Use `df.agg()` for aggregations on the entire dataset
- Use `df.groupBy("col").agg()` for aggregations per group
- Remember to use `.alias()` to rename result columns
- You can filter first with `.filter()`, then aggregate
- For multiple aggregations, pass them separated by commas in `.agg()`

In [21]:
# Exercise 1: Basic Aggregations
# 1. Find the maximum salary across all employees
df.agg(max("Salary")).show()
# 2. Find the minimum age across all employees
df.agg(min("Age")).show()
# 3. Calculate the total salary (sum) for all employees
df.agg(sum("Salary")).show()


+-----------+
|max(Salary)|
+-----------+
|     150000|
+-----------+

+--------+
|min(Age)|
+--------+
|      28|
+--------+

+-----------+
|sum(Salary)|
+-----------+
|     645000|
+-----------+



In [22]:
# Exercise 2: GroupBy Aggregations
# 1. Find the maximum salary for each Occupation
df.groupBy("Occupation").agg(max("Salary")).show()

# 2. Find the minimum age for each Occupation
df.groupBy("Occupation").agg(min("Age")).show()

# 3. Count how many employees are in each Occupation
df.groupBy("Occupation").agg(count("*")).show()

+--------------+-----------+
|    Occupation|max(Salary)|
+--------------+-----------+
|        Intern|      40000|
|     Developer|      95000|
|Data Scientist|     120000|
|      Engineer|      80000|
|       Manager|     150000|
+--------------+-----------+

+--------------+--------+
|    Occupation|min(Age)|
+--------------+--------+
|        Intern|      28|
|     Developer|      29|
|Data Scientist|      45|
|      Engineer|      34|
|       Manager|      52|
+--------------+--------+

+--------------+--------+
|    Occupation|count(1)|
+--------------+--------+
|        Intern|       1|
|     Developer|       2|
|Data Scientist|       1|
|      Engineer|       2|
|       Manager|       1|
+--------------+--------+



In [23]:
# Exercise 3: Multiple Aggregations
# For each Occupation, show: Average salary, Maximum salary, Minimum salary, Employee count
# Hint: df.groupBy("col").agg(func1.alias("name1"), func2.alias("name2"), ...)

# Complete solution with all 4 aggregations
df.groupBy("Occupation").agg(
    avg("Salary").alias("Avg_Salary"),
    max("Salary").alias("Max_Salary"),
    min("Salary").alias("Min_Salary"),
    count("*").alias("Employee_Count")
).show()

+--------------+----------+----------+----------+--------------+
|    Occupation|Avg_Salary|Max_Salary|Min_Salary|Employee_Count|
+--------------+----------+----------+----------+--------------+
|        Intern|   40000.0|     40000|     40000|             1|
|     Developer|   92500.0|     95000|     90000|             2|
|Data Scientist|  120000.0|    120000|    120000|             1|
|      Engineer|   75000.0|     80000|     70000|             2|
|       Manager|  150000.0|    150000|    150000|             1|
+--------------+----------+----------+----------+--------------+



In [24]:
# Exercise 4: Advanced Filtering + Aggregation

# 1. Find the average salary for employees older than 30
df.filter(df.Age > 30).agg(
    avg("Salary").alias("Avg_Salary_Age_Over_30")
).show()

# 2. Count how many Engineers have salary greater than 75000
df.filter((df.Occupation == "Engineer") & (df.Salary > 75000)).agg(
    count("*").alias("Engineer_Count_Salary_Over_75k")
).show()

# 3. For employees under 40, calculate average salary by Occupation
df.filter(df.Age < 40).groupBy("Occupation").agg(
    avg("Salary").alias("Avg_Salary")
).show()

+----------------------+
|Avg_Salary_Age_Over_30|
+----------------------+
|              103000.0|
+----------------------+

+------------------------------+
|Engineer_Count_Salary_Over_75k|
+------------------------------+
|                             1|
+------------------------------+

+----------+----------+
|Occupation|Avg_Salary|
+----------+----------+
|    Intern|   40000.0|
| Developer|   92500.0|
|  Engineer|   70000.0|
+----------+----------+



## 📚 PySpark_Fundamentals_Part2



In [None]:
# 1. Setup & SparkSession
spark3 = spark  # Reuse the existing SparkSession to avoid conflicts

data3 = [
    ("Alice", 34, "M"),
    ("Bob", 45, "M"),
    ("Catherine", 29, "F"),
    ("David", 52, "M"),
    ("Eva", None, "F"),
    ("Frank", 36, None),
    ("Grace", 28, "F"),
    ("Henry", None, None),
    ("Ivy", 33, "F"),
    ("Jack", None, "M"),
    ("Kathy", 27, "F"),
    ("Leo", 44, "M"),
    ("Mona", 31, None),
    ("Nina", 38, "F"),
    ("Nate", None, "M"),
    ("Olivia", None, None)
]
columns3 = ["Name", "Age", "Gender"]

# 2. Create DataFrame
df3 = spark3.createDataFrame(data3, columns3)
df3.show()

# 3. Select Columns
df3.select("Name", "Age").show()

+---------+----+------+
|     Name| Age|Gender|
+---------+----+------+
|    Alice|  34|     M|
|      Bob|  45|     M|
|Catherine|  29|     F|
|    David|  52|     M|
|      Eva|NULL|     F|
|    Frank|  36|  NULL|
|    Grace|  28|     F|
|    Henry|NULL|  NULL|
|      Ivy|  33|     F|
|     Jack|NULL|     M|
|    Kathy|  27|     F|
|      Leo|  44|     M|
|     Mona|  31|  NULL|
|     Nina|  38|     F|
|     Nate|NULL|     M|
|   Olivia|NULL|  NULL|
+---------+----+------+

+---------+----+
|     Name| Age|
+---------+----+
|    Alice|  34|
|      Bob|  45|
|Catherine|  29|
|    David|  52|
|      Eva|NULL|
|    Frank|  36|
|    Grace|  28|
|    Henry|NULL|
|      Ivy|  33|
|     Jack|NULL|
|    Kathy|  27|
|      Leo|  44|
|     Mona|  31|
|     Nina|  38|
|     Nate|NULL|
|   Olivia|NULL|
+---------+----+



In [26]:
# 4. Filter Rows
t1 = df3.filter((df3.Age > 25) & (df3.Gender.isNotNull()))
t1.show()

t2 = df3.filter((df3.Age > 25) & (df3.Gender == "F" ))
t2.show()

t3 = df3.filter((df3.Age!= 28) | (df3.Gender == "M"))
t3.show()

+---------+---+------+
|     Name|Age|Gender|
+---------+---+------+
|    Alice| 34|     M|
|      Bob| 45|     M|
|Catherine| 29|     F|
|    David| 52|     M|
|    Grace| 28|     F|
|      Ivy| 33|     F|
|    Kathy| 27|     F|
|      Leo| 44|     M|
|     Nina| 38|     F|
+---------+---+------+

+---------+---+------+
|     Name|Age|Gender|
+---------+---+------+
|Catherine| 29|     F|
|    Grace| 28|     F|
|      Ivy| 33|     F|
|    Kathy| 27|     F|
|     Nina| 38|     F|
+---------+---+------+

+---------+----+------+
|     Name| Age|Gender|
+---------+----+------+
|    Alice|  34|     M|
|      Bob|  45|     M|
|Catherine|  29|     F|
|    David|  52|     M|
|    Frank|  36|  NULL|
|      Ivy|  33|     F|
|     Jack|NULL|     M|
|    Kathy|  27|     F|
|      Leo|  44|     M|
|     Mona|  31|  NULL|
|     Nina|  38|     F|
|     Nate|NULL|     M|
+---------+----+------+



In [27]:
#4. Add, Rename, Drop Columns
df4 = df3.withColumn("Age_plus1", df3.Age + 1)
df4.show()

df5 = df4.withColumnRenamed("Age_plus1", "Age+1")
df5.show()

df6 = df5.drop("Age+1")
df6.show()


+---------+----+------+---------+
|     Name| Age|Gender|Age_plus1|
+---------+----+------+---------+
|    Alice|  34|     M|       35|
|      Bob|  45|     M|       46|
|Catherine|  29|     F|       30|
|    David|  52|     M|       53|
|      Eva|NULL|     F|     NULL|
|    Frank|  36|  NULL|       37|
|    Grace|  28|     F|       29|
|    Henry|NULL|  NULL|     NULL|
|      Ivy|  33|     F|       34|
|     Jack|NULL|     M|     NULL|
|    Kathy|  27|     F|       28|
|      Leo|  44|     M|       45|
|     Mona|  31|  NULL|       32|
|     Nina|  38|     F|       39|
|     Nate|NULL|     M|     NULL|
|   Olivia|NULL|  NULL|     NULL|
+---------+----+------+---------+

+---------+----+------+-----+
|     Name| Age|Gender|Age+1|
+---------+----+------+-----+
|    Alice|  34|     M|   35|
|      Bob|  45|     M|   46|
|Catherine|  29|     F|   30|
|    David|  52|     M|   53|
|      Eva|NULL|     F| NULL|
|    Frank|  36|  NULL|   37|
|    Grace|  28|     F|   29|
|    Henry|NULL|  N

In [28]:
# 6. Handle Missing Values
# 6.1 Drop Rows with Missing Values
df_drop = df3.dropna()
df_drop.show()
# 6.2 fill all missing values with constants
df_fconstants = df3.fillna({"Age":28, "Gender":"M", "Name": "BigDaddy"})
df_fconstants.show()
# 6.3 Fill missing numeric values with mean
mean_value = df3.select(mean("Age")).first()[0]
print(mean_value) 
df_fmean = df3.fillna({"Age": mean_value})
df_fmean.show()
# 6.4 Fill missing categorical values with a suitable value
# Here we can use 'Unknown' for missing names and gender
df_fcat = df3.fillna({"Name": "Unknown", "Gender": "Unknown"})
df_fcat.show()


+---------+---+------+
|     Name|Age|Gender|
+---------+---+------+
|    Alice| 34|     M|
|      Bob| 45|     M|
|Catherine| 29|     F|
|    David| 52|     M|
|    Grace| 28|     F|
|      Ivy| 33|     F|
|    Kathy| 27|     F|
|      Leo| 44|     M|
|     Nina| 38|     F|
+---------+---+------+

+---------+---+------+
|     Name|Age|Gender|
+---------+---+------+
|    Alice| 34|     M|
|      Bob| 45|     M|
|Catherine| 29|     F|
|    David| 52|     M|
|      Eva| 28|     F|
|    Frank| 36|     M|
|    Grace| 28|     F|
|    Henry| 28|     M|
|      Ivy| 33|     F|
|     Jack| 28|     M|
|    Kathy| 27|     F|
|      Leo| 44|     M|
|     Mona| 31|     M|
|     Nina| 38|     F|
|     Nate| 28|     M|
|   Olivia| 28|     M|
+---------+---+------+

36.09090909090909
+---------+---+------+
|     Name|Age|Gender|
+---------+---+------+
|    Alice| 34|     M|
|      Bob| 45|     M|
|Catherine| 29|     F|
|    David| 52|     M|
|      Eva| 36|     F|
|    Frank| 36|  NULL|
|    Grace| 28

In [30]:
# 7. Sort

df3.show()
df_sort = df3.orderBy("Name" , ascending = True )
df_sort.show()

+---------+----+------+
|     Name| Age|Gender|
+---------+----+------+
|    Alice|  34|     M|
|      Bob|  45|     M|
|Catherine|  29|     F|
|    David|  52|     M|
|      Eva|NULL|     F|
|    Frank|  36|  NULL|
|    Grace|  28|     F|
|    Henry|NULL|  NULL|
|      Ivy|  33|     F|
|     Jack|NULL|     M|
|    Kathy|  27|     F|
|      Leo|  44|     M|
|     Mona|  31|  NULL|
|     Nina|  38|     F|
|     Nate|NULL|     M|
|   Olivia|NULL|  NULL|
+---------+----+------+

+---------+----+------+
|     Name| Age|Gender|
+---------+----+------+
|    Alice|  34|     M|
|      Bob|  45|     M|
|Catherine|  29|     F|
|    David|  52|     M|
|      Eva|NULL|     F|
|    Frank|  36|  NULL|
|    Grace|  28|     F|
|    Henry|NULL|  NULL|
|      Ivy|  33|     F|
|     Jack|NULL|     M|
|    Kathy|  27|     F|
|      Leo|  44|     M|
|     Mona|  31|  NULL|
|     Nate|NULL|     M|
|     Nina|  38|     F|
|   Olivia|NULL|  NULL|
+---------+----+------+



In [32]:
# 8. GroupBy & Aggregate
data4 = [
("Math", "Ali", 80), ("Math", "Sara", 90), ("English", "Ali", 85),
("English", "Sara", 95), ("Math", "Mike", 70), ("English", "Emma", 88)
]
columns4 = ["Subject", "Name", "Score"]
df4 = spark.createDataFrame(data4, columns4)
df4.show()

agg_dict = {
    'Subject': 'count', 
    'Score': 'avg'
}
df4_agg = df4.groupBy("Name").agg(agg_dict)
df4_agg.show()

+-------+----+-----+
|Subject|Name|Score|
+-------+----+-----+
|   Math| Ali|   80|
|   Math|Sara|   90|
|English| Ali|   85|
|English|Sara|   95|
|   Math|Mike|   70|
|English|Emma|   88|
+-------+----+-----+

+----+----------+--------------+
|Name|avg(Score)|count(Subject)|
+----+----------+--------------+
|Mike|      70.0|             1|
|Emma|      88.0|             1|
|Sara|      92.5|             2|
| Ali|      82.5|             2|
+----+----------+--------------+



In [41]:
# 9. Joins
df5 = spark.createDataFrame([("Ali", 25), ("Sara", 30), ("Mike", 28)], ["Name", "Age"])
df5.show()
df6 = spark.createDataFrame([("Ali", "M"), ("John", "M"), ("Mike", "M")], ["Name", "Gender"])
df6.show()
print("Inner Join:")
df_inner = df5.join(df6, on="Name", how="inner")
df_inner.show()
print("Left Join:")
df_left = df5.join(df6, on="Name", how="left")
df_left.show()
print("Right Join:")
df_right = df5.join(df6, on="Name", how="right")
df_right.show()
print("Full Outer Join:")
df_full = df5.join(df6, on="Name", how="outer")
df_full.show()

+----+---+
|Name|Age|
+----+---+
| Ali| 25|
|Sara| 30|
|Mike| 28|
+----+---+

+----+------+
|Name|Gender|
+----+------+
| Ali|     M|
|John|     M|
|Mike|     M|
+----+------+

Inner Join:
+----+---+------+
|Name|Age|Gender|
+----+---+------+
| Ali| 25|     M|
|Mike| 28|     M|
+----+---+------+

Left Join:
+----+---+------+
|Name|Age|Gender|
+----+---+------+
|Mike| 28|     M|
|Sara| 30|  NULL|
| Ali| 25|     M|
+----+---+------+

Right Join:
+----+----+------+
|Name| Age|Gender|
+----+----+------+
|John|NULL|     M|
|Mike|  28|     M|
| Ali|  25|     M|
+----+----+------+

Full Outer Join:
+----+----+------+
|Name| Age|Gender|
+----+----+------+
| Ali|  25|     M|
|John|NULL|     M|
|Mike|  28|     M|
|Sara|  30|  NULL|
+----+----+------+



In [43]:

d = [["1", "sravan", "company 1"],
        ["2", "ojaswi", "company 1"],
        ["3", "rohith", "company 2"],
        ["4", "sridevi", "company 1"],
        ["5", "bobby", "company 1"]]

cols = ['ID', 'Name', 'Company']
employee_df = spark.createDataFrame(d, cols)
employee_df.show()

d1 = [["1", "45000", "IT"],
         ["2", "145000", "Manager"],
         ["6", "45000", "HR"],
         ["5", "34000", "Sales"]]

cols = ['ID', 'salary', 'department']
salary_df = spark.createDataFrame(d1, cols)

salary_df.show()

+---+-------+---------+
| ID|   Name|  Company|
+---+-------+---------+
|  1| sravan|company 1|
|  2| ojaswi|company 1|
|  3| rohith|company 2|
|  4|sridevi|company 1|
|  5|  bobby|company 1|
+---+-------+---------+

+---+------+----------+
| ID|salary|department|
+---+------+----------+
|  1| 45000|        IT|
|  2|145000|   Manager|
|  6| 45000|        HR|
|  5| 34000|     Sales|
+---+------+----------+



In [None]:
employee_df.join(salary_df,on='ID',how='inner').show()
employee_df.join(salary_df,on='ID',how='left').show()
employee_df.join(salary_df,on='ID',how='right').show()

+---+------+---------+------+----------+
| ID|  Name|  Company|salary|department|
+---+------+---------+------+----------+
|  1|sravan|company 1| 45000|        IT|
|  2|ojaswi|company 1|145000|   Manager|
|  5| bobby|company 1| 34000|     Sales|
+---+------+---------+------+----------+

+---+-------+---------+------+----------+
| ID|   Name|  Company|salary|department|
+---+-------+---------+------+----------+
|  3| rohith|company 2|  NULL|      NULL|
|  5|  bobby|company 1| 34000|     Sales|
|  1| sravan|company 1| 45000|        IT|
|  4|sridevi|company 1|  NULL|      NULL|
|  2| ojaswi|company 1|145000|   Manager|
+---+-------+---------+------+----------+

+---+------+---------+------+----------+
| ID|  Name|  Company|salary|department|
+---+------+---------+------+----------+
|  5| bobby|company 1| 34000|     Sales|
|  6|  NULL|     NULL| 45000|        HR|
|  1|sravan|company 1| 45000|        IT|
|  2|ojaswi|company 1|145000|   Manager|
+---+------+---------+------+----------+



25/10/27 23:56:32 ERROR Inbox: Ignoring error
org.apache.spark.SparkException: Exception thrown in awaitResult: 
	at org.apache.spark.util.SparkThreadUtils$.awaitResult(SparkThreadUtils.scala:53)
	at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:342)
	at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75)
	at org.apache.spark.rpc.RpcEnv.setupEndpointRefByURI(RpcEnv.scala:102)
	at org.apache.spark.rpc.RpcEnv.setupEndpointRef(RpcEnv.scala:110)
	at org.apache.spark.util.RpcUtils$.makeDriverRef(RpcUtils.scala:36)
	at org.apache.spark.storage.BlockManagerMasterEndpoint.driverEndpoint$lzycompute(BlockManagerMasterEndpoint.scala:132)
	at org.apache.spark.storage.BlockManagerMasterEndpoint.org$apache$spark$storage$BlockManagerMasterEndpoint$$driverEndpoint(BlockManagerMasterEndpoint.scala:131)
	at org.apache.spark.storage.BlockManagerMasterEndpoint.isExecutorAlive$lzycompute$1(BlockManagerMasterEndpoint.scala:700)
	at org.apache.spark.storage.BlockManagerMasterE