![Lancaster University](https://www.lancaster.ac.uk/media/lancaster-university/content-assets/images/fst/logos/SCC-Logo.svg)

# SCC.454: Large Scale Platforms for AI and Data Analysis
## Lab 2: Introduction to Apache Spark

**Duration:** 2 hours

**Learning Objectives:**
- Understand what Apache Spark is and why it's used for big data processing
- Learn to set up and configure Spark in Google Colab
- Master the fundamentals of RDDs (Resilient Distributed Datasets)
- Work with Spark DataFrames and the DataFrame API
- Execute SQL queries using Spark SQL
- Apply Spark to real-world datasets

## Table of Contents

1. **Part 1: Introduction to Apache Spark** (20 minutes)
   - What is Apache Spark?
   - Spark Architecture Overview
   - Setting up Spark in Google Colab
   - Creating Your First SparkSession

2. **Part 2: RDDs - Resilient Distributed Datasets** (30 minutes)
   - Understanding RDDs
   - Creating RDDs
   - Transformations and Actions
   - Lazy Evaluation
   - Word Count Example

3. **Part 3: Spark DataFrames** (40 minutes)
   - Introduction to DataFrames
   - Loading Data from Various Sources
   - DataFrame Operations
   - Working with Real-World Data: Global Temperature Dataset
   - Data Aggregation and Grouping

4. **Part 4: Spark SQL** (20 minutes)
   - Introduction to Spark SQL
   - Creating Temporary Views
   - Writing SQL Queries
   - Combining DataFrame API and SQL

5. **Part 5: Practical Exercises** (10 minutes)
   - Exercise 1: Movie Ratings Analysis
   - Exercise 2: E-commerce Sales Analysis
   - Challenge Exercise

---
# Part 1: Introduction to Apache Spark
---

## 1.1 What is Apache Spark?

Apache Spark is a unified analytics engine for large-scale data processing. Originally developed at UC Berkeley's AMPLab in 2009, Spark was designed to overcome the limitations of the Hadoop MapReduce model while maintaining its benefits.

**Key Features of Spark:**

- **Speed**: Spark can be up to 100x faster than Hadoop MapReduce for certain applications, thanks to in-memory computing
- **Ease of Use**: Provides high-level APIs in Python, Java, Scala, and R
- **Generality**: Supports SQL queries, streaming data, machine learning, and graph processing
- **Runs Everywhere**: Can run on Hadoop, Apache Mesos, Kubernetes, standalone, or in the cloud

**When to use Spark:**
- Processing large datasets (gigabytes to petabytes)
- Iterative algorithms (machine learning)
- Interactive data analysis
- Stream processing
- ETL (Extract, Transform, Load) operations

## 1.2 Spark Architecture Overview

Spark follows a master-worker architecture:

```
┌─────────────────────────────────────────────────────┐
│                   Driver Program                     │
│  ┌─────────────────────────────────────────────┐    │
│  │              SparkContext                    │    │
│  └─────────────────────────────────────────────┘    │
└───────────────────────┬─────────────────────────────┘
                        │
        ┌───────────────┼───────────────┐
        │               │               │
        ▼               ▼               ▼
┌───────────────┐ ┌───────────────┐ ┌───────────────┐
│   Worker 1    │ │   Worker 2    │ │   Worker N    │
│  ┌─────────┐  │ │  ┌─────────┐  │ │  ┌─────────┐  │
│  │Executor │  │ │  │Executor │  │ │  │Executor │  │
│  │ Tasks   │  │ │  │ Tasks   │  │ │  │ Tasks   │  │
│  └─────────┘  │ │  └─────────┘  │ │  └─────────┘  │
└───────────────┘ └───────────────┘ └───────────────┘
```

**Components:**
- **Driver Program**: The main program that creates the SparkContext and coordinates the execution
- **SparkContext**: The entry point to Spark functionality, connects to the cluster
- **Cluster Manager**: Allocates resources across applications (YARN, Mesos, Kubernetes, or standalone)
- **Executors**: Processes that run on worker nodes, execute tasks and store data

## 1.3 Setting up Spark in Google Colab

Google Colab doesn't have Spark pre-installed, so we need to set it up manually. Run the cell below to install PySpark and configure the Java environment.

If you are running it on a different system make sure you give the correct JAVA_HOME

Do not worry about the pip error you see here

In [None]:
# Install PySpark
!pip install pyspark==3.5.0 -q

# Install Java (Spark requires Java)
!apt-get install openjdk-11-jdk-headless -qq > /dev/null

# Set Java environment variable
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"

print("PySpark and Java installed successfully!")

## 1.4 Creating Your First SparkSession

The `SparkSession` is the entry point to Spark functionality in Spark 2.0+. It provides a unified interface to work with DataFrames, SQL, and streaming.

In earlier versions of Spark (1.x), you needed separate contexts:
- `SparkContext` for RDDs
- `SQLContext` for DataFrames and SQL
- `HiveContext` for Hive integration

`SparkSession` combines all of these into a single, unified entry point.

In [None]:
from pyspark.sql import SparkSession
from pyspark import SparkConf

# Create a SparkSession
spark = SparkSession.builder \
    .appName("SCC454-SparkIntro") \
    .config("spark.driver.memory", "4g") \
    .config("spark.ui.port", "4050") \
    .getOrCreate()

# Get the underlying SparkContext
sc = spark.sparkContext

print(f"Spark Version: {spark.version}")
print(f"Application Name: {spark.sparkContext.appName}")
print(f"Master: {spark.sparkContext.master}")

You can display the SparkSession object to see its configuration:

In [None]:
spark

**Understanding Local Mode:**

When you see `Master: local[*]`, it means Spark is running in local mode using all available CPU cores. This is perfect for learning and development. In production, you would connect to a cluster manager like YARN or Kubernetes.

The `local[N]` notation means:
- `local`: Run Spark locally with one thread
- `local[2]`: Run Spark locally with 2 worker threads
- `local[*]`: Run Spark locally with as many threads as CPU cores

---
# Part 2: RDDs - Resilient Distributed Datasets
---

## 2.1 Understanding RDDs

RDD (Resilient Distributed Dataset) is the fundamental data structure of Spark. It's an immutable, distributed collection of objects that can be processed in parallel.

**Key Properties of RDDs:**

- **Resilient**: Can recover from node failures through lineage
- **Distributed**: Data is distributed across multiple nodes in the cluster
- **Dataset**: Collection of partitioned data with primitive values or custom objects

**RDD Characteristics:**
- Immutable: Once created, the data in an RDD cannot be changed
- Lazily Evaluated: Transformations are not executed until an action is called
- Cacheable: Can be cached in memory for faster access

## 2.2 Creating RDDs

There are two main ways to create RDDs:
1. **Parallelizing an existing collection** in your driver program
2. **Loading data from external storage** (files, databases, etc.)

In [None]:
# Method 1: Parallelize a Python collection
numbers = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
numbers_rdd = sc.parallelize(numbers)

print(f"Type: {type(numbers_rdd)}")
print(f"Number of partitions: {numbers_rdd.getNumPartitions()}")
print(f"First 5 elements: {numbers_rdd.take(5)}")

In [None]:
# Method 1b: Parallelize with specified number of partitions
numbers_rdd_4part = sc.parallelize(numbers, 4)
print(f"Number of partitions: {numbers_rdd_4part.getNumPartitions()}")

# View the partitions
print("\nData in each partition:")
print(numbers_rdd_4part.glom().collect())

In [None]:
# Method 2: Create RDD from a text file
# First, let's create a sample text file

sample_text = """Apache Spark is a unified analytics engine for large-scale data processing.
It provides high-level APIs in Java, Scala, Python and R.
Spark powers a stack of libraries including SQL and DataFrames.
It also includes MLlib for machine learning and GraphX for graph processing.
Spark can run on Hadoop, Apache Mesos, Kubernetes, standalone, or in the cloud."""

with open("spark_intro.txt", "w") as f:
    f.write(sample_text)

# Load the text file as an RDD
text_rdd = sc.textFile("spark_intro.txt")
print(f"Number of lines: {text_rdd.count()}")
print("\nFirst 3 lines:")
for line in text_rdd.take(3):
    print(f"  - {line}")

## 2.3 Transformations and Actions

RDD operations are divided into two categories:

**Transformations**: Create a new RDD from an existing one. They are *lazy* - they don't execute until an action is called.

| Transformation | Description |
|----------------|-------------|
| `map(func)` | Apply function to each element |
| `filter(func)` | Keep elements where function returns true |
| `flatMap(func)` | Map then flatten the results |
| `distinct()` | Remove duplicates |
| `reduceByKey(func)` | Aggregate values by key |
| `groupByKey()` | Group values by key |
| `sortBy(func)` | Sort RDD elements |

**Actions**: Return a value to the driver program. They *trigger* the execution of transformations.

| Action | Description |
|--------|-------------|
| `collect()` | Return all elements as a list |
| `count()` | Return the number of elements |
| `first()` | Return the first element |
| `take(n)` | Return first n elements |
| `reduce(func)` | Aggregate elements using a function |
| `saveAsTextFile(path)` | Write elements to a text file |

In [None]:
# Transformation Examples

numbers = sc.parallelize([1, 2, 3, 4, 5, 6, 7, 8, 9, 10])

# map: Apply a function to each element
squared = numbers.map(lambda x: x ** 2)
print(f"Original: {numbers.collect()}")
print(f"Squared:  {squared.collect()}")

# filter: Keep only elements that match a condition
evens = numbers.filter(lambda x: x % 2 == 0)
print(f"Evens:    {evens.collect()}")

# Chaining transformations
even_squares = numbers.filter(lambda x: x % 2 == 0).map(lambda x: x ** 2)
print(f"Even squares: {even_squares.collect()}")

In [None]:
# flatMap: Map then flatten
sentences = sc.parallelize(["Hello World", "Apache Spark", "Big Data"])

# map vs flatMap
words_map = sentences.map(lambda s: s.split())
words_flatmap = sentences.flatMap(lambda s: s.split())

print(f"Using map:     {words_map.collect()}")
print(f"Using flatMap: {words_flatmap.collect()}")

In [None]:
# Action Examples

numbers = sc.parallelize([1, 2, 3, 4, 5])

print(f"collect(): {numbers.collect()}")
print(f"count():   {numbers.count()}")
print(f"first():   {numbers.first()}")
print(f"take(3):   {numbers.take(3)}")
print(f"sum():     {numbers.sum()}")
print(f"mean():    {numbers.mean()}")
print(f"max():     {numbers.max()}")
print(f"min():     {numbers.min()}")

# reduce: Aggregate elements
total = numbers.reduce(lambda a, b: a + b)
print(f"reduce(+): {total}")

## 2.4 Lazy Evaluation

Spark transformations are *lazy* - they don't execute immediately. Instead, Spark builds up a *lineage graph* (DAG - Directed Acyclic Graph) of transformations. The actual computation only happens when an action is called.

**Benefits of Lazy Evaluation:**
- Allows Spark to optimize the execution plan
- Reduces unnecessary computations
- Enables fault tolerance through lineage

In [None]:
# Demonstrating Lazy Evaluation
import time

# Create an RDD and apply transformations
print("Creating RDD and transformations...")
start = time.time()

large_rdd = sc.parallelize(range(1000000))
transformed = large_rdd.map(lambda x: x * 2).filter(lambda x: x % 4 == 0)

print(f"Time to define transformations: {time.time() - start:.4f} seconds")
print(f"Transformations are defined but NOT executed yet!")
print(f"Type of 'transformed': {type(transformed)}")

# Now trigger execution with an action
print("\nCalling count() action...")
start = time.time()
result = transformed.count()
print(f"Time to execute: {time.time() - start:.4f} seconds")
print(f"Count: {result}")

## 2.5 Word Count Example - The "Hello World" of Big Data

Word Count is the classic MapReduce example. Let's implement it using Spark RDDs.

In [None]:
# Create a sample text
text = """Spark is fast and general purpose cluster computing system
Spark provides high level APIs in Java Scala Python and R
Spark supports general computation graphs for data analysis
Spark has rich set of higher level tools including Spark SQL
Spark SQL provides support for structured data processing"""

# Save to file
with open("wordcount_input.txt", "w") as f:
    f.write(text)

# Word Count using RDDs - Step by Step
lines = sc.textFile("wordcount_input.txt")

# Step 1: Split each line into words
words = lines.flatMap(lambda line: line.lower().split())
print("Step 1 - Words:")
print(words.take(10))

# Step 2: Map each word to a (word, 1) pair
word_pairs = words.map(lambda word: (word, 1))
print("\nStep 2 - Word pairs:")
print(word_pairs.take(10))

# Step 3: Reduce by key - sum up counts for each word
word_counts = word_pairs.reduceByKey(lambda a, b: a + b)
print("\nStep 3 - Word counts:")
print(word_counts.take(10))

# Step 4: Sort by count (descending)
sorted_counts = word_counts.sortBy(lambda x: -x[1])
print("\nTop 10 words:")
for word, count in sorted_counts.take(10):
    print(f"  {word}: {count}")

In [None]:
# Word Count - Concise Version (one-liner)

word_counts_concise = sc.textFile("wordcount_input.txt") \
    .flatMap(lambda line: line.lower().split()) \
    .map(lambda word: (word, 1)) \
    .reduceByKey(lambda a, b: a + b) \
    .sortBy(lambda x: -x[1])

print("Word counts (concise version):")
for word, count in word_counts_concise.take(10):
    print(f"  {word}: {count}")

---
# Part 3: Spark DataFrames
---

## 3.1 Introduction to DataFrames

A DataFrame is a distributed collection of data organized into named columns, similar to a table in a relational database or a DataFrame in pandas. DataFrames are built on top of RDDs but provide:

- **Schema**: Named columns with data types
- **Optimized Execution**: Catalyst optimizer for query optimization
- **Familiar API**: SQL-like operations

**DataFrames vs RDDs:**

| Feature | RDD | DataFrame |
|---------|-----|----------|
| Schema | No schema | Named columns with types |
| Optimization | No automatic optimization | Catalyst optimizer |
| Ease of use | Low-level API | High-level, SQL-like API |
| Performance | Good | Better (optimized) |
| Data types | Any Python object | Structured data types |

## 3.2 Creating DataFrames

There are multiple ways to create DataFrames in Spark.

In [None]:
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, FloatType
from pyspark.sql import Row

# Method 1: From a list of tuples
data = [
    ("Alice", 28, "Data Scientist", 75000),
    ("Bob", 35, "Software Engineer", 85000),
    ("Charlie", 32, "Data Analyst", 65000),
    ("Diana", 29, "ML Engineer", 90000),
    ("Eve", 41, "Data Scientist", 95000)
]
columns = ["name", "age", "role", "salary"]

df = spark.createDataFrame(data, columns)
df.show()

In [None]:
# Method 2: With explicit schema
schema = StructType([
    StructField("name", StringType(), True),
    StructField("age", IntegerType(), True),
    StructField("role", StringType(), True),
    StructField("salary", IntegerType(), True)
])

df_with_schema = spark.createDataFrame(data, schema)
df_with_schema.printSchema()

In [None]:
# Method 3: From a pandas DataFrame
import pandas as pd

pandas_df = pd.DataFrame({
    "city": ["London", "Manchester", "Birmingham", "Leeds", "Glasgow"],
    "population": [8982000, 547627, 1141816, 793139, 626410],
    "country": ["England", "England", "England", "England", "Scotland"]
})

spark_df = spark.createDataFrame(pandas_df)
spark_df.show()

In [None]:
# Method 4: From Row objects
row_data = [
    Row(product="Laptop", price=999.99, quantity=50),
    Row(product="Mouse", price=29.99, quantity=200),
    Row(product="Keyboard", price=79.99, quantity=150)
]

df_rows = spark.createDataFrame(row_data)
df_rows.show()

## 3.3 Loading Data from External Sources

Spark can read data from various formats including CSV, JSON, Parquet, and more. Let's download a real-world dataset to work with.

In [None]:
# Download the Global Land Temperatures dataset from NASA GISS
# This is the official GISTEMP v4 dataset (Global Land-Ocean Temperature Index)

!wget -q https://data.giss.nasa.gov/gistemp/tabledata_v4/GLB.Ts+dSST.csv -O global_temp.csv

# Preview the file
!head -15 global_temp.csv

In [None]:
# Simpler approach: use pandas to clean, then convert to Spark
import pandas as pd
from pyspark.sql.functions import col, lit, explode, array, struct

# Read with pandas, skip the first row
pdf = pd.read_csv("global_temp.csv", skiprows=1)
print("Pandas columns:", pdf.columns.tolist())
print(pdf.head())

# Convert to Spark DataFrame
temp_df_raw = spark.createDataFrame(pdf)

print("\nSpark DataFrame:")
temp_df_raw.show(5)
print("\nSchema:")
temp_df_raw.printSchema()

# Now transform to long format
month_cols = ['Jan', 'Feb', 'Mar', 'Apr', 'May', 'Jun', 'Jul', 'Aug', 'Sep', 'Oct', 'Nov', 'Dec']

temp_df = temp_df_raw.select(
    col("Year"),
    explode(array([struct(lit(i+1).alias("Month"), col(m).cast("double").alias("Mean"))
                   for i, m in enumerate(month_cols)])).alias("data")
).select(
    col("Year").cast("int").alias("Year"),
    col("data.Month").alias("Month"),
    col("data.Mean").alias("Mean")
).filter(col("Mean").isNotNull())

# Add a Source column for consistency
temp_df = temp_df.withColumn("Source", lit("GISTEMP"))

print("\nTransformed data (long format):")
temp_df.show(10)

print("\nSchema:")
temp_df.printSchema()

print(f"\nTotal records: {temp_df.count()}")

## 3.4 DataFrame Operations

DataFrames support a rich set of operations for data manipulation and analysis.

In [None]:
from pyspark.sql.functions import col, year, month, avg, max, min, count, round

# Select columns
print("Select specific columns:")
temp_df.select("Year", "Month", "Mean").show(5)

# Select with expressions - convert to Fahrenheit
print("\nSelect with expressions (add Fahrenheit):")
temp_df.select(
    col("Year"),
    col("Month"),
    col("Mean"),
    round(col("Mean") * 1.8 + 32, 2).alias("Mean_Fahrenheit")
).show(5)

In [None]:
# Filter rows
print("Filter: Temperatures above 0.5°C anomaly:")
temp_df.filter(col("Mean") > 0.5).show(5)

# Multiple conditions
print("\nFilter: Year 2020 or later with mean > 0.8:")
temp_df.filter(
    (col("Year") >= 2020) & (col("Mean") > 0.8)
).show(10)

In [None]:
# Add new columns with withColumn
from pyspark.sql.functions import when

# The data already has Year and Month columns, let's add derived columns
temp_df_enhanced = temp_df \
    .withColumn("Mean_F", round(col("Mean") * 1.8 + 32, 2)) \
    .withColumn("Decade", (col("Year") / 10).cast("int") * 10) \
    .withColumn("Season",
                when(col("Month").isin(12, 1, 2), "Winter")
                .when(col("Month").isin(3, 4, 5), "Spring")
                .when(col("Month").isin(6, 7, 8), "Summer")
                .otherwise("Autumn"))

temp_df_enhanced.show(10)

In [None]:
# Sorting
print("Sorted by Mean (descending) - Warmest temperature anomalies:")
temp_df_enhanced.orderBy(col("Mean").desc()).show(10)

print("\nSorted by Mean (ascending) - Coldest temperature anomalies:")
temp_df_enhanced.orderBy(col("Mean").asc()).show(10)

## 3.5 Data Aggregation and Grouping

Aggregation operations are essential for data analysis. Spark provides powerful groupBy and aggregation functions.

In [None]:
# Basic aggregations
print("Overall statistics:")
temp_df_enhanced.agg(
    count("*").alias("total_records"),
    round(avg("Mean"), 4).alias("avg_temp"),
    round(max("Mean"), 4).alias("max_temp"),
    round(min("Mean"), 4).alias("min_temp")
).show()

In [None]:
# Group by Year and calculate average temperature
yearly_avg = temp_df_enhanced \
    .groupBy("Year") \
    .agg(
        round(avg("Mean"), 4).alias("Avg_Temp"),
        round(max("Mean"), 4).alias("Max_Temp"),
        round(min("Mean"), 4).alias("Min_Temp")
    ) \
    .orderBy("Year")

print("Yearly temperature statistics (GISTEMP):")
yearly_avg.show(20)

In [None]:
# Compare by decade
decade_comparison = temp_df_enhanced \
    .groupBy("Decade") \
    .agg(
        count("*").alias("Record_Count"),
        round(avg("Mean"), 4).alias("Avg_Temp"),
        round(max("Mean"), 4).alias("Max_Temp"),
        round(min("Mean"), 4).alias("Min_Temp")
    ) \
    .orderBy("Decade")

print("Temperature anomalies by Decade:")
decade_comparison.show()

In [None]:
# Visualize the trends using matplotlib
import matplotlib.pyplot as plt

# Convert to pandas for plotting
yearly_pandas = yearly_avg.toPandas()

plt.figure(figsize=(14, 6))
plt.plot(yearly_pandas["Year"], yearly_pandas["Avg_Temp"], marker='o', markersize=3, linewidth=1)
plt.xlabel("Year", fontsize=12)
plt.ylabel("Average Temperature Anomaly (°C)", fontsize=12)
plt.title("Global Temperature Anomaly by Year (NASA GISTEMP v4)", fontsize=14)
plt.grid(True, alpha=0.3)
plt.axhline(y=0, color='r', linestyle='--', alpha=0.5, label='1951-1980 Baseline')
plt.legend()
plt.tight_layout()
plt.show()

## 3.6 Working with Missing Data

In [None]:
from pyspark.sql.functions import when, isnan, isnull

# Create a DataFrame with missing values
data_with_nulls = [
    ("A", 25, 50000.0),
    ("B", None, 60000.0),
    ("C", 35, None),
    ("D", None, None),
    ("E", 45, 80000.0)
]

df_nulls = spark.createDataFrame(data_with_nulls, ["name", "age", "salary"])
print("DataFrame with null values:")
df_nulls.show()

# Count nulls per column
print("Null counts per column:")
df_nulls.select([
    count(when(col(c).isNull(), c)).alias(c)
    for c in df_nulls.columns
]).show()

In [None]:
# Drop rows with any null values
print("Drop rows with any null:")
df_nulls.na.drop().show()

# Drop rows where specific columns are null
print("Drop rows where age is null:")
df_nulls.na.drop(subset=["age"]).show()

# Fill null values
print("Fill nulls with 0:")
df_nulls.na.fill(0).show()

# Fill with different values per column
print("Fill with specific values per column:")
df_nulls.na.fill({"age": 30, "salary": 50000.0}).show()

---
# Part 4: Spark SQL
---

## 4.1 Introduction to Spark SQL

Spark SQL allows you to query structured data using SQL syntax. This is particularly useful if you're already familiar with SQL or need to integrate with SQL-based tools.

**Advantages of Spark SQL:**
- Familiar SQL syntax for data analysts
- Automatic query optimization via Catalyst
- Integration with Hive and other data sources
- Mix SQL with DataFrame operations

## 4.2 Creating Temporary Views

To use SQL queries, you first need to register your DataFrame as a temporary view.

In [None]:
# Register the temperature DataFrame as a temporary view
temp_df_enhanced.createOrReplaceTempView("temperatures")

print("Temporary view 'temperatures' created successfully!")
print("\nAvailable columns:")
print(temp_df_enhanced.columns)

## 4.3 Writing SQL Queries

Once you have a temporary view, you can run SQL queries using `spark.sql()`.

In [None]:
# Basic SELECT query
result = spark.sql("""
    SELECT Year, Month, Mean, Mean_F, Season
    FROM temperatures
    LIMIT 10
""")
result.show()

In [None]:
# Filtering with WHERE
high_temps = spark.sql("""
    SELECT Year, Month, Mean, Season, Decade
    FROM temperatures
    WHERE Mean > 1.0
    ORDER BY Mean DESC
    LIMIT 15
""")

print("Months with temperature anomaly > 1.0°C:")
high_temps.show()

In [None]:
# Aggregation with GROUP BY
yearly_stats = spark.sql("""
    SELECT
        Year,
        ROUND(AVG(Mean), 4) as avg_temp,
        ROUND(MAX(Mean), 4) as max_temp,
        ROUND(MIN(Mean), 4) as min_temp,
        COUNT(*) as num_records
    FROM temperatures
    GROUP BY Year
    HAVING AVG(Mean) > 0.5
    ORDER BY avg_temp DESC
""")

print("Years with average temperature anomaly > 0.5°C:")
yearly_stats.show(20)

In [None]:
# Aggregation by decade
warmest_per_decade = spark.sql("""
    SELECT
        Decade,
        ROUND(AVG(Mean), 4) as Avg_Temp,
        COUNT(*) as Num_Records,
        MIN(Year) as First_Year,
        MAX(Year) as Last_Year
    FROM temperatures
    GROUP BY Decade
    ORDER BY Decade
""")

print("Average temperature anomaly by decade:")
warmest_per_decade.show()

## 4.4 Combining DataFrame API and SQL

You can seamlessly switch between DataFrame API and SQL operations.

In [None]:
# Start with SQL, continue with DataFrame API
recent_years = spark.sql("""
    SELECT *
    FROM temperatures
    WHERE Year >= 2000
""")

# Continue with DataFrame operations
monthly_pattern = recent_years \
    .groupBy("Month") \
    .agg(
        round(avg("Mean"), 4).alias("Avg_Temp"),
        count("*").alias("Count")
    ) \
    .orderBy("Month")

print("Monthly temperature patterns (2000+):")
monthly_pattern.show(12)

In [None]:
# The result of DataFrame operations can be registered as a new view
monthly_pattern.createOrReplaceTempView("monthly_patterns")

# Query the new view
warmest_months = spark.sql("""
    SELECT
        Month,
        Avg_Temp,
        CASE
            WHEN Month IN (12, 1, 2) THEN 'Winter'
            WHEN Month IN (3, 4, 5) THEN 'Spring'
            WHEN Month IN (6, 7, 8) THEN 'Summer'
            ELSE 'Autumn'
        END as Season
    FROM monthly_patterns
    ORDER BY Avg_Temp DESC
""")

warmest_months.show()

---
# Part 5: Practical Exercises
---

Now it's your turn to apply what you've learned! Complete the following exercises using the datasets provided.

## Exercise 1: Movie Ratings Analysis

Let's work with a real dataset! We'll use the MovieLens 100K dataset, which contains 100,000 movie ratings.

In [None]:
# Download MovieLens 100K dataset
!wget -q https://files.grouplens.org/datasets/movielens/ml-100k.zip
!unzip -q -o ml-100k.zip

# Load the ratings data
ratings_schema = StructType([
    StructField("user_id", IntegerType(), True),
    StructField("movie_id", IntegerType(), True),
    StructField("rating", IntegerType(), True),
    StructField("timestamp", IntegerType(), True)
])

ratings_df = spark.read \
    .option("delimiter", "\t") \
    .schema(ratings_schema) \
    .csv("ml-100k/u.data")

# Load movie titles
movies_df = spark.read \
    .option("delimiter", "|") \
    .csv("ml-100k/u.item") \
    .select(
        col("_c0").cast("int").alias("movie_id"),
        col("_c1").alias("title"),
        col("_c2").alias("release_date")
    )

print("Ratings DataFrame:")
ratings_df.show(5)

print("\nMovies DataFrame:")
movies_df.show(5)

# Register as views for SQL
ratings_df.createOrReplaceTempView("ratings")
movies_df.createOrReplaceTempView("movies")

### Task 1.1: Basic Statistics

Calculate and display:
1. Total number of ratings
2. Number of unique users
3. Number of unique movies
4. Average rating across all movies

In [None]:
# Your code here



### Task 1.2: Rating Distribution

Show the distribution of ratings (how many ratings of each value 1-5). Order by rating value.

In [None]:
# Your code here



### Task 1.3: Top Rated Movies

Find the top 10 highest-rated movies that have at least 100 ratings. Join with the movies table to show the movie title.

**Hint:** You'll need to:
1. Group ratings by movie_id
2. Calculate average rating and count
3. Filter for movies with ≥100 ratings
4. Join with movies_df to get titles
5. Order by average rating (descending)

In [None]:
# Your code here



### Task 1.4: SQL Challenge

Write a SQL query to find the top 5 users who have rated the most movies. Show their user_id, number of ratings, and average rating they give.

In [None]:
# Your SQL query here
# result = spark.sql("""
#     -- Write your query here

# """)
# result.show()

## Exercise 2: E-commerce Sales Analysis

Now let's work with an e-commerce dataset to practice more complex operations.

In [None]:
# Create a synthetic e-commerce dataset
import random
from datetime import datetime, timedelta
from builtins import round as python_round

# Set seed for reproducibility
random.seed(42)

# Generate data
categories = ["Electronics", "Clothing", "Home & Garden", "Books", "Sports"]
products = {
    "Electronics": ["Laptop", "Phone", "Tablet", "Headphones", "Camera"],
    "Clothing": ["T-Shirt", "Jeans", "Jacket", "Shoes", "Hat"],
    "Home & Garden": ["Chair", "Table", "Lamp", "Plant", "Vase"],
    "Books": ["Fiction", "Non-Fiction", "Technical", "Children", "Comics"],
    "Sports": ["Football", "Tennis Racket", "Yoga Mat", "Dumbbells", "Bicycle"]
}
regions = ["North", "South", "East", "West", "Central"]

sales_data = []
base_date = datetime(2024, 1, 1)

for i in range(5000):
    category = random.choice(categories)
    product = random.choice(products[category])
    date = base_date + timedelta(days=random.randint(0, 364))
    quantity = random.randint(1, 10)
    base_price = random.uniform(10, 500)
    discount = random.choice([0.0, 0.0, 0.0, 0.1, 0.15, 0.2, 0.25])  # Use 0.0 instead of 0
    price = python_round(base_price * (1 - discount), 2)
    total = python_round(price * quantity, 2)

    sales_data.append((
        i + 1,
        date.strftime("%Y-%m-%d"),
        category,
        product,
        random.choice(regions),
        float(quantity),      # Ensure float
        float(price),         # Ensure float
        float(total),         # Ensure float
        float(discount)       # Ensure float
    ))

# Create DataFrame
sales_columns = ["order_id", "order_date", "category", "product", "region",
                 "quantity", "unit_price", "total", "discount"]
sales_df = spark.createDataFrame(sales_data, sales_columns)

# Add month column
from pyspark.sql.functions import substring
sales_df = sales_df.withColumn("month", substring("order_date", 1, 7))

print("E-commerce Sales Data:")
sales_df.show(10)

print(f"\nTotal records: {sales_df.count()}")

# Register as view
sales_df.createOrReplaceTempView("sales")

### Task 2.1: Sales by Category

Calculate total sales, number of orders, and average order value for each category. Order by total sales (descending).

In [None]:
# Your code here



### Task 2.2: Monthly Trend Analysis

Analyze monthly sales trends:
1. Calculate total sales per month
2. Show results ordered by month
3. Create a simple visualization of the trend

In [None]:
# Your code here



### Task 2.3: Regional Performance

Using SQL, find:
1. Total sales by region
2. The best-selling product in each region

**Hint:** For part 2, you might need a subquery or window function.

In [None]:
# Your SQL queries here



### Task 2.4: Discount Impact Analysis

Analyze the relationship between discounts and sales:
1. Group orders by discount level (0%, 10%, 15%, 20%, 25%)
2. Calculate average order value and total quantity for each discount level
3. What patterns do you observe?

In [None]:
# Your code here



## Challenge Exercise (Optional)

Combine what you've learned to complete this advanced analysis.

Using the sales data, create a comprehensive report that includes:

1. **Category Deep Dive**: For each category, find the month with highest sales
2. **Pareto Analysis**: Identify products that contribute to 80% of total sales
3. **Performance Metrics**: Calculate month-over-month growth rate for total sales

**Bonus**: Create visualizations for your findings.

In [None]:
# Your code here - Challenge Exercise



---
# Cleanup
---

Always remember to stop your Spark session when you're done to release resources.

In [None]:
# Stop the Spark session
spark.stop()
print("Spark session stopped.")

---
# Summary
---

In this lab, you learned:

1. **Apache Spark Fundamentals**
   - What Spark is and why it's used for big data processing
   - Spark's master-worker architecture
   - How to set up and configure Spark in Google Colab

2. **RDDs (Resilient Distributed Datasets)**
   - Creating RDDs from collections and files
   - Transformations vs Actions
   - Lazy evaluation and its benefits
   - Classic Word Count example

3. **Spark DataFrames**
   - Creating DataFrames from various sources
   - DataFrame operations: select, filter, withColumn
   - Aggregation and grouping
   - Handling missing data

4. **Spark SQL**
   - Creating temporary views
   - Writing SQL queries on DataFrames
   - Combining DataFrame API with SQL

5. **Practical Application**
   - Working with real-world datasets (Global Temperature, MovieLens)
   - Data analysis patterns and best practices

---
# Additional Resources
---

- Apache Spark Documentation: https://spark.apache.org/docs/latest/
- PySpark API Reference: https://spark.apache.org/docs/latest/api/python/
- Spark SQL Guide: https://spark.apache.org/docs/latest/sql-programming-guide.html
- Databricks Learning: https://www.databricks.com/learn
- Spark: The Definitive Guide (Book): https://www.oreilly.com/library/view/spark-the-definitive/9781491912201/