In [None]:
import os
import pandas as pd
import numpy as np
import random
import math

from collections import defaultdict
from typing import Dict, Set, List, Tuple, Callable
from implicit.als import AlternatingLeastSquares
from scipy import sparse

import torch
from torch import nn
from torch.utils.data import Dataset, DataLoader 

In [None]:
# =======================
# Section 1: Data Loading & Exploratory Data Analysis (EDA)
# =======================

BASE_PATH = os.path.join('2 кейс/ml-25m')


def load_datasets(base_path: str) -> dict:
    files = {
        'movies': 'movies.csv',
        'ratings': 'ratings.csv',
        'tags': 'tags.csv',
        'genome_tags': 'genome-tags.csv',
        'genome_scores': 'genome-scores.csv',
    }

    dfs: dict[str, pd.DataFrame] = {}
    for key, filename in files.items():
        csv_path = os.path.join(base_path, filename)
        print(f"Loading {filename} …")

        # Apply light dtypes optimisation for large files
        if key == 'ratings':
            dtype = {
                'userId': 'int32',
                'movieId': 'int32',
                'rating': 'float32',
                'timestamp': 'int32',
            }
            df = pd.read_csv(csv_path, dtype=dtype)
        elif key in {'movies', 'genome_tags'}:
            df = pd.read_csv(csv_path)
        else:
            # tags & genome_scores
            df = pd.read_csv(csv_path)

        dfs[key] = df
        print(f"  → {key}: {df.shape[0]:,} rows × {df.shape[1]} columns")
    return dfs


def basic_eda(dfs: dict) -> None:
    """Run a minimal EDA and print key stats."""

    print("\n========== Missing Values (per dataset) ==========")
    for name, df in dfs.items():
        miss_total = int(df.isna().sum().sum())
        print(f"{name:<12}: {miss_total:,} missing values")
        if miss_total:
            print(df.isna().sum())

    # Ratings-specific statistics
    ratings = dfs['ratings']
    movies = dfs['movies']
    tags = dfs['tags']

    print("\n========== Ratings Distribution ==========")
    print(ratings['rating'].describe())

    print("\n========== Unique Counts ==========")
    print(f"Users          : {ratings['userId'].nunique():,}")
    print(f"Rated movies   : {ratings['movieId'].nunique():,}")
    print(f"Tagged movies  : {tags['movieId'].nunique():,}")

    print("\n========== Top-10 Most Rated Movies ==========")
    top = (ratings.groupby('movieId')
                  .size()
                  .sort_values(ascending=False)
                  .head(10)
                  .reset_index(name='num_ratings'))
    top = top.merge(movies[['movieId', 'title']], on='movieId', how='left')
    print(top[['title', 'num_ratings']])

    # Temporal coverage
    ratings['datetime'] = pd.to_datetime(ratings['timestamp'], unit='s')
    tags['datetime'] = pd.to_datetime(tags['timestamp'], unit='s')
    print("\n========== Temporal Coverage ==========")
    print(f"Ratings : {ratings['datetime'].min()} → {ratings['datetime'].max()}")
    print(f"Tags    : {tags['datetime'].min()} → {tags['datetime'].max()}")



datasets = load_datasets(BASE_PATH)
basic_eda(datasets)

In [None]:
def leave_one_out_split(ratings: pd.DataFrame) -> Tuple[pd.DataFrame, pd.DataFrame]:
    ratings_sorted = ratings.sort_values(['userId', 'timestamp'])
    # Mark the last interaction per user as test
    idx = ratings_sorted.groupby('userId').tail(1).index
    test_df = ratings_sorted.loc[idx]
    train_df = ratings_sorted.drop(index=idx)  # type: ignore[arg-type]
    print(f"Train size: {train_df.shape[0]:,}, Test size: {test_df.shape[0]:,}")
    return train_df, test_df


# --- Per-user metric helpers ---

