# Simple Recommender System

In this notebook we will see a simple implementation of a Recommender System based on *Collaborative Filtering*. We will use the MovieLens dataset:

https://grouplens.org/datasets/movielens/

Since we will store the full utility matrix, we consider a small dataset, and in particular the one recommended for education and development (small version). Such a dataset contains approximately 100,000 ratings to 9,000 films made by 600 users.

Users and items (hereinafter, we will use the term "item" instead of "film") are identified by integers, and the rating is a number bewteen 1 and 5. A sample of the file, which contains as also the timestamps, is:

```text
userId,movieId,rating,timestamp
1,1,4.0,964982703
1,3,4.0,964981247
1,6,4.0,964982224
1,47,5.0,964983815
1,50,5.0,964982931
1,70,3.0,964982400
1,101,5.0,964980868
```

## Loading the data


We first define the function to load the data. As usual, we need to adapt such a function to the specific file input format.

In particular, we are going to assign to users and items progressive numbers, so their identifications will be also the indexes of the utility matrix.

In [None]:
def load_data(filename: str) -> tuple[list[list[float]], int, int]:
    """Loads and processes rating data from a CSV file into indexed format.

    Args:
        filename: Path to the input CSV file. The file should contain
            rows in the format: user_id, item_id, rating (with a header row).

    Returns:
        A tuple containing:
            - A list of [user_idx, item_idx, rating] entries.
            - Total number of unique users.
            - Total number of unique items.
    """
    input_lines = []
    users = {}
    num_users = 0
    items = {}
    num_items = 0
    raw_lines = open(filename, "r").read().splitlines()
    # Remove the first line
    del raw_lines[0]
    for line in raw_lines:
        line_content = line.split(",")
        user_id = int(line_content[0])
        item_id = int(line_content[1])
        rating = float(line_content[2])
        if user_id not in users:
            users[user_id] = num_users
            num_users += 1
        if item_id not in items:
            items[item_id] = num_items
            num_items += 1
        input_lines.append([users[user_id], items[item_id], rating])
    return input_lines, num_users, num_items

Why do we use the following if statements?

```python
        if user_id not in users:
            users[user_id] = num_users
            num_users += 1
        if item_id not in items:
            items[item_id] = num_items
            num_items += 1
```

Let's explain it with an example:

| userId | movieId | rating |
| :----: | :-----: | :----: |
|   10   |   100   |   4.0  |
|   10   |   300   |   5.0  |
|   25   |   100   |   3.0  |

1. Process the first line (10,100,4.0):

- `user_id = 10` is new ⇒ assign `users[10] = 0` and increment `num_users` to 1.

- `item_id = 100` is new ⇒ assign `items[100] = 0` and increment `num_items` to 1.

Append `[ users[10], items[100], 4.0 ]` ⇒ `[0, 0, 4.0]`

2. Process the second line (10,300,5.0):

- `user_id = 10` already seen ⇒ `users[10]` stays 0.

- `item_id = 300` is new ⇒ assign `items[300] = 1` and increment `num_items` to 2.

Append `[0, 1, 5.0]`

And so on, until we obtain:

```python
users  = {10: 0, 25: 1}
items  = {100: 0, 300: 1}
input_lines = [
  [0, 0, 4.0],
  [0, 1, 5.0],
  [1, 0, 3.0]
]
num_users = 2
num_items = 2
```

**Note**: We remap the original `user_id` and `item_id` to consecutive zero-based integers (e.g., from 10 to 0, and from 25 to 1). This makes the data easier to store in dense matrix or tensor formats, speeds up computation, and avoids dealing with sparse or large original IDs.

The input file containing the dataset is called "3-ratings.csv".

On Colab, remember to mount your Drive
```python
from google.colab import drive
drive.mount('/content/drive')
input_file = "/content/drive/My Drive/..."
```
Let's load our dataset and see its initial content:

In [None]:
input_file = "./data/3-ratings.csv"

