# Lesson 1: Apache Spark Fundamentals

Welcome to your first Spark lesson! In this notebook, you'll learn the core concepts of Apache Spark and get hands-on experience with Resilient Distributed Datasets (RDDs).

## Learning Objectives
- Understand Spark architecture and components
- Create and manipulate RDDs
- Learn the difference between transformations and actions
- Explore lazy evaluation in Spark
- Navigate the Spark UI

Let's get started!

## 1. Setting Up Spark Session

First, let's import the necessary libraries and create a Spark session:

In [None]:
# Import required libraries
from pyspark.sql import SparkSession
import time
import re
from collections import defaultdict

# Create Spark session
spark = (
    SparkSession.builder.appName("Lesson1-SparkFundamentals")
    .master("local[*]")
    .config("spark.sql.adaptive.enabled", "true")
    .config("spark.sql.adaptive.coalescePartitions.enabled", "true")
    .getOrCreate()
)

# Get SparkContext from SparkSession
sc = spark.sparkContext

# Display Spark version and configuration
print(f"Spark Version: {spark.version}")
print(f"Python Version: {sc.pythonVer}")
print(f"Master: {sc.master}")
print(f"Application Name: {sc.appName}")
print(f"Default Parallelism: {sc.defaultParallelism}")

# Spark UI URL
print("\nSpark UI available at: http://localhost:4040")

25/09/28 16:43:40 WARN Utils: Your hostname, Liams-MacBook-Pro.local resolves to a loopback address: 127.0.2.2; using 192.168.1.240 instead (on interface en0)
25/09/28 16:43:40 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/09/28 16:43:41 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


Spark Version: 3.5.7
Python Version: 3.11
Master: local[*]
Application Name: Lesson1-SparkFundamentals
Default Parallelism: 10

Spark UI available at: http://localhost:4040


