# Part B: MapReduce-Style Architecture Sentiment Analyzer

This notebook implements sentiment analysis using a **MapReduce-style architecture**.
The processing follows the Map → Shuffle/Group → Reduce pattern to simulate distributed data processing.

**Architectural characteristics:**
- Parallel processing model
- Data partitioning and grouping
- Separate Map, Shuffle, and Reduce phases
- Scalable design pattern

## 0) Dataset Path (edit if needed)

In [None]:
DATA_PATH = 'data/sample_us_posts.txt'  # TODO: change path if needed
print('Using dataset:', DATA_PATH)

## 1) Keywords - Load from CSV

In [None]:
import csv

# Load keywords from CSV file
def load_keywords():
    pos_keywords = set()
    neg_keywords = set()
    
    with open('keywords.csv', 'r', encoding='utf-8') as f:
        reader = csv.DictReader(f)
        for row in reader:
            keyword = row['keyword'].lower()
            sentiment = row['sentiment'].lower()
            
            if sentiment == 'positive':
                pos_keywords.add(keyword)
            elif sentiment == 'negative':
                neg_keywords.add(keyword)
    
    return pos_keywords, neg_keywords

POS, NEG = load_keywords()
print('POS:', POS, '\nNEG:', NEG)

## 2) Map Phase - Implement map_post Function

The **Map** phase processes each line independently and emits key-value pairs.
Each mapper classifies one post and emits `(sentiment_label, 1)`.

In [None]:
def map_post(line: str):
    """Map function: classify one line and emit (label, 1)
    
    This simulates a distributed mapper that processes one post
    and emits a key-value pair for the shuffle phase.
    """
    # 1) Tokenize: lowercase and split into words
    words = set(line.lower().split())
    
    # 2) Check keyword presence
    has_positive = bool(words & POS)
    has_negative = bool(words & NEG)
    
    # 3) Classify based on rules and emit (label, count)
    if has_positive and has_negative:
        return [('Mixed', 1)]
    elif has_positive:
        return [('Positive', 1)]
    elif has_negative:
        return [('Negative', 1)]
    else:
        return [('Neutral', 1)]

# Test the map function
test_cases = [
    "I am so happy today!",
    "I feel sad and depressed.",
    "I love this but I'm also upset.",
    "The weather is nice."
]

print("Testing map_post function:")
for test in test_cases:
    result = map_post(test)
    print(f'  "{test}" -> {result}')

## 3) Driver: Map Over Lines

This phase simulates multiple mappers processing the dataset in parallel.
Each line is processed by the map function to generate key-value pairs.

In [None]:
# Map phase: process all lines and collect (label, 1) pairs
mapped = []
line_count = 0

print(f"Map phase: Processing {DATA_PATH}...")

with open(DATA_PATH, 'r', encoding='utf-8') as f:
    for line in f:
        line = line.strip()
        if line:  # Skip empty lines
            pairs = map_post(line)
            mapped.extend(pairs)
            line_count += 1

print(f"Map phase completed: {line_count} lines processed")
print(f"Total mapped pairs: {len(mapped)}")
print('First few mapped items:', mapped[:10])

## 4) Shuffle/Group Phase

The **Shuffle** phase groups all values by their keys (sentiment labels).
This simulates the distributed grouping that happens between Map and Reduce phases.

In [None]:
# Shuffle/Group: group values by label
print("Shuffle phase: Grouping mapped pairs by sentiment label...")

groups = {}
for (label, value) in mapped:
    if label not in groups:
        groups[label] = []
    groups[label].append(value)

print('Shuffle phase completed')
print('Groups (label -> count of values):', {k: len(v) for k, v in groups.items()})
print('Sample from groups:')
for label, values in groups.items():
    print(f'  {label}: {values[:5]}...' if len(values) > 5 else f'  {label}: {values}')

## 5) Reduce Phase

The **Reduce** phase aggregates values for each key (sentiment label).
Each reducer sums up all the counts for its assigned sentiment category.

