<a href="https://colab.research.google.com/github/BHARATH077/Real-Time-Data-Processing-for-Recommendation-Engine-/blob/main/Notebook/RealTime_RecEngine_Project.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Project: Real-Time Data Processing for Recommendation Engine (Simulated)

In [9]:
# =======================
# Step 1: Setup & Historical Data
# =======================

import pandas as pd
import numpy as np
import random
import os

# Create directories for data
os.makedirs("data", exist_ok=True)

# Create sample users and items
users = [f"user_{i}" for i in range(1, 101)]    # 100 users
items = [f"item_{i}" for i in range(1, 501)]   # 500 items

# Generate historical purchases
historical = pd.DataFrame({
    "user_id": [random.choice(users) for _ in range(500)],
    "item_id": [random.choice(items) for _ in range(500)],
    "purchase_amount": [random.randint(10, 500) for _ in range(500)],
    "purchase_date": pd.to_datetime("2023-01-01") +
                     pd.to_timedelta(np.random.randint(0, 365, 500), unit='D')
})

# Save dataset
historical.to_parquet("data/historical_purchases.parquet", index=False)

print("✅ Historical dataset created with shape:", historical.shape)
historical.head()


✅ Historical dataset created with shape: (500, 4)


Unnamed: 0,user_id,item_id,purchase_amount,purchase_date
0,user_56,item_205,272,2023-08-15
1,user_45,item_475,21,2023-11-05
2,user_61,item_113,491,2023-06-17
3,user_86,item_150,148,2023-08-04
4,user_91,item_35,128,2023-08-02


In [10]:
# =======================
# Step 2: Simulate Data Sources
# =======================
import os
import pandas as pd
import numpy as np
import random
from datetime import datetime, timedelta

# Make sure folders exist
os.makedirs("data/stream", exist_ok=True)
os.makedirs("data/batch", exist_ok=True)

# Define users and items (same as Day 1)
users = [f"user_{i}" for i in range(1, 101)]
items = [f"item_{i}" for i in range(1, 21)]

# ------------------------
# 1. Simulate Streaming Logs
# ------------------------
events = ["view", "click", "add_to_cart"]

stream_data = pd.DataFrame({
    "user_id": [random.choice(users) for _ in range(200)],
    "item_id": [random.choice(items) for _ in range(200)],
    "event_type": [random.choice(events) for _ in range(200)],
    "timestamp": [datetime(2023,12,1) + timedelta(seconds=i*60) for i in range(200)]
})

stream_file = "data/stream/user_activity_day1.csv"
stream_data.to_csv(stream_file, index=False)

print(f"✅ Simulated stream data saved at {stream_file} with shape {stream_data.shape}")
print(stream_data.head())

# ------------------------
# 2. Simulate Batch Purchases
# ------------------------
purchase_data = pd.DataFrame({
    "user_id": [random.choice(users) for _ in range(100)],
    "item_id": [random.choice(items) for _ in range(100)],
    "purchase_amount": [round(random.uniform(10, 200), 2) for _ in range(100)],
    "timestamp": [datetime(2023,11,1) + timedelta(days=random.randint(0,30)) for _ in range(100)]
})

purchase_file = "data/batch/purchases.csv"
purchase_data.to_csv(purchase_file, index=False)

print(f"✅ Purchases dataset saved at {purchase_file} with shape {purchase_data.shape}")
print(purchase_data.head())


✅ Simulated stream data saved at data/stream/user_activity_day1.csv with shape (200, 4)
   user_id  item_id   event_type           timestamp
0   user_7  item_14  add_to_cart 2023-12-01 00:00:00
1  user_85  item_15         view 2023-12-01 00:01:00
2  user_58   item_7        click 2023-12-01 00:02:00
3  user_18  item_20  add_to_cart 2023-12-01 00:03:00
4  user_62  item_17        click 2023-12-01 00:04:00
✅ Purchases dataset saved at data/batch/purchases.csv with shape (100, 4)
   user_id  item_id  purchase_amount  timestamp
0  user_49  item_19           165.50 2023-11-26
1  user_40  item_19           126.96 2023-11-16
2  user_58  item_20            88.04 2023-11-25
3  user_18  item_13            52.52 2023-11-20
4  user_55   item_6           136.88 2023-11-03


In [11]:
# =======================
# Step 3: Mini Streaming Pipeline (First Join)
# =======================

import duckdb

# Load historical data
historical = pd.read_parquet("data/historical_purchases.parquet")

# Load stream data (from Day 2)
stream_data = pd.read_csv("data/stream/user_activity_day1.csv")

