# Gaussian Mixture Model (GMM)


## dGMM

We will derive and implement the EM algorithm for the **diagonal** Gaussian mixture model (dGMM), where all covariance matrices are constrained to be diagonal.

In [1]:
import numpy as np
import math
from numpy import linalg as LA

In [24]:
def EM_dGMM(X, K, maxiter=500, tol=10**-5):
    X = np.array(X)
    n, d = len(X), len(X[0])
    # initialization of pi, mu, r, S
    pi = np.zeros(K)
    assignment = np.random.randint(K, size=n)
    for a in assignment:
        pi[a] += 1
    pi = pi / n
    mu = np.random.rand(K, d)
    r = np.zeros((n, K))
    S = np.ones((K, d))     # each S_k is the diagonal vector
    l = np.zeros(maxiter)

    for iter in range(maxiter):
        for i in range(n):
            for k in range(K):
                # compute responsibility
                ex = (-1/2) * sum([(X[i][j] - mu[k][j])**2 * (1/S[k][j]) for j in range(d)])
                r[i][k] = pi[k] * np.prod(S[k])**(-1/2) * math.exp(ex)
        r_i = np.sum(r, axis=1)
        r = [[r[i][k] / r_i[i] for k in range(K)] for i in range(n)]
        # negative log-likelihood
        l[iter] = -sum([math.log(r_i[i]) for i in range(n)])
        if iter > 0 and abs(l[iter] - l[iter-1]) <= tol * abs(l[iter-1]):
            break
        r_k = np.sum(r, axis=0)
        pi = r_k / n
        for k in range(K):
            S[k] = np.array([sum([r[i][k] * (X[i][j]-mu[k][j])**2 for i in range(n)]) for j in range(d)])
            mu[k] = np.sum([r[i][k] * X[i] for i in range(n)], axis=0) / r_k[k]
            S[k] = S[k] / r_k[k]
    return [pi, mu, [s * np.identity(d) for s in S], l]

If we look at the run-time of the algorithm, for each iteration:

1. Commpute responsibility (update $r_{ij}$) takes $O(d)$, and every update has $nK$ iterations - the total run-time is $O(ndK)$.
2. Normalize $r$ takes $O(nK)$, and compute negative log-likelihood takes $O(n)$ - the total run-time is $O(nK)$.
3. Finding $r_{.k}$ takes $O(n)$, updating $\pi_k$ takes $O(1)$, updating $\mu_k$ takes $O(n)$, and updating $S_k$ takes $O(nd)$ - the total run-time is then $O(ndK)$ since it iterates from 1 to $K$.

The initialization takes $O(Kd)+O(nK)\subseteq O(ndK)$.
Hence, the running time of the algorithm is $O(\mathtt{maxiter}\cdot ndK)$.


## cGMM

Now we fit $d$ GMMs (each with $K$ components) to each feature dimension $X_{i,j}$, separately. This is a component wise GMM (cGMM).

In [25]:
def EM_cGMM(X, K, maxiter=500, tol=10**-5):
    n, d = len(X), len(X[0])
    # initialization of pi, mu, r, S
    pi = np.zeros((K, d))
    for _ in X:
        pi[np.random.randint(K)] += np.ones(d)
    pi /= n
    mu = np.random.rand(K, d)
    r = np.zeros((d, n, K))
    S = np.ones((K, d))     # each S_k is the diagonal vector
    l = np.zeros(maxiter)

    for iter in range(maxiter):
        for i in range(n):
            for k in range(K):
                for j in range(d):
                    # compute responsibility
                    ex = (-1/2) * (X[i][j] - mu[k][j])**2 / S[k][j]
                    ex = math.exp(ex)
                    r[j][i][k] = pi[k][j] * S[k][j]**(-1/2) * ex
        r_i = np.sum(r, axis=2)
        r /= r_i[:,:,None]
        # negative log-likelihood
        l[iter] = -np.sum(np.log(r_i))
        if iter > 0 and abs(l[iter] - l[iter-1]) <= tol * abs(l[iter]):
            break
        r_k = np.sum(r, axis=1)
        for k in range(K):
            for j in range(d):
                pi[k][j] = r_k[j][k] / n
                mu[k][j] = np.sum(X[:,j] * r[j,:,k]) / r_k[j][k]
                S[k][j] = np.sum(np.power(X[:,j], 2) * r[j,:,k]) / r_k[j][k] - mu[k][j]**2
    return [pi, mu, [np.diag(s) for s in S], l]