input_ratings, num_users, num_items = load_data(input_file)

print(
    "\nThe dataset contains",
    num_users,
    "users,",
    num_items,
    "items, and",
    len(input_ratings),
    "ratings.\n",
)
print("The first five ratings are:", input_ratings[:5], "\n")

## Utility matrix

Building and maintaining in memory the utility matrix is inefficient, since the matrix is sparse. But working with a matrix is more intuitive, therefore we will use such an approach. 

The best way to handle a matrix and the operations associated to it is to use Numpy arrays. If you are not familiar with Numpy, you can find different tutorials online. See for instance:

https://www.kaggle.com/saptarsi/numpy-tutorial-notebook-sg  
https://cs231n.github.io/python-numpy-tutorial/

In [None]:
import numpy as np


def create_utility_matrix(
    input_ratings: list[list[float]], num_users: int, num_items: int
) -> np.ndarray:
    # In the utils.py file we log the sparsity
    # While here we print it for simplicity
    """Builds the user-item utility matrix and logs its sparsity.

    This function is designed to work directly with the output of `load_data()`.
    It takes the remapped user/item indices and constructs a dense matrix where
    each cell [i, j] contains the rating assigned by user i to item j.

    Args:
        input_ratings: List of [user_idx, item_idx, rating] triples,
            as returned by `load_data()`.
        num_users: Total number of users (also from `load_data()`).
        num_items: Total number of items (also from `load_data()`).

    Returns:
        A 2D NumPy array representing the utility matrix.
    """
    # Create an NxI matrix of zeros,
    # where N = number of users
    # and I = number of items
    ratings = np.zeros((num_users, num_items))

    # Fill the matrix with the ratings
    # NOTE: input_ratings: list[list]
    for row in input_ratings:
        ratings[int(row[0]), int(row[1])] = row[2]

    # Compute the "sparsity", i.e., percentage of non-zero cells
    sparsity = 100 * float(np.count_nonzero(ratings)) / float(num_users * num_items)
    print("Sparsity: %.2f%%" % sparsity)

    return ratings

In [None]:
ratings = create_utility_matrix(input_ratings, num_users, num_items)

ratings

1.70% sparsity means:
- Only 1.7% of user–item pairs have known ratings.
- 98.3% of the matrix is unknown → needs to be predicted.

## Preliminary analysis

In the following, we suggest a set of preliminary analysis on the dataset that can be carried out with simple operations on the matrix:


### Question  Q1
<div class="alert alert-info">
For a given user id, find the number of rated items by that user, and the average rating.  
    
- **Hint**: Compute these values once for all users, and store them in another Nx2 matrix.
</div>

In [None]:
# your answer here
def count_and_avg_rating_per_user(utility_matrix: np.ndarray) -> np.ndarray:
    """Computes the number of rated items and average rating per user.

    Args:
        utility_matrix: A 2D NumPy array of shape (num_users, num_items),
            where each cell [i, j] contains the rating from user i for item j,
            or 0 if no rating was given.

    Returns:
        A NumPy array of shape (num_users, 2), where:
            - [:, 0] contains the count of rated items per user.
            - [:, 1] contains the average rating per user (0 if no ratings).
    """
    counts = np.count_nonzero(utility_matrix, axis=1)
    sums = utility_matrix.sum(axis=1)
    averages = np.zeros_like(sums, dtype=float)
    mask = counts > 0
    averages[mask] = sums[mask] / counts[mask]
    return np.vstack((counts, averages)).T


users_info = count_and_avg_rating_per_user(ratings)
users_info[:20]

### Question  Q2
<div class="alert alert-info">
Find the top-k viewers, i.e., users that rated the highest number of items.
    
- *Variation*: find the bottom-k viewers.
</div>

