In [None]:
import numpy as np
import pandas as pd

In [None]:
class SimilarityMetric:

    def calculateSimilarity(self, u, v):
        pass

In [None]:
class CosineSimilarity(SimilarityMetric):
    def calculateSimilarity(self, u, v):
        return np.dot(u, v) / (np.linalg.norm(u) * np.linalg.norm(v) + 1e-9)

class PCCSimilarity(SimilarityMetric):
    def calculateSimilarity(self, u, v):
        return self.__pearson_correlation(u, v)
    
    def __pearson_correlation(self, user1_ratings, user2_ratings):
        common_items = user1_ratings.index.intersection(user2_ratings.index)
        if len(common_items) < 2:  # Require at least 2 common items for correlation
            return 0
        user1_common_ratings = user1_ratings[common_items]
        user2_common_ratings = user2_ratings[common_items]
        correlation = user1_common_ratings.corr(user2_common_ratings)
        if np.isnan(correlation):
            return 0
        return correlation

In [None]:
class CollaborativeFiltering:

    def __init__(self, data, metric:SimilarityMetric):
        """
        Initialize CollaborativeFiltering object with user-item rating data.
        
        Parameters:
        - data: DataFrame containing user-item ratings
        """
        self.data = data
        self.train_data = None
        self.test_data = None
        self.similarity_matrix = None
        self.metric = metric
    
    def calculate_similarity_matrix(self):
        pass

    def predict_ratings(self, user_id):
        pass

    def train_test_split(self, test_size = 0.2):
        """
        Split the data into training and test sets.
        
        Parameters:
        - test_size: Fraction of the data to be used for testing
        """
        np.random.seed(42)  # for reproducibility
        mask = np.random.rand(len(self.data)) < 1 - test_size
        self.train_data = self.data[mask]
        self.test_data = self.data[~mask]

    def evaluate(self):
        pass    

In [None]:
class CollaborativeFilteringItemItem(CollaborativeFiltering):

    def __init__(self, data, metric:SimilarityMetric):
        super.__init__(data, metric)
    

    def calculate_similarity_matrix(self):
        
        user_item_matrix = self.train_data.pivot(index='UserID', columns='MovieID', values='Rating').fillna(0)
        n_movies = user_item_matrix.shape[1]
        similarity_matrix = np.zeros((n_movies, n_movies))
        for i in range(n_movies):
            for j in range(n_movies):
                similarity_matrix[i, j] = self.metric.calculateSimilarity(user_item_matrix.iloc[:, i], user_item_matrix.iloc[:, j])
        self.similarity_matrix = pd.DataFrame(similarity_matrix, index=user_item_matrix.columns, columns=user_item_matrix.columns)

    def predict_ratings(self, user_id):
        """
        Predict ratings for items for a given user.
        
        Parameters:
        - user_id: ID of the user for whom to predict ratings
        
        Returns:
        - DataFrame containing predicted ratings for each item
        """
        user_ratings = self.train_data[self.train_data['UserID'] == user_id]
        predicted_ratings = pd.DataFrame(index=self.similarity_matrix.index, columns=['PredictedRating'])
        for item_id in predicted_ratings.index:
            numerator = 0
            denominator = 0
            for _, rating_row in user_ratings.iterrows():
                similarity = self.similarity_matrix.loc[item_id, rating_row['MovieID']]
                numerator += similarity * rating_row['Rating']
                denominator += abs(similarity)
            predicted_ratings.loc[item_id, 'PredictedRating'] = numerator / (denominator + 1e-9)  # Add a small value to avoid division by zero
        return predicted_ratings
    
    def evaluate(self):
        """
        Evaluate the Collaborative Filtering model on the test set.
        
        Returns:
        - Mean squared error (MSE) of the predictions
        """
        self.calculate_similarity_matrix()
        mse_sum = 0
        total_predictions = 0
        for user_id in self.test_data['UserID'].unique():
            user_test_ratings = self.test_data[self.test_data['UserID'] == user_id]
            user_predicted_ratings = self.predict_ratings(user_id)
            for _, row in user_test_ratings.iterrows():
                if row['MovieID'] in user_predicted_ratings.index:
                    total_predictions += 1
                    mse_sum += (row['Rating'] - user_predicted_ratings.loc[row['MovieID'], 'PredictedRating']) ** 2
        mse = mse_sum / total_predictions
        return mse
        