def precision_at_k(recommended: List[int], relevant: Set[int], k: int) -> float:
    if k == 0:
        return 0.0
    hit_count = len(set(recommended[:k]) & relevant)
    return hit_count / k


def recall_at_k(recommended: List[int], relevant: Set[int], k: int) -> float:
    if not relevant:
        return 0.0
    hit_count = len(set(recommended[:k]) & relevant)
    return hit_count / len(relevant)


def apk(recommended: List[int], relevant: Set[int], k: int) -> float:
    """Average Precision at K"""
    score = 0.0
    hits = 0
    for i, rec in enumerate(recommended[:k], start=1):
        if rec in relevant:
            hits += 1
            score += hits / i
    if not relevant:
        return 0.0
    return score / min(len(relevant), k)


def ndcg_at_k(recommended: List[int], relevant: Set[int], k: int) -> float:
    dcg = 0.0
    for i, rec in enumerate(recommended[:k], start=1):
        if rec in relevant:
            dcg += 1 / math.log2(i + 1)
    ideal_hits = min(len(relevant), k)
    idcg = sum(1 / math.log2(i + 1) for i in range(1, ideal_hits + 1))
    return dcg / idcg if idcg > 0 else 0.0


MetricFunc = Callable[[List[int], Set[int], int], float]


METRIC_FUNCS: dict[str, MetricFunc] = {
    'precision': precision_at_k,
    'recall': recall_at_k,
    'map': apk,
    'ndcg': ndcg_at_k,
}


def evaluate_model(
    recommend_fn: Callable[[int, int], List[int]],
    test_user_movie: Dict[int, Set[int]],
    k: int = 10,
    metrics: Tuple[str, ...] = ('precision', 'recall', 'map', 'ndcg'),
) -> dict:
    """Compute selected metrics for the provided recommender.

    Parameters
    ----------
    recommend_fn : callable
        Function (user_id, k) -> List[movieId].
    test_user_movie : dict
        Ground-truth sets for each user.
    k : int
        Cut-off rank.
    metrics : tuple[str]
        Metric names to compute.
    """
    results = {m: [] for m in metrics}
    for uid, relevant in test_user_movie.items():
        recs = recommend_fn(uid, k)
        for m in metrics:
            func = METRIC_FUNCS[m]
            results[m].append(func(recs, relevant, k))

    aggregated = {m: float(pd.Series(vals).mean()) for m, vals in results.items()}
    return aggregated

In [None]:
# =======================
# Section 2: Pre-processing Helpers
# =======================


def build_user_movie_sets(ratings: pd.DataFrame, min_ratings: int = 5) -> Dict[int, Set[int]]:
    user_movie: Dict[int, Set[int]] = defaultdict(set)
    for row in ratings.itertuples(index=False):
        user_movie[row.userId].add(row.movieId)  # type: ignore[attr-defined]

    # Filter sparse users
    if min_ratings > 0:
        user_movie = {u: movies for u, movies in user_movie.items() if len(movies) >= min_ratings}
    print(f"Kept {len(user_movie):,} users with ≥{min_ratings} ratings each (from {ratings['userId'].nunique():,} total)")
    return user_movie

### Part 2.1 - Matrix Factorization with ALS

Alternating Least Squares for Implicit Feedback
Core Idea
We implement weighted matrix factorization optimized for implicit feedback data (e.g., movie ratings treated as confidence values). The method:\
- Decomposes user-item interactions into low-dimensional latent factors
- Uses alternating optimization with regularization to prevent overfitting
- Scales linearly with the number of factors (unlike exact SVD)
- Handles sparse data efficiently via conjugate gradient method

Input Data Transformation\
Convert explicit ratings to implicit confidence weights:
$$ c_{ui} = 1 + \alpha ⋅ r_{ui} $$

where $\alpha$ controls how strongly high ratings should be weighted

