## User-Based Collaborative Filtering на основе коэффициента корреляции Пирсона на датасете MovieLens

In [6]:
import numpy as np
import pandas as pd 

In [8]:
ratings = pd.read_csv("../data/u.data", sep='\t', header=None, names=["user_id", "movie_id", "rate", "timestamp"])

ratings.head()

Unnamed: 0,user_id,movie_id,rate,timestamp
0,196,242,3,881250949
1,186,302,3,891717742
2,22,377,1,878887116
3,244,51,2,880606923
4,166,346,1,886397596


In [10]:
movies = pd.read_csv("../data/u.item",
                    names=('movie','title'),
                    header=None, 
                    encoding='latin-1', 
                    sep='|', 
                    usecols=(0,1), index_col=0)
movies.head()

Unnamed: 0_level_0,title
movie,Unnamed: 1_level_1
1,Toy Story (1995)
2,GoldenEye (1995)
3,Four Rooms (1995)
4,Get Shorty (1995)
5,Copycat (1995)


In [11]:
data = ratings.pivot_table(
    index='user_id',      
    columns='movie_id',   
    values='rate',        
    fill_value=None
)

data.head()

movie_id,1,2,3,4,5,6,7,8,9,10,...,1673,1674,1675,1676,1677,1678,1679,1680,1681,1682
user_id,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1,Unnamed: 12_level_1,Unnamed: 13_level_1,Unnamed: 14_level_1,Unnamed: 15_level_1,Unnamed: 16_level_1,Unnamed: 17_level_1,Unnamed: 18_level_1,Unnamed: 19_level_1,Unnamed: 20_level_1,Unnamed: 21_level_1
1,5.0,3.0,4.0,3.0,3.0,5.0,4.0,1.0,5.0,3.0,...,,,,,,,,,,
2,4.0,,,,,,,,,2.0,...,,,,,,,,,,
3,,,,,,,,,,,...,,,,,,,,,,
4,,,,,,,,,,,...,,,,,,,,,,
5,4.0,3.0,,,,,,,,,...,,,,,,,,,,


In [20]:
data.loc[1][data.loc[1].isna()].index

Index([ 273,  274,  275,  276,  277,  278,  279,  280,  281,  282,
       ...
       1673, 1674, 1675, 1676, 1677, 1678, 1679, 1680, 1681, 1682],
      dtype='int64', name='movie_id', length=1410)

In [21]:
data.loc[1][data.loc[1].notna()].index

Index([  1,   2,   3,   4,   5,   6,   7,   8,   9,  10,
       ...
       263, 264, 265, 266, 267, 268, 269, 270, 271, 272],
      dtype='int64', name='movie_id', length=272)

### Коэффициент Корреляции Пирсона

https://habr.com/ru/companies/surfingbird/articles/139518/

"Прежде всего оставим в этих векторах только те элементы, для которых нам известны значения в обоих векторах, т.е. оставим только те продукты, которые оценили оба пользователя, или только тех пользователей, которые оба оценили данный продукт. В результате нам просто нужно определить, насколько похожи два вектора вещественных чисел."

In [None]:
import math


def calculate_similarity(user_1: np.ndarray, user_2: np.ndarray):
    common_movies = (~np.isnan(user_1)) & (~np.isnan(user_2))

    if not np.any(common_movies):
        return 0.0

    user_1_common = user_1[common_movies]
    user_2_common = user_2[common_movies]

    mean_u1 = np.mean(user_1_common)
    mean_u2 = np.mean(user_2_common)

    # Вычисляем числитель (ковариация)
    numerator = 0.0
    for i in range(len(user_1_common)):
        numerator += (u1_common.iloc[i] - mean_u1) * (user_2_common.iloc[i] - mean_u2)
    
    # Вычисляем знаменатель (произведение стандартных отклонений)
    sum_sq_u1 = 0.0
    sum_sq_u2 = 0.0
    
    for i in range(len(user_1_common)):
        sum_sq_u1 += (user_1_common.iloc[i] - mean_u1) ** 2
        sum_sq_u2 += (user_2_common.iloc[i] - mean_u2) ** 2
    
    denominator = math.sqrt(sum_sq_u1) * math.sqrt(sum_sq_u2)
    
    # Защита от деления на ноль
    if denominator == 0:
        return 0.0
    
    return numerator / denominator

In [25]:
from abc import ABCMeta, abstractmethod
from typing import List


class DatabaseError(ValueError):
    pass 

class DatabaseNotExists(DatabaseError):
    pass 

class UserTableNotExists(DatabaseError):
    pass 

class MovieTableNotExists(DatabaseError):
    pass 