In [None]:
class CollaborativeFilteringUserUser(CollaborativeFiltering):
    def __init__(self, data, metric:SimilarityMetric):
        
        super.__init__(data, metric)

    def calculate_similarity_matrix(self):
        
        user_item_matrix = self.train_data.pivot(index='UserID', columns='MovieID', values='Rating').fillna(0)
        n_users = user_item_matrix.shape[0]
        similarity_matrix = np.zeros((n_users, n_users))
        for i in range(n_users):
            for j in range(n_users):
                similarity_matrix[i, j] = self.metric.calculateSimilarity(user_item_matrix.iloc[i, :], user_item_matrix.iloc[j, :])
                
        self.similarity_matrix = pd.DataFrame(similarity_matrix, index=user_item_matrix.index, columns=user_item_matrix.index)

    def predict_ratings(self, user_id):
        user_ratings = self.train_data[self.train_data['UserID'] == user_id]
        predicted_ratings = pd.DataFrame(index=self.similarity_matrix.index, columns=['PredictedRating'])
        for other_user_id in predicted_ratings.index:
            numerator = 0
            denominator = 0
            for _, rating_row in user_ratings.iterrows():
                similarity = self.similarity_matrix.loc[user_id, other_user_id]
                other_user_rating = self.train_data[(self.train_data['UserID'] == other_user_id) & (self.train_data['MovieID'] == rating_row['MovieID'])]['Rating']
                if not other_user_rating.empty:
                    numerator += similarity * float(other_user_rating)
                    denominator += abs(similarity)
            if denominator != 0:
                predicted_ratings.loc[other_user_id, 'PredictedRating'] = numerator / denominator
            else:
                predicted_ratings.loc[other_user_id, 'PredictedRating'] = np.nan
        return predicted_ratings

    def evaluate(self):
        self.calculate_similarity_matrix()
        mse_sum = 0
        total_predictions = 0
        for user_id in self.test_data['UserID'].unique():
            user_test_ratings = self.test_data[self.test_data['UserID'] == user_id]
            user_predicted_ratings = self.predict_ratings(user_id)
            for _, row in user_test_ratings.iterrows():
                other_user_rating = user_predicted_ratings.loc[row['UserID'], 'PredictedRating']
                if not np.isnan(other_user_rating):
                    total_predictions += 1
                    mse_sum += (row['Rating'] - other_user_rating) ** 2
        mse = mse_sum / total_predictions
        return mse


In [None]:
data=pd.read_csv("EncodedCombined.csv")

metric = CosineSimilarity()
# Create CollaborativeFiltering instance
cf = CollaborativeFilteringItemItem(data, metric)


# Split data into train and test sets
cf.train_test_split(test_size=0.2)

# Evaluate the model
mse = cf.evaluate()
print("Mean Squared Error:", mse)

In [None]:
metric = PCCSimilarity()
cf = CollaborativeFilteringItemItem(data, metric)

# Split data into train and test sets
cf.train_test_split(test_size=0.2)

# Evaluate the model
mse = cf.evaluate()
print("Mean Squared Error:", mse)

In [None]:
metric = CosineSimilarity()
# Create CollaborativeFiltering instance
cf = CollaborativeFilteringUserUser(data, metric)


# Split data into train and test sets
cf.train_test_split(test_size=0.2)

# Evaluate the model
mse = cf.evaluate()
print("Mean Squared Error:", mse)

In [None]:
metric = PCCSimilarity()
cf = CollaborativeFilteringUserUser(data, metric)

# Split data into train and test sets
cf.train_test_split(test_size=0.2)

# Evaluate the model
mse = cf.evaluate()
print("Mean Squared Error:", mse)