In [1]:
import numpy as np
import pandas as pd
from sklearn.metrics import mean_squared_error

# SVD Layer2
## *sparse_matrix = item_n x user_n*

In [2]:
class SVD:
    
    def __init__(self, sparse_matrix, K):
        self.sparse_matrix = sparse_matrix
        self.K = K
        self.init_sparse_matrix()
    
    def init_sparse_matrix(self):
        #SVD에 사용될 Matrix: Nan 값들을 평균으로 채워줌
        self.train_matrix = self.sparse_matrix.apply(lambda x: x.fillna(x.mean()), axis=1)
        # 예측된 행렬과의 비교를 위한 원본 행렬
        self.sparse_matrix = self.sparse_matrix.fillna(0)
        
    def train(self):
        print("Factorizing...")
        
        # SVD 알고리즘을 통해 Item/User Matrix 분해
        item_factors, user_factors = self.get_svd(self.train_matrix, self.K)
        
        # 두 행렬을 병합하여 pred_matrix 생성
        result_df = pd.DataFrame(
            np.matmul(item_factors, # item_n * K
                      user_factors),# K * user_n
            columns=self.sparse_matrix.columns.values,
            index=self.sparse_matrix.index.values
        )
        self.item_factors = item_factors
        self.user_factors = user_factors
        self.pred_matrix = result_df
        
    
    @staticmethod
    def get_svd(sparse_matrix, K): # sparse_matrix = Rating Matrix
        U, s, VT = np.linalg.svd(sparse_matrix.transpose())
        # U = (user_n, user_n), s = (user_n,), VT = (item_n, item_n)
        
        # 각 행렬을 K만큼 절단
        U = U[:, :K] # (user_n, K)
        s = s[:K] * np.identity(K, np.float) # (K, K)
        VT = VT[:K, :] # (K, item_n)
        
        # Item Matrix에 S행렬을 곱해준 후,
        #언제든지 행렬곱으로 rating을 예측할 수 있다록 미리 transpose
        item_factors = np.transpose(np.matmul(s, VT)) # (item_n, K)
        user_factors = np.transpose(U) # (K, user_n)

        return item_factors, user_factors
    
    def evaluate(self):
        idx, jdx = self.sparse_matrix.to_numpy().nonzero() # 0이 아니 부분의 위치들
        ys, preds = [], [] # 실제 값과 예측 값을 뱉어낼 리스트
        for i, j in zip(idx, jdx): # Item i에 대한 User j의 평가
            ys.append(self.sparse_matrix.iloc[i, j]) # 원본행렬
            preds.append(self.pred_matrix.iloc[i, j]) # 예측행렬

        error = mean_squared_error(ys, preds)
        return np.sqrt(error)


In [10]:
import pandas as pd
import numpy as np
from sklearn.metrics import mean_squared_error

# 가상의 희소 행렬을 생성하여 SVD 클래스의 인스턴스를 생성
sparse_matrix = pd.DataFrame({
    'user1': [4, np.nan, 1, 5,3],
    'user2': [2, 1, 5, np.nan,2],
    'user3': [3, np.nan, 3, 4,2],
    'user4': [np.nan, 2, 4, 3,5],
}, index = ['item_1','item_2','item_3','item_4','item_5'])
K = 2

svd_model = SVD(sparse_matrix, K)

# 행렬 분해를 수행하고, 예측 행렬을 생성
svd_model.train()

# 모델의 평가를 수행
rmse = svd_model.evaluate()

print("Root Mean Squared Error (RMSE):", rmse)

Factorizing...
Root Mean Squared Error (RMSE): 0.6137784748624842


Deprecated in NumPy 1.20; for more details and guidance: https://numpy.org/devdocs/release/1.20.0-notes.html#deprecations
  s = s[:K] * np.identity(K, np.float) # (K, K)


In [11]:
sparse_matrix

Unnamed: 0,user1,user2,user3,user4
item_1,4.0,2.0,3.0,
item_2,,1.0,,2.0
item_3,1.0,5.0,3.0,4.0
item_4,5.0,,4.0,3.0
item_5,3.0,2.0,2.0,5.0
