# The Gaussian Mixture model

 Unlike methods like K-Means, which assign each point to a single cluster, GMM gives a probability for each point to belong to different clusters, making it more flexible for complex datasets where clusters may overlap or have different shapes.

## Task 4, Initializing the GMM

* X is a numpy.ndarray of shape (n, d) containing the data set
- k is a positive integer containing the number of cluster
* You are not allowed to use any loops
- Returns: pi, m, S, or None, None, None on failure
pi is a numpy.ndarray of shape (k,) containing the priors for each cluster, initialized evenly
m is a numpy.ndarray of shape (k, d) containing the centroid means for each cluster, initialized with K-means
S is a numpy.ndarray of shape (k, d, d) containing the covariance matrices for each cluster, initialized as identity matrices

In [4]:
from sklearn.covariance import log_likelihood

#!/usr/bin/env python3
"""
Initializes variables for a Gaussian mixture model.
"""


import numpy as np
kmeans = __import__('1-kmeans').kmeans


def initialize(X, k):
    """
    Initializes variables for a Gaussian Mixture Mode
    """
    if not isinstance(X, np.ndarray) or len(X.shape) != 2:
        return None, None, None
    if not isinstance(k, int) or k < 1:
        return None, None, None
    
    n, d = X.shape
    
    phi = np.ones(k) / k
    
    m, _ = kmeans(X, k)
    
    S = np.tile(np.identity(d), (k, 1)).reshape(k, d, d)
    
    return phi, m, S


In [5]:
# main func

if __name__ == '__main__':
    np.random.seed(11)
    a = np.random.multivariate_normal([30, 40], [[75, 5], [5, 75]], size=10000)
    b = np.random.multivariate_normal([5, 25], [[16, 10], [10, 16]], size=750)
    c = np.random.multivariate_normal([60, 30], [[16, 0], [0, 16]], size=750)
    d = np.random.multivariate_normal([20, 70], [[35, 10], [10, 35]], size=1000)
    X = np.concatenate((a, b, c, d), axis=0)
    np.random.shuffle(X)
    pi, m, S = initialize(X, 4)
    print(pi)
    print(m)
    print(S)

[0.25 0.25 0.25 0.25]
[[54.73711515 31.81393242]
 [16.84012557 31.20248225]
 [21.43215816 65.50449077]
 [32.3301925  41.80664127]]
[[[1. 0.]
  [0. 1.]]

 [[1. 0.]
  [0. 1.]]

 [[1. 0.]
  [0. 1.]]

 [[1. 0.]
  [0. 1.]]]


## PDF

Calculates the probability density function of a Gaussian distribution

* X is a numpy.ndarray of shape (n, d) containing the data points whose PDF should be evaluated
- m is a numpy.ndarray of shape (d,) containing the mean of the distribution
* You are not allowed to use any loops
- You are not allowed to use the function numpy.diag or the method numpy.ndarray.diagonal
* Returns: P, or None on failure
P is a numpy.ndarray of shape (n,) containing the PDF values for each data point
- All values in P should have a minimum value of 1e-300

In [6]:
#!/usr/bin/env python3
"""PDF function """

import numpy as np


def pdf(X, m, S):
    """
    Probability Density Function of gaussian distributions
    """

    if not isinstance(X, np.ndarray) or len(X.shape) != 2:
        return None
    if not isinstance(m, np.ndarray) or len(m.shape) != 1:
        return None
    if not isinstance(S, np.ndarray) or len(S.shape) != 2:
        return None
    if X.shape[1] != m.shape[0] or X.shape[1] != S.shape[0]:
        return None
    if S.shape[0] != S.shape[1]:
        return None

    # formula
    # p(x∣ μ,Σ) = (1 √(2π)d|Σ|)exp(−1/2(x−μ)T Σ−1(x−μ))
    n, d = X.shape
    mean = m
    x_m = X - mean

    # Determinant of the covariance matrix (d x d)
    det_S = np.linalg.det(S)

    # Since Σ is Hermitian, it has an eigendecomposition
    inv_S = np.linalg.inv(S)

    # Formula Section one: (1 √(2π)d|Σ|)
    part_1_dem = np.sqrt(det_S) * ((2 * np.pi) ** (d/2))

    # Formula Section two_upper_1: −1/2(x−μ)T
    part_2 = np.matmul(x_m, inv_S)

    # Formula Section two_upper_2: Σ−1(x−μ) used diagonal to fix alloc err
    part_2_1 = np.sum(x_m * part_2, axis=1)

    # Formula Section two exp(−1/2(x−μ)T Σ−1(x−μ))
    part_2_2 = np.exp(part_2_1 / -2)

    # pdf = part_1 * part_2_2:
    pdf = part_2_2 / part_1_dem
    P = np.where(pdf < 1e-300, 1e-300, pdf)
    return P

In [7]:
# main func

