In [None]:
import numpy as np

In [None]:
class GMM(object):
    def __init__(self, C=3, seed=None):
        self.parameters = {}
        self.hyperparameters = {"C":C, "seed": seed,}
        self.is_fit = False
        self.elbo = None

    def _initialize_params(self,X):
        N,d = X.shape
        C = self.hyperparameters;["C"]      # C is the cluster numbers, default is 3

        rr = np.random.rand(C)

        self.parameters = {
            "pi": rr/rr.sum(),    # cluster prior, the initial posibility of example belongs to each cluster, init with the same 
            "Q": np.zeros((N,C)),  # variational distribution, means percentage of example belongs to each cluster
            "mu": np.random.uniform(-5,10, c*d).reshape(C,d),   # cluster means, C examples and d dimensions
            "sigma": np.array([np.eye(d) for _ in range(C)]),   # covariance matrix, c x d x d
        }

        self.elbo = None
        self.is_fit = False
    

    def likelihood_loewer_bound(self, X):
        N = X.shape[0]      # example number
        P = self.parameters
        C = self.hyperparameters["C"]
        pi, Q, mu, sigma = P["pi"], P["Q"], P["mu"], P["sigma"]

        eps = np.finfo(float).eps   # eps: non-negative minimum number, often used to generate bias(regular term)
        expec1, expec2 = 0.0, 0.0

        for i in range(N):
            x_i = X[i]

            for c in range(C):
                pi_k = pi[c]
                z_nk = Q[i,c]
                mu_k = mu[c,]
                sigma_k = sigma[c,:,:]

                log_pi_k = np.log(pi_k + eps)
                log_p_x_i = log_gaussian_pdf(x_i,mu_k,sigma_k)
                prob = z_nk * (log_p_x_i + log_pi_k) 

                expec1 += prob      # likelihood lower bound
                expec2 += z_nk * np.log(z_nk + eps)   #  regular term do the same log operation
        loss = expec1 - expec2
        return loss

    
    def _E_step(self,X):
        P = self.parameters
        C = self.hyperparameters["C"]
        pi, Q, mu, sigma = P["pi"], P["Q"], P["mu"], P["sigma"]

        for i, x_i in enumerate(X):
            denom_vals = []
            for c in range(C):
                pi_c = pi[c]
                mu_c = mu[c,:]
                sigma_c = sigma[c,:,:]

                log_pi_c = np.log(pi_c)
                log_p_x_i = log_gaussian_pdf(x_i, mu_c, sigma_c)

                # log N(X_i | mu_c, Sigma_c) + log pi_c
                denom_vals.append(log_p_x_i + log_pi_c)     # molecular
            
            # log \sum_c exp{ log N(X_i | mu_c, Sigma_c) + log pi_c } 
            log_denom = logsumexp(denom_vals)
            q_i = np.exp([num - log_denom for num in denom_vals])
            np.testing.assert_allclose(np.sum(q_i), 1, err_msg="{}".format(np.sum(q_i)))

            Q[i, :] = q_i       # record each example probability that produced by each components


    def _M_step(self,X):
        N, d = X.shape
        P = self.parameters
        C = self.hyperparameters["C"]
        pi, Q, mu, sigma = P["pi"], P["Q"], P["mu"], P["sigma"]

        denoms = np.sum(Q, axis = 0)  #  denoms represents Kth cluster's posterior prob, aka the num of points of Kth cluster

        #update
        pi = denoms/N  # ak or pi
        
        nums_mu = [np.dot(Q[:,c], X) for c in range(C)]
        for ix, (num,den) in enumerate(zip(nums_mu, denoms)):
            mu[ix, :] = num/den if den > 0 else np.zeros_like(num)

        for c in range(C):
            mu_c = mu[c,:]
            n_c = denoms[c]

            outer = np.zeros((d,d))
            for i in range(N):
                wic = Q[i,c]
                xi = X[i,:]
                outer += wic * np.outer(xi-mu_c, xi-mu_c)  # caculate outer multiply
            
            outer = outer/n_c if n_c>0 else outer
            sigma[c,:,:] = outer

        np.testing.assert_allclose(np.sum(pi), 1, err_msg="{}".format(np.sum(pi)))  # test whether sum of the prob pi is one



    def fit(self, X, max_iter=100, tol=1e-3, verbose=False):
        prev_vlb = -np.inf
        self._initialize_params(X)

        for _iter in range(max_iter):
            try:
                self._E_step(X)
                self._M_step(X)
                vlb = self.likelihood_loewer_bound(X)

                if verbose:
                    print(f"{_iter + 1}. Lower bound: {vlb}")
                
                converged = _iter>0 and np.abs(vlb - prev_vlb)<=tol
                if np.isnan(vlb) or converged:
                    break

                prev_vlb = vlb

            except np.linalg.LinAlgError:
                print("Singular matrix: components collapsed")
                return -1

        self.elbo = vlb
        self.is_fit = True
        return 0


    
    def predict(self,X, soft_labels=True):
        assert self.is_fit

        P = self.parameters
        C = self.hyperparameters["C"]
        mu, sigma = P["mu"], P["sigma"]

        y = []
        for x_i in X:
            cprobs = [log_gaussian_pdf(x_i, mu[c,:], sigma[c,:,:]) for c in range(C)]

            if not soft_labels:
                y.append(np.argmax(cprobs))
            else:
                y.argmax(cprobs)

        return np.array(y)






$loss = \sum_{i=1}^{K}\log_{}{(\sum_{k=1}^{K}\pi_{k}N(x_{i}|\mu_{k},\sigma_{k}))}$    

lack of log regular term


molecular : $log N(X_i | mu_c, Sigma_c) + \log_{}{pi_c}$

denominator : $log \sum_c exp{( \log_{}{N(X_i | mu_c, Sigma_c) + \log_{}{pi_c}}) } $

$Q = \frac_{molucular}{denominator}#



$N_k = \sum_{i=1}^{N}Q(i,k)$

$pi_k = N_k / N$

$\sum_{k}{} = \frac{1}{N_k}\sum_{i=1}^{N}Q(i,k)(x_i-\mu_k)(x_i-\mu_k)^{T}$

In [None]:
def log_gaussian_pdf(x_i,mu,sigma):
    "compute log N(x_i | mu,sigma)"
    n = len(mu)     # C
    a = n*np.log(2*np.pi)
    _, b = np.linalg.slogdet(sigma)   # det(sigma)^(2)

    y = np.linalg.solve(sigma, x_i - mu)   # solve a matrix equation: sigma * x = xi - mu, then x = (xi-mu) * sigma^(-1) 
    c = np.dot(x_i - mu, y)    #exp balabala
    return -0.5 * (a+b+c)
    

def logsumexp(log_probs, axis=None):
    _max = np.max(log_probs)
    ds = log_probs - _max
    exp_sum = np.exp(ds).sum(axis = axis)
    return _max+np.log(exp_sum)

$N_{j}(x;\mu_{j}, \sigma_{j}) = \frac{1}{\sqrt{(2\pi)}^{m}\left | \sigma_{j} \right |  } exp[-\frac{1}{2}(x-\mu_{j})^{T}\sigma_{j}^{-1}(x-\mu_{j})]  $

doing log on N(x|mu,sigma)