# Map Reduce example using PySpark

## Sample JSON data file

In [None]:
import json
import random

actions = ['click', 'view', 'purchase']
user_ids = list(range(1, 21))  # 20 users
item_ids = [str(i) for i in range(100, 200)]

with open("user_logs.json", "w") as f:
    for _ in range(500):
        record = {
            "user_id": random.choice(user_ids),
            "timestamp": "2024-06-01T10:00:00Z",
            "action": random.choice(actions),
            "item_id": random.choice(item_ids)
        }
        json.dump(record, f)
        f.write("\n")


## 1. Load Data

In [None]:
import pandas as pd
import numpy as np
from collections import Counter
import json

df = pd.read_json("user_logs.json", lines=True)

## 2. Map Phase

In [None]:
## Map Phase

mapped = df[['user_id', 'action', 'item_id']].values.tolist()
mapped_pairs = [(user, (action, item)) for user, action, item in mapped]
mapped_pairs

## 3. Shuffle phase

In [None]:
# Shuffle the mapped pairs

from collections import defaultdict

shuffled = defaultdict(list)
for user_id, value in mapped_pairs:
    shuffled[user_id].append(value)

shuffled

## 4. Reduce Phase

In [None]:
# Reduce Phase

results = {}

for user_id, values in shuffled.items():
    actions = [a for a, _ in values]
    items = [i for _, i in values]
    
    total_actions = len(actions)
    most_common_action = Counter(actions).most_common(1)[0][0]
    unique_items = len(set(items))
    
    results[user_id] = {
        "total_actions": total_actions,
        "most_common_action": most_common_action,
        "unique_items": unique_items
    }
    
results

In [None]:
summary_df = pd.DataFrame.from_dict(results, orient='index')
print(summary_df)