Matrix Factorization Model
Factorize the interaction matrix $R$ into:
- User factors: $X \in \mathbb{R}^{|U|\times k}$
- Item factors: $Y \in \mathbb{R}^{|M|\times k}$

Objective function with L2 regularization:\
$$ \min_{X,Y} \sum_{u,i} c_{ui}(p_{ui} - \mathbf{x}_u^T \mathbf{y}_i)^2 + \lambda\left(\|X\|_F^2 + \|Y\|_F^2\right) $$

where $p_{ui} = 1$ if user $u$ interacted with item $i$, else 0

ALS Optimization\
Alternating between:

1) Fix $Y$, solve for $X$:\
$$ x_u = (Y^T C_u Y + \lambda I)^{-1} Y^T C_u p_u $$
2) Fix $X$, solve for $Y$:\
$$ y_i = (X^T C_u X + \lambda I)^{-1} X^T C_i p_i $$


Implementation Details
- Factors: $k=64$ (tunable latent dimension)
- Regularization: $\lambda=0.05$ (controls overfitting)
- Iterations: 15 (trade-off between convergence and speed)
- Alpha: 40.0 (controls how strongly to weight observed ratings)

Recommendation Generation\
For user $u$:

1) Compute user's latent vector $x_u$
2) Score all items:
3) Filter out seen items (optional)
4) Return top-$k$ items by predicted score

Computational Complexity\
Per iteration:

- $O(k^2|U| + k^3|M|)$ for user updates
- $O(k^2|M| + k^3|U|)$ for item updates\
Linear in number of users/items when $k$ is fixed

Advantages over Neighborhood Methods
- Better cold-start handling via shared factors
- Captures transitive relationships (A likes B, B likes C → A might like C)
- More compact representation ($k$ factors per user/item vs. full vectors)

In [None]:
# =======================
# Section 2.1: Collaborative Filtering via MinHash-LSH
# =======================


class ALSRecommender:

    def __init__(
        self,
        factors: int = 64,
        regularization: float = 0.05,
        iterations: int = 15,
        alpha: float = 40.0,
        random_state: int = 42,
    ) -> None:
        self.factors = factors
        self.regularization = regularization
        self.iterations = iterations
        self.alpha = alpha
        self.random_state = random_state

        self.model: AlternatingLeastSquares | None = None
        self.user2idx: dict[int, int] = {}
        self.idx2user: dict[int, int] = {}
        self.item2idx: dict[int, int] = {}
        self.idx2item: dict[int, int] = {}
        self.user_items: sparse.csr_matrix | None = None

    @staticmethod
    def _to_implicit_weight(rating: float, alpha: float) -> float:
        return 1.0 + alpha * rating

    def _build_mappings(self, ratings: pd.DataFrame) -> None:
        users = ratings["userId"].unique()
        items = ratings["movieId"].unique()

        self.user2idx = {u: i for i, u in enumerate(users)}
        self.idx2user = {i: u for u, i in self.user2idx.items()}
        self.item2idx = {m: i for i, m in enumerate(items)}
        self.idx2item = {i: m for m, i in self.item2idx.items()}

    def _build_matrix(self, ratings: pd.DataFrame) -> sparse.csr_matrix:
        row = ratings["userId"].map(self.user2idx).to_numpy()
        col = ratings["movieId"].map(self.item2idx).to_numpy()
        data = self._to_implicit_weight(ratings["rating"].astype(float), self.alpha)

        mat = sparse.coo_matrix(
            (data, (row, col)),
            shape=(len(self.user2idx), len(self.item2idx)),
            dtype=np.float32,
        )
        return mat.tocsr()

    def fit(self, ratings_df: pd.DataFrame) -> None:
        # 1. Copy Id → index
        self._build_mappings(ratings_df)

        # 2. make a matrix
        self.user_items = self._build_matrix(ratings_df)

        # 3. train ALS
        self.model = AlternatingLeastSquares(
            factors=self.factors,
            regularization=self.regularization,
            iterations=self.iterations,
            random_state=self.random_state,
        )
        # implicit awaits (items × users) — transpond
        self.model.fit(self.user_items.T)

    def recommend(
        self,
        uid: int,
        seen: set[int] | None = None,
        top_k: int = 10,
    ) -> list[int]:
        if self.model is None or self.user_items is None:
            raise RuntimeError("Model not trained yet.")
        if uid not in self.user2idx:
            return []

        seen = seen or set()
        uidx = self.user2idx[uid]

        # ① берём ровно одну строку (1 × num_items) в формате CSR
        user_row = self.user_items[uidx]

        # ② получаем рекомендации
        recs, _ = self.model.recommend(
            uidx,                 # userid позиционно
            user_row,             # одна строка
            N=top_k + len(seen),
            filter_items=[self.item2idx.get(m, m) for m in seen],  # защитимся тем же способом
        )

        # ③ переводим в movieId, учитывая оба возможных случая
        rec_movie_ids: list[int] = []
        for i in recs:
            mid = self.idx2item.get(int(i), int(i))  # если нет — значит i уже movieId
            if mid not in seen:
                rec_movie_ids.append(mid)

        return rec_movie_ids[:top_k]

