In [4]:
import pandas as pd
import numpy as np
from typing import List, Callable, Dict, Tuple, Set

# 1. Main Methods

In [5]:
def calc_diversity(
        recommendation: List[int],
        dissimilarity: Callable[[int, int], float]
) -> float:
    n = len(recommendation)
    if n < 2: return 0.0
    total = 0
    count = 0
    for i in range(n-1):
        for j in range(i+1, n):
            total += dissimilarity(recommendation[i], recommendation[j])
            count += 1
    return 2 * total / count

def calc_novelty(
        recommendation: List[int],
        popularity_score: Callable[[int], float]
) -> float:
    if not recommendation: return 0.0
    return sum(1 - popularity_score(i) for i in recommendation) / len(recommendation)

def calc_serendipity(
    recommendation: List[int],
    liked_items: Set[int],
    user_history: Set[int],
    similarity: Callable[[int, int], float],
    threshold: float = 0.5
) -> float:
    if not recommendation: return 0.0
    ser = 0
    for i in recommendation:
        rel = (i in liked_items)
        unexp = all(similarity(i, h) < threshold for h in user_history)
        if rel and unexp:
            ser += 1
    return ser / len(recommendation)

# 2. Helper Funcitons

### 2.1 Genre Based

In [6]:
def load_genre_vectors(
    movies_csv: str
) -> Dict[int, np.ndarray]:
    """
    Load MovieLens movies.csv and build a multi-hot genre vector for each movie.

    Parameters:
    - movies_csv: str
        Path to the MovieLens movies.csv file.

    Returns:
    - Dict[int, np.ndarray]
        Mapping from movieId to a binary vector of length G,
        where G is the number of unique genres.
    """

    movies = pd.read_csv(movies_csv)
    # Handle missing genres by filling with empty string
    movies['genre_list'] = movies['genres'].fillna('').str.split('|')
    all_genres = sorted({g for lst in movies['genre_list'] for g in lst if g})
    genre_to_idx = {genre: i for i, genre in enumerate(all_genres)}
    vecs = {}
    for _, r in movies.iterrows():
        vec = np.zeros(len(all_genres), dtype=float)
        for g in r['genre_list']:
            if g and g in genre_to_idx:
                vec[genre_to_idx[g]] = 1.0
        vecs[int(r['movieId'])] = vec
    return vecs

def jaccard_similarity(
        a: int,
        b: int,
        genre_vectors: Dict[int, np.ndarray]
) -> float:
    """
    Compute Jaccard similarity between two movies based on genre vectors.
    """
    vec_a = genre_vectors[a]
    vec_b = genre_vectors[b]
    intersection = np.logical_and(vec_a, vec_b).sum()
    union = np.logical_or(vec_a, vec_b).sum()
    if union == 0:
        return 0.0
    return intersection / union

def cosine_similarity_genre(
        a: int,
        b: int,
        genre_vectors: Dict[int, np.ndarray]
) -> float:
    """
    Compute Cosine similarity between two movies based on genre vectors.
    """
    vec_a = genre_vectors[a]
    vec_b = genre_vectors[b]
    num = float(np.dot(vec_a, vec_b))
    denom = np.linalg.norm(vec_a) * np.linalg.norm(vec_b)
    if denom == 0:
        return 0.0
    return num / denom

def dissimilarity_from_similarity(
    similarity_fn: Callable[[int, int], float]
) -> Callable[[int, int], float]:
    """
    Given a similarity function, return a dissimilarity function = 1 - similarity.
    """
    def diss(a: int, b: int) -> float:
        return 1.0 - similarity_fn(a, b)
    return diss


### 2.2 Popularity Based

In [7]:
def load_ratings_popularity(
        ratings_csv: str
) -> Tuple[Dict[int, int], int]:
    """
    Load MovieLens ratings.csv and compute item popularity counts.

    Returns:
    - movie_counts: Dict[int, int] mapping movieId to number of unique users who rated it.
    - total_users: int total number of unique userWs in the dataset.
    """
    ratings = pd.read_csv(ratings_csv)
    total_users = ratings['userId'].nunique()
    # Count unique users per movie
    movie_counts = ratings.groupby('movieId')['userId'].nunique().to_dict()
    return movie_counts, total_users

