In [38]:
import os
import pandas as pd
import numpy as np

from sklearn.preprocessing import LabelEncoder

from typing import Tuple

In [23]:
class DirFilePath:
    dir_base = os.path.join(os.path.join('/opt','ml','paper','RecSys'))
    dir_data = os.path.join(dir_base, 'Data', 'ml-latest-small')
    path_rating = os.path.join(dir_data, 'ratings.csv')

In [105]:
class DataInfo():
    def __init__(self, file_path: str) -> None:
        self.df = pd.read_csv(file_path)
        self.user_encoder, self.movie_encoder = self._encode()

        self.set_users = set(self.df['userId'].unique())
        self.num_users = len(self.set_users)
        self.min_feedback = 5 # for resolving cold-start problem
        self.set_movies = set(self.df['movieId'].unique())
        self.num_movies = len(self.set_movies)
        self.set_train_movies = self._moreThanFeedback()
        self.num_train_movies = len(self.set_train_movies)

        self.df_pos_user_sequence, self.user_negative_samples = self._makeSequenceAndNeg()

        self.df_pos_train, self.df_pos_test = self._trainTestSplit(0.8)
    
    def _trainTestSplit(self, train_ratio: float) -> Tuple[pd.DataFrame, pd.DataFrame]:
        # https://github.com/scikit-learn/scikit-learn/pull/16236
        # GroupTimeSeriesSplit이 PR 중이라고 해서 직접 구현을 해야한다.
        df_train = {
            'user_id': list(),
            'movie_id': list(),
        }

        df_test = {
            'user_id': list(),
            'movie_id': list(),
        }

        for user in self.set_users:
            df_user = self.df_pos_user_sequence[self.df_pos_user_sequence['user_id']==user]
            list_user_movies = df_user['movie_id'].tolist()

            num_user = df_user.shape[0]
            num_train_user = int(num_user * train_ratio)
            num_test_user = num_user - num_train_user

            list_user_train_movies = list_user_movies[:num_train_user]
            list_user_test_movies = list_user_movies[num_train_user:]

            df_train['user_id'].extend([user]*num_train_user)
            df_train['movie_id'].extend(list_user_train_movies)

            df_test['user_id'].extend([user]*num_test_user)
            df_test['movie_id'].extend(list_user_test_movies)
            
        df_train = pd.DataFrame(df_train)
        df_train['label'] = 1

        df_test = pd.DataFrame(df_test)
        df_test['label'] = 1
            
        return df_train, df_test        


    def _moreThanFeedback(self):
        movie_ids_for_training = list()
        for movie_id in self.df['movieId'].unique():
            if self.df[self.df['movieId'] == movie_id].shape[0] >= self.min_feedback:
                movie_ids_for_training.append(movie_id)
        
        return set(movie_ids_for_training)


    def _makeSequenceAndNeg(self):
        pos_user_sequence = {
            'user_id': list(),
            'movie_id': list(),
        }

        user_negative_samples = dict()


        for user in self.set_users:
            user_sequence_movies = self.df[self.df['userId']==user].sort_values(by='timestamp', axis=0)['movieId'].tolist()
            # 최조 조건을 만족하지 못한 것 포함, feedback있는 영화를 제외시킨다. 
            user_negative_movies = self.set_movies - set(user_sequence_movies)
            user_negative_samples[user] = np.array(user_negative_movies)

            for movie in user_sequence_movies:
                # 최소 feedback 조건을 만족시키지 못한 것은 추가하지 않는다.
                if movie not in self.set_train_movies: continue
                pos_user_sequence['user_id'].append(user)
                pos_user_sequence['movie_id'].append(movie)


        df_pos_user_sequence = pd.DataFrame(pos_user_sequence)
        df_pos_user_sequence['label'] = 1

        return df_pos_user_sequence, user_negative_samples


    def _encode(self) -> Tuple[pd.DataFrame, LabelEncoder, LabelEncoder]:
        userId_label_encoder = LabelEncoder()
        movieId_label_encoder = LabelEncoder()

        self.df['userId'] = userId_label_encoder.fit_transform(self.df['userId'].values)
        self.df['movieId'] = movieId_label_encoder.fit_transform(self.df['movieId'].values)

        # encoder.inverse_transform() 으로 decode
        return userId_label_encoder, movieId_label_encoder

In [106]:
data = DataInfo(DirFilePath.path_rating)