### Part 2.2 - Content filtration

#### Core idea
Let $e_i \in \mathbb{R}^n$ be a content embedding vector for item $i$. We can estimate dot product or cosine distance before user rated it and rate in by formula:
$$
\hat{r}_{ui} = \max_{j \in I_u, \, r_{uj} > \alpha} \rho(e_i, e_j) \, r_{uj},
$$
where $\rho$ - dot product or cosine distance between two vectors, $I_u$ - set of rated movies by user, and $\alpha$ - a hyperparameter.\
(just a simple ranking model)

In [None]:
# =======================
# Section 2.2: Simple Genre-Based Content Filtering
# =======================


def parse_movie_genres(movies: pd.DataFrame) -> Dict[int, Set[str]]:
    genre_map: Dict[int, Set[str]] = {}
    for row in movies.itertuples(index=False):
        genres = set() if row.genres == "(no genres listed)" else set(row.genres.split("|"))  # type: ignore[attr-defined]
        genre_map[row.movieId] = genres  # type: ignore[attr-defined]
    return genre_map


def content_recommend(uid: int, user_movie: Dict[int, Set[int]], movie_genres: Dict[int, Set[str]], top_k: int = 10) -> List[int]:
    seen = user_movie.get(uid, set())
    if not seen:
        return []

    # Build user genre profile
    genre_counter: Dict[str, int] = defaultdict(int)
    for m in seen:
        for g in movie_genres.get(m, set()):
            genre_counter[g] += 1

    if not genre_counter:
        return []

    # Score candidate movies by summed genre counts
    candidate_scores: Dict[int, int] = {}
    for movie_id, genres in movie_genres.items():
        if movie_id in seen:
            continue
        score = sum(genre_counter[g] for g in genres)
        if score:
            candidate_scores[movie_id] = score

    ranked = sorted(candidate_scores.items(), key=lambda x: x[1], reverse=True)
    return [m for m, _ in ranked[:top_k]]

### Part 2.3 - Hybrid approach (DSSM)

Both described approaches are good but have some problems.\
Biggest problem for a recommendation model is a filter bubble. In case we use collaborative approach we don't have any information about new users so we can't recommend them something. In case we use content filtration we have a bigger filter bubble so we should use more complicated model to solve that problem.

In this solution we will use a `DSSM` model:\
`DSSM` learns a nonlinear mapping between two discrete entities (typically queries and documents) into a shared latent semantic space where similarity can be computed via cosine distance.

Given:\
Query $q$ and document $d$\
Their embeddings $y(q)$ and $y(d)$ in $\mathbb{R}^k$\
Goal: Maximize similarity for relevant $R(q,d)$ pairs: $$ \text{sim}(y_q, y_d) = \text{cos}(y(q), y(d)) = \frac{y_{q}^{T}y_d}{||y_q||⋅||y_d||} $$