print("Historical Purchases:", historical.shape)
print("Stream Data:", stream_data.shape)

# Use DuckDB for SQL-style join
con = duckdb.connect()

con.register("hist", historical)
con.register("stream", stream_data)

# Join to enrich stream with purchase info
enriched_query = """
SELECT
    s.user_id,
    s.item_id,
    s.event_type,
    s.timestamp,
    AVG(h.purchase_amount) AS avg_purchase_amount,
    COUNT(h.item_id) AS past_purchases
FROM stream s
LEFT JOIN hist h
ON s.user_id = h.user_id
GROUP BY s.user_id, s.item_id, s.event_type, s.timestamp
ORDER BY s.timestamp
"""

enriched = con.execute(enriched_query).fetchdf()

# Save enriched stream output
os.makedirs("data/enriched", exist_ok=True)
enriched_file = "data/enriched/enriched_day1.csv"
enriched.to_csv(enriched_file, index=False)

print(f"✅ Enriched stream written to {enriched_file} with shape {enriched.shape}")
enriched.head()


Historical Purchases: (500, 4)
Stream Data: (200, 4)
✅ Enriched stream written to data/enriched/enriched_day1.csv with shape (200, 6)


Unnamed: 0,user_id,item_id,event_type,timestamp,avg_purchase_amount,past_purchases
0,user_7,item_14,add_to_cart,2023-12-01 00:00:00,176.75,4
1,user_85,item_15,view,2023-12-01 00:01:00,158.0,10
2,user_58,item_7,click,2023-12-01 00:02:00,303.0,5
3,user_18,item_20,add_to_cart,2023-12-01 00:03:00,187.714286,7
4,user_62,item_17,click,2023-12-01 00:04:00,307.6,5


In [12]:
# =======================
# Step 4: Simulated Chunked Streaming
# =======================

# Load historical data again
historical = pd.read_parquet("data/historical_purchases.parquet")

# Register historical purchases in DuckDB
con = duckdb.connect()
con.register("hist", historical)

# File to simulate streaming
stream_file = "data/stream/user_activity_day1.csv"

# Output enriched file
os.makedirs("data/enriched", exist_ok=True)
chunked_output = "data/enriched/enriched_day1_chunked.csv"

# Ensure the output file is empty before appending
if os.path.exists(chunked_output):
    os.remove(chunked_output)

# Read in chunks of 50 rows (like mini-streams)
chunksize = 50
batch_num = 1

for chunk in pd.read_csv(stream_file, chunksize=chunksize):
    print(f"\n🔹 Processing batch {batch_num} with {chunk.shape[0]} events...")

    # Register chunk as stream
    con.register("stream_chunk", chunk)

    # Enrich with historical purchases
    query = """
    SELECT
        s.user_id,
        s.item_id,
        s.event_type,
        s.timestamp,
        AVG(h.purchase_amount) AS avg_purchase_amount,
        COUNT(h.item_id) AS past_purchases
    FROM stream_chunk s
    LEFT JOIN hist h
    ON s.user_id = h.user_id
    GROUP BY s.user_id, s.item_id, s.event_type, s.timestamp
    ORDER BY s.timestamp
    """
    enriched_chunk = con.execute(query).fetchdf()

    # Append to final file
    enriched_chunk.to_csv(chunked_output, mode='a', index=False, header=not os.path.exists(chunked_output))

    batch_num += 1

print(f"\n✅ Streaming simulation complete. Enriched file saved at {chunked_output}")



🔹 Processing batch 1 with 50 events...

🔹 Processing batch 2 with 50 events...

🔹 Processing batch 3 with 50 events...

🔹 Processing batch 4 with 50 events...

✅ Streaming simulation complete. Enriched file saved at data/enriched/enriched_day1_chunked.csv


In [13]:
# =======================
# Step 5: Session-Level Features
# =======================

# Load enriched data from Day 4
enriched_file = "data/enriched/enriched_day1_chunked.csv"
df = pd.read_csv(enriched_file, parse_dates=["timestamp"])

# Sort by user and timestamp
df = df.sort_values(by=["user_id", "timestamp"])

# Create sessions:
# For simplicity, define a new session if gap > 30 minutes
df["time_diff"] = df.groupby("user_id")["timestamp"].diff().dt.total_seconds().fillna(0)
df["new_session"] = (df["time_diff"] > 1800).astype(int)
df["session_id"] = df.groupby("user_id")["new_session"].cumsum()

