# Big Data Processing with MapReduce

This notebook will guide you through the practical implementation and analysis of the MapReduce algorithm. You will start with a simple, single-threaded approach, build a parallel version, and compare their performance.

### Setup

First, let's make sure you have the necessary libraries and data. Run the cell below to download the text for *Moby Dick*.

In [2]:
# Let's import the libraries we'll need for this workshop
import time
import re
from collections import Counter, defaultdict
import multiprocessing
import matplotlib.pyplot as plt
import requests
import os

# Download the file if it doesn't exist
file_path = 'moby_dick.txt'
if not os.path.exists(file_path):
    print(f"Downloading Moby Dick to {file_path}...")
    url = 'https://www.gutenberg.org/ebooks/2701.txt.utf-8'
    try:
        response = requests.get(url)
        response.raise_for_status() # Raise an exception for bad status codes
        with open(file_path, 'w', encoding='utf-8') as f:
            f.write(response.text)
        print("Download complete.")
    except requests.exceptions.RequestException as e:
        print(f"Error downloading file: {e}")
else:
    print(f"{file_path} already exists.")

Downloading Moby Dick to moby_dick.txt...
Download complete.


---

## Exercise 1: Baseline Performance - Serial Word Count

**Task:** Before optimizing, we need a baseline. Complete the `serial_word_count` function below. It should:
1.  Read the entire content of the `moby_dick.txt` file.
2.  Use regular expressions (`re.findall`) to find all words. A good pattern is `r'\b[a-z]{3,}\b'`, which finds words of 3 or more lowercase letters. Remember to convert the text to lowercase first.
3.  Use `collections.Counter` to count the frequency of each word.
4.  Return the Counter object.

The code to time and execute the function is already provided.

In [4]:
def serial_word_count(file_path):
    """
    Reads a text file and counts the frequency of words in a serial manner.
    """
    try:
        with open(file_path, 'r', encoding='utf-8') as f:
            text = f.read().lower()
    except FileNotFoundError:
        print(f"Error: {file_path} not found. Please run the setup cell first.")
        return None

    # TODO: Find all words using a regular expression.
    # The pattern r'\b[a-z]{3,}\b' is a good starting point.
    words = re.findall(r'\b[a-z]{3,}\b', text)

    # TODO: Use collections.Counter to count the words.
    word_counts = Counter(words)

    return word_counts

# --- Execution and Timing ---
print("Starting serial word count...")
start_time_serial = time.perf_counter()
s_word_counts = serial_word_count(file_path)
end_time_serial = time.perf_counter()

serial_execution_time = end_time_serial - start_time_serial

print(f"Serial execution took: {serial_execution_time:.4f} seconds")
# Optional: Print the 10 most common words to verify
if s_word_counts:
    print("10 most common words:", s_word_counts.most_common(10))

Starting serial word count...
Serial execution took: 0.1498 seconds
10 most common words: [('the', 14715), ('and', 6514), ('that', 3081), ('his', 2530), ('but', 1822), ('with', 1769), ('was', 1647), ('for', 1644), ('all', 1543), ('this', 1437)]


---

## Exercise 2: Parallel Processing with MapReduce

**Task:** Now, implement the same word count logic using a MapReduce approach. We will use the `multiprocessing` library to parallelize the "Map" step.

### 2.1: Prepare Data Chunks
First, we need to split our data into chunks to be processed in parallel.

In [8]:
# Determine the number of processes to use (usually the number of CPU cores)
# You can change this number to see how it affects performance!
NUM_PROCESSES = multiprocessing.cpu_count()
print(f"Using {NUM_PROCESSES} processes.")

def chunkify_text(file_path, num_chunks):
    """Splits the text into a specified number of chunks."""
    try:
        with open(file_path, 'r', encoding='utf-8') as f:
            text = f.read()
        
        chunk_size = len(text) // num_chunks
        chunks = [text[i:i+chunk_size] for i in range(0, len(text), chunk_size)]
        return chunks
    except FileNotFoundError:
        print(f"Error: {file_path} not found.")
        return []