if __name__ == '__main__':
    np.random.seed(0)
    m = np.array([12, 30, 10])
    S = np.array([[36, -30, 15], [-30, 100, -20], [15, -20, 25]])
    X = np.random.multivariate_normal(m, S, 10000)
    P = pdf(X, m, S)
    print(P)

[3.47450910e-05 2.53649178e-06 1.80348301e-04 ... 1.24604061e-04
 1.86345129e-04 2.59397003e-05]


# PDF expectation step

* Calculates the expectation step in the EM algorithm for a GMM
- X is a numpy.ndarray of shape (n, d) containing the data set
* pi is a numpy.ndarray of shape (k,) containing the priors for each cluster
- m is a numpy.ndarray of shape (k, d) containing the centroid means for each cluster
* S is a numpy.ndarray of shape (k, d, d) containing the covariance matrices for each cluster
* 1 loop is allowed at most
- Returns: g, l, or None, None on failure

* g is a numpy.ndarray of shape (k, n) containing the posterior probabilities for each data point in each cluster
l is the total log likelihood

In [8]:
#!/usr/bin/env python3
"""
Calculates the expectation step in the EM algorithm for a GMM
"""

import numpy as np
pdf = __import__('5-pdf').pdf



def expectation(X, pi, m, S):
    """
    calculates the expectation step for a Gaussian mixture model
    """
    if not isinstance(X, np.ndarray) or len(X.shape) != 2:
        return None, None
    if not isinstance(m, np.ndarray) or len(m.shape) != 2:
        return None, None
    if not isinstance(pi, np.ndarray) or len(pi.shape) != 1:
        return None, None
    if not isinstance(S, np.ndarray) or len(S.shape) != 3:
        return None, None
    if not np.isclose([np.sum(pi)], [1])[0]:
        return None, None

    n, d = X.shape
    k = pi.shape[0]
    if d != m.shape[1] or d != S.shape[1] or d != S.shape[2]:
        return None, None
    if k != m.shape[0] or k != S.shape[0]:
        return None, None
    
    center_mean = m
    covariance_mat = S
    gauss = np.zeros((k, n))
    
    for i in range(k):
        likelihood = pdf(X, center_mean[i], covariance_mat[i])
        prior = pi[i]
        gauss[i] = likelihood * prior
    g = gauss / np.sum(gauss, axis=0)
    
    log_likelihood = np.sum(np.log(np.sum(gauss, axis=0)))
    
    return g, log_likelihood
    

In [9]:
# main func

if __name__ == '__main__':
    np.random.seed(11)
    a = np.random.multivariate_normal([30, 40], [[75, 5], [5, 75]], size=10000)
    b = np.random.multivariate_normal([5, 25], [[16, 10], [10, 16]], size=750)
    c = np.random.multivariate_normal([60, 30], [[16, 0], [0, 16]], size=750)
    d = np.random.multivariate_normal([20, 70], [[35, 10], [10, 35]], size=1000)
    X = np.concatenate((a, b, c, d), axis=0)
    np.random.shuffle(X)
    pi, m, S = initialize(X, 4)
    g, l = expectation(X, pi, m, S)
    print(g)
    print(np.sum(g, axis=0))
    print(l)

[[1.98542668e-055 1.00000000e+000 1.56526421e-185 ... 1.00000000e+000
  3.70567311e-236 1.91892348e-012]
 [6.97883333e-085 2.28658376e-279 9.28518983e-065 ... 8.12227631e-287
  1.53690661e-032 3.17417182e-181]
 [9.79811365e-234 2.28658376e-279 2.35073465e-095 ... 1.65904890e-298
  9.62514613e-068 5.67072057e-183]
 [1.00000000e+000 7.21133039e-186 1.00000000e+000 ... 2.42138447e-125
  1.00000000e+000 1.00000000e+000]]
[1. 1. 1. ... 1. 1. 1.]
-652797.7866541843


# Maximiliation step in the EM algorithm for a GMM

* X is a numpy.ndarray of shape (n, d) containing the data set
- g is a numpy.ndarray of shape (k, n) containing the posterior probabilities for each data point in each cluster
* 1 loop maximum
- Returns: pi, m, S, or None, None, None on failure
pi is a numpy.ndarray of shape (k,) containing the updated priors for each cluster
m is a numpy.ndarray of shape (k, d) containing the updated centroid means for each cluster
S is a numpy.ndarray of shape (k, d, d) containing the updated covariance matrices for each
cluster

In [10]:
#!/usr/bin/env python3
"""
calculates the maximization step in the EM algorithm for a Gaussian mixture
model
"""

import numpy as np


