<h2 align='center'> Collaborative filtering Using Gaussian Mixture Modeling <h2>

In [17]:
from typing import NamedTuple, Tuple
import numpy as np
from matplotlib import pyplot as plt
from matplotlib.patches import Circle, Arc
from scipy.special import logsumexp

In [18]:
class GaussianMixture(NamedTuple):
    mu: np.ndarray  # (K, d) array - each row corresponds to a gaussian component mean
    var: np.ndarray  # (K, ) array - each row corresponds to the variance of a component
    p: np.ndarray  # (K, ) array = each row corresponds to the weight of a component

* The formula of gaussian is 
$ p(j\mid u) =\frac{p(u|j)\cdot p(j)}{p(u)} =\frac{p(u|j)\cdot p(j)}{\sum _{j=1}^{K}p(u|j)\cdot p(j)} =\frac{ \pi _{j}N(x_{C_{u}}^{(u)};\mu _{C_{u}}^{(j)},\sigma _{j}^{2}I_{C_{u}\times C_{u}}) }{ \sum _{j=1}^{K}\pi _{j}N(x_{C_{u}}^{(u)};\mu _{C_{u}}^{(j)},\sigma _{j}^{2}I_{C_{u}\times C_{u}}) } $

In [19]:
def init(X: np.ndarray, K: int,
         seed: int = 0) -> Tuple[GaussianMixture, np.ndarray]:
    """
    Returns:
        mixture: the initialized gaussian mixture
        post: (n, K) array holding the soft counts
            for all components for all examples

    """
    np.random.seed(seed)
    n, _ = X.shape
    p = np.ones(K) / K

    # select K random points as initial means
    mu = X[np.random.choice(n, K, replace=False)]
    var = np.zeros(K)
    # Compute variance
    for j in range(K):
        var[j] = ((X - mu[j])**2).mean()

    mixture = GaussianMixture(mu, var, p)
    post = np.ones((n, K)) / K

    return mixture, post


In [20]:
def plot(X: np.ndarray, mixture: GaussianMixture, post: np.ndarray,
         title: str):
    """Plots the mixture model for 2D data"""
    _, K = post.shape

    percent = post / post.sum(axis=1).reshape(-1, 1)
    fig, ax = plt.subplots()
    ax.title.set_text(title)
    ax.set_xlim((-20, 20))
    ax.set_ylim((-20, 20))
    r = 0.25
    color = ["r", "b", "k", "y", "m", "c"]
    for i, point in enumerate(X):
        theta = 0
        for j in range(K):
            offset = percent[i, j] * 360
            arc = Arc(point,
                      r,
                      r,
                      0,
                      theta,
                      theta + offset,
                      edgecolor=color[j])
            ax.add_patch(arc)
            theta += offset
    for j in range(K):
        mu = mixture.mu[j]
        sigma = np.sqrt(mixture.var[j])
        circle = Circle(mu, sigma, color=color[j], fill=False)
        ax.add_patch(circle)
        legend = "mu = ({:0.2f}, {:0.2f})\n stdv = {:0.2f}".format(
            mu[0], mu[1], sigma)
        ax.text(mu[0], mu[1], legend)
    plt.axis('equal')


In [21]:
def rmse(X, Y):
    return np.sqrt(np.mean((X - Y)**2))

In [22]:
def estep(X: np.ndarray, mixture: GaussianMixture) -> Tuple[np.ndarray, float]:
    """
    Returns:
        np.ndarray: (n, K) array holding the soft counts
            for all components for all examples
        float: log-likelihood of the assignment

    """
    non_null = (X != 0).astype(int)
    rated = np.sum(non_null, axis=1)
    # mask_mu = non_null[:, None, :] * mixture.mu  # nxkxd
    # f = (np.sum(X[:, None, :] - mask_mu[:, None, :], axis=2) ** 2) / (mixture.var[None, :] * 2.0)

    norm = np.sum((X[:, None, :] - non_null[:, None, :] * mixture.mu) ** 2, axis=2)
    norm2 = -norm / (mixture.var[None, :] * 2)
    f = norm2 - (rated[:, None] / 2.0) * (np.log(2 * np.pi) + np.log(mixture.var[None, :]))
    f = f + np.log(mixture.p[None, :] + 1e-16)
    f_max = np.max(f, axis=1, keepdims=True)
    log_post = f - logsumexp(f, axis=1, keepdims=True)
    post = np.exp(log_post)
    log_likelihood = np.sum(post * (f-log_post))
    return post, log_likelihood


