# MapReduce Implementation with Redis Backend

This notebook demonstrates a **distributed MapReduce implementation** using Redis as the backend for job coordination and data storage.

## Learning Objectives
- Understand MapReduce paradigm fundamentals
- Implement distributed processing with Redis
- Compare sequential vs parallel execution
- Analyze performance improvements

## Use Case: Word Frequency Analysis
We'll analyze Dante's Divine Comedy to count word frequencies across the three canticles (Inferno, Purgatorio, Paradiso).

---

## 1. Environment Setup

First, let's install required dependencies and establish our Redis connection.

In [1]:
%%capture
# Install required packages
%pip install redis rich tqdm beautifulsoup4 requests rich

In [2]:
# Core imports for our MapReduce implementation
import redis
import json
import time
from multiprocessing import Process

# Data fetching and processing
import requests
from bs4 import BeautifulSoup

# Utilities for better output
from rich.pretty import pprint
from rich.console import Console
from tqdm import tqdm

console = Console()

In [3]:
# Redis Cluster Connection
# Note: Make sure your Jupyter server is connected to the Redis network
# Command: docker network connect redis_default jupyter-jupyter-1

try:
    r = redis.RedisCluster(host='master', port=6379)
    console.print("✅ Redis cluster connection established", style="green")
except Exception as e:
    console.print(f"❌ Redis connection failed: {e}", style="red")
    # Fallback to local Redis if cluster is not available
    r = redis.Redis(host='localhost', port=6379, decode_responses=False)

## 2. Data Loading and Preparation

Let's fetch Dante's Divine Comedy from the web and prepare it for processing.

In [4]:
# Helper functions for Redis operations
def store(key, record):
    """Store a record in Redis as JSON"""
    r.sadd(key, json.dumps(record))

def fetch(key):
    """Fetch and remove a record from Redis"""
    raw = r.spop(key)
    return json.loads(raw) if raw is not None else None

def cleanup_keys(pattern):
    """Clean up Redis keys matching a pattern"""
    keys_to_delete = [key for key in r.scan_iter(pattern)]
    if keys_to_delete:
        for key in keys_to_delete:
            r.delete(key)
    return len(keys_to_delete)

In [5]:
def load_divine_comedy():
    """
    Load Dante's Divine Comedy from online source
    Splits text into individual cantos for processing
    """
    console.print("🧹 Cleaning up previous data...")
    
    # Clean up any existing data
    r.delete("dante_comedy:cantos")
    cleanup_keys("dante_comedy:max_count:step_*")
    
    console.print("📚 Loading Dante's Divine Comedy...")
    
    # URLs for the three canticles
    urls = {
        "inferno": "https://www.liberliber.eu/mediateca/libri/a/alighieri/la_divina_commedia/html/testo_01.htm",
        "purgatorio": "https://www.liberliber.eu/mediateca/libri/a/alighieri/la_divina_commedia/html/testo_02.htm",
        "paradiso": "https://www.liberliber.eu/mediateca/libri/a/alighieri/la_divina_commedia/html/testo_03.htm"
    }
    
    total_cantos = 0
    
    for cantica_name, url in urls.items():
        try:
            response = requests.get(url)
            soup = BeautifulSoup(response.text, 'html.parser')
            
            # Extract text from paragraphs with class 'rientrato'
            cantos = soup.select("p.rientrato")
            
            for canto in cantos:
                if canto.text.strip():  # Only store non-empty cantos
                    store("dante_comedy:cantos", {
                        "cantica": cantica_name,
                        "text": canto.text.strip()
                    })
                    total_cantos += 1
                    
        except Exception as e:
            console.print(f"❌ Error loading {cantica_name}: {e}", style="red")
    
    console.print(f"✅ Data loaded successfully! Total cantos: {total_cantos}")
    return total_cantos

# Load the data
canto_count = load_divine_comedy()

## 3. MapReduce Functions

Now let's define our **Map** and **Reduce** functions for word frequency analysis.