# Create the chunks for our mappers
text_chunks = chunkify_text(file_path, NUM_PROCESSES)
if text_chunks:
    print(f"Split text into {len(text_chunks)} chunks.")

Using 16 processes.
Split text into 17 chunks.


### 2.2: Implement the `Mapper` and `Reducer`

**Task:** Complete the `mapper` and `reducer` functions below.
* **`mapper`**: This function will receive one chunk of text. It should perform the same logic as the serial version (lowercase, find words) but instead of counting, it should return a list of `(word, 1)` tuples.
* **`reducer`**: This function takes the grouped (shuffled) data, which is a list of `(word, [1, 1, 1, ...])` tuples. It should sum the list of ones for each word and return the final `(word, count)` tuple.

In [13]:
def mapper(text_chunk):
    """
    Processes a chunk of text, finds words, and returns a list of (word, 1) tuples.
    """
    # TODO: Convert the chunk to lowercase and find all words.
    text_chunk = text_chunk.lower()
    words = re.findall(r'\b[a-z]{3,}\b', text_chunk)
    
    # TODO: Create a list of (word, 1) tuples and return it.
    mapped_results = [(word, 1) for word in words]
    return mapped_results


def reducer(item):
    """
    Reduces a key and its list of values to a single value.
    Takes one item from the shuffled list, e.g., ('whale', [1, 1, 1, 1]).
    """
    word, counts = item
    # TODO: Return a tuple of the word and the sum of its counts.
    return (word, sum(counts))

### 2.3: Orchestrate Map, Shuffle, and Reduce

**Task:** Now, put all the pieces together. The steps are:
1.  **Map:** Use `multiprocessing.Pool` to apply the `mapper` function to each `text_chunk` in parallel.
2.  **Shuffle:** The results from the mappers will be a list of lists (e.g., `[ [('a', 1)], [('b', 1), ('a', 1)] ]`). You need to collect all these `(key, value)` pairs and group them by key. A `defaultdict(list)` is perfect for this.
3.  **Reduce:** Use the pool to apply the `reducer` function to the shuffled data.

In [None]:
# --- Execution and Timing ---
mr_word_counts = {}
mapreduce_execution_time = 0
if text_chunks: # Only run if chunks were created successfully
    print("Starting MapReduce word count...")
    start_time_mapreduce = time.perf_counter()

    # --- MAP STAGE ---
    # Create a pool of worker processes
    with multiprocessing.Pool(processes=NUM_PROCESSES) as pool:
        # This applies the mapper function to each chunk in parallel
        mapped_results = pool.map(mapper, text_chunks)

    # --- SHUFFLE STAGE ---
    # The result is a list of lists; we need to flatten it and group by key.
    shuffled_data = defaultdict(list)
    # Flatten the list of lists into a single list of tuples
    for result_list in mapped_results:
        for key, value in result_list:
            shuffled_data[key].append(value)

    # --- REDUCE STAGE ---
    with multiprocessing.Pool(processes=NUM_PROCESSES) as pool:
        # pool.map works on an iterable, so we pass shuffled_data.items()
        reduced_results = pool.map(reducer, shuffled_data.items())

    # Final result is a list of tuples, which can be converted to a dictionary
    mr_word_counts = dict(reduced_results)

    end_time_mapreduce = time.perf_counter()
    mapreduce_execution_time = end_time_mapreduce - start_time_mapreduce

    print(f"MapReduce execution took: {mapreduce_execution_time:.4f} seconds")

    # Optional: Print the 10 most common words to verify they match the serial version
    if mr_word_counts:
        # Sort the dictionary by value to find the most common words
        sorted_counts = sorted(mr_word_counts.items(), key=lambda item: item[1], reverse=True)
        print("10 most common words:", sorted_counts[:10])