class MovieDatabaseManager(metaclass=ABCMeta):
    @abstractmethod
    def movie_title_to_id(self, movie_title: str) -> int:
        """Получить id фильма по его названию"""
        pass


    @abstractmethod
    def movie_id_to_title(self, movie_id: int) -> str:
        """Получить название фильма по его id"""
        pass


    @abstractmethod
    def get_user_new_movies(self, user_id: int) -> List[str]:
        """Получить для пользователя список еще не оценённых фильмов"""
        pass 
    

    @abstractmethod
    def set_user_movie_rate(self, user_id: int, movie_title: str, rate: int) -> bool:
        """
        Обновить пользовательскую оценку фильма в БД.
        Возвращает bool:
            - True, если операция прошла успешно
            - False, если возникла ошибка записи в БД.
        """
        pass 


    @abstractmethod
    def get_user_movie_data(self):
        """
        Получение таблицы в формате user_x_movies (строка - это пользователи, столбцы - id фильмов, значения - оценки от 1 до 5)
        """
        pass 

In [54]:
import os 
from dataclasses import dataclass, field
import pandas as pd
import csv


@dataclass
class CsvDatabaseConfig:
    db_path: str = field(repr=True)
    user_table: str = field(repr=True)
    movie_table: str = field(repr=True)


class CsvMovieDatabaseManager(MovieDatabaseManager):
    def __init__(self, config: CsvDatabaseConfig):
        self.config = config

        if not os.path.exists(self.config.db_path):
            raise DatabaseNotExists
        
        if not os.path.exists(self.config.db_path + self.config.user_table):
            raise UserTableNotExists

        if not os.path.exists(self.config.db_path + self.config.movie_table):
            raise MovieTableNotExists
        
        self.id2title = dict()
        self.title2id = dict()

        self.id2title: dict[int, str] = pd.read_csv(self.config.db_path+self.config.movie_table,
                    names=('movie','title'),
                    header=None, 
                    encoding='latin-1', 
                    sep='|', 
                    usecols=(0,1), index_col=0).to_dict()['title']
        
        self.title2id: dict[str, int] = {value: key for key, value in self.id2title.items()}
    

    def movie_id_to_title(self, movie_id):
        return self.id2title.get(movie_id, None)
    
    
    def movie_title_to_id(self, movie_title):
        return self.title2id.get(movie_title, None)
    

    def get_user_movie_data(self) -> pd.DataFrame:
        ratings = pd.read_csv(
            self.config.db_path + self.config.user_table, 
            sep='\t', header=None, names=["user_id", "movie_id", "rate", "timestamp"])
        
        return ratings.pivot_table(
            index='user_id',      
            columns='movie_id',   
            values='rate',        
            fill_value=None)
    

    def get_user_new_movies(self, user_id: int):
        ratings_df: pd.DataFrame = self.get_user_movie_data()
        user_ratings: pd.Series = ratings_df.loc[user_id]

        user_new_movies_id = user_ratings[user_ratings.isna()].index
        user_new_movies: set = set()

        for movie_id in user_new_movies_id:
            movie_title: str = self.id2title.get(movie_id, None)
            if movie_title:
                user_new_movies.add(movie_title)

        return user_new_movies


    def set_user_movie_rate(self, user_id: int, movie_title: str, rate: int) -> bool:
        try:
            movie_id = self.title2id.get(movie_title, None)

            if movie_id:
                with open(self.config.db_path + self.config.user_table, mode='a', newline='') as file:
                    writer = csv.writer(file, delimiter='\t')
                    writer.writerow([user_id, movie_id, rate, None])
                return True
            
            else:
                return False
            
        except Exception as err:
            print(err)
            return False

In [55]:
config = CsvDatabaseConfig(
    db_path='../data/',
    user_table='u.data',
    movie_table='u.item'
)

manager = CsvMovieDatabaseManager(config)

In [56]:
class UserBasedCollaborativeFiltering(metaclass=ABCMeta):
    @abstractmethod
    def provide_recommendation(self, user_id: int, n_movies: int = 5, n_neighbors: int = 5) -> List[str]:
        """
        Предлагает список рекомендуемых фильмов для пользователя.
        Параметры:
            * n_movies: int - количество рекомендуемых фильмов
            * n_neighbors: int - количество рассматриваемых наиболее близких пользователей, из которых будут 
                формироваться рекомендации
        """
        pass 

In [113]:
import math