In [None]:
# your answer here
def top_bottom_users_by_count(
    user_info: np.ndarray, k: int, bottom_k: bool = False
) -> np.ndarray:
    """
    Returns the top or bottom k users based on the number of rated items, including their indices.

    By default, the function returns the top k users with the highest number of rated items.
    If `bottom_k` is True, it returns the bottom k users with the lowest number of rated items.

    Args:
        user_info (np.ndarray): NumPy array of shape (num_users, 2), where:
            - [:, 0] contains the count of rated items per user.
            - [:, 1] contains the average rating per user.
        k (int): Number of users to return.
        bottom_k (bool, optional): If True, returns the bottom k users instead of the top. Defaults to False.

    Returns:
        np.ndarray: Array of shape (k, 3), where each row contains:
            [user_index, count, average_rating].
    """
    if not bottom_k:
        indices = np.argsort(user_info[:, 0])[::-1][:k]
    else:
        indices = np.argsort(user_info[:, 0])[:k]

    return np.column_stack((indices, user_info[indices]))


top_10_users_by_count = top_bottom_users_by_count(users_info, 10)

# Print results without scientific notation
np.set_printoptions(suppress=True)
print(top_10_users_by_count)

### Question  Q3
<div class="alert alert-info">
For a given item id, find the number of users that rated it, and its average rating.
    
- **Hint**: Compute these values once for all items, and store them in another Ix2 matrix.
</div>

In [None]:
# your answer here
def count_and_avg_ratings_per_item(utility_matrix: np.ndarray) -> np.ndarray:
    """Computes the number of ratings and average rating per item.

    Args:
        utility_matrix: A 2D NumPy array of shape (num_users, num_items),
            where each cell [i, j] contains the rating from user i for item j,
            or 0 if no rating was given.

    Returns:
        A NumPy array of shape (num_items, 2), where:
            - [:, 0] contains the count of ratings per item.
            - [:, 1] contains the average rating per item (0 if unrated).
    """
    counts = np.count_nonzero(utility_matrix, axis=0)
    sums = utility_matrix.sum(axis=0)
    averages = np.zeros_like(sums, dtype=float)
    mask = counts > 0
    averages[mask] = sums[mask] / counts[mask]
    return np.vstack((counts, averages)).T


items_info = count_and_avg_ratings_per_item(ratings)
items_info[:20]

### Question  Q4
<div class="alert alert-info">
Find the top-k items with at least v views, i.e., the k items with the highest average rate that has been rated by at least v users.
</div>

In [None]:
# your answer here
def top_items_by_avg_and_count(
    item_info: np.ndarray, min_count: int, top_k: int
) -> np.ndarray:
    """Returns the top-k items with the highest average rating among those with sufficient ratings.

    Args:
        item_info: NumPy array of shape (num_items, 2), where:
            - [:, 0] contains the count of ratings per item.
            - [:, 1] contains the average rating per item.
        min_count: Minimum number of ratings required to include an item.
        top_k: Number of top items to return (default is 10).

    Returns:
        A NumPy array of shape (top_k, 3), where each row contains:
            [item_index, num_ratings, avg_rating].
    """
    mask = item_info[:, 0] >= min_count
    filtered = item_info[mask]
    indices = np.where(mask)[0]

    sorted_idx = np.argsort(filtered[:, 1])[::-1][:top_k]
    return np.column_stack((indices[sorted_idx], filtered[sorted_idx]))


top_10_items_with_100_ratings = top_items_by_avg_and_count(
    items_info, min_count=100, top_k=10
)
np.set_printoptions(suppress=True)
print(top_10_items_with_100_ratings)

### Question  Q5
<div class="alert alert-info">
Normalize the utility matrix by subtracting from each non-zero cell at row i the average rating of the user i.
</div>

