In [1]:
import math
import numpy as np
import scipy.sparse as sp
import pandas as pd
from tqdm import tqdm
from collections import defaultdict
import os

import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader

from box import Box

import warnings

warnings.filterwarnings(action='ignore')
torch.set_printoptions(sci_mode=True)

Pyarrow will become a required dependency of pandas in the next major release of pandas (pandas 3.0),
(to allow more performant data types, such as the Arrow string type, and better interoperability with other libraries)
but was not found to be installed on your system.
If this would cause problems for you,
please provide us feedback at https://github.com/pandas-dev/pandas/issues/54466
        
  import pandas as pd


# 1. 학습 설정

In [154]:
config = {
    'data_path' : "C:/Users/ansck/python/추천시스템/" , # 데이터 경로
    'valid_samples' : 1, # 검증에 사용할 sample 수
    'seed' : 22,
}

device = 'cuda' if torch.cuda.is_available() else 'cpu'

config = Box(config)

# 2. 데이터 전처리

In [164]:
class MakeMatrixDataSet():
    """
    MatrixDataSet 생성
    """
    def __init__(self, config):
        self.config = config
        self.column_names = ['productId','userId','rating','timestamp']
        self.df = pd.read_csv(os.path.join(self.config.data_path, 'AMAZON_FASHION.csv'), names=self.column_names)
        
        self.df = self.preprocess_data(min_len = 5)
        
        self.item_encoder, self.item_decoder = self.generate_encoder_decoder('productId')
        self.user_encoder, self.user_decoder = self.generate_encoder_decoder('userId')
        self.num_item, self.num_user = len(self.item_encoder), len(self.user_encoder)

        self.df['item_idx'] = self.df['productId'].apply(lambda x : self.item_encoder[x])
        self.df['user_idx'] = self.df['userId'].apply(lambda x : self.user_encoder[x])

        self.user_train, self.user_valid = self.generate_sequence_data()
        
    def preprocess_data(self, min_len = 5):
        """
        min_len 이하의 유저 sequence는 삭제
        """
        userId_counts = self.df['userId'].value_counts()
        valid_users = userId_counts[userId_counts >= min_len].index
        df_validusers = self.df[self.df['userId'].isin(valid_users)]
        
        return df_validusers

    def generate_encoder_decoder(self, col : str) -> dict:
        """
        encoder, decoder 생성

        Args:
            col (str): 생성할 columns 명
        Returns:
            dict: 생성된 user encoder, decoder
        """

        encoder = {}
        decoder = {}
        ids = self.df[col].unique()

        for idx, _id in enumerate(ids):
            encoder[_id] = idx
            decoder[idx] = _id

        return encoder, decoder
    
    def generate_sequence_data(self) -> dict:
        """
        sequence_data 생성

        Returns:
            dict: train user sequence / valid user sequence
        """
        users = defaultdict(list)
        user_train = {}
        user_valid = {}
        for user, item, time in zip(self.df['user_idx'], self.df['item_idx'], self.df['timestamp']):
            users[user].append(item)
        
        for user in users:
            np.random.seed(self.config.seed)

            user_total = users[user]
            valid = np.random.choice(user_total, size = self.config.valid_samples, replace = False).tolist()
            train = list(set(user_total) - set(valid))

            user_train[user] = train
            user_valid[user] = valid # valid_samples 개수 만큼 검증에 활용 (현재 Task와 가장 유사하게)

        return user_train, user_valid
    
    def get_train_valid_data(self):
        return self.user_train, self.user_valid

    def make_matrix(self, user_list, train = True):
        """
        user_item_dict를 바탕으로 행렬 생성
        """
        mat = torch.zeros(size = (user_list.size(0), self.num_item))
        for idx, user in enumerate(user_list):
            if train:
                mat[idx, self.user_train[user.item()]] = 1
            else:
                mat[idx, self.user_train[user.item()] + self.user_valid[user.item()]] = 1
        return mat

    def make_sparse_matrix(self):
        X = sp.dok_matrix((self.num_user, self.num_item), dtype=np.float32)
        for user in self.user_train.keys():
            item_list = self.user_train[user]
            X[user, item_list] = 1.0
        
        self.X = X.tocsr()        
        return X.tocsr()

In [165]:
class AEDataSet(Dataset):
    def __init__(self, num_user):
        self.num_user = num_user
        self.users = [i for i in range(num_user)]

    def __len__(self):
        return self.num_user

    def __getitem__(self, idx): 
        user = self.users[idx]
        return torch.LongTensor([user])

# 3. 모델