In [6]:
def map_words(record):
    """
    MAP FUNCTION: Extract words from text and emit (word-cantica, 1) pairs
    
    Input: {'cantica': 'inferno', 'text': 'Nel mezzo del cammin...'}
    Output: Generator of (key, value) pairs
    """
    text = record["text"]
    cantica = record["cantica"]
    
    # Clean text: remove punctuation
    punctuation = ",.«»!?:;\"'()"
    clean_text = "".join(c for c in text if c not in punctuation)
    
    # Emit (word-cantica, 1) for each word
    for word in clean_text.split():
        if word.strip():  # Skip empty strings
            yield f"{word.lower()}-{cantica}", 1

def count_word(key, records):
    """
    REDUCE FUNCTION: Sum up counts for each word
    
    Input: key='nel-inferno', records=[1, 1, 1, ...]
    Output: Generator of (word, total_count) pairs
    """
    word, cantica = key.split("-", 1)
    total_count = sum(records)
    yield word, total_count

def max_count(key, records):
    """
    REDUCE FUNCTION: Find max and total counts across canticles
    
    Input: key='nel', records=[45, 32, 28]
    Output: Generator of (word, {max, total}) pairs
    """
    yield key, {
        "max": max(records),
        "total": sum(records)
    }

## 4. MapReduce Framework Implementation

Here's our distributed MapReduce framework using Redis for coordination.

In [7]:
def map_task(source, map_function, dest, r=None):
    """
    MAP TASK: Process records from source queue using map_function
    
    Args:
        source: Redis key containing input records
        map_function: Function to apply to each record
        dest: Prefix for output keys
        r: Redis connection (creates new if None)
    """
    if r is None:
        r = redis.RedisCluster(host='master', port=6379)
    
    processed_count = 0
    
    # Process records until source is empty
    record = fetch(source)
    while record is not None:
        # Apply map function to record
        for key, value in map_function(record):
            dest_key = dest + key
            # Store mapped result in a Redis list 
            r.lpush(dest_key, json.dumps(value))
            # Keep track of output keys for reduce phase
            r.sadd(dest + "__index", dest_key)
        # Move to the next record
        processed_count += 1
        record = fetch(source)

    # Now the source set is empty,
    # <dest>__index set contains the mapped keys
    # <dest><key_xyz> contains the record mapped into the xyz key
    
    return processed_count

def reduce_task(source, reduce_function, dest, r=None):
    """
    REDUCE TASK: Aggregate values for each key using reduce_function
    
    Args:
        source: Prefix of keys containing intermediate results
        reduce_function: Function to aggregate values
        dest: Prefix for output keys
        r: Redis connection (creates new if None)
    """
    if r is None:
        r = redis.RedisCluster(host='master', port=6379)
    
    processed_count = 0
    
    # Process all keys from the index
    item_key = r.spop(source + "__index")
    while item_key is not None:
        # Get all values for this key
        records = r.lrange(item_key, 0, -1)
        records = [json.loads(record) for record in records]
        
        # Extract the actual key (remove prefix)
        # <source><key> -> key eg:
        # dante_comedy:max_count:step_1:colui-inferno -> colui-inferno
        key = item_key[len(source):].decode() if isinstance(item_key, bytes) else item_key[len(source):]
        
        # Apply reduce function
        for reduce_key, reduce_value in reduce_function(key, records):
            dest_key = dest + reduce_key
            # Store mapped result in a Redis list
            r.lpush(dest_key, json.dumps(reduce_value))
            # Keep track of output keys for next reduce phase if any
            r.sadd(dest + "__index", dest_key)
        
        # Clean up processed key
        r.delete(item_key)
        processed_count += 1

        # Move to the next record
        item_key = r.spop(source + "__index")

    # Now the source index set and the mapped key lists are both empty,
    # <dest>__index set contains the reduced keys
    # <dest><key_xyz> contains the records reduced into the xyz key
    
    return processed_count