In [None]:
# your answer here
def normalize_utility_matrix_by_user(utility_matrix: np.ndarray) -> np.ndarray:
    """Normalizes the utility matrix by subtracting each user's average rating.

    Each nonzero entry is replaced with (rating - user_avg). Unrated entries remain 0.

    Args:
        utility_matrix: A 2D NumPy array of shape (num_users, num_items),
            where each cell [i, j] contains the rating from user i for item j,
            or 0 if no rating was given.

    Returns:
        A normalized utility matrix of the same shape as the input,
        with zero-mean rows (for nonzero entries).
    """
    user_info = count_and_avg_rating_per_user(utility_matrix)
    avg_ratings = user_info[:, 1][:, None]
    return np.where(utility_matrix != 0, utility_matrix - avg_ratings, 0)


ratings_norm_u = normalize_utility_matrix_by_user(ratings)
print(ratings_norm_u[:10])

In [None]:
def normalize_utility_matrix_by_item(utility_matrix: np.ndarray) -> np.ndarray:
    """Normalizes the utility matrix by subtracting each item's average rating.

    Each nonzero entry is replaced with (rating - item_avg). Unrated entries remain 0.

    Args:
        utility_matrix: A 2D NumPy array of shape (num_users, num_items),
            where each cell [i, j] contains the rating from user i for item j,
            or 0 if no rating was given.

    Returns:
        A normalized utility matrix of the same shape as the input,
        with zero-mean columns (for nonzero entries).
    """
    item_info = count_and_avg_ratings_per_item(utility_matrix)
    avg_ratings = item_info[:, 1][None, :]
    return np.where(utility_matrix != 0, utility_matrix - avg_ratings, 0)


ratings_norm_i = normalize_utility_matrix_by_item(ratings)
print(ratings_norm_i[:10])

### Use of `[:, None]`:

```python
import numpy as np

a = np.array([10, 20, 30])
print(a.shape)          # (3,)

b = a[:, None]
print(b.shape)          # (3, 1)
print(b)
# Output:
# [[10]
#  [20]
#  [30]]
```

### Explanation of `np.where(condition, x, y)`

```python
if condition:
    use x
else:
    use y
```

## Splitting the utility matrix

We want to separate the values of utility matrix into two sets, train and test:

- The train set is used to compute the similarity between users or items;
- Using the similarity, we will then predict the ratings;
- We compare the prediction with the values in the test set to compute the prediction error.

**Note**: `ratings` simply represents the utility matrix (num_users, num_items). We computed it at the beginning of the notebook.

In [None]:
## previous definition
def train_test_split(
    ratings: np.ndarray, sample_per_user: int = 10
) -> tuple[np.ndarray, np.ndarray]:
    """Splits the utility matrix into train and test sets by sampling a fixed number of ratings per user.

    For each user, exactly `sample_per_user` rated items are randomly selected and moved to the test set.
    The train and test matrices are guaranteed to be disjoint.

    Args:
        ratings: A 2D NumPy array of shape (num_users, num_items),
            containing the full utility matrix with ratings.
        sample_per_user: Number of ratings to sample per user for the test set.

    Returns:
        A tuple of two NumPy arrays (train, test), each of shape (num_users, num_items).
    """
    test = np.zeros(ratings.shape)
    train = ratings.copy()
    for user in range(ratings.shape[0]):
        test_ratings = np.random.choice(
            ratings[user, :].nonzero()[0],
            size=sample_per_user,
            replace=False,
        )
        train[user, test_ratings] = 0.0
        test[user, test_ratings] = ratings[user, test_ratings]

    assert np.all((train * test) == 0)
    return train, test

Example above function usage:

```python
import numpy as np

# Utility matrix (2, 5)
ratings = np.array([
    [5, 0, 3, 0, 4],  # user 0
    [0, 2, 0, 0, 1],  # user 1
])

for user in range(ratings.shape[0]):
    test_ratings = np.random.choice(ratings[user, :].nonzero()[0], 
                                    size=sample_per_user, 
                                    replace=False)

```