In [None]:
# Reduce: sum values for each label
print("Reduce phase: Aggregating counts for each sentiment label...")

totals = {}
for label, values in groups.items():
    totals[label] = sum(values)
    print(f"  Reducer for '{label}': {len(values)} values -> total = {totals[label]}")

print('Reduce phase completed')
print('Final totals:', totals)

## 6) Print Counts and Verdict

Generate the final output in the required format and compute the overall verdict.

In [None]:
# Print counts and compute verdict
positive = totals.get('Positive', 0)
negative = totals.get('Negative', 0)
mixed = totals.get('Mixed', 0)
neutral = totals.get('Neutral', 0)

print("\n" + "="*50)
print("MAPREDUCE RESULTS")
print("="*50)

# Required output format
print(f"Positive={positive} Negative={negative} Mixed={mixed} Neutral={neutral}")

# Compute verdict
if positive > negative:
    verdict = 'Happier'
elif negative > positive:
    verdict = 'Sadder'
else:
    verdict = 'Tied'

print(f"Verdict: {verdict}")

## 7) Optional: Visualization with Charts

In [None]:
try:
    import matplotlib.pyplot as plt
    
    # Chart 1: Sentiment Counts
    labels = ['Positive', 'Negative', 'Mixed', 'Neutral']
    values = [totals.get(label, 0) for label in labels]
    colors = ['green', 'red', 'orange', 'gray']
    
    plt.figure(figsize=(15, 5))
    
    # Subplot 1: Bar chart of counts
    plt.subplot(1, 2, 1)
    bars = plt.bar(labels, values, color=colors, alpha=0.7)
    plt.title('Sentiment Counts (MapReduce Architecture)', fontweight='bold')
    plt.xlabel('Sentiment Category')
    plt.ylabel('Count')
    
    # Add value labels on bars
    for bar, value in zip(bars, values):
        if value > 0:
            plt.text(bar.get_x() + bar.get_width()/2, bar.get_height() + 0.5,
                    str(value), ha='center', va='bottom', fontweight='bold')
    
    plt.grid(axis='y', alpha=0.3)
    
    # Subplot 2: Positive vs Negative percentage
    plt.subplot(1, 2, 2)
    pos_neg_total = positive + negative
    if pos_neg_total > 0:
        pos_pct = (positive / pos_neg_total) * 100
        neg_pct = (negative / pos_neg_total) * 100
        
        plt.pie([pos_pct, neg_pct], 
                labels=[f'Positive\n{positive} ({pos_pct:.1f}%)', 
                       f'Negative\n{negative} ({neg_pct:.1f}%)'],
                colors=['green', 'red'], 
                autopct='', 
                startangle=90,
                alpha=0.7)
        plt.title('Positive vs Negative Distribution', fontweight='bold')
    else:
        plt.text(0.5, 0.5, 'No Positive/Negative\nsentiments found', 
                ha='center', va='center', transform=plt.gca().transAxes,
                fontsize=14, bbox=dict(boxstyle='round', facecolor='lightgray'))
        plt.title('Positive vs Negative Distribution', fontweight='bold')
    
    plt.tight_layout()
    plt.show()
    
except Exception as e:
    print('Chart skipped:', e)

## 8) Configuration Options (Advanced)

MapReduce systems often include configuration knobs for optimization:

In [None]:
# Configuration knobs for MapReduce optimization
USE_COMBINER = True  # Enable local aggregation before shuffle
NUM_REDUCERS = 4     # Simulate multiple reducers (for partitioning)

print(f"Configuration:")
print(f"  USE_COMBINER: {USE_COMBINER}")
print(f"  NUM_REDUCERS: {NUM_REDUCERS}")

if USE_COMBINER:
    print("\n[Combiner Phase - Local Aggregation]")
    print("In a real MapReduce system, combiners would:")
    print("- Reduce network traffic by pre-aggregating locally")
    print("- Run the same logic as reducers but on each mapper node")
    print("- Example: (Positive,1),(Positive,1),(Positive,1) -> (Positive,3)")