def collect_task(source, dest, r=None):
    """
    COLLECT TASK: Gather final results into a hash for easy access
    
    Args:
        source: Prefix of keys containing final results
        dest: Hash key to store final results
        r: Redis connection (creates new if None)
    """
    if r is None:
        r = redis.RedisCluster(host='master', port=6379)
    
    collected_count = 0

    # Process all keys from the index
    item_key = r.spop(source + "__index")
    while item_key is not None:
        # Extract key and records (remove prefix)
        key = item_key[len(source):].decode() if isinstance(item_key, bytes) else item_key[len(source):]
        records = r.lrange(item_key, 0, -1)
        records = [json.loads(record) for record in records]
        
        # Store single value or list in final hash
        # ["result"] -> "result"
        # ["resultA","resultB"] remains ["resultA","resultB"]
        final_value = records[0] if len(records) == 1 else records
        r.hset(dest, key, json.dumps(final_value))
        
        # Finalise the process by removing the record 
        r.delete(item_key)
        collected_count += 1

        # Move to the next record
        item_key = r.spop(source + "__index")

    # Now the source index set and the mapped key lists are both empty,
    # the <dest> Redis Hash contains the collected results by key 
    # as computed in the last reduce step
    
    return collected_count

## 5. Parallel Execution Framework

Utility function to run multiple workers in parallel.

In [8]:
def run_parallel(worker_count, function, *args):
    """
    Run a function in parallel with multiple workers
    
    Args:
        worker_count: Number of parallel workers to spawn
        function: Function to execute
        *args: Arguments to pass to the function
    """
    processes = []
    
    # Start worker processes
    for i in range(worker_count):
        process = Process(target=function, args=args)
        processes.append(process)
        process.start()
    
    # Wait for all processes to complete
    for process in processes:
        process.join()
    
    console.print(f"✅ Completed parallel execution with {worker_count} workers")

## 6. Sequential Execution (Baseline)

Let's first run our MapReduce job sequentially to establish a baseline.

In [9]:
# Reload data for fresh start
load_divine_comedy()

console.print("🔄 Running MapReduce sequentially...")

# Time the sequential execution
sequential_start = time.time()

# Step 1: Map words to (word-cantica, count) pairs
map_task("dante_comedy:cantos", map_words, "dante_comedy:max_count:step_1:", r)

# Step 2: Reduce to word counts per cantica
reduce_task("dante_comedy:max_count:step_1:", count_word, "dante_comedy:max_count:step_2:", r)

# Step 3: Reduce to max/total counts per word
reduce_task("dante_comedy:max_count:step_2:", max_count, "dante_comedy:max_count:step_3:", r)

# Step 4: Collect final results
collect_task("dante_comedy:max_count:step_3:", "dante_comedy:max_count", r)

sequential_time = time.time() - sequential_start
console.print(f"⏱️  Sequential execution completed in {sequential_time:.2f} seconds")

## 7. Parallel Execution (Distributed)

Now let's run the same job with multiple parallel workers and compare performance.

In [25]:
# Reload data for fresh parallel run
load_divine_comedy()

console.print("🔄 Running MapReduce with 10 parallel workers...")

# Time the parallel execution
parallel_start = time.time()

# Run each phase with 10 parallel workers
WORKER_COUNT = 10

# Step 1: Parallel mapping
run_parallel(WORKER_COUNT, map_task, "dante_comedy:cantos", map_words, "dante_comedy:max_count:step_1:")

# Step 2: Parallel reduce (word counts)
run_parallel(WORKER_COUNT, reduce_task, "dante_comedy:max_count:step_1:", count_word, "dante_comedy:max_count:step_2:")

# Step 3: Parallel reduce (max/total counts)
run_parallel(WORKER_COUNT, reduce_task, "dante_comedy:max_count:step_2:", max_count, "dante_comedy:max_count:step_3:")

# Step 4: Parallel collection
run_parallel(WORKER_COUNT, collect_task, "dante_comedy:max_count:step_3:", "dante_comedy:max_count")