For each iteration in the algorithm:

1. Computing the responsibility $r$ takes $O(ndK)$
2. Normalizing $r$ takes $O(ndK)$ since it iterates through $r$, and finding negative log-likelhood takes $O(nd)$ - total runtime is $O(ndK)$.
3. Finding $r_k$ takes $O(ndK)$, updating $\pi$ takes $O(dK)$, updating $\mu$ takes $O(ndK)$, and updating $S$ takes $O(ndK)$ - total runtime is $O(ndK)$.

Since the initialization also takes $O(ndK)$, the total runtime of the algorithm is then within $O(\mathtt{maxiter}\cdot ndK)$.

## Training and Testing

We apply (the adapted) dGMM algorithm [MNIST](http://yann.lecun.com/exdb/mnist/) dataset. For each of the 10 classes (digits), we can use its (and only its) training images to estimate its (class-conditional) distribution by fitting a GMM (with $K=5$, roughly corresponding to 5 styles of writing this digit). This gives us the density estimate $p(\mathbf x | y)$ where $\mathbf x$ is an image (of some digit) and $y$ is the class (digit). We can now classify the test set using the Bayes classifier:
$$
\begin{align}
\hat y(\mathbf x) = \arg\max_{c = 0, \ldots, 9} ~~ \underbrace{\mathrm{Pr}(Y = c) \cdot p(X = \mathbf x | Y = c)}_{\propto ~\mathrm{Pr}(Y=c | X=\mathbf x)},
\end{align}
$$
where the probabilities $\mathrm{Pr}(Y = c)$ can be estimated using the training set, e.g. the proportion of the $c$-th class in the training set, and the **density** $p(X = \mathbf x | Y = c)$ is estimated using GMM for each class $c$ separately.

In [26]:
from torchvision import datasets
import torchvision.transforms as transforms
from sklearn.decomposition import PCA

training_data = datasets.MNIST(
    root="./data/",
    train=True,
    transform=transforms.Compose([
        transforms.ToTensor()
    ])
)

testing_data = datasets.MNIST(
    root="./data/",
    train=False,
    transform=transforms.Compose([
        transforms.ToTensor()
    ])
)

# set up
pca = PCA(n_components=50)
train_data = np.reshape(training_data.data.numpy() * (1/255), (-1, 28**2))
test_data = np.reshape(testing_data.data.numpy() * (1/255), (-1, 28**2))
pca.fit(train_data)
train_data = pca.transform(train_data)
test_data = pca.transform(test_data)
train_digit = training_data.targets.numpy()
test_digit = testing_data.targets.numpy()

train_input = [[] for _ in range(10)]
test_input = [[] for _ in range(10)]
for i in range(len(training_data)):
    train_input[train_digit[i]].append(train_data[i])
for i in range(len(testing_data)):
    test_input[test_digit[i]].append(test_data[i])

In [28]:
num_digits = 10
K = 5

# GMM training
model = [EM_dGMM(train_input[c][:5000], K) for c in range(num_digits)]
# GMM testing
err = 0
for c in range(num_digits):
    for X in test_input[c][:1000]:
        result = []
        for digit in range(num_digits):
            m = model[digit]
            pi, mu, S = m[0], m[1], m[2]
            pr_yc = len(train_input[digit]) / 60000
            r = 0
            for k in range(K):
                ex = (-1/2) * ((X - mu[k]).T @ LA.inv(S[k]) @ (X - mu[k]))
                r += pi[k] * LA.det(S[k])**(-1/2) * math.exp(ex)
            result.append(pr_yc * r)
        err += 1 if c != np.argmax(result) else 0
    # print(f"digit {c}: err {err}")
err = err / 10000
print(err)

0.0786


The error rate for $K=5$ on the test set is 0.0786, which is very small. Due to time concern, I only tested on $K=5$.