**🎯 Exercise Checkpoint 1**: Open the Spark UI in your browser (http://localhost:4040) and explore the interface. You should see your application listed.

---

## 2. Creating Your First RDD

RDDs (Resilient Distributed Datasets) are the fundamental data structure in Spark. Let's create some RDDs:

In [2]:
# Method 1: Create RDD from a Python collection
numbers = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
numbers_rdd = sc.parallelize(numbers)

print("=== RDD from Collection ===")
print(f"Type: {type(numbers_rdd)}")
print(f"Number of partitions: {numbers_rdd.getNumPartitions()}")
print(f"Partitions content: {numbers_rdd.glom().collect()}")

# Method 2: Create RDD with specific number of partitions
numbers_rdd_4_partitions = sc.parallelize(numbers, 4)
print(f"\nWith 4 partitions: {numbers_rdd_4_partitions.glom().collect()}")

# Method 3: Create RDD from range
range_rdd = sc.range(1, 11)  # equivalent to range(1, 11) in Python
print(f"\nRange RDD: {range_rdd.collect()}")

=== RDD from Collection ===
Type: <class 'pyspark.rdd.RDD'>
Number of partitions: 10


[Stage 0:>                                                        (0 + 10) / 10]

Partitions content: [[1], [2], [3], [4], [5], [6], [7], [8], [9], [10]]

With 4 partitions: [[1, 2], [3, 4], [5, 6], [7, 8, 9, 10]]

Range RDD: [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]


                                                                                

### Understanding Partitions

Partitions are how Spark divides data across multiple cores/machines. Let's explore this concept:

In [5]:
# Create a larger dataset to see partitioning
large_data = list(range(1, 101))  # 1 to 100
large_rdd = sc.parallelize(large_data, 8)  # 8 partitions

print("=== Partition Analysis ===")
print(f"Total elements: {large_rdd.count()}")
print(f"Number of partitions: {large_rdd.getNumPartitions()}")

# See how data is distributed across partitions
partition_sizes = large_rdd.mapPartitions(lambda x: [len(list(x))]).collect()
print(f"Elements per partition: {partition_sizes}")

# Visualize partition distribution
partitions_data = large_rdd.glom().collect()
for i, partition in enumerate(partitions_data):
    if partition:  # Only show non-empty partitions
        print(f"Partition {i}: {partition[:]}")

=== Partition Analysis ===
Total elements: 100
Number of partitions: 8
Elements per partition: [12, 12, 12, 12, 12, 12, 12, 16]
Partition 0: [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12]
Partition 1: [13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24]
Partition 2: [25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36]
Partition 3: [37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47, 48]
Partition 4: [49, 50, 51, 52, 53, 54, 55, 56, 57, 58, 59, 60]
Partition 5: [61, 62, 63, 64, 65, 66, 67, 68, 69, 70, 71, 72]
Partition 6: [73, 74, 75, 76, 77, 78, 79, 80, 81, 82, 83, 84]
Partition 7: [85, 86, 87, 88, 89, 90, 91, 92, 93, 94, 95, 96, 97, 98, 99, 100]


---

## 3. Transformations vs Actions

Understanding the difference between transformations and actions is crucial in Spark:

### Transformations (Lazy Evaluation)

Transformations create new RDDs from existing ones but don't execute immediately:

In [7]:
numbers_rdd = sc.parallelize([1, 2, 3, 4, 5, 6, 7, 8, 9, 10])
print("Step 1: Created numbers_rdd")
print(f"Type: {type(numbers_rdd)}")
print("No computation has happened yet!\n")

# Transformation 1: map() - apply function to each element
squared_rdd = numbers_rdd.map(lambda x: x**2)
print("Step 2: Applied map transformation (squared)")
print(f"Type: {type(squared_rdd)}")
print("Still no computation - it's lazy!\n")

# Transformation 2: filter() - keep elements that match condition
even_squares_rdd = squared_rdd.filter(lambda x: x % 2 == 0)
print("Step 3: Applied filter transformation (even squares)")
print(f"Type: {type(even_squares_rdd)}")
print("Still lazy - no actual processing yet!\n")

# Let's examine the lineage BEFORE triggering execution
print("=== LINEAGE (Execution Plan) ===")
lineage = even_squares_rdd.toDebugString().decode("utf-8")
print(lineage)
print("\n=== END LINEAGE ===\n")

Step 1: Created numbers_rdd
Type: <class 'pyspark.rdd.RDD'>
No computation has happened yet!

Step 2: Applied map transformation (squared)
Type: <class 'pyspark.rdd.PipelinedRDD'>
Still no computation - it's lazy!

Step 3: Applied filter transformation (even squares)
Type: <class 'pyspark.rdd.PipelinedRDD'>
Still lazy - no actual processing yet!

=== LINEAGE (Execution Plan) ===
(10) PythonRDD[16] at RDD at PythonRDD.scala:53 []
 |   ParallelCollectionRDD[15] at readRDDFromFile at PythonRDD.scala:289 []

=== END LINEAGE ===



### note:
What you expected:
[Data] → [Map Stage] → [Filter Stage] → [Result]

What Spark actually does:
[Data] → [Combined Map+Filter Stage] → [Result]

### Actions (Eager Evaluation)

Actions trigger the execution of the entire RDD lineage:

In [10]:
# Action 1: collect() - bring all data to driver
print("=== Actions Trigger Execution ===")
start_time = time.time()
result = even_squares_rdd.collect()
end_time = time.time()

print(f"Even squares: {result}")
print(f"Execution time: {end_time - start_time:.4f} seconds")

# Action 2: count() - count elements
count = even_squares_rdd.count()
print(f"Count of even squares: {count}")

# Action 3: first() - get first element
first_element = even_squares_rdd.first()
print(f"First even square: {first_element}")

# Action 4: take(n) - get first n elements
first_three = even_squares_rdd.take(3)
print(f"First three even squares: {first_three}")

# Action 5: reduce() - aggregate elements
sum_of_even_squares = even_squares_rdd.reduce(lambda a, b: a + b)
print(f"Sum of even squares: {sum_of_even_squares}")

=== Actions Trigger Execution ===
Even squares: [4, 16, 36, 64, 100]
Execution time: 0.0943 seconds
Count of even squares: 5
First even square: 4
First three even squares: [4, 16, 36]
Sum of even squares: 220


**🎯 Exercise Checkpoint 2**: Check the Spark UI again. Look at the "Jobs" tab to see the jobs that were executed. Notice how multiple actions created separate jobs.

---

## 4. Common RDD Transformations

Let's explore the most commonly used RDD transformations:

In [11]:
# Create sample data for demonstrations
data = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
rdd = sc.parallelize(data)

print("Original data:", rdd.collect())
print()

# 1. map() - transform each element
doubled = rdd.map(lambda x: x * 2)
print("map (x * 2):", doubled.collect())

# 2. filter() - keep elements matching condition
evens = rdd.filter(lambda x: x % 2 == 0)
print("filter (even numbers):", evens.collect())

# 3. flatMap() - transform and flatten
pairs = rdd.flatMap(lambda x: [x, x])
print("flatMap (duplicate each):", pairs.collect())

# 4. distinct() - remove duplicates
duplicates = sc.parallelize([1, 1, 2, 2, 3, 3, 4, 4])
unique = duplicates.distinct()
print("distinct():", unique.collect())

# 5. sample() - random sample
sample = rdd.sample(withReplacement=False, fraction=0.5, seed=42)
print("sample (50%):", sample.collect())

Original data: [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]

map (x * 2): [2, 4, 6, 8, 10, 12, 14, 16, 18, 20]
filter (even numbers): [2, 4, 6, 8, 10]
flatMap (duplicate each): [1, 1, 2, 2, 3, 3, 4, 4, 5, 5, 6, 6, 7, 7, 8, 8, 9, 9, 10, 10]
distinct(): [1, 2, 3, 4]
sample (50%): [1, 7, 8, 9]


### Working with Pairs (Key-Value RDDs)

Many Spark operations work with key-value pairs:

In [13]:
# Create key-value pairs
pairs_data = [("apple", 1), ("banana", 2), ("apple", 3), ("cherry", 1), ("banana", 4)]
pairs_rdd = sc.parallelize(pairs_data)

print("Original pairs:", pairs_rdd.collect())
print()

# 1. groupByKey() - group values by key
grouped = pairs_rdd.groupByKey()
print("Raw grouped data:")
raw_grouped = grouped.collect()
for key, values in raw_grouped:
    print(f"  Key '{key}' -> Values: {type(values)} (iterator)")
    # Note: values is an iterator, not a list!
print()
grouped_result = grouped.map(lambda x: (x[0], list(x[1]))).collect()
print("groupByKey():", grouped_result)

# 2. reduceByKey() - combine/aggrigate values for each key
reduced = pairs_rdd.reduceByKey(lambda a, b: a + b)
print("reduceByKey() [sum]:", reduced.collect())

# 3. mapValues() - transform only values 1 by 1
doubled_values = pairs_rdd.mapValues(lambda x: x * 2)
print("mapValues() [x * 2]:", doubled_values.collect())

# 4. keys() and values()
keys_only = pairs_rdd.keys().distinct()
values_only = pairs_rdd.values()
print("keys():", keys_only.collect())
print("values():", values_only.collect())

Original pairs: [('apple', 1), ('banana', 2), ('apple', 3), ('cherry', 1), ('banana', 4)]

Raw grouped data:
  Key 'apple' -> Values: <class 'pyspark.resultiterable.ResultIterable'> (iterator)
  Key 'banana' -> Values: <class 'pyspark.resultiterable.ResultIterable'> (iterator)
  Key 'cherry' -> Values: <class 'pyspark.resultiterable.ResultIterable'> (iterator)

groupByKey(): [('apple', [1, 3]), ('banana', [2, 4]), ('cherry', [1])]
reduceByKey() [sum]: [('apple', 4), ('banana', 6), ('cherry', 1)]
mapValues() [x * 2]: [('apple', 2), ('banana', 4), ('apple', 6), ('cherry', 2), ('banana', 8)]
keys(): ['apple', 'banana', 'cherry']
values(): [1, 2, 3, 1, 4]


---

## 5. Working with Text Data

Let's work with some real text data to implement a classic word count example:

In [14]:
# Create sample text data
text_data = [
    "Apache Spark is a unified analytics engine for large-scale data processing",
    "Spark provides high-level APIs in Java, Scala, Python and R",
    "Spark supports SQL queries, streaming data, machine learning and graph processing",
    "The Spark engine supports multiple programming languages",
    "Spark can run on Apache Hadoop, Apache Mesos, Kubernetes, standalone, or in the cloud",
]

text_rdd = sc.parallelize(text_data)
print("Original text lines:")
for i, line in enumerate(text_rdd.collect()):
    print(f"{i+1}. {line}")
print()

Original text lines:
1. Apache Spark is a unified analytics engine for large-scale data processing
2. Spark provides high-level APIs in Java, Scala, Python and R
3. Spark supports SQL queries, streaming data, machine learning and graph processing
4. The Spark engine supports multiple programming languages
5. Spark can run on Apache Hadoop, Apache Mesos, Kubernetes, standalone, or in the cloud



In [None]:
# Step 1: Split lines into words
words_rdd = text_rdd.flatMap(lambda line: line.split())
print(f"Total words: {words_rdd.count()}")
print(f"First 10 words: {words_rdd.take(10)}")
print()

# Step 2: Clean words (lowercase, remove punctuation)

clean_words_rdd = words_rdd.map(
    lambda word: re.sub(r"[^a-zA-Z]", "", word.lower())
).filter(lambda word: len(word) > 0)

print(f"Clean words count: {clean_words_rdd.count()}")
print(f"Sample clean words: {clean_words_rdd.take(15)}")
print()

# Step 3: Create word-count pairs
word_pairs_rdd = clean_words_rdd.map(lambda word: (word, 1))
print(f"Word pairs sample: {word_pairs_rdd.take(10)}")
print()

# Step 4: Count words
word_counts_rdd = word_pairs_rdd.reduceByKey(lambda a, b: a + b)
print("Word counts:")
for word, count in word_counts_rdd.collect():
    print(f"{word}: {count}")

Total words: 53
First 10 words: ['Apache', 'Spark', 'is', 'a', 'unified', 'analytics', 'engine', 'for', 'large-scale', 'data']

Clean words count: 53
Sample clean words: ['apache', 'spark', 'is', 'a', 'unified', 'analytics', 'engine', 'for', 'largescale', 'data', 'processing', 'spark', 'provides', 'highlevel', 'apis']

Word pairs sample: [('apache', 1), ('spark', 1), ('is', 1), ('a', 1), ('unified', 1), ('analytics', 1), ('engine', 1), ('for', 1), ('largescale', 1), ('data', 1)]

Word counts:
and: 2
learning: 1
unified: 1
in: 2
can: 1
run: 1
standalone: 1
largescale: 1
machine: 1
spark: 5
engine: 2
apis: 1
on: 1
kubernetes: 1
supports: 2
mesos: 1
cloud: 1
is: 1
provides: 1
scala: 1
for: 1
java: 1
python: 1
queries: 1
multiple: 1
processing: 2
sql: 1
the: 2
programming: 1
languages: 1
apache: 3
analytics: 1
streaming: 1
graph: 1
hadoop: 1
a: 1
data: 2
highlevel: 1
r: 1
or: 1


### note:
```python
Explain flatMap:
# Using map
.map(lambda x: x.split())
# Result: [["Hello", "world"], ["Spark", "rocks"]]  ← Still nested!

# Using flatMap  
.flatMap(lambda x: x.split())
# Result: ["Hello", "world", "Spark", "rocks"]      ← Flattened!
```

In [16]:
# Let's find the most common words
# Sort by count (descending)
sorted_word_counts = (
    word_counts_rdd.map(lambda x: (x[1], x[0]))
    .sortByKey(ascending=False)
    .map(lambda x: (x[1], x[0]))
)

print("Top 10 most frequent words:")
top_words = sorted_word_counts.take(10)
for word, count in top_words:
    print(f"{word}: {count}")

# Alternative: using takeOrdered for top N
print("\nUsing takeOrdered (top 5):")
top_5 = word_counts_rdd.takeOrdered(5, key=lambda x: -x[1])
for word, count in top_5:
    print(f"{word}: {count}")

Top 10 most frequent words:
spark: 5
apache: 3
and: 2
in: 2
engine: 2
supports: 2
processing: 2
the: 2
data: 2
learning: 1

Using takeOrdered (top 5):
spark: 5
apache: 3
and: 2
in: 2
engine: 2


**🎯 Exercise Checkpoint 3**: Look at the Spark UI "Stages" tab. You should see multiple stages corresponding to different operations. Notice how Spark optimized the execution plan.

---

## 6. Caching and Persistence

When you need to reuse an RDD multiple times, caching can significantly improve performance:

In [17]:
# Create a computationally expensive RDD
def expensive_computation(x):
    # Simulate expensive computation
    import time

    time.sleep(0.01)  # 10ms delay per element
    return x**2


large_data = sc.parallelize(range(1, 101), 4)
expensive_rdd = large_data.map(expensive_computation)

print("=== Without Caching ===")
# First computation
start_time = time.time()
result1 = expensive_rdd.count()
time1 = time.time() - start_time
print(f"First count: {result1}, Time: {time1:.2f}s")

# Second computation (will recompute everything)
start_time = time.time()
result2 = expensive_rdd.reduce(lambda a, b: a + b)
time2 = time.time() - start_time
print(f"Sum: {result2}, Time: {time2:.2f}s")

print(f"Total time without caching: {time1 + time2:.2f}s")

=== Without Caching ===
First count: 100, Time: 0.38s
Sum: 338350, Time: 0.33s
Total time without caching: 0.71s


In [18]:
print("\n=== With Caching ===")
# Cache the RDD in memory
expensive_rdd.cache()  # or use .persist()

# First computation (will cache the result)
start_time = time.time()
result1 = expensive_rdd.count()
time1 = time.time() - start_time
print(f"First count (caching): {result1}, Time: {time1:.2f}s")

# Second computation (will use cached data)
start_time = time.time()
result2 = expensive_rdd.reduce(lambda a, b: a + b)
time2 = time.time() - start_time
print(f"Sum (from cache): {result2}, Time: {time2:.2f}s")

print(f"Total time with caching: {time1 + time2:.2f}s")

# Check cache status
print(f"\nIs cached: {expensive_rdd.is_cached}")
print(f"Storage level: {expensive_rdd.getStorageLevel()}")

# Clean up cache
expensive_rdd.unpersist()


=== With Caching ===
First count (caching): 100, Time: 0.40s
Sum (from cache): 338350, Time: 0.05s
Total time with caching: 0.44s

Is cached: True
Storage level: Memory Serialized 1x Replicated


PythonRDD[107] at RDD at PythonRDD.scala:53

### Different Storage Levels

Spark offers different storage levels for persistence:

In [22]:
from pyspark import StorageLevel

# Create a test RDD
test_rdd = sc.parallelize(range(1, 1000), 4)

# Basic memory and disk storage
print(f"MEMORY_ONLY: {StorageLevel.MEMORY_ONLY}")
print(f"MEMORY_ONLY_2: {StorageLevel.MEMORY_ONLY_2}")
print(f"MEMORY_AND_DISK: {StorageLevel.MEMORY_AND_DISK}")
print(f"MEMORY_AND_DISK_2: {StorageLevel.MEMORY_AND_DISK_2}")

# Disk-only storage
print(f"DISK_ONLY: {StorageLevel.DISK_ONLY}")
print(f"DISK_ONLY_2: {StorageLevel.DISK_ONLY_2}")

# Off-heap storage
print(f"OFF_HEAP: {StorageLevel.OFF_HEAP}")
print(f"OFF_HEAP: {StorageLevel.OFF_HEAP}")

# Example of using different storage levels
# test_rdd.persist(StorageLevel.MEMORY_AND_DISK)
# test_rdd.persist(StorageLevel.MEMORY_ONLY_SER)  # Serialized in memory
# test_rdd.persist(StorageLevel.DISK_ONLY)        # Only on disk

print("\nChoose storage level based on:")
print("- MEMORY_ONLY: Fast, but limited by memory (always serialized in PySpark)")
print("- MEMORY_AND_DISK: Spills to disk when memory is full (recommended)")
print("- DISK_ONLY: Slower but handles large datasets")
print("- MEMORY_ONLY_2: Fast + fault tolerance (replicated across 2 nodes)")
print("- MEMORY_AND_DISK_2: Spills to disk + replicated for fault tolerance")
print("- DISK_ONLY_2: Disk storage + replicated for fault tolerance")
print("- OFF_HEAP: Uses off-heap memory (requires configuration)")

MEMORY_ONLY: Memory Serialized 1x Replicated
MEMORY_ONLY_2: Memory Serialized 2x Replicated
MEMORY_AND_DISK: Disk Memory Serialized 1x Replicated
MEMORY_AND_DISK_2: Disk Memory Serialized 2x Replicated
DISK_ONLY: Disk Serialized 1x Replicated
DISK_ONLY_2: Disk Serialized 2x Replicated
OFF_HEAP: Disk Memory OffHeap Serialized 1x Replicated
OFF_HEAP: Disk Memory OffHeap Serialized 1x Replicated

Choose storage level based on:
- MEMORY_ONLY: Fast, but limited by memory (always serialized in PySpark)
- MEMORY_AND_DISK: Spills to disk when memory is full (recommended)
- DISK_ONLY: Slower but handles large datasets
- MEMORY_ONLY_2: Fast + fault tolerance (replicated across 2 nodes)
- MEMORY_AND_DISK_2: Spills to disk + replicated for fault tolerance
- DISK_ONLY_2: Disk storage + replicated for fault tolerance
- OFF_HEAP: Uses off-heap memory (requires configuration)


---

## 7. RDD Lineage and Fault Tolerance

One of Spark's key features is automatic fault tolerance through RDD lineage:

In [26]:
# Create a complex RDD with multiple transformations
data = sc.parallelize(range(1, 21), 4)
step1 = data.map(lambda x: x * 2)
step2 = step1.filter(lambda x: x > 10)
step3 = step2.map(lambda x: (x % 3, x))
step4 = step3.reduceByKey(lambda a, b: a + b)
# Check lineage after narrow transformations only
step3_lineage = step3.toDebugString().decode("utf-8")
print("After narrow transforms only:")
print(step3_lineage)

print("=== RDD Lineage ===")
print("Final RDD lineage:")
lineage = step4.toDebugString().decode("utf-8")
print(lineage)

print("\n=== Understanding Lineage ===")
print("Each line shows:")
print("- Stage ID and partition count")
print("- Transformation applied")
print("- Dependencies on parent RDDs")
print("\nIf a partition is lost, Spark can recompute it using this lineage!")

After narrow transforms only:
(4) PythonRDD[125] at RDD at PythonRDD.scala:53 []
 |  ParallelCollectionRDD[120] at readRDDFromFile at PythonRDD.scala:289 []
=== RDD Lineage ===
Final RDD lineage:
(4) PythonRDD[126] at RDD at PythonRDD.scala:53 []
 |  MapPartitionsRDD[124] at mapPartitions at PythonRDD.scala:160 []
 |  ShuffledRDD[123] at partitionBy at NativeMethodAccessorImpl.java:0 []
 +-(4) PairwiseRDD[122] at reduceByKey at /var/folders/hj/ljckbnbd51d_x_p3c23zp6j40000gn/T/ipykernel_74672/1071599196.py:6 []
    |  PythonRDD[121] at reduceByKey at /var/folders/hj/ljckbnbd51d_x_p3c23zp6j40000gn/T/ipykernel_74672/1071599196.py:6 []
    |  ParallelCollectionRDD[120] at readRDDFromFile at PythonRDD.scala:289 []

=== Understanding Lineage ===
Each line shows:
- Stage ID and partition count
- Transformation applied
- Dependencies on parent RDDs

If a partition is lost, Spark can recompute it using this lineage!


## Note:

* **ParallelCollectionRDD[120]** = the source `data`
* **PythonRDD[121] at reduceByKey** = the **entire chain of Python-side narrow ops before the shuffle**, which includes:
  * `map(lambda x: x * 2)` ← **step1**
  * `filter(lambda x: x > 10)` ← **step2**
  * `map(lambda x: (x % 3, x))` ← **step3** (creates the pair RDD)
* **PairwiseRDD[122]** = the keyed view feeding `reduceByKey`
* **ShuffledRDD[123]** = the shuffle caused by `reduceByKey`
* **MapPartitionsRDD[124]** (then a **PythonRDD[126]**) = post-shuffle final reduce and whatever Python action followed

Why they look “missing”: Spark collapses consecutive **narrow** transformations (map/filter/map) into a single stage and often labels that block by the *last* transformation that created the Python boundary for the upcoming wide op (here it shows “at reduceByKey”). So step1 & step2 aren’t in **ParallelCollectionRDD** (that’s just the source) — they’re executed in the executors’ Python workers as part of **PythonRDD[121]** right before the shuffle.


In [None]:
# Let's see the result
result = step4.collect()
print(f"Final result: {result}")

# Verify our computation manually
print("\nManual verification:")
original = list(range(1, 21))
doubled = [x * 2 for x in original]
filtered = [x for x in doubled if x > 10]
print(f"After doubling and filtering > 10: {filtered}")

# Group by x % 3 and sum

groups = defaultdict(int)
for x in filtered:
    groups[x % 3] += x
print(f"Grouped by (x % 3) and summed: {dict(groups)}")

Final result: [(0, 120), (1, 140), (2, 130)]

Manual verification:
After doubling and filtering > 10: [12, 14, 16, 18, 20, 22, 24, 26, 28, 30, 32, 34, 36, 38, 40]
Grouped by (x % 3) and summed: {0: 120, 2: 130, 1: 140}


---

## 8. Reading Data from Files

Let's create some sample data files and learn how to read them with Spark:

In [32]:
# Create sample data files
import os

# Create data directory if it doesn't exist
os.makedirs("data", exist_ok=True)

# Create a sample text file
sample_text = """
Apache Spark is a multi-language engine for executing data engineering, data science, and machine learning on single-node machines or clusters.
Spark offers over 80 high-level operators that make it easy to build parallel apps.
You can use it interactively from the Scala, Python, R, and SQL shells.
Spark powers a stack of libraries including SQL and DataFrames, MLlib for machine learning, GraphX, and Spark Streaming.
You can combine these libraries seamlessly in the same application.
""".strip()

with open("data/sample_data.txt", "w") as f:
    f.write(sample_text)

# Create a CSV file
csv_data = """name,age,city,salary
Alice,25,New York,50000
Bob,30,San Francisco,75000
Charlie,35,Chicago,60000
Diana,28,Boston,55000
Eve,32,Seattle,70000
Eve,34,Chicago,100000
"""

with open("data/people.csv", "w") as f:
    f.write(csv_data)

print("Sample data files created in 'data/' directory")

Sample data files created in 'data/' directory


In [29]:
# Reading text files
text_file_rdd = sc.textFile("data/sample_data.txt")

print("=== Reading Text Files ===")
print(f"Number of lines: {text_file_rdd.count()}")
print("\nContent:")
for i, line in enumerate(text_file_rdd.collect()):
    print(f"{i+1}. {line}")

# Word count on file data
file_word_counts = (
    text_file_rdd.flatMap(lambda line: line.split())
    .map(lambda word: word.lower().strip(".,"))
    .filter(lambda word: len(word) > 3)
    .map(lambda word: (word, 1))
    .reduceByKey(lambda a, b: a + b)
    .sortBy(lambda x: x[1], ascending=False)
)

print("\nTop words (length > 3):")
for word, count in file_word_counts.take(10):
    print(f"{word}: {count}")

=== Reading Text Files ===
Number of lines: 5

Content:
1. Apache Spark is a multi-language engine for executing data engineering, data science, and machine learning on single-node machines or clusters.
2. Spark offers over 80 high-level operators that make it easy to build parallel apps.
3. You can use it interactively from the Scala, Python, R, and SQL shells.
4. Spark powers a stack of libraries including SQL and DataFrames, MLlib for machine learning, GraphX, and Spark Streaming.
5. You can combine these libraries seamlessly in the same application.

Top words (length > 3):
spark: 4
machine: 2
learning: 2
data: 2
libraries: 2
apache: 1
multi-language: 1
engineering: 1
science: 1
machines: 1


In [33]:
# Reading CSV files (basic approach with RDDs)
csv_rdd = sc.textFile("data/people.csv")

print("=== Reading CSV Files ===")
# Skip header and parse CSV
header = csv_rdd.first()
data_rdd = csv_rdd.filter(lambda line: line != header)

print(f"Header: {header}")
print(f"Data rows: {data_rdd.count()}")

# Parse CSV rows
parsed_rdd = data_rdd.map(lambda line: line.split(","))
print("\nParsed data:")
for row in parsed_rdd.collect():
    print(row)

# Calculate average salary
salaries = parsed_rdd.map(lambda row: int(row[3]))
avg_salary = salaries.reduce(lambda a, b: a + b) / salaries.count()
print(f"\nAverage salary: ${avg_salary:,.2f}")

# Find people in specific cities
cities_rdd = parsed_rdd.map(lambda row: (row[2], row[0]))  # (city, name)
people_by_city = cities_rdd.groupByKey().mapValues(list)
print("\nPeople by city:")
for city, people in people_by_city.collect():
    print(f"{city}: {people}")

=== Reading CSV Files ===
Header: name,age,city,salary
Data rows: 6

Parsed data:
['Alice', '25', 'New York', '50000']
['Bob', '30', 'San Francisco', '75000']
['Charlie', '35', 'Chicago', '60000']
['Diana', '28', 'Boston', '55000']
['Eve', '32', 'Seattle', '70000']
['Eve', '34', 'Chicago', '100000']

Average salary: $68,333.33

People by city:
New York: ['Alice']
Boston: ['Diana']
Seattle: ['Eve']
San Francisco: ['Bob']
Chicago: ['Charlie', 'Eve']


**🎯 Exercise Checkpoint 4**: Create your own small text file with some content and try reading it with Spark. Implement a character count (instead of word count).

---

## 9. Performance Monitoring and Debugging

Let's learn how to monitor and debug Spark applications:

In [36]:
# Create a deliberately inefficient computation
def inefficient_example():
    data = sc.parallelize(range(1, 10000000), 1)  # Only 1 partition - inefficient!

    # Multiple unnecessary actions
    print(f"Count: {data.count()}")
    print(f"Sum: {data.sum()}")
    print(f"Max: {data.max()}")
    print(f"Min: {data.min()}")

    return data


print("=== Inefficient Example ===")
start_time = time.time()
inefficient_rdd = inefficient_example()
inefficient_time = time.time() - start_time
print(f"Inefficient approach time: {inefficient_time:.2f}s")

=== Inefficient Example ===
Count: 9999999
Sum: 49999995000000


                                                                                

Max: 9999999
Min: 1
Inefficient approach time: 1.53s


                                                                                

In [37]:
# More efficient approach
def efficient_example():
    data = sc.parallelize(range(1, 10000000), 8)  # Multiple partitions

    # Cache the data since we'll use it multiple times
    data.cache()

    # Force materialization
    data.count()

    # Now use the cached data
    print(f"Count: {data.count()}")
    print(f"Sum: {data.sum()}")
    print(f"Max: {data.max()}")
    print(f"Min: {data.min()}")

    data.unpersist()
    return data


print("\n=== Efficient Example ===")
start_time = time.time()
efficient_rdd = efficient_example()
efficient_time = time.time() - start_time
print(f"Efficient approach time: {efficient_time:.2f}s")
print(f"Speedup: {inefficient_time/efficient_time:.2f}x")


=== Efficient Example ===
Count: 9999999
Sum: 49999995000000
Max: 9999999
Min: 1
Efficient approach time: 0.89s
Speedup: 1.72x


In [38]:
# Debugging tips
debug_rdd = sc.parallelize([1, 2, 3, 4, 5], 2)

print("=== Debugging Information ===")
print(f"RDD ID: {debug_rdd.id()}")
print(f"Name: {debug_rdd.name()}")
print(f"Partitions: {debug_rdd.getNumPartitions()}")
print(f"Storage level: {debug_rdd.getStorageLevel()}")

# Set a name for easier identification in UI
debug_rdd.setName("MyDebugRDD")
debug_rdd.cache()
print(f"New name: {debug_rdd.name()}")

# Use glom() to see partition contents
print(f"Partition contents: {debug_rdd.glom().collect()}")


# Use mapPartitionsWithIndex to see partition processing
def debug_partition(index, iterator):
    data = list(iterator)
    print(f"Processing partition {index} with data: {data}")
    return data


result = debug_rdd.mapPartitionsWithIndex(debug_partition).collect()
print(f"Final result: {result}")

=== Debugging Information ===
RDD ID: 190
Name: None
Partitions: 2
Storage level: Serialized 1x Replicated
New name: MyDebugRDD
Partition contents: [[1, 2], [3, 4, 5]]
Final result: [1, 2, 3, 4, 5]


Processing partition 1 with data: [3, 4, 5]
Processing partition 0 with data: [1, 2]


**🎯 Exercise Checkpoint 5**: Visit the Spark UI and explore:
- Jobs tab: See completed jobs
- Stages tab: See how jobs were broken into stages
- Storage tab: See cached RDDs
- Executors tab: See resource utilization

---

## 10. Summary and Key Takeaways

Congratulations! You've completed your first Spark lesson. Let's summarize what you learned:

In [39]:
# Final summary demonstration
print("🎉 LESSON 1 COMPLETE! 🎉")
print("\n=== What You've Learned ===")
print("✅ Spark architecture and components")
print("✅ Creating and manipulating RDDs")
print("✅ Transformations vs Actions")
print("✅ Lazy evaluation principles")
print("✅ Common RDD operations")
print("✅ Working with key-value pairs")
print("✅ Text processing and word count")
print("✅ Caching and persistence")
print("✅ RDD lineage and fault tolerance")
print("✅ Reading data from files")
print("✅ Performance monitoring basics")

print("\n=== Key Concepts to Remember ===")
print("🔑 RDDs are immutable and distributed")
print("🔑 Transformations are lazy, actions are eager")
print("🔑 Use caching for RDDs accessed multiple times")
print("🔑 Spark automatically handles fault tolerance")
print("🔑 Partitioning affects performance")
print("🔑 Monitor your jobs using Spark UI")

print("\n=== Next Steps ===")
print("📚 Complete the exercises in the exercises/ folder")
print("🧪 Run the validation tests")
print("🚀 Ready for Lesson 2: DataFrames and Spark SQL")

# Final application statistics
print("\n=== Session Statistics ===")
print(f"Spark Version: {spark.version}")
print(f"Application ID: {sc.applicationId}")
print(f"Total Cores: {sc.defaultParallelism}")
print("Spark UI: http://localhost:4040")

🎉 LESSON 1 COMPLETE! 🎉

=== What You've Learned ===
✅ Spark architecture and components
✅ Creating and manipulating RDDs
✅ Transformations vs Actions
✅ Lazy evaluation principles
✅ Common RDD operations
✅ Working with key-value pairs
✅ Text processing and word count
✅ Caching and persistence
✅ RDD lineage and fault tolerance
✅ Reading data from files
✅ Performance monitoring basics

=== Key Concepts to Remember ===
🔑 RDDs are immutable and distributed
🔑 Transformations are lazy, actions are eager
🔑 Use caching for RDDs accessed multiple times
🔑 Spark automatically handles fault tolerance
🔑 Partitioning affects performance
🔑 Monitor your jobs using Spark UI

=== Next Steps ===
📚 Complete the exercises in the exercises/ folder
🧪 Run the validation tests
🚀 Ready for Lesson 2: DataFrames and Spark SQL

=== Session Statistics ===
Spark Version: 3.5.7
Application ID: local-1759052621348
Total Cores: 10
Spark UI: http://localhost:4040


## Cleanup

Always clean up your Spark session when you're done:

In [None]:
# Clean up resources
spark.stop()
print("Spark session stopped. ✅")
print("\nGreat job completing Lesson 1! 🎊")
print("Don't forget to check out the exercises and run the validation tests.")