In [1]:
import numpy as np
#model: BMF
class BayesianMatrixFactorization():
    """
    Bayesian Matrix Factorization model
    R = PxQ
    p ~ N(p|0, alpha^(-1)I)
    q ~ N(q|0, alpha^(-1)I)
    r = p @ q
    t ~ N(r|p @ q, beta^(-1))
    """

    def __init__(self, alpha_p:float=1., alpha_q:float=1., beta:float=1.):
        """
        ----------
        n_u, n_i: the number of users and items, respectively.
        k : the number of latent factors
        """
        self.alpha_p = alpha_p
        self.alpha_q = alpha_q
        self.beta = beta
        #posterior of p,q 
        self.pos_mean_p = None
        self.pos_precision_p = None
        self.pos_mean_q = None
        self.pos_precision_q = None

    def fit(self, R:np.ndarray, k:int=5):
        """
        bayesian update of parameters given training dataset
        Parameters
        ----------
        R : (u,i) np.ndarray
            training data independent variable, u is the number of users, i is the number of items.
        k : int, the number of latent factors.
        """
        #1. generate matrices P, Q
        P = np.random.normal(0,self.alpha_p,(R.shape[0],k))#uxk
        Q = np.random.normal(0,self.alpha_q,(R.shape[1],k))#ixk
        #2.calculate the posterior with analytical solution
        self.pos_precision_p = self.alpha_p + self.beta * Q @ Q.T # ixi
        self.pos_mean_p = self.beta * R @ np.linalg.inv(self.pos_precision_p) @ Q # uxi,ixi,ixk -> uxk
        self.pos_precision_q = self.alpha_q + self.beta * P @ P.T # uxu
        self.pos_mean_q = self.beta * R.T @ np.linalg.inv(self.pos_precision_q) @ P # ixu,uxu,uxk -> ixk
        
    def predict(self, sample_size:int=None):
        """
        return mean  of predictive distribution
        Parameters
        ----------
        sample_size : int, optional
            number of samples to draw from the predictive distribution
            (the default is None, no sampling from the distribution)
        Returns
        -------
        R_pred : (u,i) np.ndarray
            mean of the predictive distribution
        R_pred_sample : (u,i,sample_size) np.ndarray
            samples from the predictive distribution
        """
        if sample_size is not None:
            R_sample = []
            for i in range(sample_size):
                p_sample, q_sample = [], []
                for k in range(self.pos_mean_p.shape[1]):#latent factors    
                    mean_p = self.pos_mean_p[:,k]
                    mean_q = self.pos_mean_q[:,k]
                    p_sample_k = np.random.multivariate_normal(mean_p, np.linalg.inv(self.pos_precision_q), size=1)
                    q_sample_k = np.random.multivariate_normal(mean_q, np.linalg.inv(self.pos_precision_p), size=1)
                    p_sample.append(p_sample_k.flatten())
                    q_sample.append(q_sample_k.flatten())
                R_sample.append(np.dot(np.array(p_sample).T, np.array(q_sample)))
            return  R_sample #uxi
        
        R_pred = self.pos_mean_p @ self.pos_mean_q.T #R = PxQ
        return R_pred #uxi