What happens inside the for loop (**User 0**):
- `ratings[0, :]` = `[5, 0, 3, 0, 4]`
- `ratings[0, :].nonzero()[0]` → `np.array([0, 2, 4])` → indexes of non-zero ratings
- `np.random.choice([0, 2, 4], size=2, replace=False)` → randomly selects 2 indices from `[0, 2, 4]`, e.g., `[2, 4]`

User 1:
- `ratings[1, :]` = `[0, 2, 0, 0, 1]`
- Non-zero indices: `[1, 4]`
- `np.random.choice([1, 4], size=2, replace=False)` → `[4, 1]` (**Note**: order doesn’t matter)

**Note**: The sampled indices will be zeroed out in train and copied into test.

Finally, we use assure that train and test are truly dijoint with the line:
```python
assert np.all((train * test) == 0)
```

If any position has a value in both matrices, the product will be nonzero. 
`np.all(...) == 0` ensures all products are zero, i.e., no overlap anywhere.

assert syntax:
```python
assert condition, "Optional error message"
```

If the condition is:
- `True` → nothing happens, code continues.
- `False` → an `AssertionError` is raised and execution stops.

**Note**: Assertions are mainly used for debugging and internal sanity checks. don’t rely on them for critical validation in production code.

In [None]:
def train_test_split_v2(
    ratings: np.ndarray, sample_per_user: int = 10, seed: int = 123425536
) -> tuple[np.ndarray, np.ndarray]:
    """Splits the utility matrix into train and test sets for evaluation.

    For each user, a percentage of their ratings is randomly selected and moved to the test set.
    The resulting train and test matrices are disjoint.

    Args:
        ratings: A 2D NumPy array of shape (num_users, num_items),
            containing the full utility matrix with ratings.
        sample_per_user: Percentage of each user's ratings to sample for the test set.
        seed: Random seed for reproducibility.

    Returns:
        A tuple of two NumPy arrays (train, test), each of shape (num_users, num_items).
    """
    test = np.zeros(ratings.shape)
    train = ratings.copy()
    np.random.seed(seed)
    for user in range(ratings.shape[0]):
        num_ratings = len(ratings[user, :].nonzero()[0])
        if num_ratings == 0:
            continue
        actual_sample = int(num_ratings * sample_per_user / 100)
        test_ratings = np.random.choice(
            ratings[user, :].nonzero()[0],
            size=actual_sample,
            replace=False,
        )
        train[user, test_ratings] = 0.0
        test[user, test_ratings] = ratings[user, test_ratings]

    assert np.all((train * test) == 0)
    return train, test


Main differences with the prior `train_test_split` function:
1. `sample_per_user` is now interpreted as a percentage, not as an absolute count anymore.
```python 
actual_sample = int(num_ratings * sample_per_user / 100)
```
2. Random seed added.

3. Users with no ratings are skipped safely.
```python
if num_ratings == 0:
    continue
```

We run the above function by using 15 samples from each user for the test set:

In [None]:
train, test = train_test_split_v2(ratings, sample_per_user=10)

print("Non-zero elements in ratings ", np.count_nonzero(ratings))
print("Non-zero elements in train ", np.count_nonzero(train))
print("Non-zero elements in test ", np.count_nonzero(test))

In [None]:
# this version can be used after having answered to Q5

train_u, test_u = train_test_split_v2(ratings_norm_u, sample_per_user=10)
train_i, test_i = train_test_split_v2(ratings_norm_i, sample_per_user=10)

## Computing the similarity matrix 

Using the train set, we compute the user-user similarity matrix (NxN) and the item-item similairyt matrix (IxI). From the mathematical point of view, we have for users x and y:

$$
sim(x,y) = \cos(r_x, r_y) = \frac{\sum_i r_{xi} r_{yi}}{\sqrt{\sum_i r_{xi}^2}\sqrt{\sum_i r_{yi}^2}}
$$

A similar computation can be done for the item-item similarity.

Each matrix is computed using matrix operations.

