In [None]:
import numpy as np
from collections import defaultdict
import pandas as pd
from sklearn.model_selection import KFold
from typing import List, Tuple, Dict
import math

In [None]:
df_ratings = pd.read_csv(".\\ml-latest-small\\ratings.csv").drop(columns=["timestamp"])
print(df_ratings.shape)
print(df_ratings.info())
df_ratings.columns = ['user_id', 'item_id', 'rating']
df_ratings.head()

In [None]:
class SimpleRecommender:
    def __init__(self, k_neighbors=5):
        # Dictionary to store user ratings: {user_id: {item_id: rating}}
        self.user_ratings = defaultdict(dict)
        # Dictionary to store item ratings: {item_id: {user_id: rating}}
        self.item_ratings = defaultdict(dict)
        # Store mean ratings for normalization
        self.user_means = {}
        self.item_means = {}
        self.k_neighbors = k_neighbors
        
    def fit(self, ratings_df):
        """
        Process ratings data into user-item and item-user mappings
        
        Parameters:
        ratings_df: DataFrame with columns [user_id, item_id, rating]
        """
        # Create user-item and item-user matrices
        for _, row in ratings_df.iterrows():
            self.user_ratings[row['user_id']][row['item_id']] = row['rating']
            self.item_ratings[row['item_id']][row['user_id']] = row['rating']
            
        # Calculate mean ratings for each user and item
        for user_id, ratings in self.user_ratings.items():
            self.user_means[user_id] = np.mean(list(ratings.values()))
        
        for item_id, ratings in self.item_ratings.items():
            self.item_means[item_id] = np.mean(list(ratings.values()))
    
    def get_user_item_matrix(self):
        """Return the user-item matrix as a pandas DataFrame"""
        data = []
        for user_id in self.user_ratings:
            for item_id in self.item_ratings:
                rating = self.user_ratings[user_id].get(item_id, np.nan)
                data.append([user_id, item_id, rating])
        
        return pd.DataFrame(data, columns=['user_id', 'item_id', 'rating']).pivot(
            index='user_id', columns='item_id', values='rating'
        )
    
    def compute_similarity(self, ratings1, ratings2, mean1, mean2):
        """
        Compute adjusted cosine similarity between two rating vectors
        
        Parameters:
        ratings1, ratings2: dictionaries of ratings
        mean1, mean2: mean ratings for normalization
        """
        common_ids = set(ratings1.keys()) & set(ratings2.keys())
        
        if len(common_ids) < 2:  # Require at least 2 common ratings
            return 0.0
        
        # Calculate normalized ratings
        norm1 = np.array([ratings1[i] - mean1 for i in common_ids])
        norm2 = np.array([ratings2[i] - mean2 for i in common_ids])
        
        # Compute similarity using adjusted cosine similarity
        num = np.dot(norm1, norm2)
        den = np.sqrt(np.dot(norm1, norm1)) * np.sqrt(np.dot(norm2, norm2))
        
        return num / den if den != 0 else 0.0
    
    def get_top_n_items(self, user_id, n=10, method='item_based'):
        """
        Get top N recommended items for a user
        
        Parameters:
        user_id: ID of the user
        n: Number of recommendations to return
        method: 'item_based' or 'user_based'
        
        Returns:
        List of tuples (item_id, predicted_rating)
        """
        # Get items the user hasn't rated yet
        rated_items = set(self.user_ratings[user_id].keys())
        all_items = set(self.item_ratings.keys())
        items_to_predict = all_items - rated_items
        
        # Predict ratings for all unrated items
        predictions = []
        for item_id in items_to_predict:
            if method == 'item_based':
                pred = self.predict_item_based(user_id, item_id)
            else:
                pred = self.predict_user_based(user_id, item_id)
            predictions.append((item_id, pred))
        
        # Return top N items
        return sorted(predictions, key=lambda x: x[1], reverse=True)[:n]
    
    def get_similar_items(self, item_id, n=10):
        """
        Get N most similar items to a given item
        
        Parameters:
        item_id: ID of the item
        n: Number of similar items to return
        
        Returns:
        List of tuples (item_id, similarity_score)
        """
        if item_id not in self.item_ratings:
            return []
        
        similarities = []
        for other_item_id in self.item_ratings:
            if other_item_id != item_id:
                sim = self.compute_similarity(
                    self.item_ratings[item_id],
                    self.item_ratings[other_item_id],
                    self.item_means[item_id],
                    self.item_means[other_item_id]
                )
                similarities.append((other_item_id, sim))
        
        return sorted(similarities, key=lambda x: x[1], reverse=True)[:n]

    def get_similar_users(self, user_id, n=10):
        """
        Get N most similar users to a given user
        
        Parameters:
        user_id: ID of the user
        n: Number of similar users to return
        
        Returns:
        List of tuples (user_id, similarity_score)
        """
        if user_id not in self.user_ratings:
            return []
                
        similarities = []
        for other_user_id in self.user_ratings:
            if other_user_id != user_id:
                sim = self.compute_similarity(
                    self.user_ratings[user_id],
                    self.user_ratings[other_user_id],
                    self.user_means[user_id],
                    self.user_means[other_user_id]
                )
                similarities.append((other_user_id, sim))
        
        return sorted(similarities, key=lambda x: x[1], reverse=True)[:n]
    
    def predict_item_based(self, user_id, item_id):
        """Predict rating using item-based collaborative filtering"""
        if item_id not in self.item_ratings:
            return self.user_means.get(user_id, 0)
        
        # Find items rated by the user
        user_items = self.user_ratings.get(user_id, {})
        if not user_items:
            return self.item_means.get(item_id, 0)
        
        # Compute similarities with items the user has rated
        similarities = []
        for rated_item_id in user_items:
            if rated_item_id != item_id:
                sim = self.compute_similarity(
                    self.item_ratings[item_id],
                    self.item_ratings[rated_item_id],
                    self.item_means[item_id],
                    self.item_means[rated_item_id]
                )
                similarities.append((sim, rated_item_id))
        
        # Get top k neighbors
        neighbors = sorted(similarities, reverse=True)[:self.k_neighbors]
        
        if not neighbors:
            return self.item_means.get(item_id, 0)
        
        # Weighted average of ratings
        num = sum(sim * self.user_ratings[user_id][item_id] 
                 for sim, item_id in neighbors)
        den = sum(abs(sim) for sim, _ in neighbors)
        
        return num / den if den != 0 else self.item_means.get(item_id, 0)
    
    def predict_user_based(self, user_id, item_id):
        """
        Predict rating using user-based collaborative filtering
        
        Parameters:
        user_id: ID of the user
        item_id: ID of the item
        
        Returns:
        Predicted rating
        """
        if user_id not in self.user_ratings:
            return self.item_means.get(item_id, 0)
        
        # Find users who rated the item
        item_users = self.item_ratings.get(item_id, {})
        if not item_users:
            return self.user_means.get(user_id, 0)
        
        # Compute similarities with other users who rated this item
        similarities = []
        for other_user_id in item_users:
            if other_user_id != user_id:
                sim = self.compute_similarity(
                    self.user_ratings[user_id],
                    self.user_ratings[other_user_id],
                    self.user_means[user_id],
                    self.user_means[other_user_id]
                )
                similarities.append((sim, other_user_id))
        
        # Get top k neighbors
        neighbors = sorted(similarities, reverse=True)[:self.k_neighbors]
        
        if not neighbors:
            return self.user_means.get(user_id, 0)
        
        # Compute weighted average of neighbors' ratings
        numerator = 0
        denominator = 0
        
        for sim, neighbor_id in neighbors:
            # Skip if neighbor hasn't rated the item
            if item_id not in self.user_ratings[neighbor_id]:
                continue
                
            # Get neighbor's rating and normalize it
            neighbor_rating = self.user_ratings[neighbor_id][item_id]
            neighbor_mean = self.user_means[neighbor_id]
            
            numerator += sim * (neighbor_rating - neighbor_mean)
            denominator += abs(sim)
        
        if denominator == 0:
            return self.user_means.get(user_id, 0)
            
        # Return prediction
        return self.user_means[user_id] + (numerator / denominator)
    
    def calculate_metrics(self, true_ratings: List[Tuple[int, int, float]], 
                         method: str = 'item_based') -> Dict[str, float]:
        """
        Calculate RMSE and MAE for a set of predictions
        
        Parameters:
        true_ratings: List of tuples (user_id, item_id, rating)
        method: 'item_based' or 'user_based'
        
        Returns:
        Dictionary containing RMSE and MAE values
        """
        if not true_ratings:
            return {'rmse': 0.0, 'mae': 0.0}
            
        squared_errors = []
        absolute_errors = []
        
        for user_id, item_id, true_rating in true_ratings:
            if method == 'item_based':
                pred_rating = self.predict_item_based(user_id, item_id)
            else:
                pred_rating = self.predict_user_based(user_id, item_id)
                
            squared_error = (true_rating - pred_rating) ** 2
            absolute_error = abs(true_rating - pred_rating)
            
            squared_errors.append(squared_error)
            absolute_errors.append(absolute_error)
        
        rmse = math.sqrt(np.mean(squared_errors))
        mae = np.mean(absolute_errors)
        
        return {
            'rmse': rmse,
            'mae': mae
        }
    
    def cross_validate(self, ratings_df: pd.DataFrame, n_splits: int = 5, 
                      method: str = 'item_based') -> Dict[str, List[float]]:
        """
        Perform k-fold cross-validation
        
        Parameters:
        ratings_df: DataFrame with columns [user_id, item_id, rating]
        n_splits: Number of folds for cross-validation
        method: 'item_based' or 'user_based'
        
        Returns:
        Dictionary containing lists of RMSE and MAE values for each fold
        """
        kf = KFold(n_splits=n_splits, shuffle=True, random_state=42)
        
        rmse_scores = []
        mae_scores = []
        
        # Convert DataFrame to array for easier splitting
        ratings_array = ratings_df.values
        
        for fold_idx, (train_idx, test_idx) in enumerate(kf.split(ratings_array)):
            print(f"Processing fold {fold_idx + 1}/{n_splits}")
            
            # Split data into train and test sets
            train_data = ratings_array[train_idx]
            test_data = ratings_array[test_idx]
            
            # Convert train data back to DataFrame and fit the model
            train_df = pd.DataFrame(train_data, columns=ratings_df.columns)
            self.fit(train_df)
            
            # Calculate metrics for this fold
            test_ratings = [(user_id, item_id, rating) 
                          for user_id, item_id, rating in test_data]
            metrics = self.calculate_metrics(test_ratings, method)
            
            rmse_scores.append(metrics['rmse'])
            mae_scores.append(metrics['mae'])
        
        return {
            'rmse_scores': rmse_scores,
            'mae_scores': mae_scores,
            'mean_rmse': np.mean(rmse_scores),
            'std_rmse': np.std(rmse_scores),
            'mean_mae': np.mean(mae_scores),
            'std_mae': np.std(mae_scores)
        }


