In [60]:
import pandas as pd
import numpy as np
from typing import Callable
from sklearn.metrics.pairwise import cosine_similarity
from scipy.sparse import csr_matrix
from sklearn.neighbors import NearestNeighbors

In [61]:
def load_data(file, sep='\t'):
    return pd.read_csv(f'./lfm-challenge-data/{file}', delimiter=sep)

In [62]:
users = load_data('lfm-challenge.user')
items = load_data('lfm-challenge.item')
inter_train = load_data('lfm-challenge.inter_train')
inter_test = load_data('lfm-challenge.inter_test')
test_users = pd.read_csv(f'./lfm-challenge-data/test_indices.txt')['users'].values

n_users = users['user_id'].values.size
n_items = items.index.values.size

In [63]:
def create_interaction_matrix(users, items, inter, threshold=1, binary=False):
    interaction_matrix = np.zeros((n_users, n_items), dtype=np.int8)
    
    for user in range(n_users):
        interacted_items = inter.loc[inter['user_id'] == user, 'item_id'].values
        rate_of_items = inter.loc[inter['user_id'] == user, 'listening_events'].values
        
        for item in range(interacted_items.size):
            rating = rate_of_items[item]
            if binary:
                rating = 0 if rating < threshold else 1
            
            interaction_matrix[user, interacted_items[item]] = rating
    
    return interaction_matrix

In [64]:
interaction_matrix = create_interaction_matrix(users, items, inter_train, binary=True)
test_interaction_matrix = create_interaction_matrix(users, items, inter_test, binary=True)

In [65]:
def create_item_knn(interaction_matrix, n_neighbors=5):
    # Convert the numpy array to a sparse matrix
    interaction_csr = csr_matrix(interaction_matrix)

    # Initialize the model
    model_knn = NearestNeighbors(metric='cosine', algorithm='brute', n_neighbors=n_neighbors, n_jobs=-1)
    model_knn.fit(interaction_csr.T)  # Fit the model on the transpose of the matrix (items as rows)

    return model_knn

In [66]:
def make_recommendations(user_id, model_knn, interaction_matrix, n_recommendations=10):
    # Get the interactions of the user
    user_interactions = interaction_matrix[user_id]

    # Get the indices of the items that the user has interacted with
    interacted_indices = np.where(user_interactions > 0)[0]

    # Get the distances and indices of the nearest items to the ones the user has interacted with
    distances, indices = model_knn.kneighbors(interaction_matrix.T[interacted_indices], n_neighbors=n_recommendations+1)

    # Flatten the distances and indices arrays
    distances = distances.flatten()
    indices = indices.flatten()

    # Sort the indices by distances and get the top N recommendations
    recommendations = indices[np.argsort(distances)[:n_recommendations]]

    return recommendations


In [67]:
model_knn = create_item_knn(interaction_matrix)

In [68]:
recommendations = []
for i, user_id in enumerate(test_users):
    print(f"{i}/100", end="\r")
    recommendations.append(make_recommendations(user_id, model_knn, interaction_matrix))

99/100

In [69]:
def get_ndcg_score(predictions: np.ndarray, test_interaction_matrix: np.ndarray, topK=10) -> float:
    """
    predictions - np.ndarray - predictions of the recommendation algorithm for each user.
    test_interaction_matrix - np.ndarray - test interaction matrix for each user.
    topK - int - topK recommendations should be evaluated.
    
    returns - average ndcg score over all users.
    """
    score = None
    
    # TODO: YOUR IMPLEMENTATION.
    n_users = predictions.shape[0]
    discounts = np.log2(np.arange(2, topK+2))
    ndcg_scores = np.zeros(n_users)
    
    for user in range(n_users):
        top_items = predictions[user][:topK]
        relevant_items = test_interaction_matrix[user].nonzero()[0]
        relevant_top_items = np.intersect1d(top_items, relevant_items)
        relevance = np.zeros(topK)
        relevance[np.where(np.isin(top_items, relevant_top_items))] = 1
        ideal_relevance = np.zeros(len(relevant_items))
        ideal_relevance[0] = 1
        print(ideal_relevance)
        ideal_dcg = np.sum(ideal_relevance / discounts[:len(ideal_relevance)])
        dcg = np.sum(relevance / discounts[:len(relevance)])
        
        if ideal_dcg > 0:
            ndcg_scores[user] = dcg / ideal_dcg
        else:
            ndcg_scores[user] = 0
    
    score = np.mean(ndcg_scores)

    return score

In [75]:
recommendations = np.array(recommendations)

In [77]:
def get_ndcg_score(predictions: np.ndarray, test_interaction_matrix: np.ndarray, topK=10) -> float:
    """
    predictions - np.ndarray - predictions of the recommendation algorithm for each user.
    test_interaction_matrix - np.ndarray - test interaction matrix for each user.
    topK - int - topK recommendations should be evaluated.
    
    returns - average ndcg score over all users.
    """
    score = None
    
    # TODO: YOUR IMPLEMENTATION.
    n_users = predictions.shape[0]
    discounts = np.log2(np.arange(2, topK+2))
    ndcg_scores = np.zeros(n_users)
    
    for user in range(n_users):
        top_items = predictions[user][:topK]
        relevant_items = test_interaction_matrix[user].nonzero()[0]
        relevant_top_items = np.intersect1d(top_items, relevant_items)
        relevance = np.zeros(topK)
        relevance[np.where(np.isin(top_items, relevant_top_items))] = 1
        ideal_relevance = np.zeros(len(relevant_items))
        ideal_relevance[0] = 1
        ideal_dcg = np.sum(ideal_relevance / discounts[:len(ideal_relevance)])
        dcg = np.sum(relevance / discounts[:len(relevance)])
        
        if ideal_dcg > 0:
            ndcg_scores[user] = dcg / ideal_dcg
        else:
            ndcg_scores[user] = 0
    
    score = np.mean(ndcg_scores)

    return score

In [79]:
predictions = np.array([[0, 1, 2, 3], [3, 2, 1, 0]])
test_interaction_matrix = np.array([[1, 0, 0, 0], [0, 0, 0, 1]])

ndcg_score = get_ndcg_score(predictions, test_interaction_matrix, topK=4)
ndcg_score

1.0