For this task we will choose cross-entropy with negative sampling

Cross-entropy formula: $$ \mathcal{L}(q, d^+) = -\log \big( P(d^+ \mid q) \big) $$

The problem is in gradient computing complexity for $ \mathcal{L}(q, d^+) $ because we should calculate click prbability of every movie by every query. So the solution is in negative sampling. Note that among the documents $d$ in the denominator of $P(d∣q)$, only one is typically clicked (positive example), while the remaining thousands or millions serve as negative examples. Rather than computing the full summation over all documents at each optimization step, it is computationally efficient to consider only a small sampled subset.\
So our result function is: $$ \exp \big( b_0 R(q, d^{+}) \big) + \sum_{i=1}^{k} \exp \big( b_0 R(q, d_i^{-}) \big), $$ where $d_{1}^{-},...,d_{k}^{-} $ - negative samples for for $q$ query.\
For generating this samples we will equally likely select a subset of movies from the unrated ones. In the original article about DSSM, a recommented ratio is 4:1.

In [None]:
# =======================
# Section 2.3: Hybrid DSSM Model (PyTorch)
# =======================


class _PairDataset(Dataset):
    """Dataset yielding (user_idx, pos_item_idx, neg_item_idxs)."""

    def __init__(
        self,
        user_pos: Dict[int, Set[int]],
        num_items: int,
        neg_ratio: int = 4,
    ) -> None:
        self.user_indices = list(user_pos.keys())
        self.user_pos = user_pos
        self.num_items = num_items
        self.neg_ratio = neg_ratio

    def __len__(self):
        return len(self.user_indices)

    def __getitem__(self, idx):
        uid = self.user_indices[idx]
        pos_items = list(self.user_pos[uid])
        pos_item = random.choice(pos_items)
        neg_items = []
        # sample negatives distinct from positives
        while len(neg_items) < self.neg_ratio:
            neg = random.randint(0, self.num_items - 1)
            if neg not in self.user_pos[uid]:
                neg_items.append(neg)
        return uid, pos_item, torch.tensor(neg_items, dtype=torch.long)  # type: ignore[name-defined]