In [166]:
class EASE():
    def __init__(self, X, reg):
        self.X = self._convert_sp_mat_to_sp_tensor(X)
        self.reg = reg
    
    def _convert_sp_mat_to_sp_tensor(self, X):
        """
        Convert scipy sparse matrix to PyTorch sparse matrix

        Arguments:
        ----------
        X = Adjacency matrix, scipy sparse matrix
        """
        coo = X.tocoo().astype(np.float32)
        i = torch.LongTensor(np.mat([coo.row, coo.col]))
        v = torch.FloatTensor(coo.data)
        res = torch.sparse.FloatTensor(i, v, coo.shape).to(device)
        return res
    
    def fit(self):
        '''

        진짜 정말 간단한 식으로 모델을 만듬
        '''
        G = self.X.to_dense().t() @ self.X.to_dense()
        diagIndices = torch.eye(G.shape[0]) == 1
        G[diagIndices] += self.reg

        P = G.inverse()
        B = P / (-1 * P.diag())
        B[diagIndices] = 0
        
        self.B = B

        self.pred = self.X.to_dense() @ B
        

# 4. 학습 함수

In [167]:
def get_ndcg(pred_list, true_list):
    idcg = sum((1 / np.log2(rank + 2) for rank in range(1, len(pred_list))))
    dcg = 0
    for rank, pred in enumerate(pred_list):
        if pred in true_list:
            dcg += 1 / np.log2(rank + 2)
    ndcg = dcg / idcg
    return ndcg

# hit == recall == precision
def get_hit(pred_list, true_list):
    hit_list = set(true_list) & set(pred_list)
    hit = len(hit_list) / len(true_list)
    return hit

def evaluate(model, X, user_train, user_valid):

    mat = torch.from_numpy(X)

    NDCG = 0.0 # NDCG@10
    HIT = 0.0 # HIT@10

    recon_mat1 = model.pred.cpu()
    print(recon_mat1)
    recon_mat1[mat == 1] = -np.inf
    rec_list1 = recon_mat1.argsort(dim = 1)
    print(rec_list1.shape)

    for user, rec1 in tqdm(enumerate(rec_list1)):
        uv = user_valid[user]

        # ranking
        up = rec1[-10:].cpu().numpy().tolist()[::-1]
        print(user,up)

        NDCG += get_ndcg(pred_list = up, true_list = uv)
        HIT += get_hit(pred_list = up, true_list = uv)

    NDCG /= len(user_train)
    HIT /= len(user_train)
    
    return NDCG, HIT
    


# 5. 추천 함수

In [None]:
def recommend_for_new_user(item_list, ease_model, make_matrix_data_set, num_recommendations=10):
    """
    새로운 사용자에 대한 항목 추천을 생성합니다.

    :param item_list: 새 사용자가 상호 작용한 항목 목록입니다.
    :param ease_model: 훈련된 EASE 모델입니다.
    :param make_matrix_data_set: 데이터 처리 클래스의 인스턴스입니다.
    :param num_recommendations: 반환할 추천 항목의 수입니다.
    :return: 추천된 항목 ID 목록입니다.
    """
    # 새 사용자 항목 인덱스 생성
    user_item_indices = [make_matrix_data_set.item_encoder[item] for item in item_list if item in make_matrix_data_set.item_encoder]

    # 새 사용자 벡터 초기화
    new_user_vector = torch.zeros((1, make_matrix_data_set.num_item))

    # 사용자 벡터에 항목 인덱스 적용
    new_user_vector[0, user_item_indices] = 1

    # EASE 모델을 사용한 추천 생성
    new_user_vector = new_user_vector.to(device)
    user_pred = new_user_vector @ ease_model.B

    # 이미 상호 작용한 항목은 제외
    user_pred[0, user_item_indices] = -np.inf

    # 상위 N개 추천
    recommended_indices = torch.topk(user_pred, k=num_recommendations, dim=1).indices.cpu().numpy().flatten()
    recommended_items = [make_matrix_data_set.item_decoder[idx] for idx in recommended_indices]

    return recommended_items

