# Chapter 11: Big Data Analytics and Distributed Processing

**Learning Objectives:**
- Understand what "big data" means and when traditional tools are insufficient
- Learn core distributed computing concepts (clusters, partitions, fault tolerance)
- Get hands-on experience with Apache Spark and PySpark
- Work with Spark DataFrames and understand lazy evaluation
- Apply best practices for handling large datasets
- Gain awareness of cloud-based analytics platforms

**Prerequisites:**
- Basic Python programming (Chapter 2)
- Familiarity with Pandas DataFrames (Chapter 4)
- Understanding of data manipulation concepts

---

## Table of Contents
1. [Introduction](#Introduction)
2. [What is Big Data?](#11.1-What-is-Big-Data?)
3. [Limitations of Traditional Tools](#11.2-Limitations-of-Traditional-Tools)
4. [Distributed Computing Concepts](#11.3-Distributed-Computing-Concepts)
5. [Introduction to Spark Architecture](#11.4-Introduction-to-Spark-Architecture)
6. [Setup: Using PySpark](#11.5-Setup:-Using-PySpark-in-a-Notebook)
7. [DataFrames and RDDs](#11.6-DataFrames-and-RDDs)
8. [Parallel Computation in Spark](#11.7-Parallel-Computation-in-Spark)
9. [Handling Large Datasets](#11.8-Handling-Large-Datasets-(Practical-Guidelines))
10. [Cloud-Based Analytics Overview](#11.9-Cloud-Based-Analytics-Overview)
11. [Exercises](#Exercises-(Practice))
12. [Mini-Project](#Mini‚ÄëProject:-Clickstream-Summary-(Spark-DataFrames))
13. [Summary](#Summary-/-Key-Takeaways)

## Introduction
Big data analytics is not about using "fancy" tools‚Äîit‚Äôs about **handling data that is too large, too fast, or too complex** for a single computer to process reliably and quickly.

A common beginner misconception is: "If I just buy a bigger laptop, I can handle big data." Sometimes that works for a while, but eventually you hit limits (memory, CPU, disk, time). Distributed systems solve this by using **multiple machines working together**.

In this chapter we focus on Spark because it is widely used and approachable for Python users.

## 11.1 What is Big Data?
There isn‚Äôt a single cutoff like "over 1GB is big data". Big data is usually described using the **3 Vs** (sometimes 5 Vs):

- **Volume**: too much data to store/process comfortably on one machine
- **Velocity**: data arrives quickly (streams, logs, sensors, click events)
- **Variety**: many formats (tables, JSON, images, text, audio)

Additional Vs you may hear:
- **Veracity**: data quality / uncertainty
- **Value**: usefulness of the data

### Examples
- Web/app logs from millions of users
- Transaction records from large e-commerce sites
- IoT sensor readings every second
- Social media text + images

**Key idea:** Big data is often a *systems* problem (storage + compute + reliability), not just a programming problem.

## 11.2 Limitations of Traditional Tools
Tools like Pandas are fantastic‚Äîbut they are mostly designed for **single-machine** workflows.

### Where single-machine tools struggle
| Limitation | Description | Example |
|------------|-------------|---------|
| **Memory limits** | DataFrame must largely fit into RAM | 50GB CSV on 16GB laptop |
| **CPU limits** | One machine has limited cores | Complex transformations on millions of rows |
| **I/O limits** | Reading huge files from disk is slow | Loading terabytes of log files |
| **Long runtimes** | Jobs take hours/days; crashes lose progress | Overnight analytics job fails at 90% |

### Visual comparison: Single vs Distributed Processing

```
Single Machine (Pandas):          Distributed System (Spark):
‚îå‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îê           ‚îå‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îê ‚îå‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îê ‚îå‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îê
‚îÇ    Your Laptop      ‚îÇ           ‚îÇ Node 1  ‚îÇ ‚îÇ Node 2  ‚îÇ ‚îÇ Node 3  ‚îÇ
‚îÇ  ‚îå‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îê  ‚îÇ           ‚îÇ Part A  ‚îÇ ‚îÇ Part B  ‚îÇ ‚îÇ Part C  ‚îÇ
‚îÇ  ‚îÇ  ALL DATA     ‚îÇ  ‚îÇ    vs     ‚îÇ of data ‚îÇ ‚îÇ of data ‚îÇ ‚îÇ of data ‚îÇ
‚îÇ  ‚îÇ  (must fit!)  ‚îÇ  ‚îÇ           ‚îî‚îÄ‚îÄ‚îÄ‚îÄ‚î¨‚îÄ‚îÄ‚îÄ‚îÄ‚îò ‚îî‚îÄ‚îÄ‚îÄ‚îÄ‚î¨‚îÄ‚îÄ‚îÄ‚îÄ‚îò ‚îî‚îÄ‚îÄ‚îÄ‚îÄ‚î¨‚îÄ‚îÄ‚îÄ‚îÄ‚îò
‚îÇ  ‚îî‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îò  ‚îÇ                ‚îÇ          ‚îÇ          ‚îÇ
‚îî‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îò                ‚îî‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚î¥‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îò
                                            Combined Result
```

### Important nuance
Not every dataset needs Spark. If your data fits in memory and runs fast enough, Pandas can be simpler and more productive.

**Rule of thumb:** if your data is too big for memory or your processing takes too long, consider distributed tools.

> üí° **Tip:** Start with Pandas. Only move to Spark when you hit real limitations. Premature optimization wastes time.

## 11.3 Distributed Computing Concepts
Distributed computing means splitting work across multiple computers (or processes).

### Core terms explained

| Term | What it means | Analogy |
|------|--------------|---------|
| **Cluster** | A group of machines working together | A team of workers |
| **Node** | One machine in the cluster | One worker in the team |
| **Driver** | The program that coordinates the job | The team leader |
| **Worker/Executor** | Processes that do the actual computation | Team members doing tasks |
| **Partition** | A chunk of data processed in parallel | Dividing work into portions |

### Visual: How a Spark cluster works

```
                    ‚îå‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îê
                    ‚îÇ  Driver Program ‚îÇ  ‚Üê Your Python code runs here
                    ‚îÇ  (Coordinator)  ‚îÇ
                    ‚îî‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚î¨‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îò
                             ‚îÇ
              ‚îå‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îº‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îê
              ‚îÇ              ‚îÇ              ‚îÇ
              ‚ñº              ‚ñº              ‚ñº
        ‚îå‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îê   ‚îå‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îê   ‚îå‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îê
        ‚îÇ Executor ‚îÇ   ‚îÇ Executor ‚îÇ   ‚îÇ Executor ‚îÇ
        ‚îÇ  Node 1  ‚îÇ   ‚îÇ  Node 2  ‚îÇ   ‚îÇ  Node 3  ‚îÇ
        ‚îÇ          ‚îÇ   ‚îÇ          ‚îÇ   ‚îÇ          ‚îÇ
        ‚îÇ [Part 1] ‚îÇ   ‚îÇ [Part 2] ‚îÇ   ‚îÇ [Part 3] ‚îÇ  ‚Üê Data partitions
        ‚îÇ [Part 4] ‚îÇ   ‚îÇ [Part 5] ‚îÇ   ‚îÇ [Part 6] ‚îÇ
        ‚îî‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îò   ‚îî‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îò   ‚îî‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îò
```

### Why partitioning matters
If you split your data into, say, 8 partitions, Spark can work on multiple partitions at the same time (depending on available cores).

**Example:** 1 billion rows split into 100 partitions = each partition has ~10 million rows, processed in parallel.

### Fault tolerance (beginner-friendly view)
Distributed systems expect failures. What happens if a node crashes?

**Spark's solution: Lineage**
- Spark tracks how each partition was created (the "recipe")
- If a partition is lost, Spark can recompute it from the original data
- No need to restart the entire job!

```
Original Data ‚Üí filter() ‚Üí groupBy() ‚Üí Result
                   ‚Üë
            If Node 2 fails here, Spark can
            recompute just the lost partitions
```

> ‚ö†Ô∏è **Warning:** Fault tolerance adds overhead. For small, quick jobs on one machine, Pandas is often faster.

## 11.4 Introduction to Spark Architecture
Spark is a distributed compute engine designed for large-scale data processing.

### Spark components (high level)

| Component | Role | Description |
|-----------|------|-------------|
| **SparkSession** | Entry point | The main interface to Spark functionality |
| **Driver program** | Coordinator | Runs your Python code, creates execution plan |
| **Executors** | Workers | Run tasks on data partitions |
| **Cluster manager** | Resource allocator | Manages cluster resources (local, YARN, K8s) |

### Lazy evaluation (very important!)
In Spark, many operations are **lazy**: Spark waits to execute until it must produce a result.

```
‚îå‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îê
‚îÇ                    TRANSFORMATIONS (Lazy)                    ‚îÇ
‚îÇ  select() ‚Üí filter() ‚Üí withColumn() ‚Üí groupBy() ‚Üí ...      ‚îÇ
‚îÇ                                                              ‚îÇ
‚îÇ  ‚ö° Nothing actually happens yet! Spark builds a plan.       ‚îÇ
‚îî‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îò
                              ‚îÇ
                              ‚ñº
‚îå‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îê
‚îÇ                      ACTIONS (Trigger!)                      ‚îÇ
‚îÇ  count(), show(), collect(), write(), take()                ‚îÇ
‚îÇ                                                              ‚îÇ
‚îÇ  üöÄ NOW Spark executes the optimized plan!                  ‚îÇ
‚îî‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îò
```

### Why lazy evaluation?
1. **Optimization**: Spark can reorder/merge operations for efficiency
2. **Efficiency**: Skip unnecessary work (e.g., if you only need 10 rows)
3. **Planning**: Spark sees the whole pipeline before executing

### Common transformations vs actions

| Transformations (lazy) | Actions (trigger execution) |
|----------------------|---------------------------|
| `select()` | `count()` |
| `filter()` / `where()` | `show()` |
| `withColumn()` | `collect()` |
| `groupBy()` | `take(n)` |
| `join()` | `write()` |
| `orderBy()` | `first()` |

> üí° **Tip:** If your Spark code seems to "do nothing," you probably haven't called an action yet!

## 11.5 Setup: Using PySpark in a Notebook
You can run Spark in two main ways:
- **Local mode** (recommended for learning): Spark runs on your machine using multiple threads
- **Cluster mode**: Spark runs on many machines in a cluster

### Installation options

| Method | Command | Best for |
|--------|---------|----------|
| pip | `pip install pyspark` | Most users, learning |
| conda | `conda install pyspark` | Anaconda users |
| Databricks | Pre-installed | Production, collaboration |

### Installing PySpark
In a terminal (not in a notebook cell), run:
```bash
pip install pyspark
```

If `pyspark` is not installed, the code cells below will show a helpful message instead of crashing.

> ‚ö†Ô∏è **Note:** PySpark requires Java (JDK 8 or 11). If you get Java errors, install Java first from https://adoptium.net/

In [None]:
# PySpark setup (works in local mode).
# If pyspark isn't installed, this cell will explain what to do.

try:
    from pyspark.sql import SparkSession
    from pyspark.sql import functions as F
    from pyspark.sql import types as T
    PysparkAvailable = True
except ImportError as e:
    PysparkAvailable = False
    print("PySpark is not installed in this environment.")
    print("Install it with: pip install pyspark")
    print("Then restart the notebook kernel and run this cell again.")

PysparkAvailable

In [None]:
# Create a Spark session (the entry point for Spark).
# local[*] means: use all available CPU cores on your machine.

if PysparkAvailable:
    spark = (
        SparkSession.builder
        .appName('Chapter11-BigData-Intro')
        .master('local[*]')
        .getOrCreate()
    )
    spark
else:
    spark = None
    None

### Quick check: Spark version
When learning, it helps to confirm Spark is running.

In [None]:
if spark is not None:
    print('Spark version:', spark.version)
else:
    print('Spark not available (install pyspark).')

## 11.6 DataFrames and RDDs
Spark supports multiple APIs. The two most important are:

### Comparison: DataFrames vs RDDs

| Feature | DataFrames | RDDs |
|---------|-----------|------|
| **Abstraction** | Tabular (like Pandas) | Distributed collection |
| **Schema** | Has schema (column names, types) | No schema |
| **Optimization** | Catalyst optimizer (fast!) | No automatic optimization |
| **Ease of use** | SQL-like, beginner-friendly | More verbose, lower-level |
| **Best for** | Analytics, SQL queries | Custom transformations |

### Which should you use?
For most analytics tasks, start with **DataFrames**. They are easier and usually faster because Spark can optimize queries.

```
Your Analytics Code
        ‚îÇ
        ‚ñº
‚îå‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îê
‚îÇ  Spark DataFrame  ‚îÇ  ‚Üê Recommended for beginners!
‚îÇ  (High-level API) ‚îÇ
‚îî‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚î¨‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îò
          ‚îÇ (Spark optimizes automatically)
          ‚ñº
‚îå‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îê
‚îÇ       RDD         ‚îÇ  ‚Üê Lower level, more control
‚îÇ  (Low-level API)  ‚îÇ
‚îî‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îò
```

> üí° **Tip:** Think of DataFrames as "Pandas for big data" ‚Äî similar feel, but distributed.

In [None]:
# We'll build a small example dataset.
# Even though it's small, the same code patterns scale to bigger data.

if spark is not None:
    data = [
        (1, 'A', 10.5),
        (2, 'A', 20.0),
        (3, 'B', 7.25),
        (4, 'B', 12.0),
        (5, 'C', 5.0),
        (6, 'C', None),
    ]

    schema = T.StructType([
        T.StructField('id', T.IntegerType(), nullable=False),
        T.StructField('group', T.StringType(), nullable=False),
        T.StructField('value', T.DoubleType(), nullable=True),
    ])

    df = spark.createDataFrame(data, schema=schema)
    df
else:
    df = None
    None

### Inspecting data (schema + rows)
In Spark, you often start by checking the schema and sampling rows.

**Tip:** avoid `collect()` on big datasets; use `show()` or `limit()`.

In [None]:
if df is not None:
    df.printSchema()
    df.show(truncate=False)
else:
    print('DataFrame not available (Spark not running).')

### Transformations vs actions (hands-on)
Below, we create a filtered DataFrame. This does **not** immediately compute results (lazy evaluation).

Then we call an action (`count`) to actually run the job.

In [None]:
if df is not None:
    filtered = df.filter(F.col('value').isNotNull())
    # At this point Spark has a plan, but hasn't executed yet.
    print('Rows with non-null value:', filtered.count())
    filtered.show()
else:
    None

### Aggregation with groupBy
A common analytics task is grouping and computing summary statistics.

In [None]:
if df is not None:
    summary = (
        df.groupBy('group')
          .agg(
              F.count('*').alias('rows'),
              F.count('value').alias('non_null_values'),
              F.avg('value').alias('avg_value'),
              F.min('value').alias('min_value'),
              F.max('value').alias('max_value'),
          )
          .orderBy('group')
    )
    summary.show()
else:
    None

### Visual example: plot aggregated results
Spark is not a plotting library. A typical workflow is:

```
‚îå‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îê    ‚îå‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îê    ‚îå‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îê
‚îÇ   Large Data    ‚îÇ    ‚îÇ  Small Summary  ‚îÇ    ‚îÇ   Visualization ‚îÇ
‚îÇ   in Spark      ‚îÇ ‚Üí  ‚îÇ  (aggregated)   ‚îÇ ‚Üí  ‚îÇ   (Matplotlib)  ‚îÇ
‚îÇ   (millions)    ‚îÇ    ‚îÇ  (few rows)     ‚îÇ    ‚îÇ                 ‚îÇ
‚îî‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îò    ‚îî‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îò    ‚îî‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îò
                              ‚îÇ
                              ‚ñº
                         .toPandas()
                       (safe for small data)
```

**Workflow:**
1. Use Spark to compute a *small* aggregated result
2. Convert that small result to Pandas with `.toPandas()`
3. Plot with Matplotlib/Seaborn

> ‚ö†Ô∏è **Warning:** Converting a large Spark DataFrame to Pandas can crash your machine. Only convert small, aggregated results!

In [None]:
if df is not None:
    import pandas as pd
    import matplotlib.pyplot as plt

    # Collect only the small summary result
    pdf = summary.toPandas()

    plt.figure(figsize=(6, 3))
    plt.bar(pdf['group'], pdf['avg_value'])
    plt.title('Average value per group')
    plt.xlabel('group')
    plt.ylabel('avg_value')
    plt.tight_layout()
    plt.show()
else:
    None

### RDDs (brief introduction)
RDDs are the older core abstraction in Spark. You may still see them in older tutorials or specialized tasks.

RDDs feel more like working with distributed lists: `map`, `filter`, `reduce`.

For analytics and SQL-like operations, prefer **DataFrames** unless you have a specific reason.

In [None]:
# Example: a tiny RDD and a classic word count pattern.
# This is for learning; DataFrames are usually preferred in real analytics work.

if spark is not None:
    sc = spark.sparkContext
    lines = sc.parallelize([
        'spark makes big data manageable',
        'big data needs distributed computing',
        'spark spark spark'
    ])

    words = lines.flatMap(lambda s: s.split())
    counts = words.map(lambda w: (w.lower(), 1)).reduceByKey(lambda a, b: a + b)
    print(counts.collect())
else:
    None

## 11.7 Parallel Computation in Spark
Spark achieves parallelism mainly through **partitions**. Each partition is processed by a task, and tasks can run at the same time.

### Why should beginners care?
Even if your code is correct, a poor partition strategy can make jobs slow.

### Useful tools
- `df.rdd.getNumPartitions()` to check partitions
- `repartition(n)` to increase (or change) partitioning (shuffle)
- `coalesce(n)` to reduce partitions (often cheaper than repartition)

In [None]:
if df is not None:
    print('Default partitions:', df.rdd.getNumPartitions())
    df2 = df.repartition(4)
    print('After repartition(4):', df2.rdd.getNumPartitions())

    df3 = df2.coalesce(2)
    print('After coalesce(2):', df3.rdd.getNumPartitions())
else:
    None

### Common mistake: too many or too few partitions
- **Too few** partitions ‚Üí not enough parallelism, slow jobs
- **Too many** partitions ‚Üí overhead (scheduling, small tasks), also slow

A practical approach is to start with defaults, measure, then tune.

## 11.8 Handling Large Datasets (Practical Guidelines)
When datasets get large, small habits matter a lot.

### ‚úÖ Do this (Best Practices)

| Practice | Why it helps | Example |
|----------|-------------|---------|
| Use **Parquet** format | Columnar, compressed, fast scans | `df.write.parquet('data.parquet')` |
| **Select** only needed columns | Less data to move | `df.select('col1', 'col2')` |
| **Filter early** | Reduce data before processing | `df.filter(F.col('year') > 2020)` |
| Use **groupBy + agg** | Summarize instead of collecting raw | `df.groupBy('cat').agg(F.count('*'))` |
| **Cache** reused DataFrames | Avoid recomputation | `df.cache()` |
| Use **built-in functions** | Optimized by Spark | `F.lower()`, `F.when()` |

### ‚ùå Avoid this (Common Mistakes)

| Mistake | Why it's bad | Alternative |
|---------|-------------|-------------|
| `collect()` on big data | Crashes driver (your notebook) | Use `show()` or `limit()` |
| `toPandas()` on huge DataFrames | Memory errors | Aggregate first |
| Python UDFs too early | Slower than built-ins | Use `F.` functions first |
| Too few partitions | Poor parallelism | `repartition()` |
| Too many partitions | Scheduling overhead | `coalesce()` |

### Why built-in functions matter
Spark can optimize built-in SQL functions (like `F.lower`, `F.when`, `F.regexp_extract`) better than custom Python functions.

```python
# ‚ùå Slow: Python UDF
from pyspark.sql.functions import udf
@udf
def my_lower(s):
    return s.lower() if s else None

# ‚úÖ Fast: Built-in function
from pyspark.sql import functions as F
df.withColumn('name_lower', F.lower(F.col('name')))
```

In [None]:
# Example: cleaning + feature creation using built-in functions
# We'll create a new column and handle missing values.

if df is not None:
    cleaned = (
        df
        .withColumn('value_filled', F.coalesce(F.col('value'), F.lit(0.0)))
        .withColumn('is_missing', F.col('value').isNull().cast('int'))
    )

    cleaned.show()
else:
    None

In [None]:
# Example: caching (only useful when reusing the same DataFrame multiple times)

if df is not None:
    cleaned_cached = cleaned.cache()
    # First action materializes cache
    print('Count:', cleaned_cached.count())
    # Second action can reuse cached data
    print('Distinct groups:', cleaned_cached.select('group').distinct().count())
else:
    None

### Using SQL with Spark DataFrames
One powerful feature of Spark is that you can use SQL queries directly on DataFrames. This is great if you already know SQL!

**Workflow:**
1. Register your DataFrame as a temporary view
2. Write SQL queries against it
3. Results are returned as DataFrames

In [None]:
# Example: Using SQL with Spark DataFrames
# This is useful if you already know SQL!

if df is not None:
    # Step 1: Register DataFrame as a temporary SQL view
    cleaned.createOrReplaceTempView('my_data')
    
    # Step 2: Write SQL query
    sql_result = spark.sql('''
        SELECT 
            group,
            COUNT(*) as total_rows,
            AVG(value_filled) as avg_value,
            SUM(is_missing) as missing_count
        FROM my_data
        GROUP BY group
        ORDER BY group
    ''')
    
    # Step 3: Result is a DataFrame - show it
    print("SQL Query Result:")
    sql_result.show()
else:
    print('Spark not available')

## 11.9 Cloud-Based Analytics Overview
Big data systems are commonly run in the cloud because it's easier to scale up/down and integrate storage + compute.

### Why cloud for big data?

| Benefit | Description |
|---------|-------------|
| **Scalability** | Add/remove nodes as needed |
| **Cost efficiency** | Pay only for what you use |
| **Managed services** | Less infrastructure to maintain |
| **Integration** | Easy connection to other cloud services |

### Common cloud architecture pattern

```
‚îå‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îê
‚îÇ                        CLOUD PLATFORM                        ‚îÇ
‚îÇ                                                              ‚îÇ
‚îÇ   ‚îå‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îê         ‚îå‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îê           ‚îÇ
‚îÇ   ‚îÇ  Object Storage ‚îÇ         ‚îÇ  Compute Cluster ‚îÇ           ‚îÇ
‚îÇ   ‚îÇ  (S3/ADLS/GCS)  ‚îÇ ‚óÑ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚ñ∫ ‚îÇ  (Spark on EMR/  ‚îÇ           ‚îÇ
‚îÇ   ‚îÇ                 ‚îÇ         ‚îÇ   Dataproc/etc.) ‚îÇ           ‚îÇ
‚îÇ   ‚îÇ  üìÅ Raw Data    ‚îÇ         ‚îÇ                  ‚îÇ           ‚îÇ
‚îÇ   ‚îÇ  üìÅ Processed   ‚îÇ         ‚îÇ  üî• Processing   ‚îÇ           ‚îÇ
‚îÇ   ‚îî‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îò         ‚îî‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îò           ‚îÇ
‚îÇ                                        ‚îÇ                     ‚îÇ
‚îÇ                                        ‚ñº                     ‚îÇ
‚îÇ                              ‚îå‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îê            ‚îÇ
‚îÇ                              ‚îÇ   Dashboard/BI   ‚îÇ            ‚îÇ
‚îÇ                              ‚îÇ   (Visualization)‚îÇ            ‚îÇ
‚îÇ                              ‚îî‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îò            ‚îÇ
‚îî‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îò
```

### Popular Spark platforms

| Platform | Provider | Key Features |
|----------|----------|-------------|
| **Databricks** | Multi-cloud | Spark-native, notebooks, MLflow |
| **Amazon EMR** | AWS | Managed Hadoop/Spark clusters |
| **Google Dataproc** | GCP | Fast cluster startup, GCS integration |
| **Azure Synapse** | Azure | Analytics + data warehouse combo |
| **Azure HDInsight** | Azure | Open-source analytics service |

### Beginner tip: Separate storage from compute
In many modern architectures:
- **Data** lives in cheap object storage (S3, ADLS, GCS)
- **Compute clusters** are created on-demand when needed
- Clusters are shut down when done ‚Üí saves money!

> üí° **Tip:** Start learning with local mode, then move to cloud when you need real scale or collaboration.

## Exercises (Practice)
Try these after the explanations. Keep datasets small while learning, but write code as if data could be large.

### Exercise 1 ‚Äî Transformations vs actions
1. Create a new DataFrame column `value_squared = value_filled ** 2`
2. Filter to keep only rows where `value_squared > 100`
3. Trigger an action to count the rows

*Goal:* practice chaining transformations and finishing with an action.

In [None]:
# Your solution (starter template)

if df is not None:
    ex1 = (
        cleaned
        .withColumn('value_squared', F.col('value_filled') * F.col('value_filled'))
        .filter(F.col('value_squared') > 100)
    )
    # Action
    print('Rows with value_squared > 100:', ex1.count())
    ex1.show()
else:
    None

### Exercise 2 ‚Äî GroupBy challenge
Compute, for each `group`, the percentage of missing values in the original `value` column.

Hints:
- Use `is_missing` from the `cleaned` DataFrame
- `percentage = 100 * missing / total`
- Use `F.sum` and `F.count`

In [None]:
if df is not None:
    ex2 = (
        cleaned
        .groupBy('group')
        .agg(
            F.count('*').alias('total_rows'),
            F.sum('is_missing').alias('missing_rows'),
        )
        .withColumn('missing_pct', F.round(F.col('missing_rows') / F.col('total_rows') * 100, 2))
        .orderBy('group')
    )
    ex2.show()
else:
    None

### Exercise 3 ‚Äî RDD word count (concept check)
1. Create an RDD from a list of sentences
2. Compute word counts
3. Show the top 5 words

*Tip:* Use `takeOrdered(5, key=...)` or convert to a DataFrame after counting.

In [None]:
if spark is not None:
    sc = spark.sparkContext
    sentences = [
        'data data data pipelines',
        'pipelines run on spark',
        'spark helps scale data analytics'
    ]
    rdd = sc.parallelize(sentences)
    counts = (
        rdd.flatMap(lambda s: s.split())
           .map(lambda w: (w.lower(), 1))
           .reduceByKey(lambda a, b: a + b)
    )
    top5 = counts.takeOrdered(5, key=lambda kv: -kv[1])
    print(top5)
else:
    None

### Exercise 4 ‚Äî Reading and Writing Parquet Files
Parquet is the recommended format for Spark. Practice reading and writing data.

**Tasks:**
1. Write the `cleaned` DataFrame to a Parquet file
2. Read it back and verify the data
3. Compare with CSV (optional)

> üí° **Note:** Parquet files are actually folders containing multiple partition files.

In [None]:
# Exercise 4: Working with Parquet files
# Parquet is columnar and compressed - ideal for big data

import os
import tempfile

if df is not None:
    # Create a temporary directory for our example
    temp_dir = tempfile.mkdtemp()
    parquet_path = os.path.join(temp_dir, 'sample_data.parquet')
    
    # Write to Parquet
    cleaned.write.mode('overwrite').parquet(parquet_path)
    print(f"‚úÖ Data written to: {parquet_path}")
    
    # Read it back
    df_from_parquet = spark.read.parquet(parquet_path)
    print("\nüìñ Data read back from Parquet:")
    df_from_parquet.show()
    
    # Show schema is preserved
    print("Schema preserved:")
    df_from_parquet.printSchema()
else:
    print('Spark not available')

## Mini‚ÄëProject: Clickstream Summary (Spark DataFrames)
In real analytics, a common big-data dataset is **clickstream / event logs**.

You will simulate a small clickstream table and answer questions that scale to large logs.

### Your tasks
1. Create a DataFrame with columns: `user_id`, `event_type`, `ts` (timestamp string)
2. Compute events per user
3. Compute event counts per type
4. Find the most active user
5. (Optional) Extract the date from `ts` and compute events per day

**Beginner warning:** do not use `collect()` on raw events; aggregate first.

In [None]:
if spark is not None:
    events = [
        (101, 'view',  '2026-01-02 10:00:00'),
        (101, 'click', '2026-01-02 10:01:00'),
        (102, 'view',  '2026-01-02 11:00:00'),
        (103, 'view',  '2026-01-03 09:00:00'),
        (101, 'view',  '2026-01-03 09:05:00'),
        (102, 'click', '2026-01-03 10:00:00'),
        (102, 'click', '2026-01-03 10:02:00'),
    ]

    events_schema = T.StructType([
        T.StructField('user_id', T.IntegerType(), nullable=False),
        T.StructField('event_type', T.StringType(), nullable=False),
        T.StructField('ts', T.StringType(), nullable=False),
    ])

    events_df = spark.createDataFrame(events, schema=events_schema)
    events_df.show(truncate=False)
else:
    events_df = None
    None

In [None]:
if events_df is not None:
    # 1) Events per user
    per_user = (
        events_df.groupBy('user_id')
                .agg(F.count('*').alias('events'))
                .orderBy(F.desc('events'))
    )
    per_user.show()

    # 2) Events per type
    per_type = (
        events_df.groupBy('event_type')
                .agg(F.count('*').alias('events'))
                .orderBy(F.desc('events'))
    )
    per_type.show()

    # 3) Most active user
    most_active = per_user.limit(1)
    most_active.show()

    # 4) Optional: events per day
    events_with_day = events_df.withColumn('day', F.to_date(F.col('ts')))
    per_day = (
        events_with_day.groupBy('day')
                      .agg(F.count('*').alias('events'))
                      .orderBy('day')
    )
    per_day.show()
else:
    None

## Tips, Warnings, and Common Mistakes

### ‚ùå Common Mistakes to Avoid

| Mistake | What happens | How to fix |
|---------|-------------|-----------|
| `df.collect()` on huge DataFrame | Driver crashes (out of memory) | Use `show()`, `limit()`, or aggregate first |
| `df.toPandas()` on big data | Memory errors | Only convert small, aggregated results |
| Writing everything as Python UDFs | Slow execution | Use built-in `F.` functions first |
| Ignoring partitions | Poor performance | Check with `df.rdd.getNumPartitions()` |
| Not filtering early | Processing unnecessary data | Add `filter()` as early as possible |

### üí° Pro Tips

1. **Use `df.explain()`** to see Spark's execution plan ‚Äî helps debug slow queries
2. **Filter early, select only needed columns** ‚Äî less data = faster processing  
3. **Keep raw data in Parquet** ‚Äî columnar format, much faster than CSV
4. **Cache strategically** ‚Äî only cache DataFrames you reuse multiple times
5. **Start small** ‚Äî develop with a sample, then run on full data

### Debugging slow Spark jobs

If something is slow, check these first:

```
1. Are you shuffling huge data?
   ‚îî‚îÄ Look for: joins, groupBy, repartition
   ‚îî‚îÄ Fix: filter before join, use broadcast for small tables

2. Are you collecting too much data to the driver?
   ‚îî‚îÄ Look for: collect(), toPandas() on big data
   ‚îî‚îÄ Fix: aggregate first, use show() or limit()

3. Are you using slow Python UDFs?
   ‚îî‚îÄ Look for: @udf decorated functions
   ‚îî‚îÄ Fix: replace with built-in F.functions
```

### Understanding Spark execution plans

Use `explain()` to see how Spark will execute your query:

In [None]:
# Optional: view the execution plan (useful when you start optimizing).

if df is not None:
    summary.explain(True)
else:
    None

## Additional Resources

### Official Documentation
- **Spark Official Docs**: https://spark.apache.org/docs/latest/
- **PySpark SQL Guide**: https://spark.apache.org/docs/latest/sql-programming-guide.html
- **PySpark API Reference**: https://spark.apache.org/docs/latest/api/python/

### Learning Platforms
- **Databricks Free Community Edition**: https://community.cloud.databricks.com/ (free Spark environment)
- **Databricks Learning Academy**: https://www.databricks.com/learn
- **Spark: The Definitive Guide** (book by Chambers & Zaharia)

### Key topics to explore next
When you read docs, focus on:
1. **DataFrames** ‚Äî the main API for analytics
2. **Transformations vs Actions** ‚Äî understand lazy evaluation
3. **Joins** ‚Äî combining data from multiple sources
4. **Partitioning** ‚Äî key to performance
5. **File formats** ‚Äî especially Parquet

### Related chapters in this book
- **Chapter 4**: Pandas fundamentals (transfer your skills to Spark)
- **Chapter 9**: SQL for Data Analysis (SQL works in Spark too!)
- **Chapter 12**: Automation and Reproducibility (scheduling Spark jobs)

## Summary / Key Takeaways

### Core Concepts

| Concept | Key Point |
|---------|-----------|
| **Big Data** | Defined by practical constraints (volume/velocity/variety), not a fixed size |
| **Distributed Computing** | Split work across multiple machines using partitions |
| **Spark Architecture** | Driver coordinates, Executors process partitions in parallel |
| **Lazy Evaluation** | Transformations build a plan; Actions trigger execution |
| **Fault Tolerance** | Spark can recompute lost partitions using lineage |

### Practical Guidelines

‚úÖ **Do:**
- Start with Pandas; move to Spark when you hit limits
- Use Spark DataFrames (not RDDs) for analytics
- Filter early, select only needed columns
- Use built-in functions (`F.lower()`, `F.when()`, etc.)
- Aggregate before converting to Pandas
- Use Parquet format for large datasets

‚ùå **Don't:**
- Use `collect()` or `toPandas()` on large data
- Write Python UDFs when built-in functions exist
- Ignore partition count (too few = slow, too many = overhead)

### When to use Spark vs Pandas

```
Use Pandas when:                    Use Spark when:
‚îú‚îÄ Data fits in memory              ‚îú‚îÄ Data too large for one machine
‚îú‚îÄ Quick, interactive analysis      ‚îú‚îÄ Processing takes too long
‚îú‚îÄ Simple transformations           ‚îú‚îÄ Need fault tolerance
‚îî‚îÄ Learning / prototyping           ‚îî‚îÄ Production pipelines at scale
```

### What's next?
- **Practice** with the exercises in this chapter
- **Try Databricks Community Edition** for a free Spark environment
- **Explore Spark SQL** ‚Äî you can use SQL directly in Spark!
- **Learn about joins and partitioning** for real-world data pipelines

---

**Congratulations!** You now understand the fundamentals of big data analytics with Spark. These concepts will serve you well as data sizes grow in your analytics career.

## Cleanup: Stop Spark Session
Always stop your Spark session when done to free up resources.

In [None]:
# Stop the Spark session when you're done
# This frees up resources (especially important in shared environments)

if spark is not None:
    spark.stop()
    print("‚úÖ Spark session stopped successfully.")
else:
    print("No Spark session to stop.")