if NUM_REDUCERS > 1:
    print(f"\n[Partitioning - {NUM_REDUCERS} Reducers]")
    print("In a real MapReduce system with multiple reducers:")
    for i in range(NUM_REDUCERS):
        labels_for_reducer = [label for j, label in enumerate(['Positive', 'Negative', 'Mixed', 'Neutral']) 
                             if hash(label) % NUM_REDUCERS == i]
        if labels_for_reducer:
            print(f"  Reducer {i}: handles {labels_for_reducer}")

## 9) Test with Different Dataset

In [None]:
# Test with mixed dataset
MIXED_DATA_PATH = 'data/sample_us_posts_mixed.txt'

def run_mapreduce_pipeline(data_path):
    """Run the complete MapReduce pipeline on a dataset"""
    print(f"\n" + "="*50)
    print(f"RUNNING MAPREDUCE ON: {data_path}")
    print("="*50)
    
    # Map phase
    mapped = []
    with open(data_path, 'r', encoding='utf-8') as f:
        for line in f:
            line = line.strip()
            if line:
                pairs = map_post(line)
                mapped.extend(pairs)
    
    # Shuffle phase
    groups = {}
    for (label, value) in mapped:
        if label not in groups:
            groups[label] = []
        groups[label].append(value)
    
    # Reduce phase
    totals = {}
    for label, values in groups.items():
        totals[label] = sum(values)
    
    # Output
    positive = totals.get('Positive', 0)
    negative = totals.get('Negative', 0)
    mixed = totals.get('Mixed', 0)
    neutral = totals.get('Neutral', 0)
    
    print(f"Positive={positive} Negative={negative} Mixed={mixed} Neutral={neutral}")
    
    if positive > negative:
        verdict = 'Happier'
    elif negative > positive:
        verdict = 'Sadder'
    else:
        verdict = 'Tied'
    
    print(f"Verdict: {verdict}")
    return totals

# Run on mixed dataset if available
try:
    mixed_totals = run_mapreduce_pipeline(MIXED_DATA_PATH)
except FileNotFoundError:
    print(f"Mixed dataset {MIXED_DATA_PATH} not found. Skipping...")

## 10) MapReduce Architecture Summary

**MapReduce Architecture Characteristics:**

### Structure & Responsibilities
- **Distributed design**: Separate Map, Shuffle, and Reduce phases
- **Parallel processing**: Multiple mappers and reducers can run concurrently
- **Data partitioning**: Data is split and processed independently
- **Fault tolerance**: Failed tasks can be restarted on other nodes

### MapReduce Phases
1. **Map**: Process input records independently, emit key-value pairs
2. **Shuffle/Group**: Sort and group intermediate data by key
3. **Reduce**: Aggregate values for each key to produce final results

### Advantages
- **Scalability**: Can handle massive datasets across many machines
- **Parallelism**: Utilizes multiple cores/machines effectively
- **Fault tolerance**: Automatic handling of node failures
- **Flexibility**: Easy to add new processing logic in map/reduce functions

### Limitations
- **Complexity**: More complex to understand and debug
- **Overhead**: Network communication and coordination costs
- **Latency**: Multiple phases introduce processing delays
- **Overkill**: Unnecessary for small datasets

### Use Cases
- Big data processing (> 1TB)
- Distributed computing environments
- Batch processing jobs
- Analytics on large datasets

### Checklist Completed ✓
- [x] Filled POS/NEG keywords from CSV
- [x] Implemented `map_post` function
- [x] Grouped mapped output into `groups` (Shuffle phase)
- [x] Reduced to `totals` (Reduce phase)
- [x] Printed totals and verdict
- [x] Added charts and visualizations
- [x] Tested with multiple datasets
- [x] Added configuration options (combiner, multiple reducers)