In [1]:
import numpy as np
import pandas as pd
from sklearn.metrics.pairwise import cosine_similarity
from sklearn.decomposition import TruncatedSVD
from sklearn.model_selection import train_test_split
import zipfile
import os
from typing import List, Dict, Any

In [8]:
import zipfile, os

zip_path = "/content/ml-100k.zip"
extract_path = "/content/"

with zipfile.ZipFile(zip_path, 'r') as zip_ref:
    zip_ref.extractall(extract_path)

RATINGS_PATH = "/content/ml-100k/u.data"
MOVIES_PATH  = "/content/ml-100k/u.item"


In [18]:
ratings = pd.read_csv(
    RATINGS_PATH, sep="\t",
    names=["userId", "movieId", "rating", "timestamp"],
    engine="python"
)

movies = pd.read_csv(
    MOVIES_PATH, sep="|", encoding="latin-1",
    names=["movieId", "title"] + [f"col{i}" for i in range(22)],
    usecols=["movieId", "title"],
    engine="python"
)

movie_title = dict(zip(movies["movieId"], movies["title"]))

def pretty_titles(movie_ids):
    return [movie_title.get(mid, str(mid)) for mid in movie_ids]

print(ratings.head())
print(movies.head())

   userId  movieId  rating  timestamp
0     196      242       3  881250949
1     186      302       3  891717742
2      22      377       1  878887116
3     244       51       2  880606923
4     166      346       1  886397596
   movieId              title
0        1   Toy Story (1995)
1        2   GoldenEye (1995)
2        3  Four Rooms (1995)
3        4  Get Shorty (1995)
4        5     Copycat (1995)


In [49]:
def per_user_split(df, test_size=0.2, min_items=5):
    trains, tests = [], []
    for uid, g in df.groupby("userId"):
        if len(g) < min_items:
            continue
        tr, te = train_test_split(g, test_size=test_size)
        trains.append(tr)
        tests.append(te)
    return pd.concat(trains, ignore_index=True), pd.concat(tests, ignore_index=True)

train_ratings, test_ratings = per_user_split(ratings, test_size=0.2, min_items=5)
user_item_train = train_ratings.pivot_table(index="userId", columns="movieId", values="rating")
user_item_train_filled = user_item_train.fillna(0.0)

users  = user_item_train.index.tolist()
items  = user_item_train.columns.tolist()
u2i    = {u:i for i,u in enumerate(users)}
i2pos  = {m:j for j,m in enumerate(items)}

def seen_items_train(uid:int) -> set:
    if uid not in user_item_train.index:
        return set()
    return set(user_item_train.loc[uid].dropna().index.tolist())


In [51]:
user_sim = cosine_similarity(user_item_train_filled.values)         # (U x U)
item_sim = cosine_similarity(user_item_train_filled.values.T)       # (I x I)

user_sim_row_sums = user_sim.sum(axis=1, keepdims=True)
user_sim_row_sums[user_sim_row_sums==0] = 1.0
user_sim_norm = user_sim / user_sim_row_sums

item_sim_row_sums = item_sim.sum(axis=1, keepdims=True)
item_sim_row_sums[item_sim_row_sums==0] = 1.0
item_sim_norm = item_sim / item_sim_row_sums

In [53]:
def recommend_usercf(uid:int, top_k:int=10) -> list:
    if uid not in u2i:
        return []
    uidx = u2i[uid]
    scores = user_sim_norm[uidx] @ user_item_train_filled.values
    seen = seen_items_train(uid)
    recs = [(m, scores[i2pos[m]]) for m in items if m not in seen]
    recs.sort(key=lambda x: x[1], reverse=True)
    return [m for m,_ in recs[:top_k]]

def recommend_itemcf(uid:int, top_k:int=10) -> list:
    if uid not in u2i:
        return []
    uidx  = u2i[uid]
    uvec  = user_item_train_filled.values [uidx]
    scores = uvec @ item_sim_norm
    seen = seen_items_train(uid)
    recs = [(m, scores[i2pos[m]]) for m in items if m not in seen]
    recs.sort(key=lambda x: x[1], reverse=True)
    return [m for m,_ in recs[:top_k]]


In [54]:
def recommend_usercf(uid:int, top_k:int=10) -> list:
    if uid not in u2i:
        return []
    uidx = u2i[uid]
    scores = user_sim_norm[uidx] @ user_item_train_filled.values
    seen = seen_items_train(uid)
    recs = [(m, scores[i2pos[m]]) for m in items if m not in seen]
    recs.sort(key=lambda x: x[1], reverse=True)
    return [m for m,_ in recs[:top_k]]