In [None]:
def get_top_n_recommendations_for_user_id(user_id, ease_model, make_matrix_data_set, n=10):
    """
    주어진 사용자 ID에 대해 상위 N개의 추천 항목을 반환합니다.

    :param user_id: 추천을 생성할 사용자의 ID
    :param ease_model: 학습된 EASE 모델
    :param make_matrix_data_set: MakeMatrixDataSet 인스턴스
    :param n: 반환할 추천 항목의 수
    :return: 상위 N개의 추천 항목 리스트
    """
    # 사용자 ID를 사용자 인덱스로 변환
    if user_id in make_matrix_data_set.user_encoder:
        user_index = make_matrix_data_set.user_encoder[user_id]
    else:
        print("User ID not found in the dataset.")
        return []

    # 사용자 인덱스에 해당하는 행을 추출
    user_vector = make_matrix_data_set.X[user_index, :].toarray()
    user_vector = torch.from_numpy(user_vector).float().to(device)

    # 모델을 사용해 예측 점수를 계산
    user_pred_scores = user_vector @ ease_model.B

    # 이미 상호작용한 항목은 추천에서 제외
    user_pred_scores[0, user_vector.nonzero()] = -np.inf

    # 상위 N개의 추천 항목 인덱스를 반환
    top_n_indices = torch.topk(user_pred_scores, k=n, dim=1).indices.cpu().numpy().flatten()

    # 인덱스를 항목 ID로 변환
    top_n_items = [make_matrix_data_set.item_decoder[idx] for idx in top_n_indices]

    return top_n_items

# 5. 학습

In [168]:
make_matrix_data_set = MakeMatrixDataSet(config = config)
user_train, user_valid = make_matrix_data_set.get_train_valid_data()
X = make_matrix_data_set.make_sparse_matrix()

In [169]:
model = EASE(X = X, reg = 1000)
model.fit()

In [177]:
data = pd.read_csv('AMAZON_FASHION.csv', names= ['productId','userId','rating','timestamp'])
userId_counts = data['userId'].value_counts()
valid_users = userId_counts[userId_counts >= 5].index
data = data[data['userId'].isin(valid_users)]
data[data['userId']=='AAQO19HKS86MQ']['productId'].tolist()

['B00008JOQI', 'B00IQZZ9JI', 'B00KA3UV0Q', 'B00KW4L9XQ', 'B00RBJZ7EC']

In [176]:
#new_user=['B0007MV6PO', 'B0008F6WMM', 'B0009A1EA6', 'B01HJ0SIF2', 'B01HJG5NLI']

#'AAQO19HKS86MQ' 유저의 ITEM
new_user=['B00008JOQI', 'B00IQZZ9JI', 'B00KA3UV0Q', 'B00KW4L9XQ', 'B00RBJZ7EC']

In [178]:
recommendations = recommend_for_new_user(new_user, model, make_matrix_data_set)
print("추천된 항목:", recommendations)

추천된 항목: ['B00MIMOVB2', 'B00PJIPZGW', 'B00LMUA6C4', 'B00LMU9NE6', 'B00KA3VEG6', 'B00FG35RU4', 'B00MXG42US', 'B00KA3V9JS', 'B00KW4LCCE', 'B00LMU7GHM']


In [171]:
user_id = 'AAQO19HKS86MQ'
top_n_recommendations = get_top_n_recommendations_for_user_id(user_id, model, make_matrix_data_set, n=10)
print(f"Top {len(top_n_recommendations)} recommendations for user {user_id}: {top_n_recommendations}")

Top 10 recommendations for user AAQO19HKS86MQ: ['B00MIMOVB2', 'B00PJIPZGW', 'B00LMUA6C4', 'B00LMU9NE6', 'B00KA3VEG6', 'B00FG35RU4', 'B00MXG42US', 'B00KA3V9JS', 'B00KW4LCCE', 'B00LMU7GHM']


In [90]:
for reg in [1000, 100, 10, 1, 0.1, 0.01]:
    model = EASE(X = X, reg = reg)
    model.fit()
    ndcg, hit = evaluate(model = model, X = X.todense(), user_train = user_train, user_valid = user_valid)
    print(f'reg: {reg}| NDCG@10: {ndcg:.5f}| HIT@10: {hit:.5f}')

torch.Size([3718, 13197])


3718it [00:00, 18405.86it/s]


reg: 1000| NDCG@10: 0.04791| HIT@10: 0.19096
torch.Size([3718, 13197])


3718it [00:00, 23019.26it/s]


reg: 100| NDCG@10: 0.04772| HIT@10: 0.19016
torch.Size([3718, 13197])


3718it [00:00, 25819.73it/s]

reg: 10| NDCG@10: 0.04528| HIT@10: 0.18209





torch.Size([3718, 13197])


3718it [00:00, 16598.16it/s]


reg: 1| NDCG@10: 0.04185| HIT@10: 0.16918
torch.Size([3718, 13197])


3718it [00:00, 9750.19it/s] 


reg: 0.1| NDCG@10: 0.03655| HIT@10: 0.14900
torch.Size([3718, 13197])


3718it [00:00, 15427.56it/s]


reg: 0.01| NDCG@10: 0.03796| HIT@10: 0.16756