In [23]:
def mstep(X: np.ndarray, post: np.ndarray, mixture: GaussianMixture,
          min_variance: float = .25) -> GaussianMixture:
    """
    Returns:
        GaussianMixture: the new gaussian mixture
    """
    K = post.shape[1]
    X2 = np.copy(X)
    X2[X2 != 0] = 1

    mu_h = np.zeros((mixture.mu.shape))

    for k in range(K):
        for i in range(X.shape[1]):
            if np.multiply(X2[:, i].T, np.matrix(post[:, k])).T.sum(axis=0) >= 1:
                mu_h[k, i] = np.multiply(X[:, i].T, np.matrix(post[:, k])).T.sum(axis=0) / np.multiply(X2[:, i].T,
                                                                                                       np.matrix(post[:,
                                                                                                                 k])).T.sum(
                    axis=0)
            else:
                mu_h[k, i] = mixture.mu[k, i]
    p = np.zeros((mixture.p.shape))
    p = post.sum(axis=0) / post.shape[0]

    var_h = np.zeros((mixture.var.shape))

    for k in range(K):
        pv = (post[:, k] * np.power((X-mu_h[k])*X2, 2).sum(axis=1)).sum(axis=0) / ((np.matrix(post[:, k]) * X2).sum(axis=1)).sum(axis=0)
        if pv >= min_variance:
            var_h[k] = pv
        else:
            var_h[k] = min_variance
    return GaussianMixture(mu_h, var_h, p)

In [24]:
def run(X: np.ndarray, mixture: GaussianMixture,
        post: np.ndarray) -> Tuple[GaussianMixture, np.ndarray, float]:
    """
    Returns:
        GaussianMixture: the new gaussian mixture
        np.ndarray: (n, K) array holding the soft counts
            for all components for all examples
        float: log-likelihood of the current assignment
    """
    cost = None
    i = 0
    while True:
        global prev_cost
        if i > 0:
            prev_cost = cost

        post, cost = estep(X, mixture)

        mixture = mstep(X, post, mixture)
        if i > 0:
            if cost - prev_cost <= 1e-6 * np.abs(cost):
                break
        i += 1

    return mixture, post, cost

In [25]:
def fill_matrix(X: np.ndarray, mixture: GaussianMixture) -> np.ndarray:
    
    """
    Fills an incomplete matrix according to a mixture model

    Returns
        np.ndarray: a (n, d) array with completed data
    """
    
    output = np.copy(X)
    non_null = (X != 0).astype(int)
    rated = np.sum(non_null, axis=1)
    norm = np.sum((X[:, None, :] - non_null[:, None, :] * mixture.mu) ** 2, axis=2)
    norm2 = -norm / (mixture.var[None, :] * 2)
    f = norm2 - (rated[:, None] / 2.0) * (np.log(2 * np.pi) + np.log(mixture.var[None, :]))
    f = f + np.log(mixture.p[None, :] + 1e-16)
    f_max = np.max(f, axis=1, keepdims=True)
    log_post = f - logsumexp(f, axis=1, keepdims=True)
    post = np.exp(log_post)
    a, b = np.where(X == 0)
    for i in list(zip(a, b)):
        output[i] = (np.sum(post[i[0]] * mixture.mu[:, i[1]]))

    return output

In [31]:
def evaluate():
    gold_X = np.loadtxt('netflix_complete.txt') # Original Review data
    X = np.loadtxt('netflix_incomplete.txt')
    for i in range(1, 2):
        seed = i
        K = 12
        mixture, _ = init(X, K, seed)
        post, cost = estep(X, mixture)
        mmixture = mstep(X, post, mixture)
        finalmixture, finalpost, finalcost = run(X, mmixture, post)
        final_X = fill_matrix(X, finalmixture)
        return final_X, (rmse(gold_X, final_X))