class PirsonUCF(UserBasedCollaborativeFiltering):
    def __init__(self, db_manager: MovieDatabaseManager):
        self.db_manager = db_manager


    def pirson_similarity(self, user_1: np.ndarray, user_2: np.ndarray):
        """Расчёт похожести двух пользователей по коэффициенту Пирсона"""

        common_movies = (~np.isnan(user_1)) & (~np.isnan(user_2))

        if not np.any(common_movies):
            return 0.0
        
        user_1_common = user_1[common_movies]
        user_2_common = user_2[common_movies]

        if len(user_1_common) < 2:
            return 0.0
        
        mean_user_1 = np.mean(user_1_common)
        mean_user_2 = np.mean(user_2_common)

        # вычисление числителя
        numerator = 0.0
        for i in range(len(user_1_common)):
            numerator += (user_1_common.iloc[i] - mean_user_1) * (user_2_common.iloc[i] - mean_user_2)

        # вычисление знаменателя
        sum_sq_user_1 = 0.0
        sum_sq_user_2 = 0.0
        
        for i in range(len(user_1_common)):
            sum_sq_user_1 += (user_1_common.iloc[i] - mean_user_1) ** 2
            sum_sq_user_2 += (user_2_common.iloc[i] - mean_user_2) ** 2
        
        denominator = math.sqrt(sum_sq_user_1) * math.sqrt(sum_sq_user_2 )

        if denominator == 0:
            return 0.0
        
        return numerator / denominator


    def provide_recommendation(self, user_id, n_movies = 5, n_neighbors: int = 5):
        ratings: pd.DataFrame = self.db_manager.get_user_movie_data()

        # Если пользователя нет, возвращаем случайные фильмы
        if user_id not in ratings.index:
            all_movie_ids = ratings.columns.tolist()
            random_movies = np.random.choice(all_movie_ids, size=min(n_movies, len(all_movie_ids)), replace=False)
            return [self.db_manager.movie_id_to_title(movie_id) for movie_id in random_movies]

        target_user_ratings = ratings.loc[user_id]

        # Вычисляем схожесть со всеми другими пользователями
        similarities = []
        for other_user_id in ratings.index:
            if other_user_id == user_id:
                continue
                
            other_user_ratings = ratings.loc[other_user_id]
            similarity = self.pirson_similarity(target_user_ratings, other_user_ratings)
            similarities.append((other_user_id, similarity))

        # Сортируем по убыванию схожести и берем n_neighbors ближайших соседей
        similarities.sort(key=lambda x: x[1], reverse=True)
        top_neighbors = similarities[:n_neighbors]

        target_user_new_movies = self.db_manager.get_user_new_movies(user_id=user_id)

        recommendation = set()
        
        for neighbor_id, similarity in top_neighbors:
            neighbor_ratings = ratings.loc[neighbor_id]
            
            neighbor_good_movies = neighbor_ratings[neighbor_ratings >= 4].index
            
            if len(recommendation) >= n_movies:
                break 

            for movie_id in neighbor_good_movies:
                movie_title = self.db_manager.movie_id_to_title(movie_id=movie_id)
                if movie_title in target_user_new_movies:
                    recommendation.add(movie_title)
                    if len(recommendation) >= n_movies:
                        break
        
        # если вдруг не получилось извлечь достаточно рекомендаций добавляем случайных
        if len(recommendation) < n_movies:
            additional_movies_cnt = n_movies - len(recommendation)
            additional_movies = np.random.choice(target_user_new_movies, size=additional_movies_cnt, replace=False)
            recommendation.update(additional_movies)
            
        return recommendation

In [114]:
data = manager.get_user_movie_data()

In [115]:
recsys = PirsonUCF(db_manager=manager)

In [116]:
recsys.provide_recommendation(123456)

{'Dead Man Walking (1995)',
 'Four Rooms (1995)',
 'Shanghai Triad (Yao a yao yao dao waipo qiao) (1995)',
 'Toy Story (1995)',
 'Twelve Monkeys (1995)'}

In [61]:
manager.set_user_movie_rate(123456, "Little Big League (1994)", 5)

True

In [98]:
np.random.choice([1, 2, 3, 4, 5, 6], size=5, replace=False)

array([6, 1, 2, 3, 5])

In [117]:
manager.get_user_movie_data()

movie_id,1,2,3,4,5,6,7,8,9,10,...,1673,1674,1675,1676,1677,1678,1679,1680,1681,1682
user_id,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1,Unnamed: 12_level_1,Unnamed: 13_level_1,Unnamed: 14_level_1,Unnamed: 15_level_1,Unnamed: 16_level_1,Unnamed: 17_level_1,Unnamed: 18_level_1,Unnamed: 19_level_1,Unnamed: 20_level_1,Unnamed: 21_level_1
1,5.0,3.0,4.0,3.0,3.0,5.0,4.0,1.0,5.0,3.0,...,,,,,,,,,,
2,4.0,,,,,,,,,2.0,...,,,,,,,,,,
3,,,,,,,,,,,...,,,,,,,,,,
4,,,,,,,,,,,...,,,,,,,,,,
5,4.0,3.0,,,,,,,,,...,,,,,,,,,,
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
940,,,,2.0,,,4.0,5.0,3.0,,...,,,,,,,,,,
941,5.0,,,,,,4.0,,,,...,,,,,,,,,,
942,,,,,,,,,,,...,,,,,,,,,,
943,,5.0,,,,,,,3.0,,...,,,,,,,,,,
