# TP : Simplified MapReduce

In this scenario, imagine you’re working for a popular e-commerce site.

Understanding when traffic is at its peak helps inform decisions about server scaling, marketing strategies, and customer support. The MapReduce job here processes large volumes of server logs to extract hourly traffic counts, providing insights into which times see the most user activity.

By analyzing page visit counts, the company can optimize popular pages, adjust advertising placements, and improve user experience on high-traffic pages. This MapReduce job counts the number of times each URL is accessed, revealing the most visited pages.

Understanding user engagement helps tailor site features and marketing efforts. The goal is to calculate the mean session duration for each unique user, indicating how long users are staying and engaging with the website’s content.

By tracking error response codes (like 404 or 500) associated with URLs, the company can prioritize bug fixes and improve user experience on critical pages. This analysis supports backend optimization by isolating pages where users encounter issues.


1. "Using the traffic data, at what time does the website experience its peak load, and how can this insight guide server capacity planning?"

Mapper: Extracts the hour from each timestamp and emits a (hour, 1) pair.
Reducer: Aggregates the counts by hour, outputting {hour: request_count, ...} to show peak traffic times.

2. "What are the top 3 most-visited pages on the site, and how could this information affect design decisions?"

Mapper: Maps each URL from a log entry to (url, 1).
Reducer: Aggregates counts per URL, resulting in {url: visit_count, ...} for a popularity ranking of pages.

3. "Which users show the highest average session durations, and what might this suggest about their engagement levels?"

Mapper: Maps each user ID to (user_id, session_duration) from each log entry.
Reducer: Aggregates the durations by user, providing {user_id: average_duration, ...} for understanding user engagement patterns.

4. "Which URLs have the highest error rates, and how should the engineering team prioritize fixes?"

Mapper: Maps URLs with error response codes (>=400) to (url, 1).
Reducer: Counts errors per URL, yielding {url: error_count, ...} to identify problematic pages needing attention.

Since we’re using Google Colab with limited resources, the MapReduce operations can be simulated simulated, applying the mapper and reducer logic, and printing the results.

In [1]:
import random
from datetime import datetime, timedelta
import pandas as pd

# Parameters
num_records = 10000
user_ids = [f"user_{i:03}" for i in range(1, 101)]
urls = ["/home", "/products", "/contact", "/about", "/cart", "/checkout"]
response_codes = [200, 200, 200, 404, 500]  # Higher chance of 200 for realistic data
ip_addresses = [f"192.168.1.{i}" for i in range(1, 101)]
user_agents = ["Mozilla/5.0", "Chrome/90.0", "Safari/537.36", "Edge/85.0"]

# Function to generate random timestamp
def random_timestamp(start, end):
    return (start + timedelta(seconds=random.randint(0, int((end - start).total_seconds())))).isoformat()

# Data generation
data = []
start_time = datetime(2023, 1, 1)
end_time = datetime(2023, 10, 1)

for _ in range(num_records):
    log_entry = {
        "timestamp": random_timestamp(start_time, end_time),
        "user_id": random.choice(user_ids),
        "ip_address": random.choice(ip_addresses),
        "url": random.choice(urls),
        "response_code": random.choice(response_codes),
        "session_duration": random.randint(10, 300),  # in seconds
        "user_agent": random.choice(user_agents)
    }
    data.append(log_entry)


TODO : convert "data" list to pandas DataFrame

In [2]:
df = pd.DataFrame(data)
df.head()

Unnamed: 0,timestamp,user_id,ip_address,url,response_code,session_duration,user_agent
0,2023-07-17T00:21:08,user_027,192.168.1.5,/cart,500,90,Chrome/90.0
1,2023-05-20T18:44:14,user_007,192.168.1.13,/cart,200,143,Safari/537.36
2,2023-07-16T14:41:39,user_035,192.168.1.49,/home,404,298,Chrome/90.0
3,2023-06-30T23:23:47,user_029,192.168.1.95,/checkout,200,143,Edge/85.0
4,2023-05-31T09:01:29,user_059,192.168.1.10,/home,404,297,Safari/537.36


TODO :

Peak Traffic Analysis: Identify the busiest hours.