def popularity_score_max_norm(
    movie_id: int,
    movie_counts: Dict[int, int]
) -> float:
    """
    Normalize popularity by the max count in the catalog.
    """
    max_count = max(movie_counts.values()) if movie_counts else 1
    return movie_counts.get(movie_id, 0) / max_count

def popularity_score_user_norm(
    movie_id: int,
    movie_counts: Dict[int, int],
    total_users: int
) -> float:
    """
    Normalize popularity by total number of users.
    """
    if total_users <= 0:
        return 0.0
    return movie_counts.get(movie_id, 0) / total_users

### 2.3 Embedding Based

In [8]:
def cosine_similarity_embeddings(
    a: int,
    b: int,
    embeddings: Dict[int, np.ndarray]
) -> float:
    """
    Cosine similarity between precomputed item embeddings.

    Parameters:
    - embeddings: Dict[int, np.ndarray] mapping movieId to embedding vector.
    """
    vec_a = embeddings.get(a)
    vec_b = embeddings.get(b)
    if vec_a is None or vec_b is None:
        return 0.0
    num = float(np.dot(vec_a, vec_b))
    denom = np.linalg.norm(vec_a) * np.linalg.norm(vec_b)
    if denom == 0:
        return 0.0
    return num / denom

# Example usage:
# genres = load_genre_vectors('movies.csv')
# diss_jaccard = dissimilarity_from_similarity(lambda x, y: jaccard_similarity(x, y, genres))
# movie_counts, total_users = load_ratings_popularity('ratings.csv')
# pop_max = lambda i: popularity_score_max_norm(i, movie_counts)
# pop_user = lambda i: popularity_score_user_norm(i, movie_counts, total_users)


In [13]:
import pandas as pd
import numpy as np
from io import StringIO

# --- Definitions (updated) ---

# --- Tests ---

# 1. Test load_genre_vectors
movies_csv = StringIO("""movieId,title,genres
1,M1,Action|Comedy
2,M2,Action
3,M3,
""")
genre_vectors = load_genre_vectors(movies_csv)
assert set(genre_vectors.keys()) == {1, 2, 3}

# 2. Test genre similarities
j12 = jaccard_similarity(1, 2, genre_vectors)
assert abs(j12 - 0.5) < 1e-6
c12 = cosine_similarity_genre(1, 2, genre_vectors)
exp_c12 = np.dot(genre_vectors[1], genre_vectors[2]) / (np.linalg.norm(genre_vectors[1]) * np.linalg.norm(genre_vectors[2]))
assert abs(c12 - exp_c12) < 1e-6

# 3. Test dissimilarity_from_similarity
sim_const = lambda a, b: 0.4
diss = dissimilarity_from_similarity(sim_const)
assert abs(diss(0, 0) - 0.6) < 1e-6

# 4. Test load_ratings_popularity
ratings_csv = StringIO("""userId,movieId,rating,timestamp
1,1,5.0,100
2,1,4.0,101
1,2,3.0,102
""")
movie_counts, total_users = load_ratings_popularity(ratings_csv)
assert movie_counts == {1: 2, 2: 1}
assert total_users == 2

# 5. Test popularity scores
assert abs(popularity_score_max_norm(1, movie_counts) - 1.0) < 1e-6
assert abs(popularity_score_user_norm(1, movie_counts, total_users) - 1.0) < 1e-6

# 6. Test cosine_similarity_embeddings
embs = {1: np.array([1, 0]), 2: np.array([0, 1])}
assert abs(cosine_similarity_embeddings(1, 2, embs) - 0.0) < 1e-6

# 7. Test calc_diversity
div = calc_diversity([1, 2, 3], lambda a, b: 1.0)
assert abs(div - 1.0) < 1e-6

# 8. Test calc_novelty
nov = calc_novelty([1, 2], lambda i: 0.25 if i == 1 else 0.75)
assert abs(nov - 0.5) < 1e-6

# 9. Test calc_serendipity
ser = calc_serendipity([1, 2, 3], {1, 3}, {2}, lambda a, b: 0.0, threshold=0.5)
assert abs(ser - (2/3)) < 1e-6

print("All tests passed!")


All tests passed!