# Session-level aggregation
session_features = df.groupby(["user_id", "session_id"]).agg(
    num_clicks = ("event_type", lambda x: (x=="click").sum()),
    num_views = ("event_type", lambda x: (x=="view").sum()),
    session_length = ("time_diff", "sum"),
    avg_time_gap = ("time_diff", "mean"),
    last_event = ("event_type", "last"),
    avg_purchase_amount = ("avg_purchase_amount", "mean"),
    past_purchases = ("past_purchases", "max")
).reset_index()

# Save session-level dataset
os.makedirs("data/sessions", exist_ok=True)
session_file = "data/sessions/session_features_day1.csv"
session_features.to_csv(session_file, index=False)

print(f"✅ Session-level dataset saved at {session_file}")
print(session_features.head())


✅ Session-level dataset saved at data/sessions/session_features_day1.csv
    user_id  session_id  num_clicks  num_views  session_length  avg_time_gap  \
0   user_10           0           0          0             0.0           0.0   
1  user_100           0           0          0             0.0           0.0   
2   user_11           0           0          1             0.0           0.0   
3   user_11           1           1          0          2820.0        2820.0   
4   user_12           0           1          0             0.0           0.0   

    last_event  avg_purchase_amount  past_purchases  
0  add_to_cart           375.571429               7  
1  add_to_cart           477.000000               1  
2         view           216.500000               8  
3        click           216.500000               8  
4        click           387.000000               1  


In [14]:
# =======================
# Day 6: User Profile Aggregation
# =======================

# Load session dataset
session_file = "data/sessions/session_features_day1.csv"
sessions_df = pd.read_csv(session_file)

# Aggregate into user-level features
user_profiles = sessions_df.groupby("user_id").agg(
    total_sessions = ("session_id", "nunique"),
    avg_clicks_per_session = ("num_clicks", "mean"),
    avg_views_per_session = ("num_views", "mean"),
    avg_session_length = ("session_length", "mean"),
    avg_time_gap = ("avg_time_gap", "mean"),
    avg_purchase_amount = ("avg_purchase_amount", "mean"),
    max_past_purchases = ("past_purchases", "max")
).reset_index()

# Add "last session" stats (recency signal)
last_sessions = sessions_df.sort_values(["user_id", "session_id"]).groupby("user_id").tail(1)
last_sessions = last_sessions[["user_id", "num_clicks", "num_views", "session_length", "last_event"]]
last_sessions = last_sessions.rename(columns={
    "num_clicks": "last_session_clicks",
    "num_views": "last_session_views",
    "session_length": "last_session_length",
    "last_event": "last_session_event"
})

# Merge with user profiles
user_profiles = pd.merge(user_profiles, last_sessions, on="user_id", how="left")

# Save user profiles
os.makedirs("data/users", exist_ok=True)
user_file = "data/users/user_profiles_day1.csv"
user_profiles.to_csv(user_file, index=False)

print(f"✅ User profile dataset saved at {user_file}")
print(user_profiles.head())


✅ User profile dataset saved at data/users/user_profiles_day1.csv
    user_id  total_sessions  avg_clicks_per_session  avg_views_per_session  \
0   user_10               1                     0.0                    0.0   
1  user_100               1                     0.0                    0.0   
2   user_11               2                     0.5                    0.5   
3   user_12               1                     1.0                    0.0   
4   user_13               2                     0.5                    1.5   

   avg_session_length  avg_time_gap  avg_purchase_amount  max_past_purchases  \
0                 0.0           0.0           375.571429                   7   
1                 0.0           0.0           477.000000                   1   
2              1410.0        1410.0           216.500000                   8   
3                 0.0           0.0           387.000000                   1   
4              3540.0        2900.0           244.000000         

In [18]:
# =======================
# Day 7: User-Item Interaction Matrix
# =======================

# Load user profiles (Day 6) and purchase history (Day 2)
user_profiles = pd.read_csv("data/users/user_profiles_day1.csv")
purchases_df = pd.read_csv("data/batch/purchases.csv")   # created in Day 2
clicks_df = pd.read_csv("data/stream/user_activity_day1.csv")   # created in Day 2

# Step 1: Aggregate purchase interactions
purchase_interactions = purchases_df.groupby(["user_id", "item_id"]).agg(
    purchase_count=("purchase_amount", "count"),
    total_spent=("purchase_amount", "sum")
).reset_index()

# Step 2: Aggregate clickstream interactions (views + clicks)
click_interactions = clicks_df.groupby(["user_id", "item_id"]).agg(
    num_clicks=("event_type", lambda x: (x=="click").sum()),
    num_views=("event_type", lambda x: (x=="view").sum())
).reset_index()