Most Visited Pages: Count visits per URL.

Average Session Duration: Calculate average session duration per user.

Error Rate: Track and analyze URLs with high error response rates (e.g., 404 or 500).

Fill in the blanks in the code

In [3]:
from datetime import datetime
from collections import defaultdict

def traffic_mapper(line):
    fields = line.strip().split(",")
    timestamp = fields[0]
    hour = datetime.strptime(timestamp, "%Y-%m-%dT%H:%M:%S.%fZ").hour
    yield (hour, 1)

def traffic_reducer(mapped_data):
    hour_counts = defaultdict(int)
    for hour, count in mapped_data:
        hour_counts[hour] += count
    return hour_counts


In [4]:
def url_mapper(line):
    fields = line.strip().split(",")
    url = fields[3]
    yield (url, 1)

def url_reducer(mapped_data):
    url_counts = defaultdict(int)
    for url, count in mapped_data:
        url_counts[url] += count
    return url_counts

In [5]:
def session_mapper(line):
    fields = line.strip().split(",")
    user_id = fields[1]
    session_duration = int(fields[5])
    yield (user_id, session_duration)

def session_reducer(mapped_data):
    user_sessions = defaultdict(list)
    for user_id, duration in mapped_data:
        user_sessions[user_id].append(duration)
    # Calculate average per user
    user_avg_duration = {}
    for user_id, durations in user_sessions.items():
        user_avg_duration[user_id] = sum(durations) / len(durations)
    return user_avg_duration


In [6]:
def error_mapper(line):
    fields = line.strip().split(",")
    url = fields[3]
    response_code = int(fields[4])
    if response_code >= 400:  # Considering 400 and 500 series as errors
        yield (url, 1)

def error_reducer(mapped_data):
    error_counts = defaultdict(int)
    for url, count in mapped_data:
        error_counts[url] += count
    return error_counts

In [8]:
# Run each MapReduce job

# Peak Traffic Analysis
mapped_data = [(hour, 1) for hour in df['timestamp'].apply(lambda x: pd.to_datetime(x).hour)]
peak_traffic = traffic_reducer(mapped_data)
print("Peak Traffic per Hour:", peak_traffic)

# Most Visited Pages
mapped_data = [(url, 1) for url in df['url']]
most_visited = url_reducer(mapped_data)
print("Most Visited Pages:", most_visited)

# Average Session Duration per User
mapped_data = [(user_id, duration) for user_id, duration in zip(df['user_id'], df['session_duration'])]
avg_session = session_reducer(mapped_data)
print("Average Session Duration per User:", avg_session)

# Error Rate Analysis
mapped_data = [(url, 1) for url, response_code in zip(df['url'], df['response_code']) if response_code >= 400]
error_rates = error_reducer(mapped_data)
print("Error Rate per URL:", error_rates)

Peak Traffic per Hour: defaultdict(<class 'int'>, {0: 397, 18: 417, 14: 428, 23: 410, 9: 434, 13: 414, 8: 478, 11: 427, 21: 418, 17: 454, 7: 425, 6: 392, 3: 419, 1: 403, 19: 400, 20: 433, 5: 410, 16: 405, 15: 389, 22: 425, 4: 387, 2: 406, 12: 395, 10: 434})
Most Visited Pages: defaultdict(<class 'int'>, {'/cart': 1638, '/home': 1676, '/checkout': 1614, '/products': 1734, '/contact': 1678, '/about': 1660})
Average Session Duration per User: {'user_027': 156.6547619047619, 'user_007': 148.16831683168317, 'user_035': 162.85576923076923, 'user_029': 137.03773584905662, 'user_059': 178.7625, 'user_056': 152.8181818181818, 'user_088': 169.83168316831683, 'user_075': 150.26470588235293, 'user_034': 148.45192307692307, 'user_063': 141.51136363636363, 'user_015': 150.525, 'user_018': 154.45192307692307, 'user_079': 146.13095238095238, 'user_092': 149.72222222222223, 'user_069': 150.64, 'user_060': 160.62244897959184, 'user_023': 155.5252525252525, 'user_039': 139.16504854368932, 'user_030': 146