class DSSMRecommender:
    """Hybrid DSSM with separate user & item towers.

    The simplest variant: both towers are embedding layers optionally followed by MLP.
    """

    def __init__(
        self,
        embedding_dim: int = 64,
        hidden_dims: Tuple[int, ...] = (),  # type: ignore[valid-type]
        neg_ratio: int = 4,
        epochs: int = 5,
        batch_size: int = 1024,
        lr: float = 1e-3,
        rating_threshold: float = 3.5,
        device: str | None = None,
    ) -> None:
        if torch is None:
            raise ImportError("PyTorch is required for DSSM model. Install via `pip install torch`. ")
        self.embedding_dim = embedding_dim
        self.hidden_dims = hidden_dims
        self.neg_ratio = neg_ratio
        self.epochs = epochs
        self.batch_size = batch_size
        self.lr = lr
        self.rating_threshold = rating_threshold
        self.device = device or ("cuda" if torch.cuda.is_available() else "cpu")

        # Will be initialised in fit
        self.user_embedding: nn.Module | None = None  # type: ignore[valid-type]
        self.item_embedding: nn.Module | None = None  # type: ignore[valid-type]
        self.user_mlp: nn.Module | None = None  # type: ignore[valid-type]
        self.item_mlp: nn.Module | None = None  # type: ignore[valid-type]
        self.user2idx: Dict[int, int] = {}
        self.idx2item: Dict[int, int] = {}
        self.item_vecs: torch.Tensor | None = None  # cached item representations

    # ----------------- Utility builders -----------------

    @staticmethod
    def _build_mlp(input_dim: int, hidden_dims: Tuple[int, ...]) -> nn.Module:  # type: ignore[name-defined]
        layers: list[nn.Module] = []
        prev_dim = input_dim
        for dim in hidden_dims:
            layers.append(nn.Linear(prev_dim, dim))  # type: ignore[attr-defined]
            layers.append(nn.ReLU())  # type: ignore[attr-defined]
            prev_dim = dim
        return nn.Sequential(*layers)  # type: ignore[attr-defined]

    # ----------------- Training -----------------

    def fit(self, ratings: pd.DataFrame):
        """Train DSSM on implicit feedback derived from *ratings* DataFrame."""
        import numpy as np  # noqa: F401
        import random  # noqa: F401

        # Map ids to contiguous indices
        unique_users = ratings['userId'].unique()
        unique_items = ratings['movieId'].unique()
        self.user2idx = {uid: i for i, uid in enumerate(unique_users)}
        item2idx = {mid: i for i, mid in enumerate(unique_items)}
        self.idx2item = {i: mid for mid, i in item2idx.items()}

        num_users = len(unique_users)
        num_items = len(unique_items)

        # Build implicit feedback sets
        user_pos: Dict[int, Set[int]] = defaultdict(set)
        for row in ratings.itertuples(index=False):
            if row.rating >= self.rating_threshold:  # type: ignore[attr-defined]
                u_idx = self.user2idx[row.userId]  # type: ignore[attr-defined]
                i_idx = item2idx[row.movieId]  # type: ignore[attr-defined]
                user_pos[u_idx].add(i_idx)

        # Filter users with no positives
        user_pos = {u: items for u, items in user_pos.items() if items}

        dataset = _PairDataset(user_pos, num_items, neg_ratio=self.neg_ratio)
        loader = DataLoader(dataset, batch_size=self.batch_size, shuffle=True, drop_last=True)

        # Build model components
        self.user_embedding = nn.Embedding(num_users, self.embedding_dim).to(self.device)  # type: ignore[attr-defined]
        self.item_embedding = nn.Embedding(num_items, self.embedding_dim).to(self.device)  # type: ignore[attr-defined]
        self.user_mlp = self._build_mlp(self.embedding_dim, self.hidden_dims).to(self.device)
        self.item_mlp = self._build_mlp(self.embedding_dim, self.hidden_dims).to(self.device)

        optimizer = torch.optim.Adam(  # type: ignore[attr-defined]
            list(self.user_embedding.parameters())  # type: ignore[union-attr]
            + list(self.item_embedding.parameters())  # type: ignore[union-attr]
            + list(self.user_mlp.parameters())  # type: ignore[union-attr]
            + list(self.item_mlp.parameters()),  # type: ignore[union-attr]
            lr=self.lr,
        )

        for epoch in range(1, self.epochs + 1):
            total_loss = 0.0
            for u_idxs, pos_i_idxs, neg_i_idxs in loader:
                u_idxs = u_idxs.to(self.device)
                pos_i_idxs = pos_i_idxs.to(self.device)
                neg_i_idxs = neg_i_idxs.to(self.device)

                # Forward pass
                user_vec = self.user_mlp(self.user_embedding(u_idxs))  # (B, d)
                pos_vec = self.item_mlp(self.item_embedding(pos_i_idxs))  # (B, d)
                neg_vec = self.item_mlp(self.item_embedding(neg_i_idxs))  # (B, neg, d)

                # Normalize
                user_vec = nn.functional.normalize(user_vec, dim=1)
                pos_vec = nn.functional.normalize(pos_vec, dim=1)
                neg_vec = nn.functional.normalize(neg_vec, dim=2)

                # Positive scores (B, 1)
                pos_scores = (user_vec * pos_vec).sum(dim=1, keepdim=True)
                # Negative scores (B, neg)
                neg_scores = torch.bmm(neg_vec, user_vec.unsqueeze(2)).squeeze(2)  # (B, neg)

                logits = torch.cat([pos_scores, neg_scores], dim=1)  # (B, 1+neg)
                labels = torch.zeros(logits.size(0), dtype=torch.long, device=self.device)
                loss = nn.functional.cross_entropy(logits, labels)

                optimizer.zero_grad()
                loss.backward()
                optimizer.step()
                total_loss += loss.item()

            avg_loss = total_loss / len(loader)
            print(f"Epoch {epoch}/{self.epochs} - loss {avg_loss:.4f}")

        # Cache item vectors for fast inference
        with torch.no_grad():
            all_item_idx = torch.arange(num_items, device=self.device)
            item_vecs = self.item_mlp(self.item_embedding(all_item_idx))
            self.item_vecs = nn.functional.normalize(item_vecs, dim=1).cpu()

    # ----------------- Recommendation -----------------

    def _get_user_vec(self, uid: int) -> torch.Tensor:
        if self.user_embedding is None:
            raise RuntimeError("Model not trained.")
        if uid not in self.user2idx:
            raise ValueError("Unknown user id")
        self.user_embedding.eval()
        with torch.no_grad():
            idx = torch.tensor([self.user2idx[uid]], device=self.device)
            vec = self.user_mlp(self.user_embedding(idx))
            return nn.functional.normalize(vec, dim=1).cpu().squeeze(0)

    def recommend(self, uid: int, seen: Set[int], top_k: int = 10) -> List[int]:
        """Return top_k recommended movieIds not in *seen*."""
        import numpy as np
        if self.item_vecs is None:
            raise RuntimeError("Model not trained.")
        user_vec = self._get_user_vec(uid)
        scores = torch.mv(self.item_vecs, user_vec)  # (num_items,)
        scores_numpy = scores.numpy()
        # Mask seen
        for mid in seen:
            if mid in self.idx2item.values():
                idx = list(self.idx2item.keys())[list(self.idx2item.values()).index(mid)]
                scores_numpy[idx] = -np.inf
        top_indices = scores_numpy.argsort()[-top_k:][::-1]
        return [self.idx2item[i] for i in top_indices]