def recommend_itemcf(uid:int, top_k:int=10) -> list:
    if uid not in u2i:
        return []
    uidx  = u2i[uid]
    uvec  = user_item_train_filled.values[uidx]
    scores = uvec @ item_sim_norm
    seen = seen_items_train(uid)
    recs = [(m, scores[i2pos[m]]) for m in items if m not in seen]
    recs.sort(key=lambda x: x[1], reverse=True)
    return [m for m,_ in recs[:top_k]]


In [55]:
#BONUS - matrix factorization (SVD)

user_means = user_item_train.apply(lambda row: row.mean(), axis=1).fillna(0.0).values
R_centered = user_item_train.subtract(user_means, axis=0).fillna(0.0).values  # (U x I)

rank = 50
svd = TruncatedSVD(n_components=rank)
U = svd.fit_transform(R_centered)       # (U x k)
S = svd.singular_values_
VT = svd.components_                    # (k x I)
R_hat_centered = U @ np.diag(S) @ VT    # (U x I)
R_hat = R_hat_centered + user_means.reshape(-1, 1)

def recommend_svd(uid:int, top_k:int=10) -> list:
    if uid not in u2i:
        return []
    uidx = u2i[uid]
    scores = R_hat[uidx]
    seen = seen_items_train(uid)
    recs = [(m, scores[i2pos[m]]) for m in items if m not in seen]
    recs.sort(key=lambda x: x[1], reverse=True)
    return [m for m,_ in recs[:top_k]]


In [59]:
#Evaluation: Precision@K
def precision_at_k(recommended:list, relevant:set, k:int=10) -> float:
    if not recommended:
        return 0.0
    cut = recommended[:k]
    return len(set(cut) & relevant) / k

def evaluate_all(users_subset:list, k:int=10) -> dict:
    out = {"UserCF": [], "ItemCF": [], "SVD": []}
    test_rel = (
        test_ratings.loc[test_ratings.rating >= 4, ["userId","movieId"]]
        .groupby("userId")["movieId"].apply(set)
    )
    for uid in users_subset:
        if uid not in u2i or uid not in test_rel.index:
            continue
        relevant = test_rel.loc[uid]
        if not relevant:
            continue
        out["UserCF"].append(precision_at_k(recommend_usercf(uid, k), relevant, k))
        out["ItemCF"].append(precision_at_k(recommend_itemcf(uid, k), relevant, k))
        out["SVD"].append(precision_at_k(recommend_svd(uid, k), relevant, k))
    return {k_: (float(np.mean(v)) if len(v)>0 else 0.0) for k_, v in out.items()}


In [60]:
#sample recommendations
def pretty_titles(movie_ids:list) -> list:
    return [movie_title.get(mid, str(mid)) for mid in movie_ids]

K = 10
sample_user = users[0]
print("Sample user:", sample_user)

print("\nUserCF recommendations:")
print(*pretty_titles(recommend_usercf(sample_user, top_k=K)), sep="\n- ")

print("\nItemCF recommendations:")
print(*pretty_titles(recommend_itemcf(sample_user, top_k=K)), sep="\n- ")

print("\nSVD recommendations:")
print(*pretty_titles(recommend_svd(sample_user, top_k=K)), sep="\n- ")

subset_size = min(200, len(users))
eval_users = list(np.random.choice(users, size=subset_size, replace=False))

results = evaluate_all(eval_users, k=K)
print(f"\nPrecision @ {K} on {len(eval_users)} users")
for k_, v in results.items():
    print(f"{k_}: {v:.4f}")


Sample user: 1

UserCF recommendations:
Raiders of the Lost Ark (1981)
- Silence of the Lambs, The (1991)
- Fargo (1996)
- Pulp Fiction (1994)
- Schindler's List (1993)
- E.T. the Extra-Terrestrial (1982)
- Terminator, The (1984)
- Rock, The (1996)
- Amadeus (1984)
- One Flew Over the Cuckoo's Nest (1975)

ItemCF recommendations:
Silence of the Lambs, The (1991)
- Pulp Fiction (1994)
- Raiders of the Lost Ark (1981)
- E.T. the Extra-Terrestrial (1982)
- Terminator, The (1984)
- Fish Called Wanda, A (1988)
- Stand by Me (1986)
- Fargo (1996)
- Get Shorty (1995)
- Amadeus (1984)

SVD recommendations:
Silence of the Lambs, The (1991)
- Raiders of the Lost Ark (1981)
- Fargo (1996)
- Secrets & Lies (1996)
- Close Shave, A (1995)
- Hunt for Red October, The (1990)
- Titanic (1997)
- Terminator, The (1984)
- Glory (1989)
- Rear Window (1954)

Precision @ 10 on 200 users
UserCF: 0.2035
ItemCF: 0.2490
SVD: 0.1758