In [None]:
# Cosine similarity (alternatively, Pearson correlation) is used in memory-based collaborative filtering
# (e.g., user-user or item-item CF).
# It is not applicable for rGLSVD and sGLSVD, which require clustering methods to assign users into fixed subsets.
def compute_similarity(
    ratings: np.ndarray,
    kind: str = "user",
    epsilon: float = 1e-9,
) -> np.ndarray:
    """Returns a cosine-similarity matrix for users or items.

    Args:
        ratings: Utility matrix of shape (num_users, num_items), where each
            non-zero entry denotes user-item interaction strength.
        kind: Either `"user"` for a user-user similarity matrix or `"item"` for
            an item-item similarity matrix.
        epsilon: Small constant added to the dot-product matrix to prevent
            division-by-zero when normalizing.

    Returns:
        out:
        A square NumPy array:
            * (num_users, num_users) if `kind == "user"`,
            * (num_items, num_items) if `kind == "item"`,
        where each entry ∈ [0, 1] is the cosine similarity between the
        corresponding user or item vectors.
    """
    # We compute the dot product between ratings
    # epsilon -> small number for handling divide-by-zero errors
    if kind == "user":
        sim = ratings.dot(ratings.T) + epsilon
    elif kind == "item":
        sim = ratings.T.dot(ratings) + epsilon
    else:
        raise ValueError("kind must be 'user' or 'item'.")

    # From the diagonal of the dot product matrix we extract the norms
    # (diagonal contains squared magnitudes: v·v = ||v||²)
    norms = np.array([np.sqrt(np.diagonal(sim))])

    # The double division allows broadcasting
    result: np.ndarray = sim / norms / norms.T
    return result

We are now ready to compute the two matrices:

In [None]:
user_similarity = compute_similarity(train, kind="user")
item_similarity = compute_similarity(train, kind="item")

# Show the first values of the item-item similairty matrix
print(item_similarity[:4, :4])

In [None]:
# This version can be used after having answered to Q5

user_similarity_n = compute_similarity(train_u, kind="user")
item_similarity_n = compute_similarity(train_i, kind="item")

# Show the first values of the item-item similairty matrix
print(item_similarity_n[:4, :4])

## Computing the prediction and the prediction error

Using the similarity matrix, we predict the rating of missing values. In case of user-user similarity, if we want to predict item i for user x, we have:

$$
\hat{r}_{xi} = \frac{\sum_y sim(x,y) r_{yi}}{\sum_y sim(x,y)}
$$


In [None]:
def predict_simple(ratings, similarity, kind="user"):
    if kind == "user":
        return similarity.dot(ratings) / np.array([similarity.sum(axis=1)]).T
    elif kind == "item":
        return ratings.dot(similarity) / np.array([similarity.sum(axis=1)])

To compute the error, we use the mean square error from Sklearn library:

In [None]:
from sklearn.metrics import mean_squared_error


def get_mse(pred, actual):
    # Ignore nonzero terms.
    pred = pred[actual.nonzero()].flatten()
    actual = actual[actual.nonzero()].flatten()
    return mean_squared_error(pred, actual)

In [None]:
user_prediction = predict_simple(train, user_similarity, kind="user")
item_prediction = predict_simple(train, item_similarity, kind="item")

print("User-based CF MSE: ", get_mse(user_prediction, test))
print("Item-based CF MSE: ", get_mse(item_prediction, test))

In [None]:
# this version can be used after having answered to Q5

user_prediction_n = predict_simple(train_u, user_similarity_n, kind="user")
item_prediction_n = predict_simple(train_i, item_similarity_n, kind="item")

print("User-based CF MSE: ", get_mse(user_prediction_n, test_u))
print("Item-based CF MSE: ", get_mse(item_prediction_n, test_i))

## Additional questions

The above procedure computes the similairity and the prediction for all users or all items. It would be interesting to work on single users, and see if we can improve the error for that user.


### Question  Q6
<div class="alert alert-info">
Consider the user similarity matrix and the item similarity matrix. 
For a given user id, consider the ratings in the test set. Predict those ratings using the user-user similarity matrix or with the items-items similairty matrix, and compute the error.
    