In [None]:
# Single cross-validation run
recommender = SimpleRecommender(k_neighbors=5)
cv_results = recommender.cross_validate(ratings_df=df_ratings, n_splits=5, method='item_based')
cv_results

In [None]:
def evaluate_recommender_system(ratings_df: pd.DataFrame, 
                                methods: List[str] = ['item_based', 'user_based'],
                                k_neighbors_list: List[int] = [5, 10, 15],
                                n_splits: int = 5) -> Dict:
        """
        Evaluate recommender system with different parameters and methods
        
        Parameters:
        ratings_df: DataFrame with columns [user_id, item_id, rating]
        methods: List of methods to evaluate
        k_neighbors_list: List of k_neighbors values to try
        n_splits: Number of folds for cross-validation
        
        Returns:
        Dictionary containing evaluation results
        """
        results = {}
        
        for method in methods:
            method_results = {}
            for k in k_neighbors_list:
                print(f"\nEvaluating {method} method with k={k}")
                recommender = SimpleRecommender(k_neighbors=k)
                cv_results = recommender.cross_validate(ratings_df=ratings_df, n_splits=n_splits, method=method)
                
                method_results[k] = {
                    'mean_rmse': cv_results['mean_rmse'],
                    'std_rmse': cv_results['std_rmse'],
                    'mean_mae': cv_results['mean_mae'],
                    'std_mae': cv_results['std_mae']
                }
            
            results[method] = method_results
        
        return results

In [None]:
# Comprehensive evaluation of different configurations
results = evaluate_recommender_system(
    df_ratings,
    k_neighbors_list=[5, 10, 15],
    n_splits=5
)
results