def maximization(X, g):
    """
    calculates the maximization step in the EM algorithm for a Gaussian mixture
    model
    """
    if not isinstance(X, np.ndarray) or len(X.shape) != 2:
        return None, None, None
    
    n, d = X.shape
    
    if not isinstance(g, np.ndarray) or len(g.shape) != 2:
        return None, None, None
    
    k, n1 = g.shape
    
    if n != n1 or np.abs(np.sum(g, axis=0) - 1).max() > 1e-10:
        return None, None, None
    
    S = np.zeros((k, d, d))
    
    sun_g = np.sum(g, axis=1)
    
    pi = sun_g / n
    m = np.dot(g, X) / sun_g[:, np.newaxis]
    
    for i in range(k):
        diff = X - m[i]
        weighted_diff = (g[i, :, np.newaxis] * diff).T
        S[i] = np.dot(weighted_diff, diff) / sun_g[i]
    
    return pi, m, S


In [11]:
# main func


if __name__ == '__main__':
    np.random.seed(11)
    a = np.random.multivariate_normal([30, 40], [[75, 5], [5, 75]], size=10000)
    b = np.random.multivariate_normal([5, 25], [[16, 10], [10, 16]], size=750)
    c = np.random.multivariate_normal([60, 30], [[16, 0], [0, 16]], size=750)
    d = np.random.multivariate_normal([20, 70], [[35, 10], [10, 35]], size=1000)
    X = np.concatenate((a, b, c, d), axis=0)
    np.random.shuffle(X)
    pi, m, S = initialize(X, 4)
    g, _ = expectation(X, pi, m, S)
    pi, m, S = maximization(X, g)
    print(pi)
    print(m)
    print(S)

[0.10104901 0.24748822 0.1193333  0.53212947]
[[54.7440558  31.80888393]
 [16.84099873 31.20560148]
 [21.42588061 65.51441875]
 [32.33208369 41.80830251]]
[[[64.05063663 -2.13941814]
  [-2.13941814 41.90354928]]

 [[72.72404579  9.96322554]
  [ 9.96322554 53.05035303]]

 [[46.20933259  1.08979413]
  [ 1.08979413 66.9841323 ]]

 [[35.04054823 -0.94790014]
  [-0.94790014 45.14948772]]]


# Task 9, Bayesian information Criterion

* X is a numpy.ndarray of shape (n, d) containing the data set
- kmin is a positive integer containing the minimum number of clusters to check for (inclusive)
* kmax is a positive integer containing the maximum number of clusters to check for (inclusive)

If kmax is None, kmax should be set to the maximum number of clusters possible
- iterations is a positive integer containing the maximum number of iterations for the EM algorithm
* tol is a non-negative float containing the tolerance for the EM algorithm
- verbose is a boolean that determines if the EM algorithm should print information to the standard output
* Returns: best_k, best_result, l, b, or None, None, None, None on failure

best_k is the best value for k based on its BIC
best_result is tuple containing pi, m, S
pi is a numpy.ndarray of shape (k,) containing the cluster priors for the best number of clusters
m is a numpy.ndarray of shape (k, d) containing the centroid means for the best number of clusters
S is a numpy.ndarray of shape (k, d, d) containing the covariance matrices for the best number of clusters
l is a numpy.ndarray of shape (kmax - kmin + 1) containing the log likelihood for each cluster size tested
b is a numpy.ndarray of shape (kmax - kmin + 1) containing the BIC value for each cluster size tested
Use: BIC = p * ln(n) - 2 * l
p is the number of parameters required for the model
n is the number of data points used to create the model
l is the log likelihood of the model

In [None]:
#!/usr/bin/env python3
"""
finds the best number of clusters for GMM using the Bayesian information
Criterion
"""

import numpy as np
expectation_maximization = __import__('8-EM').expectation_maximization


def BIC(X, kmin=1, kmax=None, iterations=1000, tol=1e-5, verbose=False):
    """
    finds the best number of clusters for GMM using the Bayesian information
    Criterion
    """
    try:
        if kmax == 1:
            return None, None, None, None
        n, d = X.shape
        if kmax is None:
            kmax = n
            if kmax >= kmin:
                return  None, None, None, None
        
        k_hist = list(range(kmin, kmax + 1))
        results_hist = []
        lh_hist = []
        bic_hist = []
        
        for k in range(kmin, kmax + 1):
            pi, m, S, g, lh = expectation_maximization(X, k, iterations, tol,
                                                       verbose)
        
            if pi is None or m is None or S is None or g is None or lh is None:
                return None, None, None, None
        
            num_parameters = k + k * d + k * d * (d + 1) // 2 - 1
            bic = num_parameters * np.log(n) - 2 * lh
        
            lh_hist.append(lh)
            results_hist.append((pi, m, S))
            bic_hist.append(bic)
    
            min_bic_index = np.argmin(bic_hist)
            best_k = k_hist[min_bic_index]
            best_result = results_hist[min_bic_index]
    
            return best_k, best_result, np.array(lh_hist), np.array(bic_hist)
        
    except Exception:
        return None, None, None, None