# Lesson 11: MapReduce Implementation in Python

In our previous lesson, we discussed the theory behind MapReduce. Today, we're going to make it concrete by building a simple simulator in Python. This will help you understand the data flow and the roles of the `map` and `reduce` functions without the complexity of a real distributed system.

We will create a reusable "framework" that handles the simulation, and then we will write `map` and `reduce` functions to solve two different problems:
1. The classic **Word Count** problem.
2. A more complex problem from a **Lithuanian state exam**, which involves scheduling and data aggregation.

## 1. Our Simulated MapReduce Framework

We have created a Python file named `map_reduce_framework.py`. This file contains a single function, `run_map_reduce`, which simulates the entire process. Let's look at what it does:

1.  **Takes Inputs**: It accepts an input file, a mapper function, a reducer function, and an output directory.
2.  **Setup**: It creates the output directory and a subdirectory for intermediate files (`/intermediate`).
3.  **Map Phase**: It reads the input file line by line, applies the `mapper` function to each line, and collects all the emitted `(key, value)` pairs. The result is saved to `intermediate/mapped_data.json`.
4.  **Shuffle & Sort Phase**: It takes the list of mapped pairs and groups all values by their key into a dictionary. This crucial step is saved to `intermediate/shuffled_data.json`.
5.  **Reduce Phase**: It iterates through the shuffled data, applying the `reducer` function to each key and its list of values. The final results are collected.
6.  **Output**: The final results are saved to `results.json` in the specified output directory.

**Your job as a developer is to focus only on writing the `mapper` and `reducer` functions!**

## 2. Example 1: Word Count

Let's start with the classic example. We have a file `word_count_example.py` that imports our framework and defines the logic for counting words. 

Here is the core logic:

In [None]:
def word_count_mapper(line):
    words = line.strip().lower().split()
    for word in words:
        yield (word, 1)

def word_count_reducer(key, values):
    yield (key, sum(values))

Now, let's run it from our notebook!

In [None]:
from map_reduce_framework import run_map_reduce
from word_count_example import word_count_mapper, word_count_reducer

input_file = 'data/word_count_input.txt'
output_dir = 'output/word_count_results'

print(f"Running Word Count example...")
final_results = run_map_reduce(input_file, word_count_mapper, word_count_reducer, output_dir)
print(f"\nWord Count finished! Results are in '{output_dir}'.")
print("Final counts:")
for key, value in sorted(final_results):
    print(f"{key}: {value}")

## 3. Example 2: Lithuanian State Exam Problem

This problem is more complex and demonstrates the power of MapReduce for data aggregation.

### Problem Description (Translated and Simplified)

Saulius wants to organize a party in a gaming room. We are given two sets of data:
1.  **Available Room Times**: The days and hours when the gaming room is free.
2.  **Friends' Preferred Times**: The days and hours when each of Saulius's friends can come.

**Goal**: Find all the time slots when the room is available AND more than 3 friends can come. For each of these time slots, we need to output the day, hour, the number of friends, and an alphabetized list of their names. The final list of time slots should be sorted by the number of friends in descending order.

### Solution with MapReduce
We have implemented the solution in `exam_problem_solver.py`.

* **Mapper**: The mapper reads each line. If it's a room time, it emits `((day, hour), ('ROOM_SLOT', True))`. If it's a friend's time, it emits `((day, hour), ('FRIEND', friend_name))`. The key is the time slot tuple.

* **Reducer**: The reducer receives a time slot and a list of all associated events (e.g., `[('ROOM_SLOT', True), ('FRIEND', 'Vidas'), ('FRIEND', 'Rasa')]`). It checks if a `ROOM_SLOT` exists and counts the friends. If the count is greater than 3, it emits the final result for that time slot.

Let's run the solution.

In [None]:
from exam_problem_solver import exam_mapper, exam_reducer, format_exam_results

input_file_exam = 'data/exam_input.txt'
output_dir_exam = 'output/exam_results'

print(f"\nRunning Exam Problem solver...")
exam_raw_results = run_map_reduce(input_file_exam, exam_mapper, exam_reducer, output_dir_exam)

# The final formatting and sorting is done outside the MapReduce job, which is common
format_exam_results(exam_raw_results, output_dir_exam)
print(f"Exam Problem finished! Final formatted results are in '{output_dir_exam}/final_results.txt'.")

## 4. Additional Practice Exercises

1.  **Average Rating**: Imagine a file where each line is `movie_id,rating` (e.g., `movie123,4.5`). Write a MapReduce job to calculate the average rating for each movie.
    * **Mapper**: Emits `(movie_id, rating)`.
    * **Reducer**: Receives `(movie_id, [5.0, 4.0, 4.5, ...])` and calculates the average.

2.  **Reverse Web-Link Graph**: You have a file where each line is `source_url destination_url`, meaning `source_url` links to `destination_url`. Write a MapReduce job to find, for each URL, which URLs link to it.
    * **Mapper**: Emits `(destination_url, source_url)`.
    * **Reducer**: Receives `(url, [source1, source2, ...])` and simply emits this pair.