# Step 3: Merge purchases + clicks into user-item interactions
user_item = pd.merge(purchase_interactions, click_interactions,
                     on=["user_id", "item_id"], how="outer").fillna(0)

# Step 4: Create interaction score
# Example: clicks*0.2 + views*0.1 + purchases*1.0
user_item["interaction_score"] = (
    user_item["purchase_count"]*1.0 +
    user_item["num_clicks"]*0.2 +
    user_item["num_views"]*0.1
)

# Step 5: Save dataset
os.makedirs("data/reco", exist_ok=True)
user_item_file = "data/reco/user_item_matrix.csv"
user_item.to_csv(user_item_file, index=False)

print(f"✅ User-item interaction dataset saved at {user_item_file}")
print(user_item.head())


✅ User-item interaction dataset saved at data/reco/user_item_matrix.csv
    user_id  item_id  purchase_count  total_spent  num_clicks  num_views  \
0   user_10   item_9             0.0          0.0         0.0        0.0   
1  user_100  item_12             0.0          0.0         0.0        0.0   
2   user_11  item_12             0.0          0.0         0.0        1.0   
3   user_11  item_14             0.0          0.0         1.0        0.0   
4   user_12  item_15             0.0          0.0         1.0        0.0   

   interaction_score  
0                0.0  
1                0.0  
2                0.1  
3                0.2  
4                0.2  


In [19]:
# =======================
# Step 8: Feature Engineering for Recommendations
# =======================


import pandas as pd
import numpy as np
import os

# Load batch and stream data
batch_file = "data/batch/purchases.csv"
stream_file = "data/stream/user_activity_day1.csv"

batch_df = pd.read_csv(batch_file, parse_dates=["timestamp"])
stream_df = pd.read_csv(stream_file, parse_dates=["timestamp"])

print("Batch Purchases:", batch_df.shape)
print("Stream Events:", stream_df.shape)

# -------------------------------
# 1. Aggregate user purchase behavior
# -------------------------------
user_purchase_features = (
    batch_df.groupby("user_id")
    .agg(
        total_purchases=("item_id", "count"),
        total_spent=("purchase_amount", "sum"),
        avg_spent=("purchase_amount", "mean")
    )
    .reset_index()
)

# -------------------------------
# 2. Aggregate item popularity
# -------------------------------
item_purchase_features = (
    batch_df.groupby("item_id")
    .agg(
        total_sales=("user_id", "count"),
        revenue=("purchase_amount", "sum"),
        avg_price=("purchase_amount", "mean")
    )
    .reset_index()
)

# -------------------------------
# 3. Stream-based engagement features
# -------------------------------
event_counts = (
    stream_df.groupby(["user_id", "event_type"])
    .size()
    .unstack(fill_value=0)
    .reset_index()
)

# Ensure all event columns exist
for ev in ["view", "click", "add_to_cart"]:
    if ev not in event_counts.columns:
        event_counts[ev] = 0

# -------------------------------
# 4. Combine user features
# -------------------------------
user_features = pd.merge(user_purchase_features, event_counts, on="user_id", how="outer").fillna(0)

# -------------------------------
# 5. Save engineered features
# -------------------------------
os.makedirs("data/features", exist_ok=True)

user_features_file = "data/features/user_features.csv"
item_features_file = "data/features/item_features.csv"

user_features.to_csv(user_features_file, index=False)
item_purchase_features.to_csv(item_features_file, index=False)

print(f"✅ Saved user features to {user_features_file} with shape {user_features.shape}")
print(f"✅ Saved item features to {item_features_file} with shape {item_purchase_features.shape}")

user_features.head(), item_purchase_features.head()


Batch Purchases: (100, 4)
Stream Events: (200, 4)
✅ Saved user features to data/features/user_features.csv with shape (94, 7)
✅ Saved item features to data/features/item_features.csv with shape (20, 4)


(    user_id  total_purchases  total_spent  avg_spent  add_to_cart  click  view
 0   user_10              0.0          0.0        0.0          1.0    0.0   0.0
 1  user_100              0.0          0.0        0.0          1.0    0.0   0.0
 2   user_11              0.0          0.0        0.0          0.0    1.0   1.0
 3   user_12              0.0          0.0        0.0          0.0    1.0   0.0
 4   user_13              1.0         11.9       11.9          0.0    1.0   3.0,
    item_id  total_sales  revenue   avg_price
 0   item_1            5   375.77   75.154000
 1  item_10            5   657.36  131.472000
 2  item_11            7   560.37   80.052857
 3  item_12            2   148.48   74.240000
 4  item_13            7   715.99  102.284286)