- **Note**: We are not interested in the error computed over all users, but only over for a specific user.
</div>

In [None]:
# your answer here

### Question  Q7
<div class="alert alert-info">
Repeat the above computation, but consider the error for only the top-5 recommended items.
</div>

In [None]:
# your answer here

### Question  Q8
<div class="alert alert-info">
Repeat the computations for questions Q5 and Q6, but, as recommendation, consider the top 30 most similar users or items, and check if this has an impact on the error.

In [None]:
# your answer here

## rGLSVD Implementation

In [None]:
# We start by splitting the utility matrix into train and test data
train_u, test_u = train_test_split_v2(ratings, sample_per_user=10)

In [None]:
from src.utils import binarize_ratings

# Simple function to binarize the utility matrix
binarized_train_u = binarize_ratings(train_u)

binarized_train_u

In [None]:
import numpy as np
from typing import Literal
from sklearn.cluster import KMeans
from sklearn.metrics import silhouette_score


def cluster_users_kmeans_grid(
    utility_matrix: np.ndarray,
    k_values: list[int],
    random_state: int = 42,
    select_by: Literal["silhouette", "none"] = "silhouette",
) -> tuple[np.ndarray, int, dict[int, float]]:
    """Cluster users with KMeans over k and (optionally) pick k by silhouette.

    Use 'silhouette' as a fast unsupervised heuristic. For paper-faithful selection,
    set select_by='none' and choose k later via HR/ARHR while training rGLSVD.

    Args:
        utility_matrix: (num_users, num_items) user–item matrix (binarized or not).
        k_values: Candidate cluster counts.
        random_state: Random seed.
        select_by: 'silhouette' to pick the best k now; 'none' to just compute and return scores.

    Returns:
        best_labels: Cluster assignments for the selected k (or the first k if select_by='none').
        best_k: Selected k (or the first k in k_values).
        scores: Mapping k -> silhouette score (NaN if degenerate).
    """
    scores: dict[int, float] = {}
    label_cache: dict[int, np.ndarray] = {}

    # Simple grid-search to find the best k among the tested values
    for k in k_values:
        labels = KMeans(
            n_clusters=k, random_state=random_state, n_init="auto"
        ).fit_predict(utility_matrix)
        label_cache[k] = labels
        scores[k] = (
            np.nan
            if len(np.unique(labels)) == 1
            else float(silhouette_score(utility_matrix, labels))
        )

    if select_by == "silhouette":
        valid = {k: s for k, s in scores.items() if not np.isnan(s)}
        if not valid:
            raise ValueError(
                "Clustering degenerated for all k; cannot select by silhouette."
            )
        best_k = max(valid, key=valid.get)
    else:
        best_k = k_values[0]

    return label_cache[best_k], best_k, scores

In [None]:
# We follow the k subset values tested in the original paper
k_values = [2, 3, 5, 10, 15, 20, 25, 30, 40, 50, 75, 100]

best_labels, best_k, scores = cluster_users_kmeans_grid(binarized_train_u, k_values)

print(f"Best labels:\n{best_labels}\nBest k:\n{best_k}\nScores: {scores}")

In [None]:
# Count number of users in each cluster
unique, counts = np.unique(best_labels, return_counts=True)
cluster_sizes = dict(zip(unique, counts))
print("Cluster sizes:", cluster_sizes)

# Percentage distribution
total_users = len(best_labels)
for cluster_id, size in cluster_sizes.items():
    print(f"Cluster {cluster_id}: {size} users ({size / total_users:.2%})")


In [None]:
import matplotlib.pyplot as plt

plt.bar(cluster_sizes.keys(), cluster_sizes.values())
plt.xlabel("Cluster ID")
plt.ylabel("Number of Users")
plt.title(f"Cluster size distribution (k={best_k})")
plt.show()