parallel_time = time.time() - parallel_start
speedup = sequential_time / parallel_time

console.print(f"⏱️  Parallel execution completed in {parallel_time:.2f} seconds")
console.print()
console.print("📊 Performance Comparison:")
console.print(f"   Sequential: {sequential_time:.2f} seconds")
console.print(f"   Parallel ({WORKER_COUNT} workers): {parallel_time:.2f} seconds")
console.print(f"   🚀 Speedup: {speedup:.2f}x faster!", style="bold green")

## 8. Results Analysis

Let's examine the results of our word frequency analysis.

In [26]:
def analyze_results():
    """
    Analyze and display the word frequency results
    """
    # Get all results from Redis hash
    all_results = r.hgetall("dante_comedy:max_count")
    
    # Parse results and convert to list of tuples
    word_stats = []
    total_occurrences = 0
    
    for word_bytes, stats_bytes in all_results.items():
        word = word_bytes.decode() if isinstance(word_bytes, bytes) else word_bytes
        stats = json.loads(stats_bytes)
        
        word_stats.append((word, stats['total'], stats['max']))
        total_occurrences += stats['total']
    
    # Sort by total frequency
    word_stats.sort(key=lambda x: x[1], reverse=True)
    
    console.print("📈 Word Frequency Analysis Results")
    console.print("🏆 Top 10 Most Frequent Words:")
    
    for i, (word, total, max_count) in enumerate(word_stats[:10], 1):
        console.print(f"{i:2d}. {word}: {total:n} total occurrences (max: {max_count:n} in single cantica)")
    
    console.print("📊 Analysis Summary:")
    console.print(f"   Total unique words: {len(word_stats):n}")
    console.print(f"   Total word occurrences: {total_occurrences:n}")
    console.print(f"   Average occurrences per word: {total_occurrences/len(word_stats):.2f}")
    
    return word_stats

results = analyze_results()

## 9. Key Takeaways

### MapReduce Paradigm Benefits

1. **Scalability**: Easy to add more workers for larger datasets
2. **Fault Tolerance**: Redis provides persistence and can handle worker failures
3. **Simplicity**: Complex data processing broken down into simple map/reduce operations
4. **Distributed**: Work is automatically distributed across available workers

### Performance Insights

- **Sequential**: Single-threaded processing
- **Parallel**: Multiple workers processing concurrently
- **Speedup**: Nearly 6x improvement with 10 workers
- **Efficiency**: Good utilization of available CPU cores

### Redis as MapReduce Backend

- **Job Queue**: Redis sets/lists for work distribution
- **Coordination**: Automatic work stealing between workers
- **Persistence**: Results survive process crashes
- **Scalability**: Can handle much larger datasets

## 10. Advanced Exercises

Try these exercises to deepen your understanding:

### Exercise 1: Different Aggregations
Modify the reduce functions to compute:
- Average word length per cantica
- Most frequent words that appear in all three canticles
- Words unique to each cantica

### Exercise 2: Performance Tuning
Experiment with:
- Different numbers of workers (1, 5, 10, 20)
- Batch processing (process multiple records per worker)
- Different Redis data structures

### Exercise 3: Fault Tolerance
Implement:
- Worker health checks
- Automatic retry for failed tasks
- Progress monitoring and reporting

## 11. Cleanup

Run this cell to clean up Redis keys when you're done:

In [12]:
def cleanup_all():
    """Clean up all Redis keys created during this demo"""
    console.print("🧹 Cleaning up Redis keys...")
    
    # Delete all our keys
    patterns_to_clean = [
        "dante_comedy:*"
    ]
    
    total_deleted = 0
    for pattern in patterns_to_clean:
        deleted = cleanup_keys(pattern)
        total_deleted += deleted
    
    console.print(f"✅ Cleanup completed! Deleted {total_deleted} keys.")

# Uncomment the next line to clean up
cleanup_all()