In [None]:
# leave-one-out split
train_df, test_df = leave_one_out_split(datasets['ratings'])

# sets of rated films
train_user_movies = build_user_movie_sets(train_df)
test_user_movies  = build_user_movie_sets(test_df, min_ratings=1)   # ≥1, иначе всех потеряем

# genres for content filtering
movie_genres_map = parse_movie_genres(datasets['movies'])

# =======================
# 1. Collaborative: ALS
# =======================
als_rec = ALSRecommender(factors=100, iterations=20, alpha=40)
als_rec.fit(train_df)

als_results = evaluate_model(
    recommend_fn=lambda u, k: als_rec.recommend(
        u, seen=train_user_movies.get(u, set()), top_k=k
    ),
    test_user_movie=test_user_movies,
    k=10,
    metrics=('precision', 'recall', 'map', 'ndcg'),
)

# =======================
# 2. Content: genres
# =======================
content_results = evaluate_model(
    recommend_fn=lambda u, k: content_recommend(
        u, user_movie=train_user_movies, movie_genres=movie_genres_map, top_k=k
    ),
    test_user_movie=test_user_movies,
    k=10,
)

# =======================
# 3. Hybrid: DSSM
# =======================
dssm = DSSMRecommender(embedding_dim=64, epochs=3)
dssm.fit(train_df)

dssm_results = evaluate_model(
    recommend_fn=lambda u, k: dssm.recommend(
        u, seen=train_user_movies.get(u, set()), top_k=k
    ),
    test_user_movie=test_user_movies,
    k=10,
)

# =======================
# 4. Metrics
# =======================
results_df = pd.DataFrame(
    [als_results, content_results, dssm_results],
    index=['ALS (collab)', 'Content (genres)', 'DSSM (hybrid)'],
).round(4)

display(results_df)