else:
    print("Skipping MapReduce execution because data chunks were not created.")


Starting MapReduce word count...


Process SpawnPoolWorker-1:
Process SpawnPoolWorker-2:
Process SpawnPoolWorker-3:
Traceback (most recent call last):
Traceback (most recent call last):
Process SpawnPoolWorker-4:
Process SpawnPoolWorker-5:
Process SpawnPoolWorker-6:
Traceback (most recent call last):
Traceback (most recent call last):
Process SpawnPoolWorker-7:
Process SpawnPoolWorker-8:
Traceback (most recent call last):
Traceback (most recent call last):
Traceback (most recent call last):
Process SpawnPoolWorker-9:
  File "/opt/anaconda3/lib/python3.12/multiprocessing/process.py", line 314, in _bootstrap
    self.run()
  File "/opt/anaconda3/lib/python3.12/multiprocessing/process.py", line 108, in run
    self._target(*self._args, **self._kwargs)
  File "/opt/anaconda3/lib/python3.12/multiprocessing/pool.py", line 114, in worker
    task = get()
           ^^^^^
  File "/opt/anaconda3/lib/python3.12/multiprocessing/queues.py", line 389, in get
    return _ForkingPickler.loads(res)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^

---

## Exercise 3: Performance Comparison and Visualization

**Task:** Now that you have timings for both methods, create a bar chart to visually compare them.

1.  Store your two timing results (`serial_execution_time` and `mapreduce_execution_time`).
2.  Use `matplotlib.pyplot` to create a bar chart showing the difference.
3.  Label your chart and axes clearly.

In [None]:
# Data for plotting
methods = ['Serial Processing', f'MapReduce ({NUM_PROCESSES} Cores)']
times = [serial_execution_time, mapreduce_execution_time]

# Create the bar plot
plt.figure(figsize=(8, 6))
bars = plt.bar(methods, times, color=['skyblue', 'lightgreen'])
plt.ylabel('Execution Time (seconds)')
plt.title('Performance Comparison: Serial vs. MapReduce')

# Add the time values on top of the bars
for bar in bars:
    yval = bar.get_height()
    plt.text(bar.get_x() + bar.get_width()/2.0, yval, f'{yval:.4f}s', va='bottom', ha='center')

plt.show()

# Print the speedup factor
if mapreduce_execution_time > 0:
    speedup = serial_execution_time / mapreduce_execution_time
    print(f"\nMapReduce was {speedup:.2f}x faster than the serial implementation.")

---

## Exercise 4: Conceptual Questions on Hadoop

**Task:** Based on your reading from Part B of the handout, answer the following questions in the markdown cell below. Double-click the cell to edit it.

### Questions:

**1. What is the Hadoop Distributed File System (HDFS)? What problem does it solve?**

*Your answer here.*

**2. What are the roles of the `NameNode` and `DataNode` in HDFS?**

*Your answer here.*

**3. What is YARN (Yet Another Resource Negotiator)? What is its role in the Hadoop ecosystem?**

*Your answer here.*

**4. How does running MapReduce on Hadoop differ from our simple Python implementation?**

*Your answer here.*

---

## Exercise 5: Looking Ahead - Final Thoughts

**Task:** In the markdown cell below, briefly brainstorm one or two ideas for how you could use the MapReduce pattern in your next major assignment. Think about the dataset you might use and what kind of pre-processing or aggregation would be necessary and time-consuming.

### My Ideas for the Next Assignment:

**Dataset Idea:** *e.g., A large dataset of customer reviews (millions of rows).*

**MapReduce Application:** *e.g., Use MapReduce to process all reviews in parallel to calculate the average review score per product ID. The mapper would emit `(product_id, score)` and the reducer would calculate the average for each product. This pre-computed data would make visualizing top/bottom products much faster.*

---

**Dataset Idea:** *Your idea here...*

**MapReduce Application:** *Your idea here...*